5 つのパーティションを持つ Kafka トピックを作成しました。そして、次のようなcreateStreamレシーバーAPIを使用しています。しかし、どういうわけか、1 つのレシーバーだけが入力データを取得しています。残りの受信機は何も処理していません。助けていただけますか?
JavaPairDStream<String, String> messages = null;
if(sparkStreamCount > 0){
// We create an input DStream for each partition of the topic, unify those streams, and then repartition the unified stream.
List<JavaPairDStream<String, String>> kafkaStreams = new ArrayList<JavaPairDStream<String, String>>(sparkStreamCount);
for (int i = 0; i < sparkStreamCount; i++) {
kafkaStreams.add( KafkaUtils.createStream(jssc, contextVal.getString(KAFKA_ZOOKEEPER), contextVal.getString(KAFKA_GROUP_ID), kafkaTopicMap));
}
messages = jssc.union(kafkaStreams.get(0), kafkaStreams.subList(1, kafkaStreams.size()));
}
else{
messages = KafkaUtils.createStream(jssc, contextVal.getString(KAFKA_ZOOKEEPER), contextVal.getString(KAFKA_GROUP_ID), kafkaTopicMap);
}
変更を追加した後、次の例外が発生します。
INFO : org.apache.spark.streaming.kafka.KafkaReceiver - Connected to localhost:2181
INFO : org.apache.spark.streaming.receiver.ReceiverSupervisorImpl - Stopping receiver with message: Error starting receiver 0: java.lang.AssertionError: assertion failed
INFO : org.apache.spark.streaming.receiver.ReceiverSupervisorImpl - Called receiver onStop
INFO : org.apache.spark.streaming.receiver.ReceiverSupervisorImpl - Deregistering receiver 0
ERROR: org.apache.spark.streaming.scheduler.ReceiverTracker - Deregistered receiver for stream 0: Error starting receiver 0 - java.lang.AssertionError: assertion failed
at scala.Predef$.assert(Predef.scala:165)
at kafka.consumer.TopicCount$$anonfun$makeConsumerThreadIdsPerTopic$2.apply(TopicCount.scala:36)
at kafka.consumer.TopicCount$$anonfun$makeConsumerThreadIdsPerTopic$2.apply(TopicCount.scala:34)
at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
at scala.collection.immutable.Map$Map1.foreach(Map.scala:109)
at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
at kafka.consumer.TopicCount$class.makeConsumerThreadIdsPerTopic(TopicCount.scala:34)
at kafka.consumer.StaticTopicCount.makeConsumerThreadIdsPerTopic(TopicCount.scala:100)
at kafka.consumer.StaticTopicCount.getConsumerThreadIdsPerTopic(TopicCount.scala:104)
at kafka.consumer.ZookeeperConsumerConnector.consume(ZookeeperConsumerConnector.scala:198)
at kafka.consumer.ZookeeperConsumerConnector.createMessageStreams(ZookeeperConsumerConnector.scala:138)
at org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:111)
at org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:148)
at org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:130)
at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:542)
at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:532)
at org.apache.spark.SparkContext$$anonfun$37.apply(SparkContext.scala:1986)
at org.apache.spark.SparkContext$$anonfun$37.apply(SparkContext.scala:1986)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
INFO : org.apache.spark.streaming.receiver.ReceiverSupervisorImpl - Stopped receiver 0
INFO : org.apache.spark.streaming.receiver.BlockGenerator - Stopping BlockGenerator
INFO : org.apache.spark.streaming.util.RecurringTimer - Stopped timer for BlockGenerator after time 1473964037200
INFO : org.apache.spark.streaming.receiver.BlockGenerator - Waiting for block pushing thread to terminate
INFO : org.apache.spark.streaming.receiver.BlockGenerator - Pushing out the last 0 blocks
INFO : org.apache.spark.streaming.receiver.BlockGenerator - Stopped block pushing thread
INFO : org.apache.spark.streaming.receiver.BlockGenerator - Stopped BlockGenerator
INFO : org.apache.spark.streaming.receiver.ReceiverSupervisorImpl - Waiting for receiver to be stopped
ERROR: org.apache.spark.streaming.receiver.ReceiverSupervisorImpl - Stopped receiver with error: java.lang.AssertionError: assertion failed
ERROR: org.apache.spark.executor.Executor - Exception in task 0.0 in stage 29.0