1

Java を使用してさまざまなパブリッシャー/サブスクライバー パターンを実装したいと考えていますが、現在アイデアが不足しています。

1 つのパブリッシャーと N 人のサブスクライバーがあり、パブリッシャーはオブジェクトをパブリッシュし、各サブスクライバーは各オブジェクトを 1 回だけ正しい順序で処理する必要があります。パブリッシャーと各サブスクライバーは、独自のスレッドで実行されます。

私の最初の実装では、各サブスクライバーには独自のブロッキング キューがあり、パブリッシャーはオブジェクトを各サブスクライバーのキューに入れました。これは正常に機能しますが、サブスクライバーのキューがいっぱいになると、パブリッシャーはブロックされます。これは、各サブスクライバーがオブジェクトの処理に異なる時間を要するため、パフォーマンスの低下につながります。

次に、別の実装では、パブリッシャーはオブジェクトを独自のキューに保持します。オブジェクトとともに、AtomicInteger カウンターがそこにあるサブスクライバーの数に関連付けられています。次に、各サブスクライバーはキューを確認してカウンターを減らし、カウンターがゼロになったらキューから削除します。

このようにして、パブリッシャーはブロックから解放されますが、サブスクライバーは、次のオブジェクトがピークされる前に、キューからオブジェクトを削除して、互いにオブジェクトを処理するのを待つ必要があります。

これを行うより良い方法はありますか?これはかなり一般的なパターンだと思います。

4

3 に答える 3

0

あなたの「多くのキュー」の実装が道です。完了するまでの全体的な時間には影響しないため、1 つの完全なキューがプロデューサーをブロックすることを必ずしも心配する必要はないと思います。3 つのコンシューマーがあり、2 つは 1 秒あたり 1 の割合で消費し、3 つ目は 5 秒あたり 1 の割合で消費し、一方、プロデューサーは 2 秒あたり 1 の割合で生成するとします。最終的に 3 番目のキューがいっぱいになると、プロデューサーはそれをブロックし、1 番目と 2 番目のキューへのアイテムの配置も停止します。これを回避する方法はありますが、3 番目の消費者が常にボトルネックになる。100 個のアイテムを生成/消費している場合、これには 3 番目のコンシューマーのために少なくとも 500 秒かかります (5 秒 x 100 アイテム)。 3 番目のキューがいっぱいになった後でもプロデューサーがキューを引き続き満たすことができるようにするための巧妙な方法を実行した場合、または 500 秒後に終了した場合 (プロデューサーが 3 番目のキューでブロックされたため)。

于 2013-05-31T16:33:29.957 に答える
0

1 つのパブリッシャーと N 人のサブスクライバーがあり、パブリッシャーはオブジェクトをパブリッシュし、各サブスクライバーは各オブジェクトを 1 回だけ正しい順序で処理する必要があります。パブリッシャーと各サブスクライバーは、独自のスレッドで実行されます。

このアーキテクチャを変更します。最初はサブスクライバーごとのキューを検討しましたが、そのメカニズムは好きではありません。たとえば、最初のサブスクライバーの実行に時間がかかる場合、すべてのジョブがそのキューに入れられ、1 スレッドの作業のみを行うことになります。

サブスクライバーを順番に実行する必要があるため、すべてのサブスクライバーを介して各メッセージを実行するスレッドのプールが必要です。サブスクライバーへの呼び出しは再入可能である必要がありますが、これは不可能な場合があります。

したがって、10 個のスレッドのプール (たとえば) があり、それぞれがパブリッシャーのキューからデキューされ、次のようなことを行います。

public void run() {
    while (!shutdown && !Thread.currentThread().isInterrupted()) {
        Article article = publisherQueue.take();
        for (Subscriber subscriber : subscriberList) {
           subscriber.process(article);
        }
    }
}
于 2013-05-31T17:42:38.270 に答える