私の知る限り、
並列処理を実装するために、kafka のパーティションと (コンシューマー) グループの概念が導入されました。私はpythonを介してkafkaを使用しています。(たとえば) 2 つのパーティションがある特定のトピックがあります。つまり、2 つのコンシューマーを含むコンシューマー グループを開始すると、それらは異なるパーティションにマップ (サブスクライブ) されます。
しかし、kafka
Python でライブラリを使用すると、奇妙な問題が発生しました。基本的に同じグループ ID を持つ 2 つのコンシューマーを開始し、それらがメッセージを消費するためのスレッドを開始しました。
しかし、kafka-stream 内のすべてのメッセージは両方によって消費されています!! これはばかげているように思えますし、概念的にも正しくありません。コンシューマーを特定の (異なる) パーティションに手動でマップできる方法はありますか (それらが別のパーティションに自動的にマップされていない場合)。
コードは次のとおりです。
from kafka import KafkaConsumer
import thread
def con1(consumer):
for msg in consumer:
print msg
consumer1 = KafkaConsumer('k-test', group_id='grp1', bootstrap_servers=['10.50.23.120:9092'])
consumer2 = KafkaConsumer('k-test', group_id='grp1', bootstrap_servers=['10.50.23.120:9092'])
thread.start_new_thread(con1, (consumer1,))
thread.start_new_thread(con1, (consumer2,))
以下は、kafka-console-producer を使用して作成したいくつかのメッセージの出力です。
ConsumerRecord(topic=u'k-test', partition=0, offset=47, timestamp=None, timestamp_type=None, key=None, value='polki')
ConsumerRecord(topic=u'k-test', partition=0, offset=47, timestamp=None, timestamp_type=None, key=None, value='polki')
ConsumerRecord(topic=u'k-test', partition=0, offset=48, timestamp=None, timestamp_type=None, key=None, value='qwewrg')
ConsumerRecord(topic=u'k-test', partition=0, offset=48, timestamp=None, timestamp_type=None, key=None, value='qwewrg')
ConsumerRecord(topic=u'k-test', partition=0, offset=49, timestamp=None, timestamp_type=None, key=None, value='shgjas')
ConsumerRecord(topic=u'k-test', partition=0, offset=49, timestamp=None, timestamp_type=None, key=None, value='shgjas')
期待されていたのはそれぞれの1つでした。ところで、このトピックk-test
には 2 つのパーティションがあります。