私は pykafka を使用して kafka トピックからメッセージを取得し、いくつかのプロセスを実行して mongodb に更新します。pymongodb は毎回 1 つの項目しか更新できないため、100 のプロセスを開始します。しかし、起動時に一部のプロセスで「PartitionOwnedError および ConsumerStoppedException」というエラーが発生しました。どうしてか分かりません。ありがとうございました。
kafka_cfg = conf['kafka']
kafka_client = KafkaClient(kafka_cfg['broker_list'])
topic = kafka_client.topics[topic_name]
balanced_consumer = topic.get_balanced_consumer(
consumer_group=group,
auto_commit_enable=kafka_cfg['auto_commit_enable'],
zookeeper_connect=kafka_cfg['zookeeper_list'],
zookeeper_connection_timeout_ms = kafka_cfg['zookeeper_conn_timeout_ms'],
consumer_timeout_ms = kafka_cfg['consumer_timeout_ms'],
)
while(1):
for msg in balanced_consumer:
if msg is not None:
try:
value = eval(msg.value)
id = long(value.pop("id"))
value["when_update"] = datetime.datetime.now()
query = {"_id": id}}
result = collection.update_one(query, {"$set": value}, True)
except Exception, e:
log.error("Fail to update: %s, msg: %s", e, msg.value)
>
Traceback (most recent call last):
File "dump_daily_summary.py", line 182, in <module>
dump_daily_summary.run()
File "dump_daily_summary.py", line 133, in run
for msg in self.balanced_consumer:
File "/data/share/python2.7/lib/python2.7/site-packages/pykafka-2.5.0.dev1-py2.7-linux-x86_64.egg/pykafka/balancedconsumer.py", line 745, in __iter__
message = self.consume(block=True)
File "/data/share/python2.7/lib/python2.7/site-packages/pykafka-2.5.0.dev1-py2.7-linux-x86_64.egg/pykafka/balancedconsumer.py", line 734, in consume
raise ConsumerStoppedException
pykafka.exceptions.ConsumerStoppedException
>
Traceback (most recent call last):
File "dump_daily_summary.py", line 182, in <module>
dump_daily_summary.run()
File "dump_daily_summary.py", line 133, in run
for msg in self.balanced_consumer:
File "/data/share/python2.7/lib/python2.7/site-packages/pykafka-2.5.0.dev1-py2.7-linux-x86_64.egg/pykafka/balancedconsumer.py", line 745, in __iter__
message = self.consume(block=True)
File "/data/share/python2.7/lib/python2.7/site-packages/pykafka-2.5.0.dev1-py2.7-linux-x86_64.egg/pykafka/balancedconsumer.py", line 726, in consume
self._raise_worker_exceptions()
File "/data/share/python2.7/lib/python2.7/site-packages/pykafka-2.5.0.dev1-py2.7-linux-x86_64.egg/pykafka/balancedconsumer.py", line 271, in _raise_worker_exceptions
raise ex
pykafka.exceptions.PartitionOwnedError