問題タブ [apache-kafka-streams]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
java - KStream - Windowed カウントを新しい文字列、文字列トピックに変換する
(ウィンドウ化) に実行さ.countByKey()
れ、.forEach()
.
ウィンドウ化された値を取得し、それらを開始値と終了値とペアにして、これを新しいストリームに入れたいと思います。「従来の」プロデューサーでこれを行うことができますが、すべての作業を 1 つのアプリに保持したいと考えています (新しいトピックの値を処理する 2 つ目のアプリを用意するのではなく)。
はKTable<Windowed<String>, Long>.forEach()
として入ってきますが、これを (またはを介して) KStream<String, String>トピックにチェーンする明白な演算子は見当たりません。.to()
.through()
これは可能ですか?質問に意味はありますか?
java - Kafka KStream - 大幅な起動遅延
KStreamsベースのアプリケーションで問題が発生しています。アプリケーションは一度実行され、停止/再起動すると「スタック」し、作成したさまざまなトピックを削除するまで進行しません。これは毎回ではありませんが、頻繁に発生します。
通常、これは、新しい (er) バージョンを作業用 VM にコピーするときに発生します (速度上の理由から、kafka クラスターと同じサブネット内にあります)。
それがくさびになったとき、私は見るでしょう。
- "接続":
org.apache.zookeeper.ZooKeeper - Initiating client connection
- "クライアント":
[StreamThread-1] INFO o.a.k.s.p.internals.StreamTask - Creating restoration consumer client
- "Ping" : これらが表示され、アプリが正常にシャットダウンされません。それは殺されなければなりません。
これらのケースのいずれにおいても、通常、メッセージは無期限に繰り返されます (まあ、少なくとも昼食と会議の間ずっと。つまり、長すぎます)。
これが発生する前に、アプリは「正常に」シャットダウンしています。
私は何を間違っていますか?
編集:
この最新の時間-20分後、一連のエラーが発生しました:
org.apache.kafka.common.errors.TimeoutException: ブローカーからのメタデータの要求中にタイムアウトが発生したため、101 レコードを含むバッチが期限切れになりました
に続く:
org.apache.kafka.clients.consumer.CommitFailedException: グループがすでに再調整され、パーティションが別のメンバーに割り当てられているため、コミットを完了できません
-->他にメンバーがいないので、これは良いトリックです。
siddhi - Kafka Streams に対して Siddhi CEP ライブラリ実行プランを実行できますか?
KafkaStreams で Siddhi CEP ライブラリを実行したいのですが、既に独自の「ストリーム」の概念があるようです。KafkaStreams をプラグインして、代わりに KafkaStreams で実行される Siddhi 実行プランを有効にするにはどうすればよいですか?
Diliniのコメントに対処する編集>>
参照先: Kafka Streams ドキュメント
Kafka Streams は現在、私が見つけた CEP フレームワークと緊密に統合されていません。たとえば、Apache Flink には独自の Flink CEP があります。したがって、私は Siddhi CEP を Kafka Streams 上の抽象化レイヤーとして使用し、2 つのテクノロジーを緊密に統合することを想定しています。例えば、
- Siddhi CEP ストリームを作成すると、Kafka トピックと Kafka ストリームが自動的に作成され、そのトピックに投稿されたイベントがストリーミングされます。
- Siddhi CEP 出力ストリームを作成すると、Kafka トピックが作成されて公開されます
- Siddhi CEP の「イベント テーブル」は、ローカルの Kafka Streams の「状態ストア」または Kafka トピックのいずれかである可能性があります。これらは本質的に複製され、分割されたデータ ストアであるためです。これは、RDBMS やインメモリ イベント テーブルなどを使用する代わりに使用されます。
- Siddhi CEP Execution Plans の複雑なイベント クエリは、map()、filter() を実行し、Kafka Streams に参加してパターンなどをチェックする Kafka Streams Processor API に転送されます。
特に環境にすでに Kafka ストリームがある場合、複数の「ストリーム」フレームワークが必要な理由を理解しようとしています。
complex-event-processing - リアルタイムのルール編集と KafkaStreams
「ビジネス」ユーザーが入力ストリームから取り込まれたデータのルールを作成および編集できるようにする必要があります。ルールはビジネスに適したものでなければならず、開発サイクルがあってはなりません。これが私が考えていることの例です:
ウェザー ステーションとコネクテッド ビークルのストリーミング データが取り込まれるので、このルールを適用する必要がありますが、ルールが変わり、detectedPctWipersOn = 75% になった場合、ルールを再デプロイまたは再起動せずにリアルタイムで適用する必要があります。データ駆動型である必要があります。
Siddhi CEP (WSO2 CEP)は、「デプロイされた」ルールのみをサポートしているようです。特に KafkaStreams とうまく連携する場合、私が必要とするものを満たす他の CEP 製品はありますか?
自作する必要がある場合、この問題で推奨される「ストリーム」設計パターンはありますか?
apache-kafka - Kafka Streams を使用してストリーム データを集約する
次のようなコードで Kafka にメッセージを生成しています。
Kafka Streams (0.10.0.1) で過去 1 時間以内の合計メッセージ数をカウントしたい。私はそれを試しました:
私はKafka/Streamsが初めてです。どうすればいいですか?
apache-kafka - Spark Kafka Stream に対する Flink Kafka Stream の利点は? そして、Flink を介した Kafka Stream?
スパーク ストリームでは、ほぼリアルタイムのマイクロバッチ処理のバッチ間隔を設定します。Flink (DataStream) や Storm では、ストリームはリアルタイムなので、バッチ間隔という概念はないと思います。
Kafka では、コンシューマーがプルしています。Spark はバッチ間隔パラメーターを使用して Kafka ブローカーからメッセージをプルすると思いますが、Flink と Storm はどのようにそれを行うのでしょうか? Flink と Storm が高速ループで Kafka メッセージをプルしてリアルタイム ストリーム ソースを形成すると想像します。そうであれば、Spark のバッチ間隔を 100 ミリ秒、50 ミリ秒、またはさらに小さく設定した場合、Spark 間に大きな違いはありますか?ストリーミングとフリンクまたはストーム?
一方、Spark では、ストリーミング データが大きく、バッチ間隔が小さすぎる場合、処理待ちのデータが大量にあるという状況に遭遇する可能性があり、その結果、OutOfMemmory が発生することがわかります。Flink または Storm で発生しますか?
トピックからトピックへの変換を行うアプリケーションを実装しました。変換は簡単ですが、ソース データが膨大になる可能性があります (IoT アプリと考えると)。私のオリジナルの実装はreact-kafkaに支えられており、スタンドアロンの Scala/Akka アプリで問題なく動作します。必要に応じて Flink/Storm/Spark が既に存在するため、クラスター化するアプリケーションを実装しませんでした。次に、Kafka Stream を見つけました。私にとっては、クライアントの使用状況の観点からは、reactive-akka に似ています。では、スタンドアロン アプリケーションまたはマイクロサービスで Kafka ストリームまたはリアクティブ カフカを使用する場合、クライアント コードの信頼性/可用性について考慮する必要がありますか?
apache-kafka-streams - カスタム StateStore を Kafka Streams DSL プロセッサに追加する方法は?
私の Kafka ストリーム アプリの 1 つで、DSL と Processor API の両方の機能を使用する必要があります。私のストリーミングアプリの流れは
集約後、SINGLE 集約メッセージをシンクに送信する必要があります。だから私は以下のように私のトポロジを定義します
カスタムを定義し、StateStore
以下のようにプロセッサに登録します
アプリを実行すると、java.lang.NullPointerException
org.apache.kafka.streams.processor.internals.ProcessorStateManager の org.apache.kafka.streams.state.internals.MeteredKeyValueStore.flush(MeteredKeyValueStore.java:167) でのスレッド "StreamThread-18" java.lang.NullPointerException の例外org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:252) の .flush(ProcessorStateManager.java:332) org.apache.kafka.streams.processor.internals.StreamThread.commitOne(StreamThread) .java:446) org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:434) で org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:422) で) org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:340) で org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:218)
ここで何がうまくいかないのですか?
apache-kafka - Kafka Stream StateStore はすべてのインスタンスでグローバルですか、それとも単にローカルですか?
Kafka StreamWordCount
の例では、StateStore
単語数を格納するために使用されます。同じコンシューマ グループに複数のインスタンスがある場合、StateStore
はグループに対してグローバルですか、それともコンシューマ インスタンスに対してローカルですか?
タナクス