2

ThreadPoolExecutor と CompletionService を使用した Java のコードがいくつかあります。タスクは大きなバッチでプールに送信されます。結果は完了サービスに送られ、バッチ全体が完了するのを待たずに、利用可能な場合に完了したタスクを収集します。

 ThreadPoolExecutor _executorService =
            new ThreadPoolExecutor(MAX_NUMBER_OF_WORKERS, new LinkedBlockingQueue(20));
 CompletionService _completionService =
            new ExecutorCompletionService<Callable>(_executorService)

//submit tasks
_completionService.submit( some task);

//get results
while(...){
   Future result = _completionService.poll(timeout);
   if(result)
      //process result
}

プール内のワーカーの総数は MAX_NUMBER_OF_WORKERS です。利用可能なワーカーなしで送信されたタスクはキューに入れられます。最大 20 個のタスクをキューに入れることができ、その後、タスクは拒否されます。

このアプローチに対応するGparsは何ですか?

gpars 並列処理に関するドキュメントを読んだところcollectManyParallel()anyParallel()、 、 などの多くの潜在的なオプションが見つかりましたが、どれをテストすればよいかわかりfork/joinません。ドキュメントで比較として「完了」または「完了サービス」についての言及を見つけたいと思っていましたが、何も見つかりませんでした。gpars の経験者からどこから始めるべきかについての方向性/指針を探しています。

4

1 に答える 1

1

オンザフライで結果を収集し、プロデューサーを調整する - これにはデータフロー ソリューションが必要です。以下の実行可能なデモのサンプルを見つけてください。

import groovyx.gpars.dataflow.DataflowQueue
import groovyx.gpars.group.DefaultPGroup
import groovyx.gpars.scheduler.DefaultPool

import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.ThreadPoolExecutor
import java.util.concurrent.TimeUnit

int MAX_NUMBER_OF_WORKERS = 10

ThreadPoolExecutor _executorService =
        new ThreadPoolExecutor(MAX_NUMBER_OF_WORKERS, MAX_NUMBER_OF_WORKERS, 1000, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(200));

final group = new DefaultPGroup(new DefaultPool(_executorService))
final results = new DataflowQueue()

//submit tasks
30.times {value ->
    group.task(new Runnable() {
        @Override
        void run() {
            println 'Starting ' + Thread.currentThread()
            sleep 5000
            println 'Finished ' + Thread.currentThread()
            results.bind(value)
        }
    });
}
group.task {
    results << -1  //stop the consumer eventually
}

//get results
while (true) {
    def result = results.val
    println result
    if (result == -1) break
    //process result
}

group.shutdown()
于 2014-01-14T08:33:20.533 に答える