0

EOS (KS 1.1.0) に切り替えました。クリーンなローカル環境で、すべてのトピックに 2 つのパーティションがあります。ENABLE_IDEMPOTENCE_CONFIG=true を使用していくつかのメッセージを生成する

アプリを再起動すると、グローバル ステート ストアの復元が無限ループに入ります。

StateRestoreListener を実装し、onRestoreStart、onRestoreEnd、onBatchRestored にログを追加しました

ログに繰り返し表示されるのは次のとおりです: (トピック名は bu)

DEBUG | AppName-GlobalStreamThread | 2018-06-17 20:07:21,590 | org.apache.kafka.clients.NetworkClient | [Consumer clientId=AppName-global-restore-consumer, groupId=] Sending metadata request (type=MetadataRequest, topics=bu) to node jonathan:9092 (id: 0 rack: null)
DEBUG | AppName-GlobalStreamThread | 2018-06-17 20:07:21,591 | org.apache.kafka.clients.Metadata | Updated cluster metadata version 3 to Cluster(id = Kw31wtS9TYm__QEmSUDgdg, nodes = [jonathan:9092 (id: 0 rack: null)], partitions = [Partition(topic = bu, partition = 0, leader = 0, replicas = [0], isr = [0], offlineReplicas = []), Partition(topic = bu, partition = 1, leader = 0, replicas = [0], isr = [0], offlineReplicas = [])])
DEBUG | AppName-GlobalStreamThread | 2018-06-17 20:07:21,592 | org.apache.kafka.clients.consumer.internals.Fetcher | [Consumer clientId=AppName-global-restore-consumer, groupId=] Sending ListOffsetRequest (type=ListOffsetRequest, replicaId=-1, partitionTimestamps={bu-1=-1, bu-0=-1}, isolationLevel=READ_COMMITTED) to broker jonathan:9092 (id: 0 rack: null)
DEBUG | AppName-GlobalStreamThread | 2018-06-17 20:07:21,595 | org.apache.kafka.clients.consumer.internals.Fetcher | [Consumer clientId=AppName-global-restore-consumer, groupId=] Handling ListOffsetResponse response for bu-1. Fetched offset 12, timestamp -1
DEBUG | AppName-GlobalStreamThread | 2018-06-17 20:07:21,596 | org.apache.kafka.clients.consumer.internals.Fetcher | [Consumer clientId=AppName-global-restore-consumer, groupId=] Handling ListOffsetResponse response for bu-0. Fetched offset 16, timestamp -1
DEBUG | AppName-GlobalStreamThread | 2018-06-17 20:07:21,596 | org.apache.kafka.clients.consumer.KafkaConsumer | [Consumer clientId=AppName-global-restore-consumer, groupId=] Subscribed to partition(s): bu-1
DEBUG | AppName-GlobalStreamThread | 2018-06-17 20:07:21,596 | org.apache.kafka.clients.consumer.KafkaConsumer | [Consumer clientId=AppName-global-restore-consumer, groupId=] Seeking to offset 9 for partition bu-1


{"thread":"AppName-GlobalStreamThread","message":"[RESTORE:START] bu bu-1, startingOffset:9","endOfBatch":false}


DEBUG | AppName-GlobalStreamThread | 2018-06-17 20:07:21,640 | org.apache.kafka.common.metrics.Metrics | Added sensor with name topic.bu.bytes-fetched
DEBUG | AppName-GlobalStreamThread | 2018-06-17 20:07:21,642 | org.apache.kafka.common.metrics.Metrics | Added sensor with name topic.bu.records-fetched
DEBUG | AppName-GlobalStreamThread | 2018-06-17 20:07:21,642 | org.apache.kafka.common.metrics.Metrics | Added sensor with name bu-1.records-lag
DEBUG | AppName-GlobalStreamThread | 2018-06-17 20:07:21,643 | org.apache.kafka.clients.consumer.internals.Fetcher | [Consumer clientId=AppName-global-restore-consumer, groupId=] Added READ_COMMITTED fetch request for partition bu-1 at offset 12 to node jonathan:9092 (id: 0 rack: null)
DEBUG | AppName-GlobalStreamThread | 2018-06-17 20:07:21,643 | org.apache.kafka.clients.FetchSessionHandler | [Consumer clientId=AppName-global-restore-consumer, groupId=] Built incremental fetch (sessionId=1036463678, epoch=1) for node 0. Added 0 partition(s), altered 1 partition(s), removed 0 partition(s) out of 1 partition(s)
DEBUG | AppName-GlobalStreamThread | 2018-06-17 20:07:21,643 | org.apache.kafka.clients.consumer.internals.Fetcher | [Consumer clientId=AppName-global-restore-consumer, groupId=] Sending READ_COMMITTED IncrementalFetchRequest(toSend=(bu-1), toForget=(), implied=()) to broker jonathan:9092 (id: 0 rack: null)

{"thread":"AppName-GlobalStreamThread","message":"[RESTORE:BATCH] bu bu-1, batchEndOffset:12","endOfBatch":false}
{"thread":"AppName-GlobalStreamThread","message":"[RESTORE:END] bu bu-1, totalRestored:1","endOfBatch":false}


DEBUG | AppName-GlobalStreamThread | 2018-06-17 20:07:21,649 | org.apache.kafka.clients.consumer.KafkaConsumer | [Consumer clientId=AppName-global-restore-consumer, groupId=] Subscribed to partition(s): bu-0
DEBUG | AppName-GlobalStreamThread | 2018-06-17 20:07:21,649 | org.apache.kafka.common.metrics.Metrics | Removed sensor with name bu-1.records-lag
DEBUG | AppName-GlobalStreamThread | 2018-06-17 20:07:21,649 | org.apache.kafka.clients.consumer.KafkaConsumer | [Consumer clientId=AppName-global-restore-consumer, groupId=] Seeking to offset 15 for partition bu-0

{"thread":"AppName-GlobalStreamThread","message":"[RESTORE:START] bu bu-0, startingOffset:15","endOfBatch":false}
{"thread":"AppName-GlobalStreamThread","message":"[RESTORE:BATCH] bu bu-0, batchEndOffset:15","endOfBatch":false}
{"thread":"AppName-GlobalStreamThread","message":"[RESTORE:BATCH] bu bu-0, batchEndOffset:15","endOfBatch":false}
{"thread":"AppName-GlobalStreamThread","message":"[RESTORE:BATCH] bu bu-0, batchEndOffset:15","endOfBatch":false}
...
...

EOS とグローバル ステート ストアに関連する既知の問題が見つかりませんでした。これを見た人はいますか?

4

1 に答える 1

1

この関連する問題を見つけました: http://mail-archives.apache.org/mod_mbox/kafka-users/201711.mbox/%3CCAG53Ff=H8e_DeAKq7BOz_LdMtf2wD_SETM9PBonwZyBNJ2HZ3w@mail.gmail.com%3E

今のところ、グローバル テーブルをサポートするトピックにレコードを生成するアプリに対して EOS を有効にしていません。これにより、グローバル テーブルの復元時の無限ループが解決されます。

于 2018-06-18T06:20:20.710 に答える