0

私が制御できないレガシー ライブラリと統合しています。

次のインターフェースが定義されています。

interface Factory {
    Future<Void> subscribe(Context c);
}

この「subscribe」メソッドは、さまざまなスレッドによって頻繁に呼び出されます。「Future.get()」の結果が気になるのは、失敗したときだけなので、例外を取得して処理できます。これは、呼び出しスレッドで発生する必要はありません。さらに、「Future.get()」でスレッドの呼び出しをブロックすると、私の場合は非常にコストがかかる可能性があります。成功したとしても、いっぱいになるまでに数秒かかる場合があります。

したがって、私の仕事は、失敗したものをフィルタリングして、これらすべての先物を何らかの方法で「後処理」することです。基本的に、次の 2 つのアプローチが考えられます。

アプローチ #1:

Future のインスタンスを取得したら、必要な処理を行う別の Runnable を外部エグゼキューターに送信します。

    executor.submit(
        new Runnable(){
            @Override
            public void run() {
                try {
                    future.get();
                } catch(Exception e){
                    // process the exception
                }
            }
        }
    );

このアプローチの欠点は、スレッドを長時間ブロックしている可能性があることです。前述したように、このコード スニペットはかなり頻繁に実行されます。

アプローチ #2:

Future のインスタンスを取得したら、それを何らかのコレクションに配置し、処理を実行するこのコレクションの要素を定期的に実行する別の単一スレッドを専用にします。

    while(true){
        Iterator<Future<Void>> iterator = collection.iterator();
        while(iterator.hasNext()){
            Future<Void> future = iterator.next();
            if(future.isDone()){
                try {
                    future.get();
                } catch(Exception e){
                    // process the exception
                } finally {
                    iterator.remove();
                }
            }
        }

        TimeUnit.MILLISECONDS.sleep(1000); // sleep
    }

どう思いますか?問題を解決するより良い方法はありますか?

4

1 に答える 1

1

Future最適なオプションの作成に手を加えていないためFuture、カスタマイズされた を使用して、それ自体から処理をトリガーしFutureます。

したがって、あなたの場合、2 つのオプションを組み合わせたようなパターンをお勧めします。Futures を (スレッドセーフな) キューに追加Runnableし、ループ内のすべてのアイテムを処理するエグゼキュータに s を送信します。したがって、 を構成することでスレッドの数を制限できますExecutor。つまり、 ほど多くFutureのスレッドはありませんが、複数のスレッドを持つことができ、これらの後処理スレッドを常に有効にしておく必要はありません。

未完成のアイテムを再キューに入れる際の無限ループを回避するには、ローカル コレクションを使用して、再キューされたアイテムから保留中のアイテムを分離します。

static BlockingQueue<Future<?>> PENDING = …;
static int MAX_ITEMS_PER_JOB = …;
…
/*scheduling code …*/new Runnable() {
  public void run()
  {
    ArrayList<Future<?>> myLocalItems=new ArrayList<>();
    PENDING.drainTo(myLocalItems, MAX_ITEMS_PER_JOB);
    for(Future<?> f:myLocalItems) {
      if(!f.isDone()) PENDING.offer(f); // re-queue
      try {
        f.get();
      } catch(ExecutionException ex) {
        handleException(ex.getCause());
      }
    }
  }
};

したがって、これRunnableは限られた数の をチェックして処理してFutureから返すため、多くのアイテムが保留中の場合は並列処理を行うために複数回送信するのに適していますが、ジョブがハングアップしないため、保留中のアイテムが少ない場合は害はありませんやることがないなら。scheduleWithFixedDelayを使用するのにも適していScheduledExecutorServiceます。

于 2013-10-08T09:21:54.627 に答える