0

次のコードを使用して、カフカからメッセージを取得しています

スカラコード:

val lines: ReceiverInputDStream[(String, String)] = KafkaUtils.createStream(ssc,
 zookeeperQuorum, consumerGroup, topicMap)
lines.print(10)

これが私のサンプル プロデューサー コードです。

    from kafka import SimpleProducer, KafkaClient
    import time
    # To send messages synchronously
    kafka = KafkaClient(serverip+':'+port)
    producer = SimpleProducer(kafka)
    kafka.ensure_topic_exists('test')

    kafka.ensure_topic_exists('test1')

    while(1):
     print "sending message "
     producer.send_messages(b'test', 'test,msg')
     time.sleep(2)
     producer.send_messages(b'test1', 'test1,msg')
     time.sleep(2)

ストリーミング レシーバーの出力

(null,'test,msg')
(null,'test1,msg')

質問:

1) How can I differentiate msg per topic level without actually
decoding the message ?

2) Why it is giving me null in the output ? From the documentation
it says key,value tuple. How can I create key,value tuple kind of
message ?

編集:keyedProducerを使用

kafka = KafkaClient(serverip+':'+port)
producer = KeyedProducer(kafka)

kafka.ensure_topic_exists('test2')

while(1):
   print "sending msg "
   producer.send_messages(b'test2',b'key1','msg')
   time.sleep(2)

これは私にエラーを投げています

raise PartitionUnavailableError("%s not available" % str(key))                                                                                                                            
kafka.common.PartitionUnavailableError: TopicAndPartition(topic='test2', partition='key1') not available   
4

2 に答える 2

1

#1の場合、最も簡単なのは、トピックごとに個別のストリームを作成することです。任意の時点でそれらを組み合わせる必要があり、それらが同じ構造を持っている場合-それらを結合できます

#2については、 KeyedProducerを使用してみましたか?

上記のリンクからのスニペット:

producer = KeyedProducer(kafka)
producer.send_messages(b'my-topic', b'key1', b'some message')
producer.send_messages(b'my-topic', b'key2', b'this methode')
于 2016-01-11T16:43:11.600 に答える