Kafka からのメッセージを使用する Spark Streaming ジョブを作成しようとしています。これまでに行ったことは次のとおりです。
- 飼育係はじめました
- Kafkaサーバーを開始しました
サーバーにいくつかのメッセージを送信しました。以下を実行すると、それらを見ることができます。
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic mytopic --from-beginning
5 分以内に着信するメッセージの数をカウントするプログラムを作成しようとしています。
コードは次のようになります。
Map<String, Integer> map = new HashMap<String, Integer>();
map.put("mytopic", new Integer(1));
JavaStreamingContext ssc = new JavaStreamingContext(
sparkUrl, " Spark Streaming", new Duration(60 * 5 * 1000), sparkHome, new String[]{jarFile});
JavaPairReceiverInputDStream tweets = KafkaUtils.createStream(ssc, "localhost:2181", "1", map);
3 番目の引数 (コンシューマー グループ) に使用する値がわからない。これを実行すると、Unable to connect to zookeeper server
. しかし、Zookeeper はポートで実行されています2181
。そうしないと、ステップ 3 が機能しません。
うまく使えていないようですKafkaUtils.createStream
。何か案は?