36

非同期タスクをスレッドのプールに委任するプロセスがあります。特定のタスクが順番に実行されるようにする必要があります。たとえば

タスクが順番に到着する

タスク a1、b1、c1、d1、e1、a2、a3、b2、f1

自然な依存関係がある場合を除いて、タスクは任意の順序で実行できるため、a1、a2、a3 は、同じスレッドに割り当てるか、前の a# タスクが完了したことがわかるまでこれらをブロックすることによって、その順序で処理する必要があります。

現在、Java Concurrency パッケージは使用していませんが、スレッド管理を活用するために変更を検討しています。

誰もがこれを達成する方法について同様の解決策または提案を持っていますか

4

8 に答える 8

18

同じキーを持つタスクのタスク順序を保証する独自の Executor を作成します。同じキーを持つ注文タスクにキューのマップを使用します。キー付きの各タスクは、同じキーで次のタスクを実行します。

このソリューションは、委任された Executor からのRejectedExecutionExceptionまたはその他の例外を処理しません。したがって、委任された Executor は「無制限」である必要があります。

import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.Executor;

/**
* This Executor warrants task ordering for tasks with same key (key have to implement hashCode and equal methods correctly).
*/
public class OrderingExecutor implements Executor{

    private final Executor delegate;
    private final Map<Object, Queue<Runnable>> keyedTasks = new HashMap<Object, Queue<Runnable>>();

    public OrderingExecutor(Executor delegate){
        this.delegate = delegate;
    }

    @Override
    public void execute(Runnable task) {
        // task without key can be executed immediately
        delegate.execute(task);
    }

    public void execute(Runnable task, Object key) {
        if (key == null){ // if key is null, execute without ordering
            execute(task);
            return;
        }

        boolean first;
        Runnable wrappedTask;
        synchronized (keyedTasks){
            Queue<Runnable> dependencyQueue = keyedTasks.get(key);
            first = (dependencyQueue == null);
            if (dependencyQueue == null){
                dependencyQueue = new LinkedList<Runnable>();
                keyedTasks.put(key, dependencyQueue);
            }

            wrappedTask = wrap(task, dependencyQueue, key);
            if (!first)
                dependencyQueue.add(wrappedTask);
        }

        // execute method can block, call it outside synchronize block
        if (first)
            delegate.execute(wrappedTask);

    }

    private Runnable wrap(Runnable task, Queue<Runnable> dependencyQueue, Object key) {
        return new OrderedTask(task, dependencyQueue, key);
    }

    class OrderedTask implements Runnable{

        private final Queue<Runnable> dependencyQueue;
        private final Runnable task;
        private final Object key;

        public OrderedTask(Runnable task, Queue<Runnable> dependencyQueue, Object key) {
            this.task = task;
            this.dependencyQueue = dependencyQueue;
            this.key = key;
        }

        @Override
        public void run() {
            try{
                task.run();
            } finally {
                Runnable nextTask = null;
                synchronized (keyedTasks){
                    if (dependencyQueue.isEmpty()){
                        keyedTasks.remove(key);
                    }else{
                        nextTask = dependencyQueue.poll();
                    }
                }
                if (nextTask!=null)
                    delegate.execute(nextTask);
            }
        }
    }
}
于 2014-02-21T13:15:55.713 に答える
14

過去にこれを行ったとき、通常はコンポーネントによって順序付けが処理され、呼び出し可能/実行可能をエグゼキューターに送信していました。

何かのようなもの。

  • 実行するタスクのリストを取得しました。一部には依存関係があります
  • Executor を作成し、ExecutorCompletionService でラップします。
  • 依存関係のないすべてのタスクを検索し、完了サービスを介してスケジュールします
  • 補完サービスをポーリングする
  • 各タスクが完了するたびに
    • 「完了」リストに追加する
    • 「完了リスト」に関連する待機中のタスクを再評価して、「依存関係が完了」しているかどうかを確認します。もしそうなら、それらをスケジュールする
    • すべてのタスクが送信/完了するまで、すすぎを繰り返します

完了サービスは、一連の Future をポーリングするのではなく、完了したタスクを取得できる優れた方法です。ただしMap<Future, TaskIdentifier>、完了サービスが完了した Future を提供するときに、それがどれであるかを把握できるように、完了サービスを介してタスクがスケジュールされたときにデータが取り込まれたを保持することをお勧めしますTaskIdentifier

タスクがまだ実行を待機しているが、何も実行されておらず、何もスケジュールできない状態になっている場合は、循環依存の問題があります。

于 2010-08-28T20:25:56.367 に答える
3

RunnableまたはCallableに送信すると、見返りにExecutorServiceを受け取りますFuture。a1 に依存するスレッドに a1 を渡しFutureて を呼び出しますFuture.get()。これは、スレッドが完了するまでブロックされます。

そう:

ExecutorService exec = Executor.newFixedThreadPool(5);
Runnable a1 = ...
final Future f1 = exec.submit(a1);
Runnable a2 = new Runnable() {
  @Override
  public void run() {
    f1.get();
    ... // do stuff
  }
}
exec.submit(a2);

等々。

于 2010-01-28T10:13:16.520 に答える
2

もう 1 つのオプションは、独自のエグゼキューターを作成し、それを OrderedExecutor と呼び、カプセル化された ThreadPoolExecutor オブジェクトの配列を作成し、内部エグゼキューターごとに 1 つのスレッドを作成することです。次に、内部オブジェクトの 1 つを選択するためのメカニズムを提供します。たとえば、クラスのユーザーが実装できるインターフェイスを提供することでこれを行うことができます。

executor = new OrderedExecutor( 10 /* プールサイズ */, new OrderedExecutor.Chooser() {
  public int choose( 実行可能 実行可能 ) {
     MyRunnable myRunnable = (MyRunnable)runnable;
     myRunnable.someId() を返します。
  });

executor.execute( new MyRunnable() );

OrderedExecutor.execute() の実装は、Chooser を使用して int を取得します。これをプール サイズで変更すると、それが内部配列へのインデックスになります。「someId()」はすべての「a」などに対して同じ値を返すという考えです。

于 2010-08-28T20:16:21.843 に答える
1

この問題のために OrderingExecutor を作成しました。異なるランナブルを使用してメソッド execute() に同じキーを渡すと、同じキーを使用したランナブルの実行は、execute() が呼び出された順序になり、オーバーラップすることはありません。

import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;

/**
 * Special executor which can order the tasks if a common key is given.
 * Runnables submitted with non-null key will guaranteed to run in order for the same key.
 *
 */
public class OrderedExecutor {

    private static final Queue<Runnable> EMPTY_QUEUE = new QueueWithHashCodeAndEquals<Runnable>(
            new ConcurrentLinkedQueue<Runnable>());

    private ConcurrentMap<Object, Queue<Runnable>> taskMap = new ConcurrentHashMap<Object, Queue<Runnable>>();
    private Executor delegate;
    private volatile boolean stopped;

    public OrderedExecutor(Executor delegate) {
        this.delegate = delegate;
    }

    public void execute(Runnable runnable, Object key) {
        if (stopped) {
            return;
        }

        if (key == null) {
            delegate.execute(runnable);
            return;
        }

        Queue<Runnable> queueForKey = taskMap.computeIfPresent(key, (k, v) -> {
            v.add(runnable);
            return v;
        });
        if (queueForKey == null) {
            // There was no running task with this key
            Queue<Runnable> newQ = new QueueWithHashCodeAndEquals<Runnable>(new ConcurrentLinkedQueue<Runnable>());
            newQ.add(runnable);
            // Use putIfAbsent because this execute() method can be called concurrently as well
            queueForKey = taskMap.putIfAbsent(key, newQ);
            if (queueForKey != null)
                queueForKey.add(runnable);
            delegate.execute(new InternalRunnable(key));
        }
    }

    public void shutdown() {
        stopped = true;
        taskMap.clear();
    }

    /**
     * Own Runnable used by OrderedExecutor.
     * The runnable is associated with a specific key - the Queue&lt;Runnable> for this
     * key is polled.
     * If the queue is empty, it tries to remove the queue from taskMap. 
     *
     */
    private class InternalRunnable implements Runnable {

        private Object key;

        public InternalRunnable(Object key) {
            this.key = key;
        }

        @Override
        public void run() {
            while (true) {
                // There must be at least one task now
                Runnable r = taskMap.get(key).poll();
                while (r != null) {
                    r.run();
                    r = taskMap.get(key).poll();
                }
                // The queue emptied
                // Remove from the map if and only if the queue is really empty
                boolean removed = taskMap.remove(key, EMPTY_QUEUE);
                if (removed) {
                    // The queue has been removed from the map,
                    // if a new task arrives with the same key, a new InternalRunnable
                    // will be created
                    break;
                } // If the queue has not been removed from the map it means that someone put a task into it
                  // so we can safely continue the loop
            }
        }
    }

    /**
     * Special Queue implementation, with equals() and hashCode() methods.
     * By default, Java SE queues use identity equals() and default hashCode() methods.
     * This implementation uses Arrays.equals(Queue::toArray()) and Arrays.hashCode(Queue::toArray()).
     *
     * @param <E> The type of elements in the queue.
     */
    private static class QueueWithHashCodeAndEquals<E> implements Queue<E> {

        private Queue<E> delegate;

        public QueueWithHashCodeAndEquals(Queue<E> delegate) {
            this.delegate = delegate;
        }

        public boolean add(E e) {
            return delegate.add(e);
        }

        public boolean offer(E e) {
            return delegate.offer(e);
        }

        public int size() {
            return delegate.size();
        }

        public boolean isEmpty() {
            return delegate.isEmpty();
        }

        public boolean contains(Object o) {
            return delegate.contains(o);
        }

        public E remove() {
            return delegate.remove();
        }

        public E poll() {
            return delegate.poll();
        }

        public E element() {
            return delegate.element();
        }

        public Iterator<E> iterator() {
            return delegate.iterator();
        }

        public E peek() {
            return delegate.peek();
        }

        public Object[] toArray() {
            return delegate.toArray();
        }

        public <T> T[] toArray(T[] a) {
            return delegate.toArray(a);
        }

        public boolean remove(Object o) {
            return delegate.remove(o);
        }

        public boolean containsAll(Collection<?> c) {
            return delegate.containsAll(c);
        }

        public boolean addAll(Collection<? extends E> c) {
            return delegate.addAll(c);
        }

        public boolean removeAll(Collection<?> c) {
            return delegate.removeAll(c);
        }

        public boolean retainAll(Collection<?> c) {
            return delegate.retainAll(c);
        }

        public void clear() {
            delegate.clear();
        }

        @Override
        public boolean equals(Object obj) {
            if (!(obj instanceof QueueWithHashCodeAndEquals)) {
                return false;
            }
            QueueWithHashCodeAndEquals<?> other = (QueueWithHashCodeAndEquals<?>) obj;
            return Arrays.equals(toArray(), other.toArray());
        }

        @Override
        public int hashCode() {
            return Arrays.hashCode(toArray());
        }

    }

}
于 2015-07-28T16:48:25.267 に答える
0

Habanero-Java ライブラリには、タスク間の依存関係を表現し、スレッドブロッキング操作を回避するために使用できるデータ駆動型タスクの概念があります。内部では、Habanero-Java ライブラリは JDK の ForkJoinPool (つまり ExecutorService) を使用します。

たとえば、タスク A1、A2、A3、... のユース ケースは次のように表現できます。

HjFuture a1 = future(() -> { doA1(); return true; });
HjFuture a2 = futureAwait(a1, () -> { doA2(); return true; });
HjFuture a3 = futureAwait(a2, () -> { doA3(); return true; });

a1、a2、および a3 は HjFuture 型のオブジェクトへの単なる参照であり、実行時にタスク A2 および A3 が発生したときに依存関係を指定するためにカスタム データ構造で維持できることに注意してください。

利用可能ないくつかのチュートリアル スライドがありますjavadocAPI 概要、および入門書として、詳細なドキュメントを見つけることができます。

于 2014-04-01T22:23:29.210 に答える