FASTプロトコルの 2 つのフィード アービトレートの問題を解決しています。よくわからなくても心配しないでください。私の質問は実際にはかなり一般的なものです。しかし、興味のある人のために問題の説明を追加しています (スキップできます)。
すべての UDP フィードのデータは、2 つの異なるマルチキャスト IP 上の 2 つの同一のフィード (A と B) で配布されます。UDP パケット損失の可能性があるため、クライアントが両方のフィードを受信して処理することを強くお勧めします。2 つの同一のフィードを処理することで、統計的にパケット損失の可能性を減らすことができます。どの特定のフィード (A または B) でメッセージが最初に表示されるかは指定されていません。これらのフィードを調停するには、プリアンブルまたはタグ 34-MsgSeqNum にあるメッセージ シーケンス番号を使用する必要があります。プリアンブルを利用すると、FAST メッセージをデコードせずにメッセージ シーケンス番号を決定できます。フィード A および B からのメッセージの処理は、次のアルゴリズムを使用して実行する必要があります。
- フィード A と B を聞く
- シーケンス番号に従ってメッセージを処理します。
- 同じシーケンス番号を持つメッセージが以前に処理された場合、メッセージを無視します。
シーケンス番号にギャップがある場合は、両方のフィード (A と B) でパケットが失われていることを示しています。クライアントは、リカバリ プロセスの 1 つを開始する必要があります。しかし、まず最初に、クライアントは妥当な時間待機する必要があります。おそらく、パケットの並べ替えが原因で、失われたパケットが少し遅れて到着する可能性があります。UDP プロトコルは、一連のパケットの配信を保証できません。
// tcp 回復アルゴリズムの詳細
私はそのような非常に単純なクラスを書きました。必要なすべてのクラスを事前に割り当ててから、特定のものを受け取った最初のスレッドがそれseqNum
を処理できます。別のスレッドが後でドロップします。
class MsgQueue
{
public:
MsgQueue();
~MsgQueue(void);
bool Lock(uint32_t msgSeqNum);
Msg& Get(uint32_t msgSeqNum);
void Commit(uint32_t msgSeqNum);
private:
void Process();
static const int QUEUE_LENGTH = 1000000;
// 0 - available for use; 1 - processing; 2 - ready
std::atomic<uint16_t> status[QUEUE_LENGTH];
Msg updates[QUEUE_LENGTH];
};
実装:
MsgQueue::MsgQueue()
{
memset(status, 0, sizeof(status));
}
MsgQueue::~MsgQueue(void)
{
}
// For the same msgSeqNum should return true to only one thread
bool MsgQueue::Lock(uint32_t msgSeqNum)
{
uint16_t expected = 0;
return status[msgSeqNum].compare_exchange_strong(expected, 1);
}
void MsgQueue::Commit(uint32_t msgSeqNum)
{
status[msgSeqNum] = 2;
Process();
}
// this method probably should be combined with "Lock" but please ignore! :)
Msg& MsgQueue::Get(uint32_t msgSeqNum)
{
return updates[msgSeqNum];
}
void MsgQueue::Process()
{
// ready packets must be processed,
}
使用法:
if (!msgQueue.Lock(seq)) {
return;
}
Msg msg = msgQueue.Get(seq);
msg.Ticker = "HP"
msg.Bid = 100;
msg.Offer = 101;
msgQueue.Commit(seq);
QUEUE_LENGTH が無限大であると仮定すると、これは問題なく機能します。この場合、1 つの msgSeqNum = 1 つのupdates
配列アイテムだからです。
しかし、履歴全体 (何百万ものパケット) を保存することは不可能であり、そうする理由がないため、バッファーを循環させる必要があります。実際には、セッションを再構築するのに十分なパケットをバッファリングする必要があり、セッションが再構築されたら、それらをドロップできます。
ただし、循環バッファーを使用すると、アルゴリズムが大幅に複雑になります。たとえば、長さ 1000 の循環バッファがあるとします。同時に、seqNum = 10 000 および seqNum = 11 000 を処理しようとします (これは非常にありそうもないことですが、それでも可能です)。これらのパケットはどちらもupdates
index の配列にマップされる0
ため、衝突が発生します。このような場合、バッファは古いパケットを「ドロップ」し、新しいパケットを処理する必要があります。
使いたいものを実装するのは簡単ですが、異なるスレッドから使用される循環バッファにコードをlocks
書くのは本当に複雑です。lock-free
ですから、それを行う方法についての提案やアドバイスを歓迎します。ありがとう!