3

Executor と Blocking Queue を使用していくつかのタスクを実行するコードがあります。結果はイテレータとして返される必要があります。これは、私が取り組んでいるアプリケーションが期待するものだからです。ただし、キューに追加されたタスクと結果の間には 1:N の関係があるため、ExecutorCompletionServiceを使用することはできません。hasNext() を呼び出している間、キューからの結果の取得を停止できるように、すべてのタスクがいつ終了し、すべての結果がキューに追加されたかを知る必要があります。アイテムがキューに入れられると、別のスレッドが消費する準備ができている必要があることに注意してください ( Executor.invokeAll()、すべてのタスクが完了するまでブロックしますが、これは私が望むものでもタイムアウトでもありません)。これは私の最初の試みでした。機能しない場合でも、ポイントを示すためだけに AtomicInteger を使用しています。誰かがこの問題を解決する方法を理解するのを手伝ってくれますか?

public class ResultExecutor<T> implements Iterable<T> {
    private BlockingQueue<T> queue;
    private Executor executor;
    private AtomicInteger count;

    public ResultExecutor(Executor executor) {
        this.queue = new LinkedBlockingQueue<T>();
        this.executor = executor;
        count = new AtomicInteger();            
    }

    public void execute(ExecutorTask task) {
        executor.execute(task);
    }

    public Iterator<T> iterator() {
        return new MyIterator();
    }

    public class MyIterator implements Iterator<T> {
        private T current;          
        public boolean hasNext() {
            if (count.get() > 0 && current == null)
            {
                try {
                    current = queue.take();
                    count.decrementAndGet();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            return current != null;
        }

        public T next() {
            final T ret = current;
            current = null;
            return ret;
        }

        public void remove() {
            throw new UnsupportedOperationException();
        }

    }

    public class ExecutorTask implements Runnable{
        private String name;

        public ExecutorTask(String name) {
            this.name = name;

        }

         private int random(int n)
         {
           return (int) Math.round(n * Math.random());
         }


        @SuppressWarnings("unchecked")
        public void run() {
            try {
                int random = random(500);
                Thread.sleep(random);
                queue.put((T) (name + ":" + random + ":1"));
                queue.put((T) (name + ":" + random + ":2"));
                queue.put((T) (name + ":" + random + ":3"));
                queue.put((T) (name + ":" + random + ":4"));
                queue.put((T) (name + ":" + random + ":5"));

                count.addAndGet(5);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }                   
        }           
    }       

}

呼び出しコードは次のようになります。

    Executor e = Executors.newFixedThreadPool(2);
    ResultExecutor<Result> resultExecutor = new ResultExecutor<Result>(e);

    resultExecutor.execute(resultExecutor.new ExecutorTask("A"));
    resultExecutor.execute(resultExecutor.new ExecutorTask("B"));

    Iterator<Result> iter = resultExecutor.iterator();
    while (iter.hasNext()) {
        System.out.println(iter.next());
    }
4

5 に答える 5

3

で「毒」オブジェクトを使用Queueして、タスクがこれ以上結果を提供しないことを通知します。

class Client
{

  public static void main(String... argv)
    throws Exception
  {
    BlockingQueue<String> queue = new LinkedBlockingQueue<String>();
    ExecutorService workers = Executors.newFixedThreadPool(2);
    workers.execute(new ExecutorTask("A", queue));
    workers.execute(new ExecutorTask("B", queue));
    Iterator<String> results = 
      new QueueMarkersIterator<String>(queue, ExecutorTask.MARKER, 2);
    while (results.hasNext())
      System.out.println(results.next());
  }

}

class QueueMarkersIterator<T>
  implements Iterator<T>
{

  private final BlockingQueue<? extends T> queue;

  private final T marker;

  private int count;

  private T next;

  QueueMarkersIterator(BlockingQueue<? extends T> queue, T marker, int count)
  {
    this.queue = queue;
    this.marker = marker;
    this.count = count;
    this.next = marker;
  }

  public boolean hasNext()
  {
    if (next == marker)
      next = nextImpl();
    return (next != marker);
  }

  public T next()
  {
    if (next == marker)
      next = nextImpl();
    if (next == marker)
      throw new NoSuchElementException();
    T tmp = next;
    next = marker;
    return tmp;
  }

  /*
   * Block until the status is known. Interrupting the current thread 
   * will cause iteration to cease prematurely, even if elements are 
   * subsequently queued.
   */
  private T nextImpl()
  {
    while (count > 0) {
      T o;
      try {
        o = queue.take();
      }
      catch (InterruptedException ex) {
        count = 0;
        Thread.currentThread().interrupt();
        break;
      }
      if (o == marker) {
        --count;
      }
      else {
        return o;
      }
    }
    return marker;
  }

  public void remove()
  {
    throw new UnsupportedOperationException();
  }

}

class ExecutorTask
  implements Runnable
{

  static final String MARKER = new String();

  private static final Random random = new Random();

  private final String name;

  private final BlockingQueue<String> results;

  public ExecutorTask(String name, BlockingQueue<String> results)
  {
    this.name = name;
    this.results = results;
  }

  public void run()
  {
    int random = ExecutorTask.random.nextInt(500);
    try {
      Thread.sleep(random);
    }
    catch (InterruptedException ignore) {
    }
    final int COUNT = 5;
    for (int idx = 0; idx < COUNT; ++idx)
      results.add(name + ':' + random + ':' + (idx + 1));
    results.add(MARKER);
  }

}
于 2010-07-09T16:47:40.673 に答える
1

私は未来があなたが探しているものだと信じています。これにより、非同期タスクを結果オブジェクトに関連付け、その結果のステータスをクエリできます。開始するタスクごとに、その参照を保持し、それFutureを使用して完了したかどうかを判断します。

于 2010-07-09T16:33:16.310 に答える
1

If I understand your problem correctly (which I'm not sure I do), you can prevent an infinite wait on an empty queue by using [BlockingQueue.poll][1] instead of take(). This lets you specify a timeout, after which time null will be returned if the queue is empty.

If you drop this straight into your hasNext implementation (with an appropriately short timeout), the logic will be correct. An empty queue will return false while a queue with entities remaining will return true.

[1]: http://java.sun.com/javase/6/docs/api/java/util/concurrent/BlockingQueue.html#poll(long, java.util.concurrent.TimeUnit)

于 2010-07-09T16:42:02.717 に答える
0

これは、待機/通知、 AtomicInteger、およびコールバックを備えた非ブロッキングキューを使用する代替ソリューションです。

public class QueueExecutor implements CallbackInterface<String> {

    public static final int NO_THREADS = 26;

    private Object syncObject = new Object();
    private AtomicInteger count;
    Queue<String> queue = new LinkedList<String>();

    public void execute() {
        count = new AtomicInteger(NO_THREADS);
        ExecutorService executor = Executors.newFixedThreadPool(NO_THREADS/2);
        for(int i=0;i<NO_THREADS;i++)
            executor.execute(new ExecutorTask<String>("" + (char) ('A'+i), queue, this));

        Iterator<String> iter = new QueueIterator<String>(queue, count);
        int count = 0;
        while (iter.hasNext()) {

            System.out.println(iter.next());
            count++;
        }

        System.out.println("Handled " + count + " items");
    }

    public void callback(String result) {
        System.out.println(result);
        count.decrementAndGet();
        synchronized (syncObject) {
            syncObject.notify();
        }
    }

    public class QueueIterator<T> implements Iterator<T> {
        private Queue<T> queue;
        private AtomicInteger count;

        public QueueIterator(Queue<T> queue, AtomicInteger count) {
            this.queue = queue;
            this.count = count;
        }

        public boolean hasNext() {          
            while(true) {                   
                synchronized (syncObject) {
                    if(queue.size() > 0)
                        return true;

                    if(count.get() == 0)
                        return false;

                    try {
                        syncObject.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }

        public T next() {

            synchronized (syncObject) {
                if(hasNext())
                    return queue.remove();
                else
                    return null;
            }
        }

        public void remove() {
            throw new UnsupportedOperationException();
        }

    }

    class ExecutorTask<T> implements Runnable {
        private String name;
        private Queue<T> queue;
        private CallbackInterface<T> callback;

        public ExecutorTask(String name, Queue<T> queue,
                CallbackInterface<T> callback) {
            this.name = name;
            this.queue = queue;
            this.callback = callback;
        }

        @SuppressWarnings("unchecked")
        public void run() {
            try {
                Thread.sleep(1000);
                                    Random randomX = new Random();
                for (int i = 0; i < 5; i++) {
                    synchronized (syncObject) {
                        Thread.sleep(randomX.nextInt(10)+1);

                        queue.add((T) (name + ":" + ":" + i));
                        syncObject.notify();
                    }
                }

                callback.callback((T) (name + ": Done"));

            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

}

public interface CallbackInterface<T> {
    void callback(T result);
}

そして、呼び出しコードは単純です:

    QueueExecutor exec = new QueueExecutor();
    exec.execute();
于 2010-07-12T15:59:52.477 に答える
0

I am not sure I understand you, but why can't the worker threads put themselves Lists onto the Queue. You can then make a custom iterator that goes over the queue in an outer loop and through the subiterators. All without concurrency magic.

于 2013-03-07T23:17:26.670 に答える