問題タブ [apache-kafka-streams]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
apache-kafka - コンシューマ グループの Kafka ストリーム オフセットをゼロにリセット
いくつかの条件に基づいて行をフィルタリングし、MongoDB にロードするだけの Kafka Streaming アプリを作成しました。
ストリーミング プロセスは問題なく動作していますが、コードに問題があるため、データ全体を再処理したいと考えています。
1 つの方法は、ストリーミング アプリを強制終了し、コンシューマー グループ ID を変更し、mongo からデータを削除して、アプリを再実行することです。
コンシューマ グループ ID を変更せずにこのシナリオを実現する方法。
<< Kafka 0.10 バージョンを使用しています >>
どうもありがとうパリ
apache-kafka - 時間枠付きKTableの最終的なkafka-streams集計結果を送信するには?
私がやりたいことはこれです:
- 数値トピックからレコードを消費する (Long's)
- 5 秒のウィンドウごとに値を集計 (カウント) します。
- FINAL 集計結果を別のトピックに送信する
私のコードは次のようになります。
すべてが期待どおりに機能しているように見えますが、集計は着信レコードごとに宛先トピックに送信されます。私の質問は、各ウィンドウの最終集計結果のみを送信するにはどうすればよいですか?
apache-kafka-streams - Apache Kafka Streams の特定のパーティションでの集計
SensorData
2 つのセンサー S1 と S2 がデータ (タイムスタンプと値) を 2 つの異なるパーティション (S1 -> P1 と S2 -> P2 など) に送信するという名前の Kafka トピックがあるとします。ここで、これら 2 つのセンサーの値を個別に集計する必要があります。たとえば、1 時間の時間枠でセンサーの平均値を計算し、それを新しいトピックに書き込むとしますSensorData1Hour
。このシナリオで
KStreamBuilder#stream
メソッドを使用して特定のトピック パーティションを選択するにはどうすればよいですか?- 同じトピックの 2 つの (複数の) 異なるパーティションに集計関数を適用することは可能ですか?
java - kafkastreams - 処理能力を追加する
既存のFlinkアプリケーション/トポロジをKafkaStreamsを使用するように変換する POC に取り組んでいます。私の質問は展開についてです。
具体的には - Flink では、「ワーカー ノード」を flink のインストールに追加し、トポロジーにさらに並列化を追加して、増加するデータ レートに対応します。
データレートの増加に伴い、KStreams の容量を増やすにはどうすればよいですか? KStreams はこれを自動的に処理しますか? より多くのプロセス (マイクロサービス) を起動する必要がありますか?
それとも、ここで全体像を見逃していますか?
apache-kafka - Kafka ストリーム プロセッサ コンテキストでの定期的な NPE
kafka-streams 0.10.0.0 を使用すると、メッセージの転送時に StreamTask で null ポインター例外が定期的に発生します。呼び出しの 10% から 50% の間で変化します。NPE は次の方法で発生します。
場合によっては、thisNodeフィールドが null になっているようです。これを引き起こしている可能性のある考えはありますか?スタックトレースは以下です。
java - ステートレス プロセッサを登録する方法 (StateStore も必要なようです)。
トポロジを構築していて、KStream.process()を使用して中間値をデータベースに書き込みたいと考えています。このステップはデータの性質を変更せず、完全にステートレスです。
プロセッサを追加するには、ProcessorSupplierを作成し、このインスタンスを状態ストアの名前とともに関数に渡す必要があります。これは私が理解していないものです。KStream.process()
StateStoreSupplierが必要なため、StateStoreオブジェクトをトポロジに追加する方法は?
上記の追加に失敗するとStateStore
、アプリケーションの起動時に次のエラーが発生します。
スレッド「メイン」org.apache.kafka.streams.errors.TopologyBuilderException の例外: 無効なトポロジの構築: StateStore my-state-store はまだ追加されていません。
プロセッサが状態ストアを持つ必要があるのはなぜですか? これは、ステートレスで状態を維持しないプロセッサの場合、オプションになる可能性があるようです。
Processor を適用して、このストリーム内のすべての要素を一度に 1 要素ずつ処理します。