この場合(任意のバージョン)、プロデューサーが「永久に」実行する必要があるExecutorServiceを使用する単一のプロデューサー複数のコンシューマーを実装するためのJavaでの最良のアプローチを見つけようとしています(必要ではありません)。すべてが完全に処理されるまで待機する必要があります。たとえば、すべてのコンシューマー スレッドが終了し、キューが空になり、生成されるアイテムが残っていません。プロデューサは、データ ソースも一定の間隔でのみポーリングする必要があります。
例として、30 分ごとにプロデューサーがレコードのデータ ソースをポーリングし、それらをキューに送信するようにします。コンシューマーが処理に 30 分以上かかっている場合は、データ ソースを再度ポーリングする前に、すべてのアイテムが処理されるまでプロデューサーに待機してもらいます (30 分が経過するとすぐにポーリングします)。
私のコードを書いてくれる人を探しているわけではありません。いくつかの基本的なヒント/ガイダンスをいただければ幸いです。
これは、私が作業しようとしている短縮された実装例です。私は問題を解決するための恐ろしい試みをすべてやめました。ThreadPoolExecutor を構築するためにハードコードされたパラメーターは、最終的に動的になることに注意してください。
import java.util.concurrent.*;
public class ItemProcessorService {
public static void main(String args[]) throws InterruptedException {
RejectedExecutionHandlerImpl rejectionHandler = new RejectedExecutionHandlerImpl();
ThreadFactory threadFactory = Executors.defaultThreadFactory();
int corePoolSize = 5,
maxPoolSize = 10,
keepAlive = 10,
queueCapacity = 1;
ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize, maxPoolSize, keepAlive, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(queueCapacity), threadFactory, rejectionHandler);
for (int i = 0; i < 10; i++) {
executor.execute(new ItemConsumer());
}
executor.shutdown();
while (!executor.isTerminated()) {
}
System.out.println("Executor finished");
}
}
class ItemConsumer implements Runnable {
@Override
public void run() {
processItem();
}
private void processItem() {
try {
Thread.sleep(3000);
System.out.println(Thread.currentThread().getName() + " - processed item");
} catch (InterruptedException e) {
//e.printStackTrace();
}
}
}
class RejectedExecutionHandlerImpl implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
System.out.println(r.toString() + " - rejected");
}
}