こんにちは、3 つのパーティションと 2 つのレプリカを持つ kafka トピックを作成しました。kafka から Spark ストリーミングにメッセージ/レコードを公開して (何らかのプロセスで)、データを HDFS に保存しようとしています。ペア RDD をテキスト ファイルとして保存しようとしましたが、うまくいきません。
このコードは機能しません。
JavaPairInputDStream<String, String> directKafkaStream = KafkaUtils
.createDirectStream(ssc, String.class, String.class,
StringDecoder.class, StringDecoder.class, kafkaParams,
topics);
directKafkaStream.foreachRDD(rdd -> {
if(!rdd.isEmpty()){
rdd.saveAsTextFile(path);
}
}
);
コンソール出力:
17/01/09 17:25:39 INFO KafkaRDD: Computing topic filebeat, partition 1 offsets 20 -> 32
17/01/09 17:25:39 INFO VerifiableProperties: Verifying properties
17/01/09 17:25:39 INFO VerifiableProperties: Property group.id is overridden to
17/01/09 17:25:39 INFO VerifiableProperties: Property zookeeper.connect is overridden to localhost:2181
17/01/09 17:25:39 INFO KafkaRDD: Computing topic filebeat, partition 0 offsets 22 -> 34
17/01/09 17:25:39 INFO VerifiableProperties: Verifying properties
17/01/09 17:25:39 INFO VerifiableProperties: Property group.id is overridden to
17/01/09 17:25:39 INFO VerifiableProperties: Property zookeeper.connect is overridden to localhost:2181
17/01/09 17:25:40 INFO JobScheduler: Added jobs for time 1483979140000 ms
17/01/09 17:25:40 ERROR Utils: Aborting task
java.lang.NoClassDefFoundError: org/apache/kafka/common/message/KafkaLZ4BlockOutputStream
at kafka.message.ByteBufferMessageSet$.decompress(ByteBufferMessageSet.scala:65)
at kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:179)
at kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:192)
at kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:146)
at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66)
at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58)
at scala.collection.Iterator$$anon$18.hasNext(Iterator.scala:764)
at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:211)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13$$anonfun$apply$7.apply$mcV$sp(PairRDDFunctions.scala:1203)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13$$anonfun$apply$7.apply(PairRDDFunctions.scala:1203)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13$$anonfun$apply$7.apply(PairRDDFunctions.scala:1203)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1325)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1211)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1190)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:85)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassNotFoundException: org.apache.kafka.common.message.KafkaLZ4BlockOutputStream
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 22 more
17/01/09 17:25:40 ERROR Utils: Aborting task
実際、私のpom.xml
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
<version>2.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.9.0.0</version>
</dependency>