0

より大きなジョブを処理する JobService があります。ジョブは複数のタスクに動的に分割され、タスクはサブタスクなども生成する可能性があるため、ジョブのタスクの総数を予測することはできません。各タスクはそれ自体をキューに入れ、それを介して実行されますExecutorService.submit(...) 。問題は、「ジョブ キュー」がいつ完了したかを知る唯一の方法はExecutorService.awaitTermination(...). ただし、ジョブとその ExecutorService の間で 単一のスレッドプールを共有できないため、これは非効率的です。

いくつかの代替案を探しています。各ジョブに AtomicInteger を使用することを考えていました。新しいタスクを送信すると増加し、タスクが終了すると減少します。しかし、それがゼロのときをポーリングする必要があり、それは面倒なようで、いくつかの例外処理も混乱しています。

より良い解決策が必要なようですか?

4

2 に答える 2

4

Submit は、タスクの完了を待機するために使用できる Future オブジェクトを返します。これらを追跡し、すべてのサブタスクが完了するまで再帰的にブロックするメソッドを追加できます。このようにして、必要な場所でエグゼキュータを再利用できます。

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;

public class JobExecutor {
    ExecutorService executorService = Executors.newFixedThreadPool(1);

    private class Task implements Runnable {
        private final String name;
        private final Task[] subtasks;
        private final ExecutorService executorService;
        private volatile boolean started = false;
        private Future<?> taskFuture;

        // Separate list from subtasks because this is what you'll probably
        // actually use as you may not be passing subtasks as constructor args
        private final List<Task> subtasksToWaitOn = new ArrayList<Task>();

        public Task(String name, ExecutorService executorService,
                Task... subtasks) {
            this.name = name;
            this.executorService = executorService;
            this.subtasks = subtasks;
        }

        public synchronized void start() {
            if (!started) {
                started = true;
                taskFuture = executorService.submit(this);
            }
        }

        public synchronized void blockTillDone() {
            if (started) {
                try {
                    taskFuture.get();
                } catch (InterruptedException e) {
                    // TODO Handle
                } catch (ExecutionException e) {
                    // TODO Handle
                }
                for (Task subtaskToWaitOn : subtasksToWaitOn) {
                    subtaskToWaitOn.blockTillDone();
                }
            } else {
                // TODO throw exception
            }
        }

        @Override
        public void run() {
            for (Task subtask : subtasks) {
                subtask.start();
                subtasksToWaitOn.add(subtask);
            }
            System.out.println("My name is: " + name);
        }
    }

    void testSubmit() {
        Task subsubTask1 = new Task("Subsubtask1", executorService);
        Task subtask1 = new Task("Subtask1", executorService, subsubTask1);
        Task subtask2 = new Task("Subtask2", executorService);
        Task subtask3 = new Task("Subtask3", executorService);
        Task job = new Task("Job", executorService, subtask1, subtask2,
                subtask3);
        job.start();
        job.blockTillDone();
        System.out.println("Job done!");
    }

    public static void main(String[] args) {
        new JobExecutor().testSubmit();
    }
}

プリントアウト:

My name is: Job
My name is: Subtask1
My name is: Subtask2
My name is: Subtask3
My name is: Subsubtask1
Job done!
于 2012-08-09T03:02:00.053 に答える
1

Java7 (またはバックポート ライブラリhttp://www.cs.washington.edu/homes/djg/teachingMaterials/grossmanSPAC_forkJoinFramework.htmlを使用する Java6 ) を使用している場合は、この種の Fork-Join プールを検討することをお勧めします。もの:

class MainTask extends RecursiveTask<Long> {

    @Override
    protected Long compute() {
        SubTask subtask0 = new SubTask(0L);
        SubTask subtask1 = new SubTask(1L);
        SubTask subtask2 = new SubTask(2L);
        SubTask subtask3 = new SubTask(3L);
        SubTask subtask4 = new SubTask(4L);
        SubTask subtask5 = new SubTask(5L);

        subtask1.fork();
        subtask2.fork();
        subtask3.fork();
        subtask4.fork();
        subtask5.fork();

        return subtask0.compute() +
                subtask1.join() +
                subtask2.join() +
                subtask3.join() +
                subtask4.join() +
                subtask5.join();
    }

}

class SubTask extends RecursiveTask<Long> {
    private Long rawResult = null;

    private Long expected = null;

    public SubTask(long expected) {
        this.expected = expected;
    }

    @Override
    protected Long compute() {
        return expected;
    }
}

public static void main( String[] args )
{
    ForkJoinPool forkJoinPool = new ForkJoinPool();
    Long result = forkJoinPool.invoke(new MainTask());
    System.out.println(result);
}

明らかに、これにはサブタスクがハードコードされていますが、パラメーターをメインタスクに渡して、それを使用してサブタスクを生成できない理由はありません。サブタスク自体はすべて同じタイプである必要はありませんが、すべて RecursiveTask を拡張する必要があります。現実的には、タスクがサブタスク (上記の MainTask など) を生成する場合、現在のスレッドがいくつかの計算を実行できるように、少なくとも 1 つのサブタスクで直接呼び出される "compute" が必要です (fork と join ではなく)。残りはスレッドが行います。

于 2012-08-09T04:17:57.890 に答える