kafka-smallrye
サービスの信頼性を満たすために、デシリアライズできなかったすべての着信メッセージを、 andを使用して配信不能トピックにプッシュする必要がありますquarkus
。
トピックに関するすべてのメッセージは avro 形式である必要があります (ただし、私にはわかりませんでした) スキーマ レジストリにスキーマが定義されています。
この方法でコンシューマーの構成を設定しました。
mp:
messaging:
incoming:
test-in:
connector: smallrye-kafka
group:
id: test-in-consumer-group
topic: events-topic
failure-strategy: dead-letter-queue
schema:
registry:
url: http://localhost:8081
value:
deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
subject:
name:
strategy: io.confluent.kafka.serializers.subject.TopicRecordNameStrategy
key:
deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
subject:
name:
strategy: io.confluent.kafka.serializers.subject.TopicRecordNameStrategy
specific:
avro:
reader: true
私の消費者コード:
@ApplicationScoped
public class Consumer {
@Incoming("test-in")
public CompletionStage<Void> store(KafkaRecord<Key,SpecificRecord> data ){
String schemaFullName = data.getPayload().getSchema().getFullName();
System.out.println(schemaFullName);
// other consumer code
return data.ack();
}
}
コンシューマーがメッセージを逆シリアル化できない場合、消費プロセスはブロックされ、代わりにメッセージをデッド レターに移動して続行します。逆シリアル化エラーが生成されないnack
ため、メッセージをデッドレターに移動できなかったと思います。
デッドレターのトピックにデシリアライズできないメッセージを移動する方法はありますか?