3

私は Kafka を初めて使用し、その上でメッセージング プラットフォームにサービスを提供するサービスを構築しようとしています。これが私のセットアップです:

Kafka 0.9.0.1
Zookeeper 3.4.8 kafka
-python 1.3.3

私のアプリケーションは、KafkaProducerメッセージのストリームを 6 つのパーティションを持つ単一のトピックに送信する を作成します。また、7 つの s を作成しますKafkaConsumer(単一の の下で、group_idそのうちの 6 つが 6 つのパーティションに割り当てられ、1 つがアイドル状態のままになります (これは予想されることです)。プロデューサーがストリーミングしている間、期待して、パーティション数を 7 に増やします)。ストリームは 7 つのパーティションに分散されず、アイドル状態のコンシューマーを起動します.ただし、アプリケーションを再起動して再初期化するまで、プロデューサーは新しく追加されたパーティションを取得しないようです.パーティション数をスケーリングしますこれを実行して:

kafka-topics --alter --zookeeper localhost:2181 --topic test --partitions 7

プロデューサーがパーティション数の変更を再初期化せずに取得する方法はありますか?

関連するコード スニペットは次のとおりです。

プロデューサー

class Producer(threading.Thread):
daemon = True

def __init__(self, name, manager):
    super(Producer, self).__init__()
    self.producer = KafkaProducer(bootstrap_servers='localhost:9092')

def run(self):
    while not self.killed:
        if not self.q.empty():
            self._busy()
            self.producer.send('test', value=self.q.get())
        else:
            self._free()

消費者

class Consumer(threading.Thread):
    daemon = True

    def __init__(self, name, manager):
        super(Consumer, self).__init__()
        self.consumer = KafkaConsumer(bootstrap_servers='localhost:9092',
                                 group_id='test_group',
                                 client_id="Consumer " + self.name)
        self.consumer.subscribe(['test'])

    def run(self):
        while not self.killed:
            messages = self.consumer.poll()

            for topic, records in messages.iteritems():
                print self.consumer.config['client_id'] + ": " + str(records)
4

1 に答える 1