3

これが私が現在持っているセットアップについて少しです。

  • データをキューにプッシュ(POST)するREST API
  • キューには、常に実行され、Exchangeに生成されるコンシューマーがあります
  • Exchangeは他のいくつかのキュー(20以上など)にルーティングします
  • (20以上の)キューのそれぞれが特定のタスクを実行します(コンシューマーも常に実行されます)
  • cronジョブが実行され、すべての(20以上の)タスクが完了して、さらに別のキューに生成されるかどうかが確認されます

各コンシューマーが約300MBのRAMを使用しているため(MBだと思いますが、現時点では目の前にありません)、コンシューマーが常に実行されているのが好きかどうかはわかりません。別の実装を探しています。

    M <-- Message coming from REST API
    |
    |
    +-First Queue
    |
    |
    | <-- The Exchange
   /|\
  / | \
 /  |  \ <-- bind to multiple queues ( 20+ )
Q1  Q2  Q3 <-- Each Queue is a task that must be completed


    | <-- CRON runs to check if all queues above have completed
    |
    |
    Q4 <-- Queues 1,2 and 3 must finish first before Queue 4 can start
    |
    C <-- Consumer 

以下の関連する質問では、RPCを使用することが提案されましたが、これに関する問題はRPCです(私の理解では)複数のインスタンスがあります。これはそのままではリソースを大量に消費するプロセスであり、RPC呼び出しを追加すると、サーバーが停止して応答しなくなると思います(間違っている場合は修正してください)。

別のアプローチは、アグリゲーターパターンを使用することでした

これはまさに私が必要としているものに見えますが、ドキュメントが限られていることがわかりました。誰かがこのパターンをしましたか?

私の質問は、それが現在どのように実装されているかに満足しておらず、プロセスを改善する方法を探しているということです。私は、CRONを削除するか、新しいパターンを実装するか、コンシューマーを常に実行しないようにすることを検討しています。

このプロセスは現在、各コンシューマーの単一のインスタンスのみをサポートしています。複数のコンシューマーを持つことができますが、どのように実装したかについては、一度に1つしか必要ありませんでした。

これは、RabbitMQBundleを使用してPHP、Symfony2フレームワークに実装されています

関連する質問:

4

1 に答える 1

5

OldSoundは、RabbitMQバンドルの作成者です。

バンドル自体は、そのままではアグリゲーターパターンをサポートしていませんが、基盤となるphp-amqplibを使用して実装できます。

集約を実行するには、相関IDを使用してメッセージを公開し、そのIDを処理チェーンに沿ってスレッド化する必要があります。次に、アグリゲーターは、その特定のタスクを処理する必要のあるさまざまなワーカーの数に応じて、X個のメッセージを待機します。メッセージを待つ方法は、相関IDによってインデックスが付けられるときに、メッセージを保持する配列を用意することです。

したがって、着信メッセージがあるときはいつでも、次のことを行います。

$correlation_id = $msg->get('correlation_id');
$this->receivedMessages[$correlation_id]['msgs'][] = $msg;

そして、どこかで:

if ($someNumber == count($this->receivedMessages[$correlation_id]['msgs']) {
// proceed to next step
}

私は実際にSymfonyのワークフローバンドルに取り組んでおり、近いうちにオープンソース化する予定です。そのバンドルを使用して、提示するユースケースを非常に簡単な方法で実装できます(つまり、各タスクにサービスを提供するだけで済みます)。

では、なぜ各コンシューマーが300MBのRAMを使用するのでしょうか。それらを使用してフルスタックフレームワークを実行する必要がありますか?可能であれば、コンシューマーアプリ用の新しいSymfonyカーネルを作成し、オーバーヘッドを削減するために必要なものだけをロードします。

于 2013-01-30T19:32:35.063 に答える