1

Message Hub から Bluemix の Spark インスタンスにメッセージをストリーミングしています。Java クライアントを使用してメッセージ ハブに単純な json メッセージを入れています。

JSON メッセージ -

{"country":"Netherlands","dma_code":"0","timezone":"Europe\/Amsterdam","area_code":"0","ip":"46.19.37.108","asn":"AS196752","continent_code":"EU","isp":"Tilaa V.O.F.","longitude":5.75,"latitude":52.5,"country_code":"NL","country_code3":"NLD"}

Spark でストリーミングを開始すると、受信したメッセージの先頭に余分な null があります。

(null,{"country":"Netherlands","dma_code":"0","timezone":"Europe\/Amsterdam","area_code":"0","ip":"46.19.37.108","asn":"AS196752","continent_code":"EU","isp":"Tilaa V.O.F.","longitude":5.75,"latitude":52.5,"country_code":"NL","country_code3":"NLD"})

Spark コンテキストがこの null を前に置く理由を教えてください。どうすれば削除できますか?

KafkaSender コード -

  KafkaProducer<String, String> kafkaProducer;
  kafkaProducer = new KafkaProducer<String, String>(props);
  ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(topic,message);

  RecordMetadata recordMetadata = kafkaProducer.send(producerRecord).get();
  //getting RecordMetadata is possible to validate topic, partition and offset
  System.out.println("topic where message is published : " + recordMetadata.topic());
  System.out.println("partition where message is published : " + recordMetadata.partition());
  System.out.println("message offset # : " + recordMetadata.offset());
  kafkaProducer.close();

ありがとうラージ

4

1 に答える 1

0

あなたのキーは null です - 最初の値はあなたのキーで、2 番目の値はもちろんあなたの値です。

より良い回答を得るために、メッセージを Kafka/MessageHub に投稿するコードを投稿することをお勧めします。

あなたの問題を解決するために - あなたの目標が単にそれを印刷することであれば、代わりにこのようなことを行うことができます。これはデータを標準出力に出力し、null キーを無視します。

stream.foreachRDD(recordRDD => {
  recordRDD.foreach(record => print(record._2))
})
于 2016-08-22T17:59:09.487 に答える