物事を同期するために、常にしばらくブロックする必要があります。実際にワーカーのプールにリクエストを送信し、レスポンスが受信されたときに、それが後続のレスポンスでない場合にバッファリングすることができます。単純なワークフローの 1 つを、疑似言語で次のように記述できます。
socket receiver; # zmq.PULL
socket workers; # zmq.DEALER, the worker thread socket is started as zmq.DEALER too.
poller = poller(receiver, workers);
next_id_req = incr()
out_queue = queue;
out_queue.last_id = next_id_req
buffer = sorted_queue;
sock = poller.poll()
if sock is receiver:
packet_N = receiver.recv()
# send N for processing
worker.send(packet_N, ++next_id_req)
else if sock is workers:
# get a processed response Func(N)
func_N_response, id = workers.recv()
if out_queue.last_id != id-1:
# not subsequent id, buffer it
buffer.push(id, func_N_rseponse)
else:
# in order, push to out queue
out_queue.push(id, func_N_response)
# also consume all buffered subsequent items
while (out_queue.last_id == buffer.min_id() - 1):
id, buffered_N_resp = buffer.pop()
out_queue.push(id, buffered_N_resp)
しかし、ここで、処理スレッド (ワーカープール) でパケットが失われた場合にどうなるかという問題が発生します。特定のタイムアウト後にパケットをスキップし (バッファを out キューにフラッシュする)、out キューへの入力を続行することができます。パケットが後で来たときに並べ替えます。