トポロジを構築していて、KStream.process()を使用して中間値をデータベースに書き込みたいと考えています。このステップはデータの性質を変更せず、完全にステートレスです。
プロセッサを追加するには、ProcessorSupplierを作成し、このインスタンスを状態ストアの名前とともに関数に渡す必要があります。これは私が理解していないものです。KStream.process()
StateStoreSupplierが必要なため、StateStoreオブジェクトをトポロジに追加する方法は?
上記の追加に失敗するとStateStore
、アプリケーションの起動時に次のエラーが発生します。
スレッド「メイン」org.apache.kafka.streams.errors.TopologyBuilderException の例外: 無効なトポロジの構築: StateStore my-state-store はまだ追加されていません。
プロセッサが状態ストアを持つ必要があるのはなぜですか? これは、ステートレスで状態を維持しないプロセッサの場合、オプションになる可能性があるようです。
Processor を適用して、このストリーム内のすべての要素を一度に 1 要素ずつ処理します。