2

私は現在、非同期処理の割り当てを使用するシステムを開発しています。情報の転送はキューを使用して行われます。したがって、あるプロセスが情報をキューに入れ (そして終了し)、別のプロセスがそれを取得して処理します。私の実装では、多くの課題に直面することになり、これらの問題に対する全員のアプローチに興味があります (アーキテクチャとライブラリの観点から)。

絵を描かせてください。3 つのプロセスがあるとします。

Process A -----> Process B
                      |
Process C <-----------|

したがって、プロセス Aはメッセージをキューに入れて終了し、プロセス Bはメッセージを取得して処理し、「リターン」キューに入れます。プロセス Cがメッセージを取得して処理します。

  1. キューからメッセージをリッスンまたは処理しないプロセス Bをどのように処理しますか? Consumer がアクティブでないときに Producer がメッセージを送信できないようにする JMS タイプのメソッドはありますか? したがって、プロセス Aは送信しますが、例外をスローします。
  2. プロセス Cが X 分以内に応答を取得する必要があるが、プロセス Bが (何らかの理由で) 停止したとします。キューにタイムアウトを強制するメカニズムはありますか? したがって、プロセス Cを開始する X 分以内の返信が保証されます。

これらの問題はすべて、何らかのデッドレターキューを使用して処理できますか? タイマーとチェックを使用して、これをすべて手動で行う必要があります。JMS について言及しましたが、私は何に対してもオープンです。実際、私はキューに Hazelcast を使用しています。

これは、利用可能な Java テクノロジとメソッドに関して、よりアーキテクチャ上の問題であることに注意してください。これは適切な問題だと思います。

どんな提案でも大歓迎です。

ありがとう

4

6 に答える 6

2

あなたが尋ねている質問のタイプは、キューと非同期処理があなたの状況に最適なツールではないかもしれないという「におい」のようです。

1) キューの目的を無効にします。同期の要求応答プロセスが必要なようです。

2) 一般的に言えば、プロセス C は応答を得ていません。キューからメッセージを取得しています。キューにメッセージがあり、プロセス C の準備ができている場合は、それを取得します。たとえば、プロセス C は、メッセージを取得すると、そのメッセージが古くなっていると判断できます。

于 2012-02-03T06:53:56.960 に答える
2

あなたの最初の質問は、他の投稿者によってすでに適切に回答されていると思います。

2 番目の質問では、アプリケーションで使用されているメッセージング エンジンによっては、実行しようとしていることが可能になる場合があります。これが IBM MQ で機能することはわかっています。これはWebSphere MQ Classes for Javaを使用して行われていますが、JMS は使用されていません。そのしくみは、プロセス Aがメッセージをキューに入れるときに、応答メッセージを待つ時間を指定することです。プロセス Aが指定された時間内に応答メッセージを受信できなかった場合、システムは適切な例外をスローします。

要求/応答のタイムアウトを希望どおりに処理する標準的な方法が JMS にあるとは思わないので、WebSphere MQ Classes for Java のようなプラットフォーム固有のクラスを使用する必要があるかもしれません。

于 2012-02-03T07:47:17.423 に答える
2

私見、最も簡単な解決策は、ExecutorService、または実行者サービスに基づくソリューションを使用することです。これは、作業のキュー、スケジュールされたタスク (タイムアウト用) をサポートします。

また、単一のプロセスで動作することもできます。(Hazelcast は分散 ExecutorService をサポートしていると思います)

于 2012-02-03T11:10:15.643 に答える
1

不可能な非同期 (メッセージング) セットアップを使用した同期処理のセマンティクスを期待しています。私は WebSphere MQ に取り組んできましたが、通常、コンシューマが終了すると、メッセージは永久にキューに保持されます (有効期限を設定しない限り)。キューがその深さに達すると、後続のメッセージは配信不能キューに移動されます。

于 2012-02-03T07:02:22.390 に答える
1

ええと、キューのポイントの一種は、物事をかなり分離しておくことです。

特定の技術にとらわれていない場合は、キューにデータベースを使用できます。

ただし、まず、2 つのプロセスが確実に調整されるようにするための簡単なメカニズムは、ソケットを使用することです。実用的な場合は、プロセス B に既知のポートでオープン ソケット リスナーを作成させるだけで、プロセス A がそのソケットに接続して監視します。プロセス B がなくなった場合、プロセス A はソケットがシャットダウンされたことを認識でき、プロセス B の問題のアラートとしてそれを使用できます。

B -> C の問題については、db テーブルを用意します。

create table queue (
    id integer,
    payload varchar(100), // or whatever you can use to indicate a payload
    status varchar(1),
    updated timestamp
)

次に、プロセス A は、現在の時刻と「B」のステータスで、そのエントリをキューに入れます。B、キューでリッスンします。

select * from queue where status = 'B' order by updated

B が完了すると、キューを更新してステータスを「C」に設定します。

一方、「C」は次のように DB をポーリングしています。

select * from queue where status = 'C' 
    or (status = 'B' and updated < (now - threshold) order by updated 

(しきい値は、キューで腐敗させたい長さです)。

最後に、C はキューの行を「D」に更新して完了、または削除するか、好きなようにします。

暗い面は、B がちょうど起動している間に C がエントリを取得しようとする可能性があるという、ちょっとした競合状態があることです。おそらく、厳密な分離レベルといくつかのロックでそれを乗り越えることができます。次のように単純なもの:

select * from queue where status = 'C' 
    or (status = 'B' and updated < (now - threshold) order by updated 
FOR UPDATE

B の select にも FOR UPDATE を使用します。このようにして、選択したレースに勝った人は誰でも、列の排他ロックを取得します.

これにより、実際の機能に関してかなり先に進むことができます。

于 2012-02-03T07:01:12.237 に答える
1

同様のアプローチを使用して、ビデオ トランスコーディング ジョブのキューイングおよび処理システムを作成しました。基本的に、それが機能する方法は次のとおりです。

  1. Process Aは「スケジュール」メッセージを に投稿しArbiter Q、ジョブを「待機中」キューに追加します。
  2. Process Bから次のジョブをリクエストしますArbiter Q。これにより、「待機中」のキューにある次のアイテムが削除されます (1 人のユーザーがトランスコード リクエストを大量に送信できず、他のユーザーがビデオをトランスコードできないようにするためのカスタム スケジューリング ロジックに従います)、それを挿入します。ジョブを に戻す前に、その「処理中」セットに入れProcess Bます。ジョブが「処理」セットに入ると、タイムスタンプが付けられます。
  3. Process Bはジョブを完了し、「完了」メッセージを に投稿しますArbiter Q。これにより、「処理中」セットからジョブが削除され、Process Cジョブが完了したことがわかるように状態が変更されます。
  4. Arbiter Q「処理」セット内のジョブを定期的に検査し、異常に長い時間実行されているジョブをタイムアウトにします。 Process Aその後、必要に応じて、同じジョブを再度キューに入れることができます。

これは JMX を使用して実装されました (JMS の方がはるかに適切でしたが、余談になります)。 Process Aユーザーが開始したトランスコード要求に応答したサーブレット スレッドに過ぎませんでした。 Arbiter Q「スケジュール」および「完了」メッセージを受信した MBean シングルトン (サーバーのクラスター内のすべてのノードにわたって永続化/複製) でした。内部で管理されている「キュー」は単なるListインスタンスであり、ジョブが完了すると、トランスコードされたビデオ ファイルの URL を参照するようにアプリケーションのデータベースの値が変更されました。 Process Bトランスコーディング スレッドでした。その仕事は単純に、ジョブをリクエストしてトランスコードし、完了したら報告することでした。時間の終わりまで何度も何度も。 Process C別のユーザー/サーブレット スレッドでした。URL が使用可能であることを確認し、ダウンロード リンクをユーザーに提示します。

このような場合、Process Bジョブが終了すると、ジョブは「待機」キューに永遠に留まります。しかし、実際にはそうはなりませんでした。あなたが実行していない/実行していない場合は、全体的なアプローチの問題よりもProcess B、展開/構成/実装に問題があることを示唆していると思います。Process B

于 2012-02-03T07:11:11.570 に答える