問題タブ [apache-kafka-streams]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
json - JSON から Avro への Kafka ストリーム
Kafka Stream を使用して、文字列/JSON メッセージを含むトピックを Avro メッセージとして別のトピックに変換しようとしています。
ストリームの主な方法:
変身:
そして、次のような例外を取得します。
これは正しいアプローチですか?Kafka Streams と Avro は初めてです
apache-kafka - Kafka は、単一のログ イベント行を結合されたログ イベントに集約します
ログイベントの処理に Kafka を使用しています。単純なコネクタとストリーム変換に関する Kafka Connect と Kafka Streams の基本的な知識があります。
これで、次の構造のログ ファイルが作成されました。
ログ イベントには、event_id で接続された複数のログ行があります (メール ログなど)。
例:
一般に、複数のイベントがあります。
例:
時間枠 (START と END の間) は最大 5 分です。
結果として、次のようなトピックが必要です
例:
これを達成するための適切なツールは何ですか? Kafka Streams で解決しようとしましたが、方法を理解できます..
apache-kafka - Kafka ストリーム - メトリクス レジストリからのデータへのアクセス
Kafka Streams メトリック レジストリ内のデータにアクセスする方法に関するドキュメントを見つけるのに苦労しています。丸い穴に四角いペグをはめようとしているのかもしれません。以下の点についてアドバイスをいただければと思います。
ゴール
Kafka Streams メトリクス レジストリに記録されているメトリクスを収集し、これらの値を任意のエンドポイントに送信します
ワークフロー
これは私が実行する必要があると思うことであり、最後のステップを除くすべてのステップを完了しました (メトリクス レジストリが非公開であるため、このステップで問題が発生しました)。しかし、私はこれについて間違った方法で行っているかもしれません:
- インターフェイスを実装するクラスを定義し
MetricReporter
ます。Kafka がメソッドで作成するメトリックのリストを作成しますmetricChange
(たとえば、このメソッドが呼び出されるたびに、現在登録されているメトリックでハッシュマップを更新します)。 metric.reporters
構成プロパティでこのクラスを指定します- 現在のデータについて Kafka Streams メトリクス レジストリをポーリングするプロセスを設定し、値を任意のエンド ポイントに送信します。
いずれにせよ、メトリクス レジストリが公開されていないため、 Kafka 0.10.0.1では最後の手順は実行できないようです。正しいワークフローであるかどうか (そうではないように聞こえます..)、または Kafka Streams メトリックを抽出するプロセスを誤解している場合は、これを教えてください。
apache-spark - DSMS、ストーム、フリンクの違い
DSMS はデータ ストリーム管理システムに対応します。これらのシステムにより、ユーザーは、ユーザーによって削除されるまで継続的に実行されるクエリを送信できます。
Storm や Flink などのシステムは DSMS と見なすことができますか、それともより一般的なものですか?
ありがとう
java - 複数の同一の Kafka Streams トピックのマージ
ソースの 1 つに障害が発生した場合に高可用性を確保できるように、2 つの Kafka トピックが異なるソースからまったく同じコンテンツをストリーミングしています。Kafka Streams 0.10.1.0 を使用して 2 つのトピックを 1 つの出力トピックにマージしようとしています。これにより、失敗時にメッセージを見逃さず、すべてのソースが稼働しているときに重複がなくなります。
KStreamのメソッドを使用するleftJoin
と、トピックの 1 つ (セカンダリ トピック) は問題なくダウンできますが、プライマリ トピックがダウンすると、出力トピックには何も送信されません。これは、Kafka Streams 開発者ガイドによると、
KStream-KStream leftJoin は、常にプライマリ ストリームから到着するレコードによって駆動されます。
そのため、プライマリ ストリームからのレコードがない場合は、セカンダリ ストリームのレコードが存在しても使用しません。プライマリ ストリームがオンラインに戻ると、出力は正常に再開されます。
また、(重複レコードを追加する)を使用してouterJoin
から、重複を取り除くために KTable および groupByKey への変換を試みました。
しかし、私はまだ時々重複を取得します。commit.interval.ms=200
また、KTable を取得して出力ストリームに十分な頻度で送信するためにも使用しています。
複数の同一の入力トピックから正確に 1 回の出力を取得するために、このマージにアプローチする最良の方法は何でしょうか?
java - KStream でオフセット値を取得するにはどうすればよいですか
Kafka Streams で PoC を開発しています。次に、ストリーム コンシューマーでオフセット値を取得し、それを使用して(topic-offset)->hash
各メッセージの一意のキーを生成する必要があります。その理由は、プロデューサーが syslog であり、ID を持つプロデューサーはごくわずかだからです。再処理の場合は同じキーを再生成する必要があるため、コンシューマーで UUID を生成できません。
私の問題:org.apache.kafka.streams.processor.ProcessorContext
クラス.offset()
は値を返すメソッドを公開していますが、プロセッサの代わりに KStream を使用しており、同じものを返すメソッドが見つかりませんでした。
KStream から各行の消費者の値を抽出する方法を知っている人はいますか? 前もって感謝します。