0

次のようなサブシステムがあります。

        [read]          [decode]       [deliver] 
Byte      -->  Undecoded  -->   Decoded   -->  Output queue
stream          message         message     

入力はソケット/バイト ストリームです。最初のステップは、メッセージを読むことです。次のステップは、メッセージをデコードすることです (そして、メッセージ オブジェクト内に結果を格納します)。最後のステップは、メッセージを配信することです。

デコード手順を並列化したいのですが、出力順序を入力順序と同じに保つ必要があります。したがって、メッセージ A と B が受信され、メッセージ B のデコードがより高速である場合、A の配信が完了するまで待たなければなりません。

Java で単純な初期実装を作成しましたが、プロファイリングによると、ハンドオーバーの手順 (「ストリーム リーダー」から「デコーダー」まで、および「デコーダー」から出力までの両方) で多くのことを失いました。24 コアのコンピューター (ハイパー スレッディングを含む) でテスト プログラムを実行すると、次のようになります。

  • シングルスレッド実装の実行時は 1100 K メッセージ/秒。
  • 単純な 12 スレッド実装 (多くのキューを使用) を実行すると、110 K メッセージ/秒。

私の単純な実装は、http://pastebin.com/be1JqZy3で入手できます。これは 200 行を超えるコードなので、並列バージョンをシリアル バージョンよりも 10 倍遅くする方法を本当に知りたい人だけに興味があるでしょう (ヒント: クラス ThreadPoolDecoder を見てください)。

この種の問題を実行するときに使用するパターン/フレームワークを持っている人はいますか?

4

2 に答える 2

2

1100 K msg/s は非常に高速です (メッセージの場合は 1 マイクロ秒未満)。この時間は、キューから/へのメッセージの書き込み/取得の時間 (0.1...1 マイクロ秒) に匹敵します。したがって、並列化を利用するには、割り込み不能な処理の時間を 1 マイクロ秒 (たとえば 1 ミリ秒) より大幅に長く保つ必要があります。これは、小さなメッセージを大きなメッセージにまとめると実行できます。パケットに 1000 個のメッセージを蓄積し、そのパケットを作業単位として処理します。単位を並行して処理します。

于 2012-04-16T15:51:16.367 に答える
2

私が (C# で) 作成したプログラムでこれを処理する方法は、出力に優先キューを設定することです。各レコードには、読み取り時に割り当てられる関連付けられたレコード番号があります。これらの数値は 0 から始まり、増加します。スレッドがレコードの処理を完了すると、そのレコードが優先キューに追加されます。

別の出力スレッドには、ゼロから始まる予想されるレコード番号があります。このスレッドはキューを監視し、予想されるレコード数が追加されるのを待ちます。予想されるレコードが追加されると、スレッドはそれをキューから削除して出力し、予想されるレコード番号をインクリメントして、再試行します。

これは、4 つのスレッドがレコードを処理し、1 つのスレッドが出力を処理する私のアプリケーションでは非常にうまく機能します。

于 2012-04-16T15:09:51.740 に答える