私はkafkaとkafka-pythonにかなり慣れていません。kafka-python をインストールした後、ここからコンシューマー コードの単純な実装を試みました - http://kafka-python.readthedocs.io/en/master/usage.html
私はkafkaのbinディレクトリから消費者コードを書いていて、そこからpythonコードを実行しようとしました。ただし、次のエラーが表示されます。
トレースバック (最後の最後の呼び出し): ファイル "KafkaConsumer.py"、4 行目、消費者のメッセージ: ファイル "/usr/local/lib/python2.7/dist-packages/kafka/vendor/six.py"、 559行目、次の戻り値の型(self)。next (self) ファイル "/usr/local/lib/python2.7/dist-packages/kafka/consumer/group.py"、915 行目、次の return next(self._iterator) ファイル "/usr/local/lib /python2.7/dist-packages/kafka/consumer/group.py"、876 行目、self._fetcher の msg の _message_generator: ファイル "/usr/local/lib/python2.7/dist-packages/kafka/vendor /six.py"、559 行目、次の戻り値の型 (self)。次の(自己)ファイル「/usr/local/lib/python2.7/dist-packages/kafka/consumer/fetcher.py」、520行目、 next(self._iterator) ファイル "/usr/local/lib/python2.7/dist-packages/kafka/consumer/fetcher.py"、477 行目、self._unpack_message_set(tp, messages) の msg の _message_generator を返します。ファイル "/usr/local/lib/python2.7/dist-packages/kafka/consumer/fetcher.py"、372 行目、_unpack_message_set inner_mset = msg.decompress() ファイル "/usr/local/lib/python2.7 /dist-packages/kafka/protocol/message.py"、121 行目、解凍アサート has_snappy()、「Snappy 解凍がサポートされていません」 AssertionError: Snappy 解凍がサポートされていません
そして、これは私が実行しようとしているコードです:
from kafka import KafkaConsumer
consumer = KafkaConsumer ('mytopic',bootstrap_servers = ['localhost:9092'], group_id='test-consumer-group')
print "Consuming messages from the given topic"
for message in consumer:
print("%s:%d%d: key=%s value=%s" % (message.topic, message.partition, message.offset, message.key, message.value))
私は Kafka を初めて使用するので、自分が間違っていることを理解するのに苦労しています。