0

私は、RabbitMQ と AMQP という名前の PHP の PECL 拡張機能を使用して、単純なタスク キューを作成しようとしています。

私の目的はかなり単純です。プロデューサは、処理が必要なオブジェクトのエンベロープを含む特定のキューにメッセージを送信する必要があります。

コンシューマーはすべて、上記のキューをリッスンし、メッセージが来ると処理する必要があります。コンシューマーをさらに追加して、RabbitMq がラウンドロビン方式でメッセージをディスパッチできるようにする必要があります。

Python または Java ライブラリのチュートリアルは非常に簡単に見つかりますが、PHP の PECL ライブラリのチュートリアルは見つかりませんでした。

何かをバインドする必要があるかどうかはよくわかりません.PECLライブラリではそのように実装されていない「basic_publishとbasic_consume」を使用するカスタムphpライブラリで動作する例がありました。

だからここに私がこれまでに得たものがあります:

$oConfig = Zend_Registry::get('config');
$sQueue = $oConfig->amqp->validate_queue_name;

$oConnection = new AMQPConnection();
$oConnection->setLogin($oConfig->amqp->login);
$oConnection->setPassword($oConfig->amqp->pass);
$oConnection->setVhost($oConfig->amqp->vhost);
$oConnection->setPort($oConfig->amqp->port);
$oConnection->connect();

$oChannel = new AMQPChannel($oConnection);
$oExchange = new AMQPExchange($oChannel);

$sMsg = new stdClass();
$sMsg->nId = $p_nId;
$sMsg->nStatus= $p_nStatus;
try  {
  $oChannel->startTransaction();
  $bResponse = $oExchange->publish($sMgs,$sQueue);
  if (!$bResponse)  {
    echo "<h1>An error occured, the message can't be published</h1>";
    echo "<h3>Sorry i don't know why</h3>";
    exit;
  }
  $oChannel->commitTransaction();
}  catch (Exception $oException)  {
  echo "<h1>An error occured, the message can't be published</h1>";
  echo "<h3>See error below</h3>";
  echo "<pre>";
  echo print_r($oException->getMessage());
  echo "</pre>";
  exit;
}

ワーカー

  $oConfig = Zend_Registry::get('config');
  $oConnection = new AMQPConnection();
  $oConnection->setLogin($oConfig->amqp->login);
  $oConnection->setPassword($oConfig->amqp->pass);
  $oConnection->setVhost($oConfig->amqp->vhost);
  $oConnection->setPort($oConfig->amqp->port);
  $oConnection->connect();

  $oChannel = new AMQPChannel($oConnection);
  $oQueue = new AMQPQueue($oChannel);

  $oQueue->declare($oConfig->amqp->validate_queue_name);

  function processMessage($oMessage, $oQueue) {
    $nId     = $msg->body->nId;
    $nStatus = $msg->body->nStatus;
    $oIniAct = $oActionMap->findBy('id',$nId);

    $sReply  = $oIniAct->updateStatusMisc($nStatus);
    if ($sReply->status == $nStatus)  {
      $oQueue->ack($sMsg['delivery_tag']);
    } else {
    $oQueue->nack($sMsg['delivery_tag'],AMQP_REQUEUE);
    }
  }

  $oQueue->consume("processMessage",AMQP_NOPARAM);

PHP doc が私に言っているのは、consume() がすべての人に応じてスレッドをロックするということですか? 基本的に、一度に働く労働者は1人だけですか?また、人々がキューをバインドしていることがわかりますが、私が見た基本的な消費を使用した最初のワーカーの例では、それを使用していませんでした。

ご覧のとおり、私はかなり混乱しています。ヘルプ/指示/チュートリアル ASO ... が役立ちます

ありがとう

4

1 に答える 1

0

PHP には同期の性質があるため、はい、consume()メイン スレッドをロックしますが、基本的なロジックは、ソケット接続ですべての受信データを読み取り、それを PHP 構造に変換して、コンシューマー関数にフィードします。

php-amqp を非同期にするために github の問題で議論がありましたが、非同期機能が必要な場合、設計上、PHP は最適な言語ではないことに全員が同意しました。

個人的には、コンシューマー スクリプトを複数回実行します (実際にはそのためのバランサーがあります)。そのため、各コンシューマーは互いに影響を与えず、個別に失敗して再起動する可能性があります。あなたも同じことができると思います。

コンシューマーのアクティビティを監視するデーモンおよびバランサースクリプト(実際には実際のロードバランサーはありません)としてコンシューマースクリプトを一度に複数回実行し(memcacheを介して実行されますが、クリアではありませんが、WFM)、コンシューマーバランサーにアクティビティがない場合は、それらを1つずつ殺します1 つ (ただし、少なくとも 1 つの作業中のコンシューマーがまだ生きている必要があります)。コンシューマーが過負荷になると、バランサー スクリプトがより多くのコンシューマーを開始します。

1 つのメッセージを消費してから終了する必要がある場合は、コンシューマー関数を return にしfalseます。

キューに利用可能なメッセージが少なくとも 1 つあることが確実な場合はAMQPQueue::get()、メイン スレッドをブロックしない (またはブロックしない) メソッドを使用することをお勧めします。

于 2013-07-12T10:56:04.227 に答える