1

私の知る限り、

並列処理を実装するために、kafka のパーティションと (コンシューマー) グループの概念が導入されました。私はpythonを介してkafkaを使用しています。(たとえば) 2 つのパーティションがある特定のトピックがあります。つまり、2 つのコンシューマーを含むコンシューマー グループを開始すると、それらは異なるパーティションにマップ (サブスクライブ) されます。

しかし、kafkaPython でライブラリを使用すると、奇妙な問題が発生しました。基本的に同じグループ ID を持つ 2 つのコンシューマーを開始し、それらがメッセージを消費するためのスレッドを開始しました。

しかし、kafka-stream 内のすべてのメッセージは両方によって消費されています!! これはばかげているように思えますし、概念的にも正しくありません。コンシューマーを特定の (異なる) パーティションに手動でマップできる方法はありますか (それらが別のパーティションに自動的にマップされていない場合)。

コードは次のとおりです。

from kafka import KafkaConsumer
import thread

def con1(consumer):
    for msg in consumer:
        print msg

consumer1 = KafkaConsumer('k-test', group_id='grp1', bootstrap_servers=['10.50.23.120:9092'])
consumer2 = KafkaConsumer('k-test', group_id='grp1', bootstrap_servers=['10.50.23.120:9092'])

thread.start_new_thread(con1, (consumer1,))
thread.start_new_thread(con1, (consumer2,))

以下は、kafka-console-producer を使用して作成したいくつかのメッセージの出力です。

ConsumerRecord(topic=u'k-test', partition=0, offset=47, timestamp=None, timestamp_type=None, key=None, value='polki')
ConsumerRecord(topic=u'k-test', partition=0, offset=47, timestamp=None, timestamp_type=None, key=None, value='polki')
ConsumerRecord(topic=u'k-test', partition=0, offset=48, timestamp=None, timestamp_type=None, key=None, value='qwewrg')
ConsumerRecord(topic=u'k-test', partition=0, offset=48, timestamp=None, timestamp_type=None, key=None, value='qwewrg')
ConsumerRecord(topic=u'k-test', partition=0, offset=49, timestamp=None, timestamp_type=None, key=None, value='shgjas')
ConsumerRecord(topic=u'k-test', partition=0, offset=49, timestamp=None, timestamp_type=None, key=None, value='shgjas')

期待されていたのはそれぞれの1つでした。ところで、このトピックk-testには 2 つのパーティションがあります。

4

4 に答える 4

0

bin/kafka-consumer-groups.sh コマンド ライン ツールを実行して、使用している Python Kafka クライアントが適切なコンシューマー グループ管理をサポートしているかどうかを確認してください。両方のコンシューマーが実際に同じグループに属している場合、相互に排他的なパーティションからメッセージを取得する必要があります。

于 2017-06-06T05:06:38.950 に答える