それぞれがユーザーに属するメッセージのリスト (JSON 形式) を取得し、各ユーザーのメッセージをカウントし、上位 10 人のユーザーを出力する単純なストリーム処理 Spark ジョブを作成しようとしています。
ただし、削減されたカウントをソートするために Comparator> を定義すると、java.io.NotSerializableExceptionがスローされてすべてが失敗します。
Spark に対する私の依存関係:
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.9.3</artifactId>
<version>0.8.0-incubating</version>
私が使用しているJavaコード:
public static void main(String[] args) {
JavaSparkContext sc = new JavaSparkContext("local", "spark");
JavaRDD<String> lines = sc.textFile("stream.sample.txt").cache();
JavaPairRDD<String, Long> words = lines
.map(new Function<String, JsonElement>() {
// parse line into JSON
@Override
public JsonElement call(String t) throws Exception {
return (new JsonParser()).parse(t);
}
}).map(new Function<JsonElement, String>() {
// read User ID from JSON
@Override
public String call(JsonElement json) throws Exception {
return json.getAsJsonObject().get("userId").toString();
}
}).map(new PairFunction<String, String, Long>() {
// count each line
@Override
public Tuple2<String, Long> call(String arg0) throws Exception {
return new Tuple2(arg0, 1L);
}
}).reduceByKey(new Function2<Long, Long, Long>() {
// count messages for every user
@Override
public Long call(Long arg0, Long arg1) throws Exception {
return arg0 + arg1;
}
});
// sort result in a descending order and take 10 users with highest message count
// This causes the exception
List<Tuple2<String, Long>> sorted = words.takeOrdered(10, new Comparator<Tuple2<String, Long>> (){
@Override
public int compare(Tuple2<String, Long> o1, Tuple2<String, Long> o2) {
return -1 * o1._2().compareTo(o2._2());
}
});
// print result
for (Tuple2<String, Long> tuple : sorted) {
System.out.println(tuple._1() + ": " + tuple._2());
}
}
結果のスタック トレース:
java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:601)
at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:297)
at java.lang.Thread.run(Thread.java:722)
Caused by: org.apache.spark.SparkException: Job failed: java.io.NotSerializableException: net.imagini.spark.test.App$5
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:760)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:758)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:60)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:758)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:556)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$16.apply(DAGScheduler.scala:670)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$16.apply(DAGScheduler.scala:668)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:60)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:668)
at org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:376)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$run(DAGScheduler.scala:441)
at org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:149)
Spark API のドキュメントを調べましたが、正しい方向性を示すものは何も見つかりませんでした。何か間違ったことをしていますか、それとも Spark のバグですか? どんな助けでも喜んでいただければ幸いです。