私は、RabbitMQ と AMQP という名前の PHP の PECL 拡張機能を使用して、単純なタスク キューを作成しようとしています。
私の目的はかなり単純です。プロデューサは、処理が必要なオブジェクトのエンベロープを含む特定のキューにメッセージを送信する必要があります。
コンシューマーはすべて、上記のキューをリッスンし、メッセージが来ると処理する必要があります。コンシューマーをさらに追加して、RabbitMq がラウンドロビン方式でメッセージをディスパッチできるようにする必要があります。
Python または Java ライブラリのチュートリアルは非常に簡単に見つかりますが、PHP の PECL ライブラリのチュートリアルは見つかりませんでした。
何かをバインドする必要があるかどうかはよくわかりません.PECLライブラリではそのように実装されていない「basic_publishとbasic_consume」を使用するカスタムphpライブラリで動作する例がありました。
だからここに私がこれまでに得たものがあります:
$oConfig = Zend_Registry::get('config');
$sQueue = $oConfig->amqp->validate_queue_name;
$oConnection = new AMQPConnection();
$oConnection->setLogin($oConfig->amqp->login);
$oConnection->setPassword($oConfig->amqp->pass);
$oConnection->setVhost($oConfig->amqp->vhost);
$oConnection->setPort($oConfig->amqp->port);
$oConnection->connect();
$oChannel = new AMQPChannel($oConnection);
$oExchange = new AMQPExchange($oChannel);
$sMsg = new stdClass();
$sMsg->nId = $p_nId;
$sMsg->nStatus= $p_nStatus;
try {
$oChannel->startTransaction();
$bResponse = $oExchange->publish($sMgs,$sQueue);
if (!$bResponse) {
echo "<h1>An error occured, the message can't be published</h1>";
echo "<h3>Sorry i don't know why</h3>";
exit;
}
$oChannel->commitTransaction();
} catch (Exception $oException) {
echo "<h1>An error occured, the message can't be published</h1>";
echo "<h3>See error below</h3>";
echo "<pre>";
echo print_r($oException->getMessage());
echo "</pre>";
exit;
}
ワーカー
$oConfig = Zend_Registry::get('config');
$oConnection = new AMQPConnection();
$oConnection->setLogin($oConfig->amqp->login);
$oConnection->setPassword($oConfig->amqp->pass);
$oConnection->setVhost($oConfig->amqp->vhost);
$oConnection->setPort($oConfig->amqp->port);
$oConnection->connect();
$oChannel = new AMQPChannel($oConnection);
$oQueue = new AMQPQueue($oChannel);
$oQueue->declare($oConfig->amqp->validate_queue_name);
function processMessage($oMessage, $oQueue) {
$nId = $msg->body->nId;
$nStatus = $msg->body->nStatus;
$oIniAct = $oActionMap->findBy('id',$nId);
$sReply = $oIniAct->updateStatusMisc($nStatus);
if ($sReply->status == $nStatus) {
$oQueue->ack($sMsg['delivery_tag']);
} else {
$oQueue->nack($sMsg['delivery_tag'],AMQP_REQUEUE);
}
}
$oQueue->consume("processMessage",AMQP_NOPARAM);
PHP doc が私に言っているのは、consume() がすべての人に応じてスレッドをロックするということですか? 基本的に、一度に働く労働者は1人だけですか?また、人々がキューをバインドしていることがわかりますが、私が見た基本的な消費を使用した最初のワーカーの例では、それを使用していませんでした。
ご覧のとおり、私はかなり混乱しています。ヘルプ/指示/チュートリアル ASO ... が役立ちます
ありがとう