3

ZeroMQで奇妙な問題が発生しています。この問題では、一部のメッセージがスタックし、新しいメッセージが到着するとスタックが解除されます。それは、新しいメッセージがスタックしたメッセージをドアに押し付けるようなものです(ひどい比較です、私は知っています)。

私のコードは非常に単純です:

rep.php

$context = new ZMQContext;
$receiver = new ZMQSocket($context, ZMQ::SOCKET_PULL);
$receiver->connect("tcp://localhost:8022");
$receiver2 = new ZMQSocket($context, ZMQ::SOCKET_PULL);
$receiver2->connect("tcp://localhost:8024");

for (;;) {
    echo $receiver->recv() . PHP_EOL;
    echo $receiver2->recv() . PHP_EOL;
}

cnt.phpとcnt2.php(同じコード、異なるポート)

$context = new ZMQContext;
$work = new ZMQSocket($context, ZMQ::SOCKET_PUSH);
$work->bind('tcp://*:8022');
$work->send('Hello World');

cnt.phpは8022に送信し、cnt2.phpは8024に送信します。これらは時々実行され、メッセージをrep.phpに送信します。ただし、一部のメッセージがスタックします。cnt.phpから4つのメッセージを送信した場合、何も受信されませんが、cnt2.phpから1つを送信すると、一度に5つのメッセージが表示されます。何か案は?

4

1 に答える 1

1

私はここでは PHP の専門家ではありませんが、構文と機能を推測しています。私が間違っている場合は修正してください

echo $receiver->recv() . ' - ' . $receiver2->recv();

recv() はブロッキング呼び出しでなければなりません。

  1. $receiver->recv()メッセージが受信されるまでブロックします。
  2. ただしecho、メッセージをすぐにエコーしません
  3. あなたは再びブロックされています$receiver2->recv()
  4. 他のファイルからメッセージを送信した場合にのみ、待機しているため、エコー作業が行われます$receiver2->recv()

ソケット recv() を個別に処理したいので、ポーリングまたはイベント ベースの非同期 I/O を使用する必要があります。

試みた解決策

$poll = new ZMQPoll();
$poll->add($receiver, ZMQ::POLL_IN);
$poll->add($receiver2, ZMQ::POLL_IN);
$readable = $writeable = array();
while(true) {
     $events = $poll->poll($readable, $writeable);
     foreach($readable as $socket) {
               $message = $socket->recv();
               echo $message, PHP_EOL;
     }
}

から適応: http://zguide.zeromq.org/php:rrbroker

于 2012-06-22T21:56:49.883 に答える