5

ExecutorService次のセマンティクスを提供する実装を探しています。各スレッドは、入力に基づいて何らかのタスクを実行する「ワーカー」によって占有されます。各ワーカーは単一のスレッドでのみ実行されることが保証されているため、単一のスレッドで自分自身と同期するため、同期のオーバーヘッドなしでタスクからタスクへの状態を維持できるようにする必要があります。

100 個の入力と 10 個のワーカーがあるとします。次のように記述できるようにしたいと考えています。

for (Input input: inputs) {
    // The following code would pass t to all 10 workers,
    // each bound to their own thread,
    // and wait for them to complete.
    executor.invokeAll(input);
}

各ワーカーは、与えられた入力に対して異なることを行うことに注意してください。入力は実行可能なコード ブロックではなく、ワーカーへの単なるパラメーターです。各ワーカーは、入力をどうするかを決定します。ただし、簡単にするために、ワーカーは、入力を受け取ってポリモーフィックに呼び出すことができるインターフェイスを実装します。

を使用して機能するものを一緒にハックしましたMap<Worker, WorkerExecutor>。ここで、WorkerExecutorは a の周りのシンラッパーでありExecutors.newSingleThreadPool、Worker の単一のインスタンスのみが各スレッドプールで実行されます。私は彼らが何をしているかを知っている誰かによって書かれたものを見つけたいと思います:-)


潜在的な非効率性

この種のセマンティクスが非効率になることは理解していますが、開発時間の点で最大限の利益を得ようとしており、Worker の各実装をスレッドセーフになるように再設計することは簡単ではありません。私が意味する非効率性は、実行が次のようになる可能性があることです (この例では最大 2 つのアクティブなスレッドをシミュレートします)。

         | Task 1    | Task 2    | Task 3    | Task 4    |
Worker 1 | =@        | =@        | =@        | =@        |
Worker 2 | ==@       | ==@       | ==@       | ==@       |
Worker 3 |   ==@     |   ==@     |   ==@     |   ==@     |
Worker 4 |    =====@ |    =====@ |    =====@ |    =====@ |

問題は、Worker 3 が完了すると、実行するタスクが残っておらず、Worker 4 が完了するまで作業を実行できないことです。これは、CPU をアイドル状態のままにしておくことができる任意の長い時間になる可能性があります。


そのようなものはExecutorService存在しますか?

4

2 に答える 2

2

あなたが実際に望んでいるのは俳優のようです。簡単に言うと、アクターは単一のスレッドで実行され、順次処理を担当するタスクの「メールボックス」を持つオブジェクトです。Akkaは、JVM でアクターを提供する現在の主要なライブラリ/フレームワークのようです。あそこを見てください。

于 2013-02-23T21:02:11.250 に答える
1

次のようなもの:

import java.util.LinkedHashSet;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;

// you implement this for each of your non-parallelisable jobbies
interface Worker<T> {
    public void process(T input);
}

// implementation detail
class Clerk<T> {
    private final Executor executor = Executors.newSingleThreadExecutor();
    private final Worker<T> worker;

    public Clerk(Worker<T> worker) {
        this.worker = worker;
    }

    public void process(final T input) {
        executor.execute(new Runnable() {
            @Override
            public void run() {
                worker.process(input);
            }
        });
    }
}

// make one of these, and give it all your workers, then give it input
class Workshop<T> {
    private final Set<Clerk<T>> clerks = new LinkedHashSet<Clerk<T>>();

    public void addWorker(Worker<T> worker) {
        // mutable; you love it
        clerks.add(new Clerk<T>(worker));
    }

    public void process(T input) {
        for (Clerk<T> clerk : clerks) {
            clerk.process(input);
        }
    }

    public void processAll(Iterable<T> inputs) {
        for (T input : inputs) {
            process(input);
        }
    }
}

多分?

于 2013-02-27T18:30:40.193 に答える