私のアプリケーションのプロデューサー モジュールは、小さなクラスターで実行する作業を送信したいユーザーによって実行されます。RabbitMQ メッセージ ブローカーを介して JSON 形式でサブスクリプションを送信します。
私はいくつかの戦略を試しましたが、これまでのところ最良の方法は次のとおりですが、まだ完全には機能していません。
各クラスター マシンはコンシューマー モジュールを実行します。このモジュールは AMQP キューにサブスクライブし、prefetch_countを発行して、一度に実行できるタスクの数をブローカーに伝えます。
Pika AMQP ライブラリの SelectConnection を使用して動作させることができました。コンシューマーとプロデューサーの両方が 2 つのチャネルを開始し、1 つは各キューに接続されます。プロデューサはチャネル [A] でリクエストを送信し、チャネル [B] で応答を待ち、コンシューマはチャネル [A] でリクエストを待ち、チャネル [B] で応答を送信します。ただし、コンシューマが応答を計算するコールバックを実行するとブロックされるように見えるため、各コンシューマで一度に実行されるタスクは 1 つだけです。
最後に必要なもの:
- コンシューマー [A] は自分のタスク (毎回約 5k) をクラスターにサブスクライブします。
- ブローカは各コンシューマに対して N 個のメッセージ/リクエストをディスパッチします。ここで、N は処理できる同時タスクの数です。
- 1 つのタスクが完了すると、コンシューマーはブローカー/プロデューサーに結果を返信します。
- プロデューサーは応答を受け取り、計算ステータスを更新し、最後にいくつかのレポートを出力します
制限:
- 別のユーザーが作業を送信すると、そのユーザーのすべてのタスクが前のユーザーの後にキューに入れられます (これはキュー システムから自動的に当てはまると思いますが、スレッド環境での影響については考えていません)。
- タスクには提出する順序がありますが、回答する順序は重要ではありません
アップデート
私はもう少し勉強しましたが、私の実際の問題は、ピカの SelectConnection.channel.basic_consume() 関数へのコールバックとして単純な関数を使用しているようです。私の最後の (未実装の) アイデアは、通常の関数の代わりにスレッド関数を渡すことです。これにより、コールバックがブロックされず、消費者がリッスンし続けることができます。