1

FASTプロトコルの 2 つのフィード アービトレートの問題を解決しています。よくわからなくても心配しないでください。私の質問は実際にはかなり一般的なものです。しかし、興味のある人のために問題の説明を追加しています (スキップできます)。


すべての UDP フィードのデータは、2 つの異なるマルチキャスト IP 上の 2 つの同一のフィード (A と B) で配布されます。UDP パケット損失の可能性があるため、クライアントが両方のフィードを受信して​​処理することを強くお勧めします。2 つの同一のフィードを処理することで、統計的にパケット損失の可能性を減らすことができます。どの特定のフィード (A または B) でメッセージが最初に表示されるかは指定されていません。これらのフィードを調停するには、プリアンブルまたはタグ 34-MsgSeqNum にあるメッセージ シーケンス番号を使用する必要があります。プリアンブルを利用すると、FAST メッセージをデコードせずにメッセージ シーケンス番号を決定できます。フィード A および B からのメッセージの処理は、次のアルゴリズムを使用して実行する必要があります。

  1. フィード A と B を聞く
  2. シーケンス番号に従ってメッセージを処理します。
  3. 同じシーケンス番号を持つメッセージが以前に処理された場合、メッセージを無視します。
  4. シーケンス番号にギャップがある場合は、両方のフィード (A と B) でパケットが失われていることを示しています。クライアントは、リカバリ プロセスの 1 つを開始する必要があります。しかし、まず最初に、クライアントは妥当な時間待機する必要があります。おそらく、パケットの並べ替えが原因で、失われたパケットが少し遅れて到着する可能性があります。UDP プロトコルは、一連のパケットの配信を保証できません。

    // tcp 回復アルゴリズムの詳細


私はそのような非常に単純なクラスを書きました。必要なすべてのクラスを事前に割り当ててから、特定のものを受け取った最初のスレッドがそれseqNumを処理できます。別のスレッドが後でドロップします。

class MsgQueue
{
public:
    MsgQueue();
    ~MsgQueue(void);
    bool Lock(uint32_t msgSeqNum);
    Msg& Get(uint32_t msgSeqNum);
    void Commit(uint32_t msgSeqNum);
private:
    void Process();
    static const int QUEUE_LENGTH = 1000000;

    // 0 - available for use; 1 - processing; 2 - ready
    std::atomic<uint16_t> status[QUEUE_LENGTH];
    Msg updates[QUEUE_LENGTH];
};

実装:

MsgQueue::MsgQueue()
{
        memset(status, 0, sizeof(status));
}

MsgQueue::~MsgQueue(void)
{
}

// For the same msgSeqNum should return true to only one thread 
bool MsgQueue::Lock(uint32_t msgSeqNum)
{
    uint16_t expected = 0;
    return status[msgSeqNum].compare_exchange_strong(expected, 1);
}

void MsgQueue::Commit(uint32_t msgSeqNum)
{
    status[msgSeqNum] = 2;
            Process();
}

    // this method probably should be combined with "Lock" but please ignore! :)
Msg& MsgQueue::Get(uint32_t msgSeqNum)
{
    return updates[msgSeqNum];
}

void MsgQueue::Process()
{
        // ready packets must be processed, 
}

使用法:

if (!msgQueue.Lock(seq)) {
    return;
}
Msg msg = msgQueue.Get(seq);
msg.Ticker = "HP"
msg.Bid = 100;
msg.Offer = 101;
msgQueue.Commit(seq);

QUEUE_LENGTH が無限大であると仮定すると、これは問題なく機能します。この場合、1 つの msgSeqNum = 1 つのupdates配列アイテムだからです。

しかし、履歴全体 (何百万ものパケット) を保存することは不可能であり、そうする理由がないため、バッファーを循環させる必要があります。実際には、セッションを再構築するのに十分なパケットをバッファリングする必要があり、セッションが再構築されたら、それらをドロップできます。

ただし、循環バッファーを使用すると、アルゴリズムが大幅に複雑になります。たとえば、長さ 1000 の循環バッファがあるとします。同時に、seqNum = 10 000 および seqNum = 11 000 を処理しようとします (これは非常にありそうもないことですが、それでも可能です)。これらのパケットはどちらもupdatesindex の配列にマップされる0ため、衝突が発生します。このような場合、バッファは古いパケットを「ドロップ」し、新しいパケットを処理する必要があります。

使いたいものを実装するのは簡単ですが、異なるスレッドから使​​用される循環バッファにコードをlocks書くのは本当に複雑です。lock-freeですから、それを行う方法についての提案やアドバイスを歓迎します。ありがとう!

4

1 に答える 1

0

リングバッファが使えるとは思えません。ハッシュされたインデックスstatus[]配列で使用できます。すなわち、hash = seq % 1000。問題は、シーケンス番号がネットワークによって決定され、その順序を制御できないことです。このシーケンス番号に基づいてロックしたいと考えています。配列は無限である必要はなく、シーケンス番号の範囲だけです。しかし、それはおそらく実際よりも大きいです。

シーケンス番号がロックされているときに何が起こっているのかわかりません。これは、別のスレッドが処理していることを意味しますか? その場合、特定のシーケンス番号を解決するために、ハッシュ衝突のサブリストを維持する必要があります。

配列サイズを 2 のべき乗と見なすこともできます。たとえば、1024 を使用するhash = seq & 1023;と、非常に効率的になります。

于 2013-04-28T16:53:00.877 に答える