問題タブ [apache-kafka-streams]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票する
1 に答える
3839 参照

apache-kafka-streams - Kafkaストリームの値としてHashMapを使用して状態ストアを作成するには?

String キー HashMap を値として持つ状態ストアを作成する必要があります。以下の2つの方法を試しました。

コードはエラーなしで正常にコンパイルされますが、実行時エラーが発生します

状態ストアを作成する正しい方法を教えてもらえますか?

0 投票する
3 に答える
9291 参照

apache-kafka-streams - Kafka ストリームに状態ストアを作成できません

Failed to lock the state directory: /tmp/kafka-streams/string-monitor/0_1kafka ストリーム アプリケーションで状態ストアを作成しているときに、このエラーが発生します。アプリケーションの完全なスタック トレースは次のとおりです。

そして、以下のようにステートストアを作成します

これを修正する方法はありますか?

0 投票する
3 に答える
1746 参照

java - Kafka KStream - ウィンドウでの AbstractProcessor の使用

KStream からの出力のウィンドウ化されたバッチをグループ化し、それらをセカンダリ ストアに書き込むことを望んでいます。

.punctuate()おおよそ30秒ごとに呼び出されることを期待していました。代わりに得たものはここに保存されます。

(元のファイルは数千行の長さでした)

要約 -.punctuate()一見ランダムに、そして繰り返し呼び出されています。ProcessorContext.schedule()を介して設定された値に準拠していないようです。


編集:

同じコードをもう一度実行すると、.punctuate()約 4 分ごとに への呼び出しが生成されました。今回はクレイジーな繰り返し値は見られませんでした。ソースに変更はありません。結果が異なるだけです。

次のコードを使用します。

主要

プロセッサ

プロセッサーサプライヤー


編集:

私のデバッガーがこれを遅くしていないことを確認するために、私はそれをビルドし、私のkafkaプロセスと同じボックスで実行しました. 今回は、4 分以上遅れることさえありませんでした。数秒以内に、偽の呼び出しを出力していました.punctuate()。これらの多く (ほとんど) には、 への呼び出しが介在していません.process()

0 投票する
2 に答える
987 参照

java - Kafka KTable - マシン間で共有される集約

多数のパーティションを持つトピックがあるとします。そこに K/V データを書き込んでおり、そのデータをタンブリング ウィンドウにキーで集約したいと考えています。

パーティションと同じ数のワーカー インスタンスを起動し、各ワーカー インスタンスが別のマシンで実行されているとします。

結果の集計に各キーのすべての値が含まれるようにするにはどうすればよいですか? IE 各ワーカーインスタンスに値のサブセットを持たせたくありません。

これはStateStoreが使用されるものですか? Kafka はこれを独自に管理しますか、それとも方法を考え出す必要がありますか?

0 投票する
1 に答える
463 参照

java - Kafka KStream - トポロジ設計

私のストリームは、「生」として 60 秒の集計でデータベースに保存したいキーと値のペアです。もともと私はこれをやっていました:

しかし、それから私はそれを知りました

を。.aggregateby()一致するペアのみを返します(一致するかどうかにかかわらず、すべてが必要です)
b. フェーズでHashMapを使用して、同じ集計効果を実現できました。.process()次に.punctuate()が呼び出されると、すべての k/v ペアをデータベースに書き込みます。

したがって、結果のトポロジは次のようになります。

kStreamBuilder.stream->foreach
kStreamBuilder.stream->process

質問:

  1. これは、一致するすべての kv ペアを書き込む結果を達成するための「合理的な」方法ですか? ( foreachおよび任意のペアを介したすべての値+ processを介した残り)
  2. 元のストリームを送信する前に(どういうわけか)分割する必要がありますか.foreach().process()または上記を実行するだけで十分ですか?