10

私たちは、3 つの異なるソースからメッセージを受信することによって一連のオブジェクトが影響を受ける可能性があるアプリケーションに取り組んでいます。(任意のソースからの) 各メッセージには、そのターゲットとして 1 つのオブジェクトがあります。各メッセージ受信者は、独自のスレッドで実行されます。

メッセージの処理 (受信後) をできるだけ高速にしたいので、ターゲット オブジェクトに対するメッセージ処理は、スレッド プールの別のスレッドで実行されます。メッセージの処理には、送信者からのメッセージの読み取り/受信よりも時間がかかります。

プールの各スレッドが特定のオブジェクトのセットのみに専用である場合、より高速になると考えています。たとえば、次のようになります。

Thread1 -> objects named A-L
Thread2 -> objects named M-Z

オブジェクト (またはスレッド) の各セットには、処理中の保留中のメッセージの専用キューがあります。

私の推測では、必要なスレッド同期が各受信スレッドと 1 つの処理スレッドの間だけである場合、ブロック キューにメッセージを入れる必要がある間は、ワーカー スレッドをランダムに割り当てて処理するよりも高速になると思います。メッセージ (この場合、同じオブジェクトに対するメッセージを持つ 2 つの異なるスレッドが存在する可能性があります)。

私の質問は、実際には 2 つの部分です。

  1. ワーカー スレッドを特定のオブジェクト セット専用にする方が優れた/高速なアプローチであるという前提に人々は同意しますか?

  2. これがより良いアプローチであると仮定すると、既存の Java ThreadPool クラスにはこれをサポートする方法がありますか? それとも、独自の ThreadPool 実装をコーディングする必要がありますか?

あなたが提供できるアドバイスをありがとう。

4

5 に答える 5

10

[Is] ワーカー スレッドを特定のオブジェクト セット専用にすることは、より優れた/より高速なアプローチですか?

全体的な目標は、これらの受信メッセージの同時処理を最大化することだと思います。最適に処理されるプールにメッセージを入れる必要がある 3 つのソースからのレシーバーがあります。3 つのソースのいずれかからのメッセージは、同時に処理できない同じターゲット オブジェクトを処理する可能性があるため、同じターゲット オブジェクトを参照しないことが保証されている場合にのみ、メッセージを分割して同時に処理できるようにする必要があります。

hashCode()私はあなたのターゲットオブジェクト(おそらくちょうど)にメソッドを実装しname.hashCode()、値を使用してオブジェクトを の配列に入れBlockingQueue、それぞれが単一のスレッドでそれらを消費します。の配列を使用してExecutors.newSingleThreadExecutor()も問題ありません。キューの数だけハッシュ値モードを変更し、そのキューに入れます。プロセッサーの最大数を事前に定義する必要があります。処理がどの程度 CPU を集中的に使用するかによって異なります。

したがって、次のコードのようなものが機能するはずです。

 private static final int NUM_PROCESSING_QUEUES = 6;
 ...
 ExecutorService[] pools = new ExecutorService[NUM_PROCESSING_QUEUES];
 for (int i = 0; i < pools.length; i++) {
    pools[i] = Executors.newSingleThreadExecutor();
 }
 ...
 // receiver loop:
 while (true) {
    Message message = receiveMessage();
    int hash = Math.abs(message.hashCode());
    // put each message in the appropriate pool based on its hash
    // this assumes message is runnable
    pools[hash % pools.length].submit(message);
 }

このメカニズムの利点の 1 つは、ターゲット オブジェクトに関する同期を制限できることです。同じターゲット オブジェクトが 1 つのスレッドによってのみ更新されることがわかっています。

ワーカー スレッドを特定のオブジェクト セット専用にする方が優れた/高速なアプローチであるという前提に人々は同意しますか?

はい。これは、最適な並行性を得る正しい方法のようです。

これがより良いアプローチであると仮定すると、既存の Java ThreadPool クラスにはこれをサポートする方法がありますか? それとも、独自の ThreadPool 実装をコーディングする必要がありますか?

これを実現するスレッドプールは知りません。ただし、独自の実装は作成しません。上記のコードの概要のように使用してください。

于 2012-11-12T18:36:51.510 に答える
3

一般的に、このようなアプローチは悪い考えです。それは「早期に最適化しない」というマントラに該当します。

さらに、あなたのアイデアが実装された場合、あなたのパフォーマンスを損なう可能性がありますが、それを助けることはできません。うまく機能しない簡単な例の1つは、あるタイプで突然大量のリクエストを受け取った場合です。もう一方のワーカースレッドはアイドル状態になります。

最善のアプローチは、標準の生産者/消費者パターンを使用し、さまざまな負荷の下でシステムテストを行うことによって、理想的には実際のトランザクションの記録をフィードすることによって、消費者スレッドの数を調整することです。

これらの状況の「移動」フレームワークは、java.util.concurrentパッケージのクラスです。ファクトリメソッドの1つから作成されたBlockingQueue(おそらくArrayBlockingQueue)を使用することをお勧めします。ExecutorServiceExecutorsnewCachedThreadPool()


それを実装してシステムテストを行った後、実績のあるパフォーマンスの問題が見つかった場合は、システムを分析し、ボトルネックを見つけて修正します。

早期に最適化すべきではない理由は、ほとんどの場合、問題が期待した場所にないためです。

于 2012-11-12T18:23:12.403 に答える
0

私の答えは次のとおりです。

  • 1 - はい
  • 2 -
    • a) いいえ
    • b) その必要はありません

いくつかの説明:

  • 何らかのアルゴリズムに従って、1 つのタスクがメッセージを異なるキューに配信するようにしたい場合、
  • メッセージ キューごとに 1 つのタスクで、割り当てられたキューからメッセージを取得して処理する必要があります。

これらの前提が、タスクをスレッドに関連付けるだけの ThreadPool の目的と矛盾しているとは思いません。ただし、このモデルでは、スレッドプールがスレッドをタスクに関連付けるのは 1 回だけであり、スレッドは実行を続けて入力メッセージ キューをポーリングします。

スレッドの摩擦スポットは、中間メッセージ キューと、おそらくこれらのメッセージの処理に関連する他のリソースです。あなたの説明に従って、メッセージ処理をタスクに巧みに分割することによって、第 2 種を最小限に抑えることを計画していると思います。各キューは、パーティショニング タスクとキューに関連付けられた処理タスクによってのみアクセスされる必要があるため、最小限に抑える必要があります。

于 2012-11-12T18:03:58.757 に答える
0

ThreadPoolExecutor に特別な BlockingQueue を提供できるはずです。キューは、どのタイプのメッセージがどのスレッドによって処理されているかを記憶しているため、同じタイプのすべてのメッセージを保留できます。

MyQueue

    ownership relation of thread - msgType 

    take/poll()

        if current thread owns msg type X
            if there is a message of type X
                return that message
            else
                give up ownership

        // current thread does not own any message type
        if there is a messsage of type Y, Y is not owned by any thread
            current thread owns Y
            return that message

        // there's no message belonging to an unowned type
        wait then retry 
于 2012-11-12T19:18:54.833 に答える
0

別の方法として、 RabbitMQActiveMQなどの既存のフレームワークを使用することをお勧めします。独自のメッセージング フレームワークを発明しようとするのは難しい場合があります。独自のフレームワークで価値を追加しようとしている場合、それは 1 つのことです。目標を達成するために 1 つだけ必要な場合、それは別の方法です。これらのフレームワークには、最適なメッセージ配信のための多くのオプションが用意されており、検討する価値があります。

于 2012-11-12T17:41:33.583 に答える