7

私は最初にここでこの質問をしましたが、私の質問はしばらくの間真のループに関するものではないことに気づきました。私が知りたいのは、Javaで高性能の非同期メッセージパッシングを行う適切な方法は何ですか?

私がやろうとしていること...

私には約10,000人のコンシューマーがいて、それぞれがプライベートキューからのメッセージを消費しています。メッセージを1つずつ生成し、それらを正しいコンシューマーのキューに入れるスレッドが1つあります。各コンシューマーは無期限にループし、キューに表示されるメッセージをチェックして処理します。

プロデューサーは1人で、各コンシューマーはプライベートキューでのみ機能するため、「単一のプロデューサー/単一のコンシューマー」という用語だと思います(複数のコンシューマーが同じキューから読み取ることはありません)。

Consumer.javaの内部:

@Override
public void run() {
    while (true) {
        Message msg = messageQueue.poll();
        if (msg != null) {
            ... // do something with the message
        }
    }
}

プロデューサーは、メッセージをコンシューマーメッセージキュー内に急速に配置しています(1秒あたり数百万メッセージ)。消費者はこれらのメッセージをできるだけ速く処理する必要があります!

注:はwhile (true) { ... }、最後のメッセージとしてプロデューサーによって送信されたKILLメッセージによって終了します。

ただし、私の質問は、このメッセージパッシングを設計する適切な方法についてです。messageQueueにはどのような種類のキューを使用する必要がありますか?同期または非同期のどちらにする必要がありますか?メッセージはどのように設計する必要がありますか?while-trueループを使用する必要がありますか?コンシューマーはスレッド、または他の何かである必要がありますか?10,000スレッドはクロールまで遅くなりますか?スレッドに代わるものは何ですか?

では、Javaで高性能のメッセージパッシングを行うための適切な方法は何でしょうか。

4

5 に答える 5

6

メモリのオーバーヘッドは言うまでもなく、10,000スレッドのコンテキスト切り替えのオーバーヘッドは非常に高くなると思います。デフォルトでは、32ビットプラットフォームでは、各スレッドはデフォルトのスタックサイズ256kbを使用するため、スタックだけで2.5GBになります。明らかにあなたは64ビットを話しているが、それでも、それはかなり大量のメモリである。使用されるメモリの量が原因で、キャッシュは大量にスラッシングされ、CPUはメモリ帯域幅によって抑制されます。

大量のスタックとコンテキスト切り替えのオーバーヘッドを割り当てないようにするために、非常に多くのスレッドを使用しないようにする設計を探します。10,000スレッドを同時に処理することはできません。現在のハードウェアのコア数は通常100未満です。

ハードウェアスレッドごとに1つのキューを作成し、ラウンドロビン方式でメッセージをディスパッチします。処理時間が大幅に異なる場合、一部のスレッドは追加の作業が与えられる前にキューの処理を終了し、他のスレッドは割り当てられた作業を完了しないという危険性があります。これは、JSR-166 ForkJoinフレームワークに実装されているように、ワークスティーリングを使用することで回避できます。

通信は発行者からサブスクライバーへの1つの方法であるため、メッセージが発行された後、サブスクライバーがメッセージを変更しないと仮定すると、メッセージに特別な設計は必要ありません。

編集:コメントを読んで、10,000個のシンボルがある場合は、少数の汎用サブスクライバースレッド(コアごとに1つのサブスクライバースレッド)を作成します。これにより、パブリッシャーからのメッセージを非同期で受信します(たとえば、メッセージキューを介して)。サブスクライバーは、キューからメッセージをプルし、メッセージからシンボルを取得し、これをメッセージハンドラーのマップで検索し、ハンドラーを取得して、ハンドラーを呼び出してメッセージを同期的に処理します。完了すると、それが繰り返され、キューから次のメッセージがフェッチされます。同じシンボルのメッセージを順番に処理する必要がある場合(これが10,000キューが必要だと推測している理由です)、シンボルをサブスクライバーにマップする必要があります。たとえば、サブスクライバーが10人の場合、シンボル0〜999はサブスクライバー0に、1000〜1999はサブスクライバー1に移動します。より洗練されたスキームは、各加入者がほぼ同じ負荷を受けるように、頻度分布に従ってシンボルをマップすることです。たとえば、トラフィックの10%がシンボル0である場合、サブスクライバ0はその1つのシンボルだけを処理し、他のシンボルは他のサブスクライバに分散されます。

于 2010-07-29T22:29:06.960 に答える
2

これを使用できます (クレジットは、 Java でのどの ThreadPool を使用する必要がありますか? ):

class Main {
    ExecutorService threadPool = Executors.newFixedThreadPool(
                                     Runtime.availableProcessors()*2);

    public static void main(String[] args){
        Set<Consumer> consumers = getConsumers(threadPool);
        for(Consumer consumer : consumers){
            threadPool.execute(consumer);
        }
    }
}

class Consumer {
    private final ExecutorService tp;
    private final MessageQueue messageQueue;
    Consumer(ExecutorService tp,MessageQueue queue){
        this.tp = tp;
        this.messageQueue = queue;
    }
    @Override
    public void run(){
              Message msg = messageQueue.poll();

              if (msg != null) {
                  try{
                       ... // do something with the message
                  finally{
                       this.tp.execute(this);
                  }
              }
           }
    }
}    

このようにして、ほとんど手間をかけずに適切なスケジューリングを行うことができます。

于 2010-07-29T23:45:59.393 に答える
1

まず第一に、完全な設計ドキュメントを作成するか、自分でさまざまなアプローチを試してみない限り、正解は 1 つではありません。

私はあなたの処理が計算集約的ではないだろうと仮定しています。考えられる解決策の 1 つは、CPU ごとに 1 ~ 2 スレッドを使用して、コンテキストの切り替えを最小限に抑えることです。システムが厳密なリアルタイムでデータを処理する場合を除き、各キューでより大きな遅延が発生する可能性がありますが、全体的なスループットは向上します。

たとえば、プロデューサー スレッドを独自の CPU で実行し、メッセージのバッチをコンシューマー スレッドに配置します。次に、各コンシューマ スレッドはメッセージをその N 個のプライベート キューに配布し、処理手順を実行し、新しいデータ バッチを受信します。繰り返しますが、遅延許容度に依存するため、処理ステップは、時間のしきい値に達しない限り、すべてのキュー、一定数のキュー、可能な限り多くのキューを処理することを意味する場合があります。どのキューがどのコンシューマー スレッドに属しているかを簡単に判断できると (たとえば、キュ​​ーに順番に番号が付けられている場合: int consumerThreadNum = queueNum & 0x03)、毎回ハッシュ テーブルでそれらを検索するのは遅くなる可能性があるため、有益です。

メモリのスラッシングを最小限に抑えるために、キューを常に作成/破棄することはあまり良い考えではない可能性があるため、スレッドごとに (キューの最大数/コアの数) キュー オブジェクトを事前に割り当てたい場合があります。キューが破棄される代わりに終了すると、クリアして再利用できます。gc があまりにも頻繁に、また長時間にわたって邪魔をするのは望ましくありません。

もう 1 つの不明な点は、プロデューサーが各キューの完全なデータ セットを生成するか、KILL コマンドを受信するまでデータをチャンクで送信するかです。プロデューサが完全なデータ セットを送信する場合は、キューの概念を完全に廃止して、コンシューマ スレッドに到着したデータを処理するだけでかまいません。

于 2010-07-29T23:00:37.100 に答える
0

ハードウェアと OS の容量に関連するコンシューマ スレッドのプールを用意します。これらのコンシューマー スレッドは、メッセージ キューをポーリングする可能性があります。

Messages に自分自身を処理する方法を知ってもらうか、初期化時にプロセッサをコンシューマ スレッド クラスに登録します。

于 2010-07-29T22:33:11.503 に答える
0

シンボルの処理に関する制約についての詳細がないため、非常に具体的なアドバイスを提供することは困難です。

このスラッシュドットの記事をご覧ください。

http://developers.slashdot.org/story/10/07/27/1925209/Java-IO-Faster-Than-NIO

多くのスレッド対単一選択対スレッドプール引数に関するかなりの議論と実際の測定データがあります。

于 2010-07-29T23:06:02.843 に答える