次のコードを使用して、カフカからメッセージを取得しています
スカラコード:
val lines: ReceiverInputDStream[(String, String)] = KafkaUtils.createStream(ssc,
zookeeperQuorum, consumerGroup, topicMap)
lines.print(10)
これが私のサンプル プロデューサー コードです。
from kafka import SimpleProducer, KafkaClient
import time
# To send messages synchronously
kafka = KafkaClient(serverip+':'+port)
producer = SimpleProducer(kafka)
kafka.ensure_topic_exists('test')
kafka.ensure_topic_exists('test1')
while(1):
print "sending message "
producer.send_messages(b'test', 'test,msg')
time.sleep(2)
producer.send_messages(b'test1', 'test1,msg')
time.sleep(2)
ストリーミング レシーバーの出力
(null,'test,msg')
(null,'test1,msg')
質問:
1) How can I differentiate msg per topic level without actually
decoding the message ?
2) Why it is giving me null in the output ? From the documentation
it says key,value tuple. How can I create key,value tuple kind of
message ?
編集:keyedProducerを使用
kafka = KafkaClient(serverip+':'+port)
producer = KeyedProducer(kafka)
kafka.ensure_topic_exists('test2')
while(1):
print "sending msg "
producer.send_messages(b'test2',b'key1','msg')
time.sleep(2)
これは私にエラーを投げています
raise PartitionUnavailableError("%s not available" % str(key))
kafka.common.PartitionUnavailableError: TopicAndPartition(topic='test2', partition='key1') not available