8

(新しい Producer APIを使用して) Kafka クラスターに複数のメッセージを連続して発行すると、Futureメッセージごとにプロデューサーから が取得されます。

ここで、プロデューサを構成したと仮定して、最後の未来を待って、以前のすべてが (そして順番に) 配信されたことを確認できますかmax.in.flight.requests.per.connection = 1? retries > 0それとも、すべての先物を待つ必要がありますか? コードでは、これを行うことができますか:

Producer<String, String> producer = new KafkaProducer<>(myConfig);
Future<?> f = null;
for(MessageType message : messages){
  f = producer.send(new ProducerRecord<String,String>("myTopic", message.getKey(), message.getValue());
}
try {
  f.get();
} catch(ExecutionException e) {
  //handle exception
}

これの代わりに:

Producer<String, String> producer = new KafkaProducer<>(myConfig);
List<Future<?>> futureList = new ArrayList<>();
for(MessageType message : messages){
  futureList.add(producer.send(new ProducerRecord<String,String>("myTopic", message.getKey(), message.getValue()));
}
try {
  for(Future<?> f : futureList) {
    f.get();
  }
} catch(ExecutionException e) {
  //handle exception
}

ここで何もキャッチされない場合 (最初のスニペットから):

try {
  f.get();
} catch(ExecutionException e) {

次に、すべてのメッセージがクラスターに順番に格納され(プロデューサーが内部で再試行を実行したかどうかに関係なく)、何か問題が発生した場合、最後の未来ではなくても例外が発生します (私が問題が最初に発生したのは?

他に注意すべき奇妙なコーナー ケースはありますか?

4

2 に答える 2

2

これを行うことができますが a) 再試行を無限 (または事実上無限) に設定し、b) 再試行不可能な例外が発生した場合にデータを破棄しても問題ない場合に限ります。

もう少し説明すると、Kafka には 2 つのクラスの例外があります。再試行可能な例外は、再度実行すると成功する可能性がある失敗です。たとえば、NotEnoughReplicasException必要な数よりもレプリカが少ないため、リクエストが拒否されることを示します。しかし、失敗したブローカーがオンラインに戻った場合は、十分な数のレプリカがあり、良好な状態に戻っている可能性があり、再度送信するとリクエストは成功します。対照的に、 aSerializationExceptionは再試行可能ではありません。これは、シリアル化を再試行すると結果が異なると信じる理由がないためです。

プロデューサーの再試行は、再試行不可能な例外が発生するまでのみ適用されます。したがって、これらのいずれにもヒットせず、無限の再試行を使用し、言及した他の設定を使用すると、最終的な未来が解決されると、注文と配信の成功が保証されます. ただし、再試行不可能な例外が発生する可能性があるため、各フューチャー (またはコールバック) を処理し、リクエストが失敗した場合に少なくとも何かをログに記録することをお勧めします。

于 2016-08-05T01:59:18.433 に答える
2

Ewen が言ったことに加えて、ループですべてのメッセージを送信し終わった後に、 flush()を呼び出すこともできます。この呼び出しは、すべての先物が完了するまでブロックされるため、この後、例外がないか先物を確認できます。ただし、これを行うには、すべての先物を保持する必要があります。

別の方法は、以下に示すように、送信でコールバックを使用し、返された例外を保存することです。再度フラッシュを使用すると、例外をチェックする前に、すべての送信が完了していることを確認できます。

Producer<String, String> producer = new KafkaProducer<>(myConfig);
final ArrayList<Exception> exceptionList = new ArrayList<>();

for(MessageType message : messages){
  producer.send(new ProducerRecord<String, String>("myTopic", message.getKey(), message.getValue()), new Callback() {
    @Override
    public void onCompletion(RecordMetadata metadata, Exception exception) {
      if (exception != null) {
        exceptionList.add(exception);
      }
    }
  });
}

producer.flush();

if (!exceptionList.isEmpty()) {
  // do stuff
}
于 2017-02-01T14:24:34.197 に答える