15

シナリオ(私は物事を単純化しました):

  • 多くのエンド ユーザーは、フロント エンド Web アプリケーション (プロデューサー) からジョブ (たとえば、大きな PDF のレンダリングなどの重いジョブ) を開始できます。
  • ジョブは、単一の永続的な RabbitMQ キューに送信されます。
  • 多くのワーカー アプリケーション (コンシューマー) がこれらのジョブを処理し、結果をデータストアに書き戻します。

このかなり標準的なパターンは正常に機能しています。

問題: ユーザーが同じ 1 分間に 10 個のジョブを開始し、その時間帯に稼働しているワーカー アプリケーションが 10 個しかない場合、このエンド ユーザーは事実上、すべての計算時間を自分自身のために取っています。

質問: エンド ユーザーごとに常に 1 つのジョブのみが処理されるようにするにはどうすればよいですか? (おまけ: 一部のエンド ユーザー (管理者など) は調整してはなりません)

また、エンド ユーザーが同時にジョブを開始するのをフロント エンド アプリケーションがブロックしないようにします。エンドユーザーには、並行ジョブが一度に 1 つずつ終了するのを待ってもらいたいだけです。

ソリューション?: エンド ユーザーごとに 1 つの自動削除専用キューを動的に作成する必要がありますか? はいの場合、ワーカー アプリケーションにこのキューの使用を開始するように指示するにはどうすればよいですか? 1 つ (そして 1 つだけ) のワーカーがこのキューから消費するようにするにはどうすればよいですか?

4

2 に答える 2

6

Dimos が言うように、これを実装するには自分で何かを構築する必要があります。これは、追加のキューといくつかの永続ストレージを必要とする別の実装です。

  • 既存のジョブのキューに加えて、「処理可能なジョブキュー」を作成します。ビジネス ルールを満たすジョブのみがこのキューに追加されます。
  • ジョブ キューのコンシューマー ("Limiter" という名前) を作成します。Limiter には、現在処理中のジョブを記録するための永続ストレージ (Redis やリレーショナル データベースなど) も必要です。リミッタは、ジョブ キューから読み取り、処理可能なジョブ キューに書き込みます。
  • ワーカー アプリケーションがジョブの処理を終了すると、「ジョブ終了」イベントがジョブ キューに追加されます。

    ------------     ------------     ----------- 
    | Producer | -> () job queue ) -> | Limiter | 
    ------------     ------------     ----------- 
                         ^                |                    
                         |                V                    
                         |     ------------------------       
                         |    () processable job queue )  
           job finished  |     ------------------------       
                         |                |
                         |                V
                         |     ------------------------
                         \-----| Job Processors (x10) |
                               ------------------------
    

リミッターのロジックは次のとおりです。

  • ジョブ メッセージを受信したら、永続ストレージをチェックして、現在のユーザーに対してジョブが既に実行されているかどうかを確認します。
    • そうでない場合は、ジョブを実行中としてストレージに記録し、ジョブ メッセージを処理可能なジョブ キューに追加します。
    • 既存のジョブが実行中の場合は、そのジョブを保留中のジョブとしてストレージに記録します。
    • ジョブが管理ユーザー向けの場合は、常に処理可能なジョブ キューに追加します。
  • 「ジョブ終了」メッセージを受信したら、そのジョブを永続ストレージの「実行中のジョブ」リストから削除します。次に、そのユーザーの保留中のジョブのストレージを確認します。
    • ジョブが見つかった場合は、そのジョブのステータスを保留中から実行中に変更し、処理可能なジョブ キューに追加します。
    • それ以外の場合は、何もしません。
  • 一度に実行できるリミッタ プロセスのインスタンスは 1 つだけです。これは、リミッター プロセスのインスタンスを 1 つだけ開始するか、永続ストレージのロック メカニズムを使用することで実現できます。

かなり重いですが、何が起こっているかを確認する必要がある場合は、いつでも永続ストレージを調べることができます。

于 2016-10-13T06:32:43.740 に答える
4

このような機能は、rabbitMQ によってネイティブに提供されていません。ただし、次の方法で実装できます。ただし、ポーリングを使用する必要がありますが、これはあまり効率的ではありません (購読/公開と比較して)。また、さまざまなワーカー間の調整のために Zookeeper を活用する必要があります。

2 つのキューを作成します。1 つは優先度の高いキュー (管理ジョブ用) で、もう 1 つは優先度の低いキュー (通常のユーザー ジョブ用) です。10 個のワーカーが両方のキューからメッセージを取得します。各ワーカーは無限ループを実行し (キューが空の場合、理想的にはスリープの間隔を置いて)、各キューから交換可能にメッセージを取得しようとします。

  • 優先度の高いキューの場合、ワーカーはメッセージを取得して処理し、キューに確認応答するだけです。
  • 優先度の低いキューの場合、ワーカーは (特定のファイル znode に書き込むことによって) Zookeeper でロックを保持しようとし、成功した場合はメッセージを読み取り、それを処理して確認します。Zookeeper の書き込みが失敗した場合、他の誰かがロックを保持しているため、このワーカーはこのステップをスキップしてループを繰り返します。
于 2016-10-12T20:23:04.570 に答える