問題タブ [apache-kafka-streams]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票する
1 に答える
1685 参照

apache-kafka - StreamsException: 抽出されたタイムスタンプ値が負であるため、許可されていません

これは、 Kafka-node を使用した Kafka ストリームのエラー - 負のタイムスタンプ の複製である可能性がありますが、そうではありません。私の Kafka Streams アプリは、各メッセージに対していくつかの変換ロジックを実行し、それを新しいトピックに転送します。アプリには時間ベースの集計/処理がないため、カスタム タイムスタンプ エクストラクタを使用する必要はありません。このアプリは数日間正常に動作していましたが、突然アプリが負のタイムスタンプ例外をスローしました。

すべての StreamThreads (合計 10 個) からこの例外をスローした後、数時間ストリームでそれ以上の進行がなかったため、アプリは一種のフリーズしました。その後、例外はスローされませんでした。アプリを再起動すると、新着メッセージのみの処理が開始されました。

問題は、間に入ったメッセージに何が起こったのかです (例外をスローした後、アプリを再起動する前に)。欠落しているメッセージにタイムスタンプが埋め込まれていなかった場合 (ブローカーとプロデューサーで変更が発生していないため、非常に不可能です)、そのようなメッセージごとにアプリが例外をスローする必要があったのではないでしょうか? または、最初にメッセージで負のタイムスタンプを検出したときに、アプリがストリームの進行を停止するようなものではありませんか? 負のタイムスタンプが検出された後でもアプリがストリームを進行できるように、この状況を処理する方法はありますか?私のアプリは Kafka Streams ライブラリ バージョン 0.10.0.1-cp1 を使用しています。

注: 各メッセージの負のタイムスタンプをチェックできるカスタム タイムスタンプ エクストラクタを簡単に作成できますが、それは私のアプリにとって不要なオーバーヘッドです。私が理解したいのは、負のタイムスタンプを持つメッセージを検出した後、ストリームが進行しなかった理由だけです。

0 投票する
1 に答える
1341 参照

kafka-consumer-api - Kafka-Stream を使用して間隔で Kafka からレコードを読み取る方法

Kafka-Stream コンシューマーで使用して Kafka からレコードを読み取りたいのですが、指定された間隔ごとにレコードを読み取るオプションがありますか? たとえば、1分ごとですか?

0 投票する
0 に答える
4322 参照

java - Kafka Streams DSL ウィンドウを使用して Java オブジェクトをリストに集約する

Kafka Streams DSL の最も単純な使用例があります。CSV センサーデータを読み取り、タイムスタンプでグループ化し、出力します。次のコードはコンパイルされません。

のため

エラー:(90, 45) java: シンボルが見つかりません symbol: メソッド add(java.lang.Object) 場所: タイプ java.lang.Object の変数リスト

変。

コメントとしてキャストすると、次のランタイム例外が発生します (ウィンドウ化された累積を出力する直前)。

addのメソッドをデバッグするとSensorDataAccumulator、手がかりが得られるはずです。

ここに画像の説明を入力

だから、私が正しく理解していれば、私は を保持してArrayList list = new ArrayList<SensorData>();いますが、実際には、プロセスのどこかでメンバーが に変更されLinkedTreeMapます。タイプチェッカーはここで私を失いました...

これLinkedTreeMapは、GSON が myJsonDeserializer およびJsonSerializerクラスに使用する基礎となるデータ構造です。したがって、完全を期すためにこれらを以下に追加します。

現在、何が間違っているのか、どこを修正すればよいのかわかりません。異なるシリアライザー、異なるデータ構造を使用する必要がありますか? 別の言語 ;) ?

任意の入力を歓迎します。

0 投票する
2 に答える
3031 参照

apache-kafka - Kafka ストリーム - 2 つの ktables を結合すると、join 関数が 2 回呼び出されます

2 つの KTables に参加しようとしています。

マージ関数は非常に単純です。ある Bean から別の Bean に値をコピーするだけです。

しかし、何らかの理由で、結合関数が 1 つの生成されたレコードに対して 2 回呼び出されています。以下のストリーミング/プロデューサー構成を参照してください

プロデューサー構成 -

次に、ストリームごとに 1 つのレコードを送信しています。両方のレコードのキーは同じです。出力として単一のレコードを受け取ることを期待しています。

しかし、ValueJoiner は 2 回トリガーし、1 つではなく 2 つの同一の出力レコードを取得しています。トリガー時間中 - 両方のストリームからの両方の値が存在し、2 番目の実行をトリガーしているものを取得できません。

参加しないと、この動作を再現できません。2 ktable join の実例が見つからないため、私のアプローチの何が問題なのか理解できません。

同じ動作を示す簡単なコードを追加する

0 投票する
1 に答える
6614 参照

apache-kafka - KafkaStreams - InconsistentGroupProtocolException

次のように、Kafka Streams DSL を使用して Kafka クラスターに接続する Kafka Streams アプリがあります。

そして、コンシューマー クライアントを直接使用してクラスターへの接続を確立するコード ベースの別の部分。

これを行っている理由は、アプリの他の部分 (Kafka Streams トポロジーを含む) を条件付きで開始する前に、コンシューマー グループに関するメタ データを収集するためです。InconsistentGroupProtocolExceptionこれを行う方法はおそらく他にもありますが (たとえば、さまざまなフックを使用するなど)、これらのメソッドを混在させると (断続的に)スローされることがある理由についてもっと知りたいです。

なぜこれがスローされているのか、誰かが光を当てることができますか? ソースコード自体から正確に何が起こっているのかを判断するのに苦労していますが、Kafka Streams によって構築された基礎となるコンシューマーは、クライアントとは異なるパーティショニング プロトコルを指定していると思いKafkaConsumerます。とにかく、この例外を理解するための助けは大歓迎です