問題タブ [apache-kafka-streams]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
apache-kafka-streams - KafkaStreams にシーケンスを格納する StateStoreSupplier
(外部結合を使用してマージされた) 2 つのトピックからのデータの順序を変更する必要があります。を使用しStateStore
て最新のシーケンスを保持し、再シーケンスされたメッセージでダウンストリーム ストリームの値を変更することをお勧めします。
単純化された問題:
(トピック A からの seq、トピック B からの seq) -> 出力への新しい seq (現在のシーケンスを に保持StateStore
)
(10,100) -> 1
(11,101) -> 2
(12,102) -> 3
(...,...) -> ...
新しいシーケンスは、キー「currentSeq」の値として stateStore に格納されます。シーケンスはメッセージごとにインクリメントされ、stateStore に保存されます。
apache-kafka - Kafka ストリーム - kafka-streams-application-reset.sh が間違った API バージョンを送信する
Kafka 0.10.0.1 は、kafka-streams-application-reset.sh というスクリプトを使用して Kafka Streams アプリをリセットする機能を追加します
Confluent には、このスクリプトに関する適切なドキュメントがいくつかあります。
残念ながら、スクリプトをローカルで実行するとエラーが発生します。
また、次の行がブローカー ログに表示されます。
私が知る限り、クライアントが無効なリクエストを行っているように見えますが、なぜそうなったのかはわかりません。当社のブローカーはまだ 0.9.0 を使用しているため、それが問題かどうかはわかりませんが、ブローカー ログの値に基づくメタ データ リクエストのようです。apiKey
誰かがなぜこれが起こっているのか、どうすれば解決できるのか教えてもらえますか?
apache-kafka - Kafka ストリーム: ストリームで最小値を見つける適切な方法
Kafka Streams バージョン 0.10.0.1 を使用しており、ストリームで最小値を見つけようとしています。
着信メッセージは kafka-streams-topic というトピックから来ており、キーがあり、値は次のような JSON ペイロードです。
これは単純なペイロードですが、この JSON の最小値を見つけたいと考えています。
発信メッセージは単なる数字です: 2334
キーもメッセージの一部です。
したがって、受信トピックが次のようになった場合:
min-topic という名前の発信トピックは、
別のメッセージが届きます:
これは同じキーであるため、最初のメッセージよりも小さいため、key=1 value=100 のメッセージを生成したいと考えています。
今、私たちが得たとしましょう:
次の場所に新しいメッセージが作成されます。
さらに、次のメッセージが表示された場合:
このメッセージは現在の値の 100 より大きいため、メッセージは生成されません。
これは機能しますが、これが API の意図に準拠しているかどうか疑問に思っています。
実際にプロセッサを実行するコードは次のとおりです。
apache-kafka-streams - ルックアップ データで KStream を強化する理想的な方法
私のストリームには「カテゴリ」という列があり、別のストアに「カテゴリ」ごとに追加の静的メタデータがあり、数日に 1 回更新されます。このルックアップを行う正しい方法は何ですか? Kafka ストリームには 2 つのオプションがあります
Kafka Streams の外部で静的データをロード
KStreams#map()
し、メタデータを追加するために使用します。Kafka Streams は単なるライブラリであるため、これが可能です。メタデータを Kafka トピックにロードし、それを a にロードして
KTable
do を実行しますKStreams#leftJoin()
。これはより自然に見え、パーティショニングなどは Kafka Streams に任せます。KTable
ただし、これにはすべての値をロードしたままにしておく必要があります。変更だけでなく、ルックアップ データ全体をロードする必要があることに注意してください。- たとえば、最初はカテゴリ「c1」が 1 つだけだったとします。Kafka ストリーム アプリは正常に停止され、再び再起動されました。再起動後、新しいカテゴリ「c2」が追加されました。私の推測では、table = KStreamBuilder().table('metadataTopic') の値は「c2」になるだけです。これは、アプリが 2 回目に開始されてから変更された唯一のものであるためです。「c1」と「c2」が必要です。
- 「c1」もある場合、データは KTable から削除されますか (おそらく、送信キー = null メッセージを設定することにより?)?
上記のうち、メタデータを検索する正しい方法はどれですか?
再起動時に常に 1 つのストリームのみを最初から読み取るように強制することは可能ですか?これは、すべてのメタデータを にロードできるようにするためKTable
です。
ストアを使用する別の方法はありますか?
apache-kafka - Apache Samza と Apache Kafka Streams の違い (並列処理と通信に注目)
Samza および Kafka Streams では、データ ストリーム処理は、処理ステップ (Samza では「ジョブ」、Kafka Streams では「プロセッサ」と呼ばれる) のシーケンス/グラフ (Samza では「データフロー グラフ」、Kafka Streams では「トポロジ」と呼ばれる) で実行されます。この質問の残りの部分では、これら 2 つの用語をワークフローとワーカーと呼びます。
非常に単純なワークフローがあるとします。これは、センサー測定値を消費し、50 未満のすべての値をフィルター処理するワーカー A と、残りの測定値を受け取り、80 を超えるすべての値をフィルター処理するワーカー B で構成されます。
入力 (Kakfa トピック X) --> (ワーカー A) --> (ワーカー B) --> 出力 (Kafka トピック Y)
分かっていたら
- http://samza.apache.org/learn/documentation/0.11/introduction/concepts.htmlおよび
- http://docs.confluent.io/3.1.1/streams/architecture.html#parallelism-model
正しくは、Samza と Kafka ストリームの両方がトピック パーティショニングの概念を使用してワークフロー/ワーカーを複製し、スケーラビリティのために処理を並列化します。
しかし:
Samza は、各ワーカー (つまり、ジョブ) を個別に複数のタスク (入力ストリームのパーティションごとに 1 つ) に複製します。つまり、タスクはワークフローのワーカーのレプリカです。
Kafka Streams は、ワークフロー全体 (つまり、トポロジ) を一度に複数のタスク (入力ストリームのパーティションごとに 1 つ) に複製します。つまり、タスクはワークフロー全体のレプリカです。
これは私の質問に私をもたらします:
パーティションが 1 つだけあると仮定します。Samza では可能ですが、Kafka Streams では 2 つの異なるマシンにワーカー (A) と (B) をデプロイすることはできないというのは正しいですか? (つまり、複数のパーティションがあるかどうかに関係なく、Kafka Streams で単一のタスク (つまり、トポロジ レプリカ) を 2 つのマシンに分割することは不可能ですか?)
(同じタスク内の) Kafka Streams トポロジ内の 2 つの後続のプロセッサはどのように通信しますか? (Samza では、後続の 2 つのワーカー (つまり、ジョブ) 間のすべての通信が Kafka トピックで行われることを知っていますが、Kafka トピックとして公開する必要があるストリームをコードで明示的に Kafka ストリームで「マーク」する必要があるため、これはできません。ここに当てはまります。)
Samza がすべての中間ストリームを Kafka トピックとして自動的に公開する (したがって、潜在的なクライアントが利用できるようにする) のは正しいですか? 一方、Kafka Streams
addSink
は、(低レベル API およびto
/またはthrough
DSLで) 明示的にマークされた中間および最終ストリームのみを公開します。 )?
(Samza は Kafka 以外のメッセージ キューも使用できるという事実を認識していますが、これは私の質問にはあまり関係ありません。)