Executor と Blocking Queue を使用していくつかのタスクを実行するコードがあります。結果はイテレータとして返される必要があります。これは、私が取り組んでいるアプリケーションが期待するものだからです。ただし、キューに追加されたタスクと結果の間には 1:N の関係があるため、ExecutorCompletionServiceを使用することはできません。hasNext() を呼び出している間、キューからの結果の取得を停止できるように、すべてのタスクがいつ終了し、すべての結果がキューに追加されたかを知る必要があります。アイテムがキューに入れられると、別のスレッドが消費する準備ができている必要があることに注意してください ( Executor.invokeAll()、すべてのタスクが完了するまでブロックしますが、これは私が望むものでもタイムアウトでもありません)。これは私の最初の試みでした。機能しない場合でも、ポイントを示すためだけに AtomicInteger を使用しています。誰かがこの問題を解決する方法を理解するのを手伝ってくれますか?
public class ResultExecutor<T> implements Iterable<T> {
private BlockingQueue<T> queue;
private Executor executor;
private AtomicInteger count;
public ResultExecutor(Executor executor) {
this.queue = new LinkedBlockingQueue<T>();
this.executor = executor;
count = new AtomicInteger();
}
public void execute(ExecutorTask task) {
executor.execute(task);
}
public Iterator<T> iterator() {
return new MyIterator();
}
public class MyIterator implements Iterator<T> {
private T current;
public boolean hasNext() {
if (count.get() > 0 && current == null)
{
try {
current = queue.take();
count.decrementAndGet();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
return current != null;
}
public T next() {
final T ret = current;
current = null;
return ret;
}
public void remove() {
throw new UnsupportedOperationException();
}
}
public class ExecutorTask implements Runnable{
private String name;
public ExecutorTask(String name) {
this.name = name;
}
private int random(int n)
{
return (int) Math.round(n * Math.random());
}
@SuppressWarnings("unchecked")
public void run() {
try {
int random = random(500);
Thread.sleep(random);
queue.put((T) (name + ":" + random + ":1"));
queue.put((T) (name + ":" + random + ":2"));
queue.put((T) (name + ":" + random + ":3"));
queue.put((T) (name + ":" + random + ":4"));
queue.put((T) (name + ":" + random + ":5"));
count.addAndGet(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
呼び出しコードは次のようになります。
Executor e = Executors.newFixedThreadPool(2);
ResultExecutor<Result> resultExecutor = new ResultExecutor<Result>(e);
resultExecutor.execute(resultExecutor.new ExecutorTask("A"));
resultExecutor.execute(resultExecutor.new ExecutorTask("B"));
Iterator<Result> iter = resultExecutor.iterator();
while (iter.hasNext()) {
System.out.println(iter.next());
}