0

zmq ソリューションを求めている次の問題があります。私は時系列データを持っています:

A,B,C,D,E,...

各ポイントで操作 Func を実行する必要があります。

zmq を介して複数のワーカーを使用してタスクを並列化することは理にかなっています。しかし、私をつまずかせているのは、結果を同期する方法です。つまり、結果は、入力データが入ってきたのとまったく同じように時間順に並べられる必要があります。したがって、最終結果は次のようになります。

Func(A), Func(B), Func(C), Func(D),...

Func(A) は Func(B) とは少し異なります。これにより、しばらくブロックする必要がある場合があります。

どんな提案でも大歓迎です。

4

1 に答える 1

1

物事を同期するために、常にしばらくブロックする必要があります。実際にワーカーのプールにリクエストを送信し、レスポンスが受信されたときに、それが後続のレスポンスでない場合にバッファリングすることができます。単純なワークフローの 1 つを、疑似言語で次のように記述できます。

socket receiver; # zmq.PULL
socket workers; # zmq.DEALER, the worker thread socket is started as zmq.DEALER too.
poller = poller(receiver, workers);

next_id_req = incr()
out_queue = queue;
out_queue.last_id = next_id_req
buffer = sorted_queue;

sock = poller.poll()
if sock is receiver:
    packet_N = receiver.recv()
    # send N for processing
    worker.send(packet_N, ++next_id_req)

else if sock is workers:
   # get a processed response Func(N)
   func_N_response, id = workers.recv()
   if out_queue.last_id != id-1:
       # not subsequent id, buffer it
       buffer.push(id, func_N_rseponse)
   else:
       # in order, push to out queue
       out_queue.push(id, func_N_response)

       # also consume all buffered subsequent items
       while (out_queue.last_id == buffer.min_id() - 1):
           id, buffered_N_resp = buffer.pop()
           out_queue.push(id, buffered_N_resp)

しかし、ここで、処理スレッド (ワーカープール) でパケットが失われた場合にどうなるかという問題が発生します。特定のタイムアウト後にパケットをスキップし (バッファを out キューにフラッシュする)、out キューへの入力を続行することができます。パケットが後で来たときに並べ替えます。

于 2013-12-16T20:46:26.180 に答える