4

私は現在、かなり大きなデータ処理タスクに取り組んでおり、処理を分割して分散する必要があります。ZeroMQのPUSH/PULLメカニズムを使用して完全に機能するパイプラインプロトタイプがあります。「処理中」のデータが送信され、正常に完了します。

ただし、データがファンアウトされ、処理が完了したら、ワーカー(while / trueループで実行)に終了するように指示する必要があります。これには、ZMQのPUB/SUBメカニズムを使用するのが賢明だと思いました。ただし、どの制御メッセージもワーカーによって受信されていません。

苛立たしいことに、PUSH / PULLメカニズムに切り替えると、ワーカーはメッセージを受信して​​正しく終了します。

ここにプロトタイプからいくつかのコードをコピーしました:

http://pastebin.com/myZyWQ1E

プロデューサー(ワーカーにデータを送信するビット)が制御ソケットを作成してバインドし(19行目)、ワーカーもこれをバインドしていることがわかります(53行目)。プロデューサーからの作業が完了すると、制御ソケットを介して終了コマンドが送信され、ワー​​カーはループを続けて制御メッセージをチェックします。残念ながら、75行目は真のテストを行わず、プロセスはループし続けます。

// Control socket create / bind from producer
$this->controlSocket = $this->zmqContext->getSocket(ZMQ::SOCKET_PUB);
$this->controlSocket->bind('tcp://172.0.0.1:51001');

// Control socket create / bind from worker
$control = $this->getZmqContext()->getSocket(ZMQ::SOCKET_SUB);
$control->setSockOpt(ZMQ::SOCKOPT_SUBSCRIBE, '');
$control->connect('tcp://172.0.0.1:51001');
4

1 に答える 1

6

ええ、あなたはおそらく「スロージョイナー」問題のインスタンスにぶつかっています。PUBはブロックしないため、送信先のユーザーがいない場合はメッセージをドロップします。ソケットを作成してすぐに送信するため、サブスクライバーは引き続き接続します。これにはゼロ以外の時間がかかるため、メッセージは失われます。開始したらすぐにコントロールソケットを作成してみてください。接続が安定する機会が与えられます(または、コントロールソケットのbind()とsend()の間に少しスリープを入れます)。詳細については、ZGuideを参照してください:http://zguide.zeromq.org/page:all 「slowjoiner」を検索)。

私が気付いたもう1つのことは、ポーリングの外部で制御ソケットをチェックしていることです。ポーリングの目的は、まさにそのようなことを実行できるようにすることです。これを、作成済みのポーリングセットに別のPOLL_INとして追加し、read()で返されたソケットが===(トリプルが等しい!)であるかどうかを確認します。制御ソケットまたはデータソケット。そうすることで、応答時間が短縮されます。

于 2012-12-06T13:31:38.183 に答える