7

同じ InputStream を同時に処理する N 個のコンシューマー スレッドを生成する必要があります。たとえば、何らかの方法で変換したり、チェックサムやデジタル署名を計算したりします。データのソース。

だから私にできることは、InputStreamの実装を作成することです。

  • 「親」ストリームからデータのチャンクを読み取る
  • 消費者のブロックを解除
  • すべての消費者がチャンク全体を読み取るまで待ちます
  • 次のチャンクを読む

シンプルに見えますが、特定のコンシューマーが死亡したときのライブロック、すべての InputStream メソッドの実装、バリア/ラッチを使用したコンシューマー自体のフォーク/ジョインの制御など、さまざまな問題が発生する可能性があります。

ある仲間は、実装に 30 分かかると言ってくれました。

十分に成熟したものを使用することをお勧めします (グーグル検索では結果が得られなかったので、私の google-fu では十分ではありませんか?)、または「ソース」ストリーム全体を一時ファイルにコピーして、それを次のように使用する必要はありません。データのソース。後者のソリューションはより信頼性が高いように見えますが、最終的にギガバイトのファイルが作成される可能性があります (たとえば、ストリーミング オーディオを処理する場合)。

4

3 に答える 3

3

私の見方では、現在最も遅いコンシューマーによってすべてが常に行き詰まることなく、さまざまなコンシューマーが異なるペースでストリームを移動できるように、少なくとも何らかのバッファリングが必要です。これにより、基本的に最悪の場合のパフォーマンスが保証され、並行性のメリットはほとんどありません。

たとえば、各チャンクにこれまでに使用したコンシューマのタグを付けてから、完全に使い切ったコンシューマを削除できます。おそらくこれは、各消費者がまだ使用していない各チャンクへの参照を保持することで実現できます。これにより、GC が使用済みのチャンクを自動的に処理できるようになります。プロデューサは、WeakReferenceまだ使用されていないチャンクの数を把握し、それに基づいてスロットリングを行うため、チャンクへの s のリストを保持する場合があります。

InputStreamまた、プロデューサーと内部的に通信するスレッドごとに個別のインスタンスを用意することも考えていますInputStream。このようにして、ライブロックの危険を簡単に解決できますtry ... finally { is.close(); }。これはプロデューサーに伝えられます。

ArrayBlockingQueue消費者ごとに使用するアイデアがいくつかあります。プロデューサをブロックまたはビジー待機させずに、すべてのコンシューマに適切に供給されることを保証することは、いくらか困難です。

于 2012-07-04T20:26:04.467 に答える
0

Apache ActiveMQのようなJavaメッセージングサービス(JMS)の実装を試すことができます。

あなたの場合、いわゆるトピックを作成する必要があります(トピックとキューを参照)。トピックはプロデューサーによって作成され、N個のコンシューマーに公開されます。これらのコンシューマーは同時に実行され、各コンシューマーはまったく同じデータを受け取ります。

を使用したいので、メッセージInputStreamを送信する方法に関する章がストリームです。

通常、プロデューサーとコンシューマーは別々のプロセスであり、おそらくネットワーク上の異なるマシンで実行されていると思います。ただし、単一のJVMで完全に実行するように構成できると思います。これは、JMSの実装によって異なります。これらも非常に有名です。JBossによるHornetQRabbitMQ、その他多数。

于 2012-07-06T15:35:19.477 に答える
0

パイプ ストリームの使用を検討しましたか? プロデューサーは、ファイルから読み取ったものをスローする 1 つ以上のPipedOuputStreamを持つことができます。パイプの反対側では、対応するPipedInputstream (ライブラリと共有できる InputStream) を読み取るさまざまなコンシューマ スレッドがあります。

プロデューサ スレッドは、パイプの反対側で読み取りを行う特定のコンシューマ スレッドに処理するデータを提供することにより、どのパイプ データを送信するかを決定できます。

コンシューマ スレッドからデータを取得する必要がある場合は、反対方向に別のパイプを作成して、データを送り返すことができます。

于 2012-07-04T20:40:50.970 に答える