11

私のストリームには「カテゴリ」という列があり、別のストアに「カテゴリ」ごとに追加の静的メタデータがあり、数日に 1 回更新されます。このルックアップを行う正しい方法は何ですか? Kafka ストリームには 2 つのオプションがあります

  1. Kafka Streams の外部で静的データをロードKStreams#map()し、メタデータを追加するために使用します。Kafka Streams は単なるライブラリであるため、これが可能です。

  2. メタデータを Kafka トピックにロードし、それを a にロードしてKTabledo を実行しますKStreams#leftJoin()。これはより自然に見え、パーティショニングなどは Kafka Streams に任せます。KTableただし、これにはすべての値をロードしたままにしておく必要があります。変更だけでなく、ルックアップ データ全体をロードする必要があることに注意してください。

    • たとえば、最初はカテゴリ「c1」が 1 つだけだったとします。Kafka ストリーム アプリは正常に停止され、再び再起動されました。再起動後、新しいカテゴリ「c2」が追加されました。私の推測では、table = KStreamBuilder().table('metadataTopic') の値は「c2」になるだけです。これは、アプリが 2 回目に開始されてから変更された唯一のものであるためです。「c1」と「c2」が必要です。
    • 「c1」もある場合、データは KTable から削除されますか (おそらく、送信キー = null メッセージを設定することにより?)?

上記のうち、メタデータを検索する正しい方法はどれですか?

再起動時に常に 1 つのストリームのみを最初から読み取るように強制することは可能ですか?これは、すべてのメタデータを にロードできるようにするためKTableです。

ストアを使用する別の方法はありますか?

4

3 に答える 3

14
  1. Kafka Streams の外部で静的データをロードし、KStreams#map() を使用してメタデータを追加します。Kafka Streams は単なるライブラリであるため、これが可能です。

これは機能します。ただし、入力ストリームを強化するためのサイド データは通常、完全に静的ではないため、通常はリストされた次のオプションを選択します。むしろ、変化していますが、あまり頻繁ではありません。

  1. メタデータを Kafka トピックにロードし、それを KTable にロードして KStreams#leftJoin() を実行します。これはより自然に見え、パーティショニングなどは Kafka Streams に任せます。ただし、これには、KTable にすべての値をロードしたままにしておく必要があります。変更だけでなく、ルックアップ データ全体をロードする必要があることに注意してください。

これは通常のアプローチであり、特別な理由がない限り、この方法に従うことをお勧めします。

ただし、これには、KTable にすべての値をロードしたままにしておく必要があります。変更だけでなく、ルックアップ データ全体をロードする必要があることに注意してください。

ですから、あなたも 2 番目のオプションを好むと思いますが、これが効率的かどうかについて懸念しています。

簡単な答えは次のとおりです。はい、KTable にはキーごとにすべての (最新の) 値がロードされます。テーブルにはルックアップ データ全体が含まれますが、KTable はバックグラウンドで分割されていることに注意してください。たとえば、(テーブルの) 入力トピックにパーティションがある場合、アプリケーションのインスタンス3まで実行できます。3そのうち1、テーブルのパーティションを取得します (データがパーティション全体に均等に分散されていると仮定すると、テーブルの各パーティション/共有はテーブルのデータの約 1/3 を保持します)。したがって、実際には「うまくいく」可能性が高くなります。以下で詳細を共有します。

グローバル KTables:または、 (分割された) 通常のテーブル バリアントの代わりにグローバル KTablesを使用できます。グローバル テーブルでは、アプリケーションのすべてのインスタンスにテーブル データの完全なコピーがあります。これにより、質問に従って KStream を強化するなど、結合シナリオでグローバルテーブルが非常に役立ちます。

再起動時に常に 1 つのストリームのみを最初から強制的に読み取ることは可能ですか?これは、すべてのメタデータを KTable にロードできるようにするためです。

それについて心配する必要はありません。簡単に言えば、使用可能なテーブルのローカル「コピー」がない場合、Streams API は自動的にテーブルのデータが最初から完全に読み取られるようにします。利用可能なローカル コピーがある場合、アプリケーションはそのコピーを再利用します (そして、テーブルの入力トピックで新しいデータが利用可能になるたびにローカル コピーを更新します)。

例を含むより長い答え

の次の入力データ (変更ログ ストリームと考えてください) を想像してください。この入力がどのようにメッセージKTableで構成されているかに注目してください。6

(alice, 1) -> (bob, 40) -> (alice, 2) -> (charlie, 600), (alice, 5), (bob, 22)

そして、この入力から生じる「論理」のさまざまな状態は次のとおりです。KTable新しく受信した各入力メッセージ ( など(alice, 1)) は、テーブルの新しい状態になります。

Key      Value
--------------
alice   |   1    // (alice, 1) received

 |
 V

Key      Value
--------------
alice   |   1
bob     |  40    // (bob, 40) received

 |
 V

Key      Value
--------------
alice   |   2    // (alice, 2) received
bob     |  40

 |
 V

Key      Value
--------------
alice   |   2
bob     |  40
charlie | 600    // (charlie, 600) received

 |
 V

Key      Value
--------------
alice   |   5    // (alice, 5) received
bob     |  40
charlie | 600

 |
 V

Key      Value
--------------
alice   |   5
bob     |  22    // (bob, 22) received
charlie | 600

ここでわかることは、入力データに非常に多くのメッセージ (または、あなたが言ったように「変更」; ここでは6) がある場合でも、結果のエントリ/行の数KTable(これは、新しく受信した入力で) は、入力内の一意のキーの数です (ここでは、 から始まり1、 まで増加し3ます)。これは通常、メッセージの数よりも大幅に少なくなります。したがって、入力内のメッセージの数が でNあり、これらのメッセージの一意のキーの数が であるM場合、通常、M << N(Mは よりも大幅に小さくなりNます。さらに、不変条件はM <= Nです)。

これが、キーごとに最新の値のみが保持されるため、「KTable にすべての値をロードしておく必要がある」ことが通常問題にならない最初の理由です。

役立つ 2 つ目の理由は、Matthias J. Sax が指摘したように、Kafka Streams が RocksDB をそのようなテーブル (より正確には、テーブルをバックアップする状態ストア) のデフォルトのストレージ エンジンとして使用することです。RocksDB を使用すると、アプリケーションの使用可能なメイン メモリ/Java ヒープ領域よりも大きなテーブルを維持できます。これは、テーブルがローカル ディスクに流出する可能性があるためです。

最後に、3 つ目の理由は、aKTableが分割されていることです。したがって、テーブルの入力トピックが (たとえば)3パーティションで構成されている場合、舞台裏で起こっていることは、KTableそれ自体が同じ方法でパーティション化されている (シャード化されていると考えてください) ことです。上記の例では、最終的に次のようになりますが、正確な「分割」は、元の入力データがテーブルの入力トピックのパーティション全体にどのように分散されているかによって異なります。

論理 KTable (上で示した最後の状態):

Key      Value
--------------
alice   |   5
bob     |  22
charlie | 600

パーティション化された実際の KTable (3テーブルの入力トピックのパーティションに加えて、キー = ユーザー名がパーティション全体に均等に分散されていると仮定):

Key      Value
--------------
alice   |   5    // Assuming that all data for `alice` is in partition 1

Key      Value
--------------
bob     |  22    // ...for `bob` is in partition 2

Key      Value
--------------
charlie | 600    // ...for `charlie` is in partition 3

実際には、この入力データの分割により、特に KTable の実際の表現を「サイズ調整」できます。

もう一つの例:

  • KTable の最新の状態が通常 1 TB のサイズになると想像してください (ここでも、おおよそのサイズは、テーブルの入力データ内の一意のメッセージ キーの数に関連するメッセージ値の平均サイズを掛けた関数です)。
  • テーブルの入力トピックに1パーティションのみがある場合、KTable 自体にも1パーティションのみがあり、サイズは 1 TB です。ここでは、入力トピックには1パーティションしかないため、最大でアプリ インスタンスを使用してアプリケーションを実行でき1ます (つまり、実際には多くの並列処理ではありません)。
  • テーブルの入力トピックに500パーティションがある場合、KTable にも500パーティションがあり、それぞれのサイズは ~ 2 GB です (データがパーティション全体に均等に分散されていると仮定)。500ここでは、最大 1 個のアプリ インスタンスでアプリケーションを実行できます。正確にインスタンスを実行する500場合、各アプリ インスタンスは1論理 KTable のパーティション/シャードを正確に取得するため、最終的に 2 GB のテーブル データになります。インスタンスのみを実行する100場合、各インスタンスは500 / 100 = 5テーブルのパーティション/シャードを取得し、最終的に約2 GB * 5 = 10 GBのテーブル データになります。
于 2016-12-08T09:12:08.327 に答える