現在、数百万のメッセージをキューで受信するキュー ワーカー ベースの設計を実装しようとしています。また、ワーカーは限られているため、次のコードを使用して作業をワーカーに割り当てています。私は同じために ExecutorService を使用しています:
ExecutorService pool = Executors.newFixedThreadPool(POOL_SIZE);
while(LISTEN_FOR_MESSAGE_FLAG == true){
Message receivedMessage = sqsClient.receiveMessage(request);
if(receivedMessage == null){
Thread.sleep(5000); // sleep for 5 seconds
}
else {
// lock the message for a certain amount of time (60 secs).
// Other workers can't receive a message, when it is locked.
sqsClient.changeMessageVisibility(receivedMessage, 60);
pool.execute(new Task(receivedMessage); // process the message.
}
}
現在、キューに Amazon SQS を使用しています。上記のコードには重大な問題があります。メッセージは 5 秒ごとに受信され、可視性タイムアウトでロックされます。可視性タイムアウトがなくなると、このロックは解除されます。これにより、ワーカーはロックされていないメッセージを保持します。そのため、重複処理の問題があります。注: Amazon SQS は、可視性タイムアウトを延長する方法を提供します。
このケースを処理するために上記のコードをどのように書くことができるか、助けてください。