4

タスクのキューがあり、数秒に1回キューをピークするスレッドがあり、タスクがある場合はそれを実行します。

別のコードセクション(もちろん別のスレッド)があり、ループ内にタスクを作成し(ループの外側から事前にタスクの数を知ることはできません)、それらをキューに挿入します。タスクにはいくつかの「結果」オブジェクトが含まれており、外部スレッド(これらのタスクを作成した)は、すべてのタスクが終了するのを待って、最終的に各タスクから結果を取得する必要があります。問題は、モニターの数が事前にわからないため、結果オブジェクトにjava Semaphore\CountDownLatchなどを渡すことができないことです。また、タスクが同期されていないため、invokeAllを使用するExecutorを使用したり、Futureオブジェクトを待機したりすることはできません(外部スレッドはタスクをキューに入れるだけで、時間があるときに別のスレッドがタスクを実行します)。

私が考えていた唯一の解決策は、一連の結果とモニターカウンターを保持する「逆セマフォ」クラスを作成することです。getResult関数は、カウンター== 0であるかどうかを確認し、答えがyesの場合は、ロックオブジェクトに通知し、getResult関数はこのロックを待機します。

public class InvertedSemaphore<T> {
    Set<T> resultSet;
    int usages;
    final Object c;

    public InvertedSemaphore() {
        resultSet = Collections.synchronizedSet(new HashSet<T>());
        usages = 0;
        c = new Object();
    }

    public void addResult(T result) {
        resultSet.add(result);
    }

    public void addResults(Set<T> result) {
        resultSet.addAll(result);
    }


    public void acquire() {
        usages++;
    }

    public void release() {
        synchronized (c) {
            if (--usages == 0) {
                c.notify();
            }
        }
    }

    public Set<T> getResults() {
        synchronized (c) {
            try {
                while (usages > 0) {
                    c.wait();
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        return resultSet;
    }

}

各addTaskメソッドはsemaphore.acquireを呼び出し、各(非同期)タスクはタスクの最後にsemaphore.releaseを呼び出します。

それはかなり複雑に聞こえますが、Java並行ライブラリなどでこれに対するより良い解決策があると確信しています。

どんなアイデアでも適用されます:)

4

3 に答える 3

4

タスクを順番に処理する必要がない場合は、ExecutorCompletionServiceを使用します

より一般的には、結果の a を取得するためにinvokeAllon anを使用する必要はありません。をこの目的に使用するか、オプションで、作成中のタスク自体を実装して、タスクの作成者が後で結果を要求できるようにすることができます。ExecutorServiceFutureExecutorService#submitFuture

いくつかのコード:

class MyTask {
    AtomicReference<?> result = new AtomicReference<?>();

    void run() {
       //do stuff here
       result.set(/* the result of the calculation */);
    }

    boolean resultReady() {
        return result.get()!=null;
    }

    ? get() {
        return result.get();
    }
}

... コードの他の場所

void createTasks() {
    Collection<MyTask> c = new ...;

    while(indeterminable condition) {
        MyTask task = new MyTask();
        c.add(task);
        mysteryQueue.add(task);
    }

    while(haven't received all results) {
        MyTask task = c.get(...); //or iterate or whatever
        ? result = task.get();
        if (result!=null) {
            //do stuff, probably remove the task from the collection c would be smart
        }
    }
}
于 2012-07-25T17:19:08.270 に答える
1

1 つのアイデアは、結果に別のキューを使用することです。したがって、スレッドがスレッドにタスクを配置する
1 つのブロッキング キューがあり、それによって生産者と消費者のアプローチが可能になります。各タスクが完了すると、結果は2 番目の結果キューに配置され、最初に作成されたスレッド以降、消費者と生産者の役割が逆転します。タスクは 2 番目のキューからの結果を消費します。 ABA

于 2012-07-25T17:35:01.083 に答える
1

次のことができます。各プロデューサーは独自のキューを保持します。プロデューサーは、このキューに報告する手段をタスク自体に渡します。タスクの実行が終了すると、その結果がこのキューに入れられます。それはいくつかのコードによって記述された獣です:

class Result{}

interface IResultCallback{
    void resultReady(Result r); // this is an abstraction of the queue
}

class Producer implements IResultCallback{ 
    // the producer needs to pass itself to the constructor of the task,
    // the task will only see its "resultReady" facade and will be able to report to it. 
    // the producer can aggragte the results at it will and execute its own computation as 
        // as soon it is ready

    Queue<Result> results; // = init queue

    @Override
    public void resultReady(Result r) {
        results.add(r);

        if(results.size() == 9){
            operate();
        }
        results.clear();
    }

    public void operate(){
        // bla bla
    }
}

public class Task {
    IResultCallback callback;

    public Task(IResultCallback callback){
        this.callback = callback;
    }
    public void execute(){
        // bla bla

        Result r = null; // init result;
        callback.resultReady(r);
    }
}
于 2012-07-25T18:17:40.183 に答える