Spring Cloud を介してマップされた 2 つのストリーム プロセッサを備えた SpringBoot アプリケーションがあります。各プロセッサには、さまざまなトピック用の独自の @StreamListener があります。1 つのプロセッサが集計データを quarable ステート ストアに書き込みます。@Service (サービスは状態ストアから集計データを取得します) を介してデータを取得するときに、単体テストで問題に直面しています。何らかの理由で時々例外をキャッチします:
org.apache.kafka.streams.errors.InvalidStateStoreException: the state store, recently-played-store, may have migrated to another instance.
at org.apache.kafka.streams.state.internals.QueryableStoreProvider.getStore(QueryableStoreProvider.java:60)
at org.apache.kafka.streams.KafkaStreams.store(KafkaStreams.java:1043)
at org.springframework.cloud.stream.binder.kafka.streams.QueryableStoreRegistry.getQueryableStoreType(QueryableStoreRegistry.java:47)
別のプロセッサから StreamListener を削除すると、すべて正常に安定して動作します。
適切なプロセッサで正確なインスタンスの状態ストアをバインドする方法は?