1

5 つのパーティションを持つ Kafka トピックを作成しました。そして、次のようなcreateStreamレシーバーAPIを使用しています。しかし、どういうわけか、1 つのレシーバーだけが入力データを取得しています。残りの受信機は何も処理していません。助けていただけますか?

JavaPairDStream<String, String> messages = null;

            if(sparkStreamCount > 0){
                // We create an input DStream for each partition of the topic, unify those streams, and then repartition the unified stream.
                List<JavaPairDStream<String, String>> kafkaStreams = new ArrayList<JavaPairDStream<String, String>>(sparkStreamCount);
                for (int i = 0; i < sparkStreamCount; i++) {
                                kafkaStreams.add( KafkaUtils.createStream(jssc, contextVal.getString(KAFKA_ZOOKEEPER), contextVal.getString(KAFKA_GROUP_ID), kafkaTopicMap));
                }
                messages = jssc.union(kafkaStreams.get(0), kafkaStreams.subList(1, kafkaStreams.size()));
            }
            else{
                messages =  KafkaUtils.createStream(jssc, contextVal.getString(KAFKA_ZOOKEEPER), contextVal.getString(KAFKA_GROUP_ID), kafkaTopicMap);
            }

MultipleReceiver を使用した Spark UI

変更を追加した後、次の例外が発生します。

INFO : org.apache.spark.streaming.kafka.KafkaReceiver - Connected to localhost:2181
INFO : org.apache.spark.streaming.receiver.ReceiverSupervisorImpl - Stopping receiver with message: Error starting receiver 0: java.lang.AssertionError: assertion failed
INFO : org.apache.spark.streaming.receiver.ReceiverSupervisorImpl - Called receiver onStop
INFO : org.apache.spark.streaming.receiver.ReceiverSupervisorImpl - Deregistering receiver 0
ERROR: org.apache.spark.streaming.scheduler.ReceiverTracker - Deregistered receiver for stream 0: Error starting receiver 0 - java.lang.AssertionError: assertion failed
    at scala.Predef$.assert(Predef.scala:165)
    at kafka.consumer.TopicCount$$anonfun$makeConsumerThreadIdsPerTopic$2.apply(TopicCount.scala:36)
    at kafka.consumer.TopicCount$$anonfun$makeConsumerThreadIdsPerTopic$2.apply(TopicCount.scala:34)
    at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
    at scala.collection.immutable.Map$Map1.foreach(Map.scala:109)
    at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
    at kafka.consumer.TopicCount$class.makeConsumerThreadIdsPerTopic(TopicCount.scala:34)
    at kafka.consumer.StaticTopicCount.makeConsumerThreadIdsPerTopic(TopicCount.scala:100)
    at kafka.consumer.StaticTopicCount.getConsumerThreadIdsPerTopic(TopicCount.scala:104)
    at kafka.consumer.ZookeeperConsumerConnector.consume(ZookeeperConsumerConnector.scala:198)
    at kafka.consumer.ZookeeperConsumerConnector.createMessageStreams(ZookeeperConsumerConnector.scala:138)
    at org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:111)
    at org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:148)
    at org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:130)
    at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:542)
    at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:532)
    at org.apache.spark.SparkContext$$anonfun$37.apply(SparkContext.scala:1986)
    at org.apache.spark.SparkContext$$anonfun$37.apply(SparkContext.scala:1986)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
    at org.apache.spark.scheduler.Task.run(Task.scala:88)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)

INFO : org.apache.spark.streaming.receiver.ReceiverSupervisorImpl - Stopped receiver 0
INFO : org.apache.spark.streaming.receiver.BlockGenerator - Stopping BlockGenerator
INFO : org.apache.spark.streaming.util.RecurringTimer - Stopped timer for BlockGenerator after time 1473964037200
INFO : org.apache.spark.streaming.receiver.BlockGenerator - Waiting for block pushing thread to terminate
INFO : org.apache.spark.streaming.receiver.BlockGenerator - Pushing out the last 0 blocks
INFO : org.apache.spark.streaming.receiver.BlockGenerator - Stopped block pushing thread
INFO : org.apache.spark.streaming.receiver.BlockGenerator - Stopped BlockGenerator
INFO : org.apache.spark.streaming.receiver.ReceiverSupervisorImpl - Waiting for receiver to be stopped
ERROR: org.apache.spark.streaming.receiver.ReceiverSupervisorImpl - Stopped receiver with error: java.lang.AssertionError: assertion failed
ERROR: org.apache.spark.executor.Executor - Exception in task 0.0 in stage 29.0 
4

1 に答える 1

0

上記のコードには 1 つの問題があります。メソッドのkafkaTopicMapパラメーターは指定します。KafkaUtils.createStreamMap of (topic_name -> numPartitions) to consume. Each partition is consumed in its own thread

以下のコードを試してください。

JavaPairDStream<String, String> messages = null;
int sparkStreamCount = 5;
Map<String, Integer> kafkaTopicMap = new HashMap<String, Integer>();
if (sparkStreamCount > 0) {

    List<JavaPairDStream<String, String>> kafkaStreams = new ArrayList<JavaPairDStream<String, String>>(sparkStreamCount);
    for (int i = 0; i < sparkStreamCount; i++) {
        kafkaTopicMap.put(topic, i+1);
        kafkaStreams.add(KafkaUtils.createStream(streamingContext, contextVal.getString(KAFKA_ZOOKEEPER), contextVal.getString(KAFKA_GROUP_ID), kafkaTopicMap));
    }

    messages = streamingContext.union(kafkaStreams.get(0), kafkaStreams.subList(1, kafkaStreams.size()));

} else {
    messages = KafkaUtils.createStream(streamingContext, contextVal.getString(KAFKA_ZOOKEEPER), contextVal.getString(KAFKA_GROUP_ID), kafkaTopicMap);
}
于 2016-09-15T03:04:58.130 に答える