9

私は最近 Kafka を使用していますが、消費者グループの消費者に関して少し混乱しています。混乱の中心は、コンシューマーをプロセスとして実装するかスレッドとして実装するかです。この質問では、高レベルのコンシューマーを使用していると仮定します。

私が実験したシナリオを考えてみましょう。私のトピックには 2 つのパーティションがあります (簡単にするために、レプリケーション ファクターが 1 であると仮定します)。groupでconsumer ( ConsumerConnector) プロセスを作成し、次にサイズ 2 のトピック カウント マップを作成し、そのプロセスの下で2 つのコンシューマー スレッドを生成しました。パーティションを消費しているようで、パーティションを消費しています。この動作は常に決定論的ですか? 以下はコードスニペットです。Classは、私のコンシューマ スレッド クラスです。consumer1group1consumer1_thread1consumer1_thread2consumer1_thread10consumer1_thread21TestConsumer

    ...
    Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
    topicCountMap.put(topic, new Integer(2));
    Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
    List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);

    executor = Executors.newFixedThreadPool(2);

    int threadNumber = 0;
    for (final KafkaStream stream : streams) {
        executor.submit(new TestConsumer(stream, threadNumber));
        threadNumber++;
    }
    ...

consumer1ここで、2 つのコンシューマー プロセスを開始し、consumer2両方が同じグループgroup1を持ち、それぞれがシングル スレッド プロセスである別のシナリオ (私は実験していませんが、興味があります) を考えてみましょう。今私の質問は次のとおりです。

  1. この場合、2 つの独立したコンシューマー プロセス (同じグループの下にあるにもかかわらず) は、どのようにパーティションに関連付けられますか? 上記のシングル プロセス マルチスレッド シナリオとの違いは何ですか?

  2. 一般に、コンシューマー スレッドまたはプロセスは、トピック内のパーティションにどのようにマップまたは関連していますか?

  3. Kafka のドキュメントには、コンシューマー グループの下の各コンシューマーが 1 つのパーティションを消費すると書かれています。ただし、それは消費者スレッド (上記のコード例のように) または独立した消費者プロセスを指しますか?

  4. 消費者をプロセスとスレッドとして実装することに関して、ここで見逃している微妙なことはありますか? 前もって感謝します。

4

3 に答える 3

14

コンシューマー グループでは、複数のコンシューマー インスタンスを実行できます (同じ を持つ複数のプロセスgroup-id)。各パーティションを消費している間、グループ内の正確に 1 つのコンシューマー インスタンスによって消費されます。

たとえば、トピックに 2 つのパーティションが含まれていてgroup-A、2 つのコンシューマー インスタンスでコンシューマー グループを開始した場合、それぞれがトピックの特定のパーティションからメッセージを消費します。

異なるグループ ID で同じ 2 つのコンシューマーを開始するとgroup-Agroup-Bトピックの両方のパーティションからのメッセージがそれぞれにブロードキャストされます。したがって、その場合、実行中のコンシューマ インスタンスgroup-Aは、トピックの両方のパーティションからのメッセージを持ち、同じことが当てはまりgroup-Bます。

詳細については、ドキュメントを参照してください

編集:あなたのコメントに基づいて、

2 つのコンシューマ プロセスとは対照的に、同じプロセスの下に 2 つのコンシューマ スレッドを持つことの効果的な違いは何だろうと思っていました (どちらの場合もグループは同じです)。

コンシューマーgroup-idは、クラスター全体で同じ/グローバルです。2 つのスレッドでプロセス 1 を開始し、同じ groupId でさらに 2 つのスレッドを持つ別のプロセス (別のマシンにある可能性があります) を生成すると、kafka はこれらの 2 つの新しいスレッドを追加して、トピックからのメッセージを消費します。したがって、最終的には、同じトピックからの消費を担当する 4 つのスレッドが存在することになります。その後、Kafka はリバランスをトリガーしてパーティションをスレッドに再割り当てします。そのため、スレッドによって消費されていた特定のパーティションが、スレッドによってT1 of process P1消費されるように割り当てられる可能性がありますT2 of process P2。以下の数行は wiki ページからの引用です

新しいプロセスが同じコンシューマー グループ名で開始されると、Kafka は、そのプロセスのスレッドを、トピックを消費するために使用可能なスレッドのセットに追加し、「リバランス」をトリガーします。このリバランス中に、Kafka は使用可能なパーティションを使用可能なスレッドに割り当て、おそらくパーティションを別のプロセスに移動します。古いビジネス ロジックと新しいビジネス ロジックが混在している場合、一部のメッセージが古いロジックに送信される可能性があります。

于 2015-09-09T05:33:09.700 に答える
1

同じ ID を持つ複数のコンシューマー グループ インスタンスと単一のコンシューマー グループ インスタンスを選択する際の主な設計上の決定は、回復力です。たとえば、2 つのスレッドを持つ単一のコンシューマがある場合、このマシンがダウンすると、すべてのコンシューマが失われます。同じ ID を持つ 2 つの別個のコンシューマー グループがあり、それぞれが異なるホスト上にある場合、それらは障害に耐えることができます。理想的には、各コンシューマー グループは上記の 2 つのスレッドを持つ必要があります。したがって、1 つのホストがダウンした場合、他のコンシューマー グループは休止状態のスレッドを使用して他のパーティションを占有します。実際、この要因をカバーするには、パーティションよりも多くのスレッドを持つことが常に望ましいです。

  1. 各コンシューマー グループを異なるホストで実行できます。特定の名前/ID の単一のコンシューマー グループでは、単一のランタイム環境ですべてのスレッドを管理するため、単一のホストでのみ実行されます。
  2. Kafka には、さまざまなトピック パーティションを読み取るスレッド/コンシューマー グループを決定するアルゴリズムがあります。Kafka は、回復力のある方法でこれらを均等に分散しようとします。コンシューマー グループが失敗すると、他のグループの他のスレッドが特定のパーティションを読み取ることができるようになります。
  3. コンシューマ グループ内の単一のスレッドを参照します。パーティションよりも多くのスレッドがある場合、他のスレッドが回復力を提供できなくなるまで、一部のスレッドは休止状態のままになります。
  4. 優先度は回復力に関連しています。そのため、同じ ID を持つ複数のコンシューマー グループをセットアップすると、複数のホストで実行でき、アプリケーションが障害に耐えられるようになります。
于 2015-09-10T10:37:06.277 に答える
0

@ user2720864 からの詳細な回答に感謝しますが、回答に記載されている再割り当てのケース @ user2720864 は正しくないと思います => 1 つのパーティションを 2 人の消費者が消費することはできません。

(パーティションと比較して) より多くのコンシューマーがある場合、各パーティションは 1 つのコンシューマーのみに排他的に割り当てられますが、残りのコンシューマーは、一部の動作中のコンシューマーが停止するか、グループから削除されるまで怠惰なままになります。

Kafka Consumers ドキュメントに基づく:

Kafka での消費の実装方法は、ログ内のパーティションを消費者インスタンスに分割することで、各インスタンスがいつでもパーティションの「公平なシェア」の排他的な消費者になるようにします。グループのメンバーシップを維持するこのプロセスは、Kafka プロトコルによって動的に処理されます。新しいインスタンスがグループに参加すると、グループの他のメンバーからいくつかのパーティションを引き継ぎます。インスタンスが停止すると、そのパーティションは残りのインスタンスに分配されます。

また、「Consumer Groups and Topic Subscriptions」セクションのAPI 仕様:

これは、各パーティションがグループ内の1 つのコンシューマーにのみ割り当てられるように、コンシューマー グループ内のすべてのメンバー間でパーティションのバランスを取ることによって実現されます。

于 2019-07-16T02:37:16.343 に答える