問題タブ [apache-kafka-streams]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票する
1 に答える
3577 参照

java - Kafka KStreams - スレッドの追加方法 / StreamsConfig.NUM_STREAM_THREADS_CONFIG の使用

私はこのパラメーターをいじっていて、奇妙なことに遭遇しました。私のアプリはそれがなくても問題なく動作しますが、次の行を構成に追加すると:

CPU 使用率がゼロを超えることはありません。アプリは何もしていないようです。エラーなし。

KStreams アプリのスレッド使用量を増やすための推奨される方法はありますか? それとも、ただ「力を信じて」、すべてを一緒に実行させますか?


編集:

  1. 私は2つのパーティションを持っています
  2. 消費者の遅れを確認しましたkafka-consumer-groups- 利用可能な多くの記録
  3. パーティションが 1 つしかない場合でも、複数のスレッドを使用しても何も起こらないのはなぜでしょうか。0% CPU。
0 投票する
2 に答える
1255 参照

apache-kafka - Kafka ストリーム reduceByKey と leftJoin

一見すると、 をKStream#reduceByKey使用すると と同じ機能を実現できるように思えKStream to KTable leftJoinます。つまり、同じキーを持つレコードを結合します。パフォーマンスの面でも、2つの違いは何ですか?

0 投票する
1 に答える
476 参照

java - kafka KStream - n 秒カウントを取るトポロジ

いくつかの値のハッシュをキーとする JSON オブジェクトのストリームがあります。n 秒 (10? 60?) 間隔でキーごとにカウントし、これらの値を使用してパターン分析を行うことを望んでいます。

私のトポロジ:K->aggregateByKey(n seconds)->process()

ステップで、process - init()私は呼び出さProcessorContent.schedule(60 * 1000L)れることを期待して.punctuate()呼び出しました。ここから、内部ハッシュの値をループして、それに応じて動作します。

値が集計ステップを通過してprocess()関数にヒットするのを見ていますが、.punctuate()呼び出されることはありません。


コード:

AggregateInit()は null を返します。

単純なタイマーで同等のことができると思います.punctuate()が、このコードが期待どおりに機能しない理由を知りたいです。

0 投票する
3 に答える
3256 参照

java - Kafka - TimestampExtractor の問題

私が使うorg.apache.kafka:kafka-streams:0.10.0.1

KStream.Process()トリガーする(「句読点」) をトリガーしていないように見える時系列ベースのストリームを操作しようとしています。(参考までにこちらをご覧ください)

構成ではKafkaStreams、このパラメーターを渡しています(特に):

ここでは、 JSON データからタイムスタンプ情報を抽出EventTimeExtractorするカスタム タイムスタンプ エクストラクタ ( を実装) を示します。org.apache.kafka.streams.processor.TimestampExtractor

新しいレコードが取り込まれるたびに、これが私のオブジェクト (から派生TimestampExtractor) を呼び出すことを期待します。問題のストリームは 2 * 10^6 レコード/分です。punctuate()60 秒に設定しましたが、起動しません。古い値を引っ張って追いつくため、データがこのスパンを非常に頻繁に通過することはわかっています。

実際、まったく呼び出されません。

  • これは、KStream レコードにタイムスタンプを設定するための間違ったアプローチですか?
  • これは、この構成を宣言する間違った方法ですか?
0 投票する
2 に答える
6966 参照

apache-kafka - Kafka の複数パーティションの順序付け

Kafka で複数のパーティションを注文することはできず、パーティションの注文はグループ内の単一のコンシューマー (単一のパーティション) に対してのみ保証されることを認識しています。しかし、Kafka Streams 0.10 でこれを達成できるようになりましたか? タイムスタンプ機能を使用して各パーティションの各メッセージがコンシューマ側で順序を維持する場合、Kafka Streams 0.10 でこれが可能になったとしましょう。すべてのメッセージを受け取ったと仮定すると、消費されたタイムスタンプに基づいてすべてのパーティションをソートし、それらを別のトピックに転送して消費することはできないでしょうか?

現時点では、順序を維持する必要がありますが、これは、1 つのコンシューマー スレッドを持つ 1 つのパーティションを持つことを意味します。これを複数のパーティションに変更して並列処理を増やしたいと思っていましたが、どういうわけか「それらを順番に取得」しました。

何かご意見は?ありがとうございました。

0 投票する
1 に答える
491 参照

apache-kafka - AdminUtils.createTopic API が kafka.admin.AdminOperationException をスローする

Windows で Confluent 3.0.1 プラットフォームを使用しています。インストール ガイドと開発者ガイドに従って、すべてのインストールを行い、トポロジを開発しました。

Zookeeper、次に Kafka サーバーを開始し、トポロジーを実行しようとしました。しかし、Kafkaサーバーでエラーを下回っています。トピックを手動で作成してトポロジを実行しても、同じエラーが表示されます。

そして、私のトポロジコードは以下の通りです:

以下は、別のJavaソースファイルの一部である私が使用しているプロパティです。

0 投票する
1 に答える
883 参照

apache-kafka - Kafka Streams の「マップ側」は、辞書ルックアップのように結合します

この質問は、 Kafka Streams with lookup data on HDFSのフォローアップです。小さな辞書データをメインの Kafka ストリームに (「マップ側」結合のように) 結合する必要があります

私の知る限り、Kafka Stream インスタンスは常にトピックの特定のパーティションで動作します。検索を行うには、結合キーの両方のストリームを再分割して、関連するレコードをまとめる必要がありました。

複数のルックアップ データをチェックする必要がある場合、何度もパーティションを再分割するコストはいくらですか? ルックアップ データセット全体を各パーティションに送信することはできないためKTable、ルックアップ トピックから作成すると、すべての Kafka Stream アプリケーション インスタンスにデータ セット全体が表示されます。したがって、私がKStream#transform()持っているすべてのルックアップ データを使用して、ローカルの RocksDB ストアを取得するメソッドでルックアップを実行できます。

どのオプションがより適切か疑問に思っています:

  • トピックの各パーティションに同じデータ (データ セット全体) を挿入し、 でルックアップを実行しますKStream#transform。トピックが過剰に分割されている場合、多くの重複データが発生しますが、小さなデータセットの場合、これは問題になりません。

  • ルックアップ (結合) を実行できるように、DSL API を使用して両方のストリームを再分割します。ここでのパフォーマンスの意味は何ですか?