9

私のアプリケーションのプロデューサー モジュールは、小さなクラスターで実行する作業を送信したいユーザーによって実行されます。RabbitMQ メッセージ ブローカーを介して JSON 形式でサブスクリプションを送信します。

私はいくつかの戦略を試しましたが、これまでのところ最良の方法は次のとおりですが、まだ完全には機能していません。

各クラスター マシンはコンシューマー モジュールを実行します。このモジュールは AMQP キューにサブスクライブし、prefetch_countを発行して、一度に実行できるタスクの数をブローカーに伝えます。

Pika AMQP ライブラリの SelectConnection を使用して動作させることができました。コンシューマーとプロデューサーの両方が 2 つのチャネルを開始し、1 つは各キューに接続されます。プロデューサはチャネル [A] でリクエストを送信し、チャネル [B] で応答を待ち、コンシューマはチャネル [A] でリクエストを待ち、チャネル [B] で応答を送信します。ただし、コンシューマが応答を計算するコールバックを実行するとブロックされるように見えるため、各コンシューマで一度に実行されるタスクは 1 つだけです。

最後に必要なもの:

  1. コンシューマー [A] は自分のタスク (毎回約 5k) をクラスターにサブスクライブします。
  2. ブローカは各コンシューマに対して N 個のメッセージ/リクエストをディスパッチします。ここで、N は処理できる同時タスクの数です。
  3. 1 つのタスクが完了すると、コンシューマーはブローカー/プロデューサーに結果を返信します。
  4. プロデューサーは応答を受け取り、計算ステータスを更新し、最後にいくつかのレポートを出力します

制限:

  • 別のユーザーが作業を送信すると、そのユーザーのすべてのタスクが前のユーザーの後にキューに入れられます (これはキュー システムから自動的に当てはまると思いますが、スレッド環境での影響については考えていません)。
  • タスクには提出する順序がありますが、回答する順序は重要ではありません

アップデート

私はもう少し勉強しましたが、私の実際の問題は、ピカの SelectConnection.channel.basic_consume() 関数へのコールバックとして単純な関数を使用しているようです。私の最後の (未実装の) アイデアは、通常の関数の代わりにスレッド関数を渡すことです。これにより、コールバックがブロックされず、消費者がリッスンし続けることができます。

4

3 に答える 3

0

スレッド化の経験がないので、私のセットアップは複数のコンシューマープロセスを実行します(その数は基本的にプリフェッチカウントです)。それぞれが2つのキューに接続し、お互いの存在を知らずに、ジョブを楽しく処理します。

于 2013-03-06T11:00:51.870 に答える
0

あなたのセットアップは私にはいいですね。その通りです。スレッドを開始するようにコールバックを設定し、スレッドがチャネル B を介して応答をキューに戻すのが終了したときに、それを別のコールバックにチェーンすることができます。

基本的に、コンシューマーには独自のキューが必要です (サイズ N、サポートされる並列処理の量)。チャネル A 経由でリクエストが来ると、Pika を使用するメイン スレッドとスレッド プール内のワーカー スレッドの間で共有されるキューに結果を格納する必要があります。キューに入れられるとすぐに、pika は ACK で応答し、ワーカー スレッドが起動して処理を開始します。

作業が完了すると、ワーカーは結果を別の結果キューに戻し、メイン スレッドにコールバックを発行してコンシューマーに送り返します。

ワーカー スレッドが共有リソースを使用している場合は、ワーカー スレッドが互いに干渉しないように注意する必要がありますが、それは別のトピックです。

于 2012-04-08T14:20:49.900 に答える