1

次のような生産者/消費者パターンがあります

  • Executor を介して呼び出される固定数のプロデューサー スレッド。それぞれが独自の BlockingQueue に書き込みます。
  • プロデューサ スレッドを読み取る単一のコンシューマ スレッド

各プロデューサーはデータベース クエリを実行し、その結果をキューに書き込みます。コンシューマーはすべてのプロデューサー キューをポーリングします。現時点で、データベース エラーが発生すると、プロデューサー スレッドが停止し、コンシューマーはプロダクト キューでさらに結果を待つことになります。

キャッチエラーを正しく処理するには、これをどのように構造化する必要がありますか?

4

5 に答える 5

4

私はかつて似たようなことをしたことがあり、死んでいるプロデューサー スレッドがキャッチ ブロックからキューにプッシュするセンチネル値を使用することにしました。例外自体をプッシュすることも (これはほとんどのシナリオで機能します)、そのための特別なオブジェクトを用意することもできます。いずれにせよ、デバッグ目的で例外を消費者にプッシュすることは素晴らしいことです。

于 2012-05-04T10:00:49.750 に答える
1

プロデューサーが死亡したときに唯一の選択肢は、コンシューマーを停止することです。

これを行うには、毒薬を使用できます。これは、プロデューサーが停止時に追加する特別なオブジェクトであり、コンシューマーはそれを受け取ると停止することを知っています。ポイズン ピルはブロックに追加できるfinallyため、生産者がどのように殺されても常に追加されます。

コンシューマーが 1 つしかない場合、1 つのキューを使用します。このようにして、コンシューマーはすべてのプロデューサーが死亡した場所のみをブロックします。

于 2012-05-04T09:58:17.653 に答える
1

実際にキューにプッシュするクラスが何であれ、コンシューマーが失敗をチェックできるように、成功/失敗/エラーメンバーを含める必要があります。

Peter は、キューを 1 つだけ使用することを既に提案しています。すべてのポーリングを回避することが特定の問題になるとは思いません。キュー上のオブジェクトは、必要に応じて、どのプロデューサーから来たかを識別するメンバーと、その他のメタデータを持つことができます。

于 2012-05-04T10:28:12.377 に答える
0

私自身の質問に答えます。

次のクラスを使用しました。Runnable のリストを取得し、それらすべてを並行して実行します。1 つが失敗すると、他のすべてを中断します。次に、プロデューサーとコンシューマーで割り込み処理を行い、中断されたときに正常に終了します。

これは私の場合はうまくいきます。

アイデアをくれたすべてのコメント/回答に感謝します。

// helper class that does the following
//
// if any thread has an exception then interrupt all the others with an eye to cancelling them
// if the thread calling execute() is interrupted then interrupt all the child threads
public class LinkedExecutor
{
    private final Collection<Runnable> runnables;
    private final String name;

    public LinkedExecutor( String name, Collection<Runnable> runnables )
    {
        this.runnables = runnables;
        this.name = name;
    }

    public void execute()
    {
        ExecutorService executorService = Executors.newCachedThreadPool( ConfigurableThreadFactory.newWithPrefix( name ) );

        // use a completion service to poll the results
        CompletionService<Object> completionService = new ExecutorCompletionService<Object>( executorService );

        for ( Runnable runnable : runnables )
        {
            completionService.submit( runnable, null );
        }

        try
        {
            for ( int i = 0; i < runnables.size(); i++ )
            {
                Future<?> future = completionService.take();
                future.get();
            }
        }
        catch ( InterruptedException e )
        {
            // on an interruption of this thread interrupt all sub-threads in the executor
            executorService.shutdownNow();

            throw new RuntimeException( "Executor '" + name + "' interrupted", e );
        }
        catch ( ExecutionException e )
        {
            // on a failure of any of the sub-threads interrupt all the threads
            executorService.shutdownNow();

            throw new RuntimeException( "Execution execution in executor '" + name + "'", e );
        }
    }
}
于 2012-05-04T11:25:53.840 に答える
0

一定時間キューに要素がなくなったときに、タイムアウトを追加してコンシューマーを強制終了することができます。

別のアプローチとして、プロデューサーに「生きている」フラグを維持させ、それを false に設定することで、死にかけていることを知らせることもできます。プロデューサが継続的に実行されているが、常にデータベースから結果を取得できるとは限らない場合、「生きている」フラグは、プロデューサが最後に生きていると報告した時間であり、タイムアウトを使用して、プロデューサが停止した可能性があるかどうかを確認します (最後のレポートが実行されたとき)。生きていたのは昔のことです)。

于 2012-05-04T10:02:43.810 に答える