0

Spring Cloud を介してマップされた 2 つのストリーム プロセッサを備えた SpringBoot アプリケーションがあります。各プロセッサには、さまざまなトピック用の独自の @StreamListener があります。1 つのプロセッサが集計データを quarable ステート ストアに書き込みます。@Service (サービスは状態ストアから集計データを取得します) を介してデータを取得するときに、単体テストで問題に直面しています。何らかの理由で時々例外をキャッチします:

org.apache.kafka.streams.errors.InvalidStateStoreException: the state store, recently-played-store, may have migrated to another instance. at org.apache.kafka.streams.state.internals.QueryableStoreProvider.getStore(QueryableStoreProvider.java:60) at org.apache.kafka.streams.KafkaStreams.store(KafkaStreams.java:1043) at org.springframework.cloud.stream.binder.kafka.streams.QueryableStoreRegistry.getQueryableStoreType(QueryableStoreRegistry.java:47)

別のプロセッサから StreamListener を削除すると、すべて正常に安定して動作します。

適切なプロセッサで正確なインスタンスの状態ストアをバインドする方法は?

4

1 に答える 1

0

私は自分の問題の解決策を見つけました。おそらく他の人を助けるでしょう。spring-cloud-stream-binder-kafka-streams の最新バージョンではありません。私のバージョンは 2.0.0 で、バグhttps://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/366があります。バージョン 2.0.1 でのみ修正されました。

于 2018-09-13T13:52:16.387 に答える