5

多数のパーティションを持つトピックがあるとします。そこに K/V データを書き込んでおり、そのデータをタンブリング ウィンドウにキーで集約したいと考えています。

パーティションと同じ数のワーカー インスタンスを起動し、各ワーカー インスタンスが別のマシンで実行されているとします。

結果の集計に各キーのすべての値が含まれるようにするにはどうすればよいですか? IE 各ワーカーインスタンスに値のサブセットを持たせたくありません。

これはStateStoreが使用されるものですか? Kafka はこれを独自に管理しますか、それとも方法を考え出す必要がありますか?

4

2 に答える 2

9

結果の集計に各キーのすべての値が含まれるようにするにはどうすればよいですか? IE 各ワーカーインスタンスに値のサブセットを持たせたくありません。

一般に、Kafka Streams は、同じキーのすべての値が同じ (そして 1 つだけの) ストリーム タスクによって処理されることを保証します。これは、1 つのアプリケーション インスタンス (「ワーカー インスタンス」と説明したもの) のみがその値を処理することも意味します。鍵。アプリ インスタンスは 1 つ以上のストリーム タスクを実行できますが、これらのタスクは分離されていることに注意してください。

この動作はデータのパーティショニングによって実現され、Kafka Streams はパーティションが常に同じ 1 つのストリーム タスクのみによって処理されるようにします。キー/値への論理的なリンクは、Kafka と Kafka Streams では、キーが常に同じパーティションに送信されることです (ここに落とし穴がありますが、この質問)、したがって、1 つの特定のパーティション (考えられる多くのパーティションの中で) には、同じキーのすべての値が含まれています。

A2 つのストリームと を結合する場合など、状況によってはB、両方のストリームからのデータが同じストリーム タスク内に同じ場所にあることを確認するために、集約が同じキーで動作することを確認する必要があります。関連する入力ストリーム パーティションと一致するキー (それぞれ fromAB) が同じストリーム タスクで利用できるようにします。ここで使用する典型的な方法はselectKey(). それが完了すると、Kafka Streams は、2 つのストリーム A と B を結合するため、および結合された出力ストリームを作成するために、同じキーのすべての値が同じストリーム タスクによって処理され、したがって同じアプリケーション インスタンスによって処理されることを保証します。

例:

  • ストリームには値Aを持つキーがあります。userId{ georegion }
  • ストリームには値Bを持つキーがあります。georegion{ continent, description }

2 つのストリームの結合は、両方のストリームが同じキーを使用する場合にのみ機能します (Kafka 0.10.0 以降)。Aこの例では、これは、結果のキーが からuserIdに変更されるように、ストリームを再キー化 (したがって再パーティション化) する必要があることを意味しますgeoregion。そうしないと、Kafka 0.10 以降、実際に結合を実行する責任があるストリーム タスクにデータが同じ場所に配置されていないためA、結合できません。B

この例では、次の方法でストリームを再キー/再パーティション化できますA

// Kafka 0.10.0.x (latest stable release as of Sep 2016)
A.map((userId, georegion) -> KeyValue.pair(georegion, userId)).through("rekeyed-topic")

// Upcoming versions of Kafka (not released yet)
A.map((userId, georegion) -> KeyValue.pair(georegion, userId))

このthrough()呼び出しは、実際に再パーティション化をトリガーするために Kafka 0.10.0 でのみ必要であり、Kafka の以降のバージョンではこれらが自動的に行われます (この今後の機能は既に完了しており、Kafka で利用可能ですtrunk)。

これは StateStore が使用されるものですか? Kafka はこれを独自に管理しますか、それとも方法を考え出す必要がありますか?

一般的に、いいえ。上記の動作は、状態ストアではなく、パーティショニングによって実現されます。

ストリームに対して定義した操作のためにステート ストアが関係する場合があります。たとえば、ウィンドウ操作では状態を管理する必要があるため、バックグラウンドで状態ストアが作成されます。しかし、実際の質問 - 「結果の集計に各キーのすべての値が含まれることを保証する」 - は状態ストアとは関係なく、パーティション化の動作に関するものです。

于 2016-09-02T07:37:16.480 に答える
1

ワーカー インスタンスとは、Kafka Streams アプリケーション インスタンスのことだと思いますよね? (Kafka Streams にはマスター/ワーカー パターンがないため、これはフレームワークではなくライブラリであるため、「ワーカー」という用語は使用しません。)

キーごとにデータを同じ場所に配置する場合は、キーごとにデータを分割する必要があります。したがって、データが最初からトピックに書き込まれるときに、外部プロデューサーによってデータがキーによって分割されます。または、Kafka Streams アプリケーション内で明示的に新しいキーを設定し (selectKey()または などを使用map())、 への呼び出しを介して再配布しthrough()ます。(将来のリリースでは、への明示的な呼び出しthrough()は必要ありません。つまり、0.10.1Kafka Streams は、必要に応じてレコードを自動的に再配布します。)

メッセージ/レコードを分割する必要がある場合、キーはnull. プロデューサー構成を使用してパーティション分割スキーマを変更することもできますpartitioner.class( https://kafka.apache.org/documentation.html#producerconfigsを参照)。

StateStores は通常、分割されたデータの上で使用されますが、分割は StateStores から完全に独立しています。

于 2016-09-01T06:32:46.487 に答える