6

Kafka からのメッセージを使用する Spark Streaming ジョブを作成しようとしています。これまでに行ったことは次のとおりです。

  1. 飼育係はじめました
  2. Kafkaサーバーを開始しました
  3. サーバーにいくつかのメッセージを送信しました。以下を実行すると、それらを見ることができます。

    bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic mytopic --from-beginning
    
  4. 5 分以内に着信するメッセージの数をカウントするプログラムを作成しようとしています。

コードは次のようになります。

Map<String, Integer> map = new HashMap<String, Integer>();
map.put("mytopic", new Integer(1));
JavaStreamingContext ssc = new JavaStreamingContext(
        sparkUrl, " Spark Streaming", new Duration(60 * 5 * 1000), sparkHome, new String[]{jarFile});
JavaPairReceiverInputDStream tweets = KafkaUtils.createStream(ssc, "localhost:2181", "1", map);

3 番目の引数 (コンシューマー グループ) に使用する値がわからない。これを実行すると、Unable to connect to zookeeper server. しかし、Zookeeper はポートで実行されています2181。そうしないと、ステップ 3 が機能しません。

うまく使えていないようですKafkaUtils.createStream。何か案は?

4

5 に答える 5

2

デフォルトのコンシューマ グループなどはありません。そこで任意の空でない文字列を使用できます。コンシューマが 1 つしかない場合、そのコンシューマ グループはあまり重要ではありません。2 つ以上のコンシューマーが存在する場合、それらは同じコンシューマー グループの一部になることも、異なるコンシューマー グループに属することもできます。

http://kafka.apache.org/documentation.htmlから:

消費者

...

すべてのコンシューマー インスタンスが同じコンシューマー グループを持っている場合、これはコンシューマー間で負荷を分散する従来のキューと同じように機能します。

すべてのコンシューマー インスタンスに異なるコンシューマー グループがある場合、これはパブリッシュ/サブスクライブのように機能し、すべてのメッセージがすべてのコンシューマーにブロードキャストされます。

問題は「topics」パラメーターにあると思います。スパークドキュメントから:

消費する (topic_name -> numPartitions) のマップ。各パーティションは独自のスレッドで消費されます

トピックに単一のパーティション、つまり「1」のみを指定しました。ブローカーの設定 (num.partitions) によっては、複数のパーティションが存在する場合があり、プログラムによって読み取られない他のパーティションにメッセージが送信される場合があります。

また、partitionIds は 0 ベースだと思います。したがって、パーティションが 1 つしかない場合、ID は 0 になります。

于 2014-11-04T07:51:12.203 に答える
0

私は同じ問題に直面していました。これが私のために働いた解決策です。

  • Spark Streaming アプリケーションに割り当てられるコアの数は、レシーバーの数よりも多くする必要があります。そうしないと、システムはデータを受信しますが、処理できません。そのため、Spark ストリーミングには最低 2 つのコアが必要です。したがって、spark-submit では、少なくとも 2 つのコアについて言及する必要があります。
  • kafka-clients-version.jar は、spark-submit の依存 jar のリストに含まれている必要があります。
于 2015-09-28T07:44:39.140 に答える
-2

あなたのコードでは、呼び出し KafkaUtils.createStream の 2 番目の引数は、zookeeper のホストとポートではなく、kafka サーバーの host:port である必要があると思います。一度確認してください。

編集: Kafka Utils API ドキュメント

上記の文書によると、それは Zookeeper quorum である必要があります。そのため、Zookeeper のホスト名とポートを使用する必要があります。

zkQuorum Zookeeper クォーラム (ホスト名:ポート、ホスト名:ポート、..)。

于 2015-06-13T11:12:46.813 に答える