1

私は基本的にFuture<List<T>>、サーバーからバッチでフェッチされる を持っています。一部のクライアントでは、将来が満たされたときにコレクション全体に加えて、ロード中に増分結果を提供したいと考えています。

これのためにどこかに定義された一般的な Future 拡張機能はありますか? そのような先物に存在する典型的なパターン/コンビネーターは何ですか?

与えられた操作IncrementalListFuture<T>を簡単に定義できると思いますmap。他に何が思い浮かびますか?

4

5 に答える 5

3

このためにどこかに定義された共通のFuture拡張機能はありますか?

からの増分結果について話していると思いますExecutorService。オブジェクトExecutorCompletionServiceの1つが取得可能になるとすぐに通知されるようにするanの使用を検討する必要があります。Future

javadocsから引用するには:

CompletionService<Result> ecs = new ExecutorCompletionService<Result>(e);
for (Callable<Result> s : solvers) {
    ecs.submit(s);
}
int n = solvers.size();
for (int i = 0; i < n; ++i) {
    // this waits for one of the futures to finish and provide a result
    Future<Result> future = ecs.take();
    Result result = future.get();
    if (result != null) {
        // do something with the result
    }
}

ごめん。私は最初に質問を読み間違え、あなたがリスト<未来<?>>について質問していると思いました。コードをリファクタリングして実際に多くの先物を返すことができるかもしれないので、これは後世のために残しておきます。

この場合、リストを返しませんFuture。仕事が終わるまで、あなたはリターンを得ることができないでしょう。

可能であれば、呼び出し元とスレッドの両方がアクセスできるように、ある種のパスを渡します。BlockingQueue

final BlockingQueue<T> queue = new LinkedBlockingQueue<T>();
// build out job with the queue
threadPool.submit(new SomeJob(queue));
threadPool.shutdown();
// now we can consume from the queue as it is built:
while (true) {
   T result = queue.take();
   // you could some constant result object to mean that the job finished
   if (result == SOME_END_OBJECT) {
      break;
   }
   // provide intermediate results 
}

また、ジョブクラス内で定義されSomeJob.take()た内部を呼び出すある種のメソッドを持つこともできます。BlockingQueue

// the blocking queue in this case is hidden inside your job object
T result = someJob.take();
...
于 2012-09-10T18:17:11.280 に答える
1

これが私がすることです:

  1. リストにデータを入力するスレッドで、次を使用してリストをラップすることにより、スレッドセーフにします。Collections.synchronizedList
  2. リストを公開するが、リストを返すスレッドに public メソッドを追加することによって変更できないようにするCollections.unmodifiableList
  3. クライアントに Future> を与える代わりに、スレッドへのハンドルまたは何らかのラッパーを与えて、上記の public メソッドを呼び出せるようにします。

または、グレイが示唆したように、BlockingQueues はこのようなスレッドの調整に最適です。ただし、これにはクライアント コードをさらに変更する必要がある場合があります。

于 2012-09-10T18:47:21.097 に答える
1

Observables の代わりに、twitter のアプローチを見ることができます。

Streamの非同期バージョンであるSpoolを使用します。

基本的には、List

trait Spool[+A] {

  def head: A

  /**
   * The (deferred) tail of the spool. Invalid for empty spools.
   */
  def tail: Future[Spool[A]]
}

mapfilterおよびそのforeach上に機能的なことを行うことができます。

于 2013-02-13T07:49:06.873 に答える
1

私自身の質問に答えるために: 最近、この分野で多くの開発が行われています。最もよく使用されるのは、Play iteratees ( http://www.playframework.org/documentation/2.0/Iteratees ) と Rx for .NET ( http://msdn.microsoft.com/en-us/data/gg577609.aspx )です。

Future の代わりに、次のようなものを定義します。

interface Observable<T> {
    Disposable subscribe(Observer<T> observer);
}

interface Observer<T> {
   void onCompleted();
   void onError(Exception error);
   void onNext(T value);
}

そしてたくさんのコンビネータ。

于 2013-01-30T19:18:47.720 に答える
0

Future は実際には、この方法で中間結果を通信するためではなく、単一の (原子的な) 結果を返すように設計されています。実際にやりたいことは、バッチごとに 1 つずつ、複数の先物を使用することです。

さまざまなリモートサーバーから取得する必要があるものがたくさんあり、それぞれが異なる時間に戻ってくるという同様の要件があります。最後のものが返されるまで待つのではなく、返された順に処理します。このために、インターフェースの使用を完全に抽象化して、反復でブロックする を受け取って返すAsyncCompleterを作成しました。Iterable<Callable<T>>Iterable<T>Future

そのクラスがどのように実装されているかを見ると、自分でこれを構築する必要がある場合に、利用可能になった順序でCompletionServiceから結果を受け取るために を使用する方法がわかります。Executor

編集:基本的に ExecutorCompletionService を使用して、グレイの答えの後半が似ていることがわかりました

于 2012-09-10T22:22:32.567 に答える