問題タブ [apache-kafka-streams]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
apache-kafka-streams - Kafkaストリームの値としてHashMapを使用して状態ストアを作成するには?
String キー HashMap を値として持つ状態ストアを作成する必要があります。以下の2つの方法を試しました。
コードはエラーなしで正常にコンパイルされますが、実行時エラーが発生します
状態ストアを作成する正しい方法を教えてもらえますか?
apache-kafka-streams - Kafka ストリームに状態ストアを作成できません
Failed to lock the state directory: /tmp/kafka-streams/string-monitor/0_1
kafka ストリーム アプリケーションで状態ストアを作成しているときに、このエラーが発生します。アプリケーションの完全なスタック トレースは次のとおりです。
そして、以下のようにステートストアを作成します
これを修正する方法はありますか?
java - Kafka KStream - ウィンドウでの AbstractProcessor の使用
KStream からの出力のウィンドウ化されたバッチをグループ化し、それらをセカンダリ ストアに書き込むことを望んでいます。
.punctuate()
おおよそ30秒ごとに呼び出されることを期待していました。代わりに得たものはここに保存されます。
(元のファイルは数千行の長さでした)
要約 -.punctuate()
一見ランダムに、そして繰り返し呼び出されています。ProcessorContext.schedule()を介して設定された値に準拠していないようです。
編集:
同じコードをもう一度実行すると、.punctuate()
約 4 分ごとに への呼び出しが生成されました。今回はクレイジーな繰り返し値は見られませんでした。ソースに変更はありません。結果が異なるだけです。
次のコードを使用します。
主要
プロセッサ
プロセッサーサプライヤー
編集:
私のデバッガーがこれを遅くしていないことを確認するために、私はそれをビルドし、私のkafkaプロセスと同じボックスで実行しました. 今回は、4 分以上遅れることさえありませんでした。数秒以内に、偽の呼び出しを出力していました.punctuate()
。これらの多く (ほとんど) には、 への呼び出しが介在していません.process()
。
java - Kafka KTable - マシン間で共有される集約
多数のパーティションを持つトピックがあるとします。そこに K/V データを書き込んでおり、そのデータをタンブリング ウィンドウにキーで集約したいと考えています。
パーティションと同じ数のワーカー インスタンスを起動し、各ワーカー インスタンスが別のマシンで実行されているとします。
結果の集計に各キーのすべての値が含まれるようにするにはどうすればよいですか? IE 各ワーカーインスタンスに値のサブセットを持たせたくありません。
これはStateStoreが使用されるものですか? Kafka はこれを独自に管理しますか、それとも方法を考え出す必要がありますか?
java - Kafka KStream - トポロジ設計
私のストリームは、「生」として 60 秒の集計でデータベースに保存したいキーと値のペアです。もともと私はこれをやっていました:
しかし、それから私はそれを知りました
を。.aggregateby()
一致するペアのみを返します(一致するかどうかにかかわらず、すべてが必要です)
b. フェーズでHashMapを使用して、同じ集計効果を実現できました。.process()
次に.punctuate()
が呼び出されると、すべての k/v ペアをデータベースに書き込みます。
したがって、結果のトポロジは次のようになります。
kStreamBuilder.stream->foreach
kStreamBuilder.stream->process
質問:
- これは、一致するすべての kv ペアを書き込む結果を達成するための「合理的な」方法ですか? ( foreachおよび任意のペアを介したすべての値+ processを介した残り)
- 元のストリームを送信する前に(どういうわけか)分割する必要がありますか
.foreach()
、.process()
または上記を実行するだけで十分ですか?