(新しい Producer APIを使用して) Kafka クラスターに複数のメッセージを連続して発行すると、Future
メッセージごとにプロデューサーから が取得されます。
ここで、プロデューサを構成したと仮定して、最後の未来を待って、以前のすべてが (そして順番に) 配信されたことを確認できますかmax.in.flight.requests.per.connection = 1
? retries > 0
それとも、すべての先物を待つ必要がありますか? コードでは、これを行うことができますか:
Producer<String, String> producer = new KafkaProducer<>(myConfig);
Future<?> f = null;
for(MessageType message : messages){
f = producer.send(new ProducerRecord<String,String>("myTopic", message.getKey(), message.getValue());
}
try {
f.get();
} catch(ExecutionException e) {
//handle exception
}
これの代わりに:
Producer<String, String> producer = new KafkaProducer<>(myConfig);
List<Future<?>> futureList = new ArrayList<>();
for(MessageType message : messages){
futureList.add(producer.send(new ProducerRecord<String,String>("myTopic", message.getKey(), message.getValue()));
}
try {
for(Future<?> f : futureList) {
f.get();
}
} catch(ExecutionException e) {
//handle exception
}
ここで何もキャッチされない場合 (最初のスニペットから):
try {
f.get();
} catch(ExecutionException e) {
次に、すべてのメッセージがクラスターに順番に格納され(プロデューサーが内部で再試行を実行したかどうかに関係なく)、何か問題が発生した場合、最後の未来ではなくても例外が発生します (私が問題が最初に発生したのは?
他に注意すべき奇妙なコーナー ケースはありますか?