ElasticSearch にロードされたいくつかのテスト データを使用して、ElasticSearch と Spark の統合をローカル マシンでテストしていました。
val sparkConf = new SparkConf().setAppName("Test").setMaster("local")
val sc = new SparkContext(sparkConf)
val conf = new JobConf()
conf.set("spark.serializer", classOf[KryoSerializer].getName)
conf.set("es.nodes", "localhost:9200")
conf.set("es.resource", "bank/account")
conf.set("es.query", "?q=firstname:Daniel")
val esRDD = sc.hadoopRDD(conf,classOf[EsInputFormat[Text, MapWritable]],
classOf[Text], classOf[MapWritable])
esRDD.first()
esRDD.collect()
コードは正常に実行され、正しい結果が esRDD.first() で正常に返されます
ただし、esRDD.collect() は例外を生成します。
java.io.NotSerializableException: org.apache.hadoop.io.Text
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:71)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:193)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
これはhttp://www.elasticsearch.org/guide/en/elasticsearch/hadoop/current/spark.htmlで言及されている問題に関連していると思われる ので、それに応じてこの行を追加しました
conf.set("spark.serializer", classOf[KryoSerializer].getName)
それを機能させるために何か他のことをすることになっていますか?ありがとうございました
更新: シリアル化のセットアップの問題が解決されました。を使用して
sparkConf.set("spark.serializer", classOf[KryoSerializer].getName)
それ以外の
conf.set("spark.serializer", classOf[KryoSerializer].getName)
このデータセットには 1000 の個別のレコードがあります
esRDD.count()
1000 を返しても問題ありませんが、
esRDD.distinct().count()
5 を返します。レコードを印刷する場合
esRDD.foreach(println)
1000 レコードが正しく印刷されます。しかし、コレクトまたはテイクを使用すると
esRDD.collect().foreach(println)
esRDD.take(10).foreach(println)
DUPLICATED レコードを出力しますが、実際には 5 つの UNIQUE レコードしか表示されません。これは、データセット全体のランダムなサブセットのようです。最初の 5 つのレコードではありません。RDDを保存して読み返すと
esRDD.saveAsTextFile("spark-output")
val esRDD2 = sc.textFile("spark-output")
esRDD2.distinct().count()
esRDD2.collect().foreach(println)
esRDD2.take(10).foreach(println)
esRDD2 は期待どおりに動作します。コレクト/テイクの動作について、バグなのか、わからないところがあるのでしょうか。それとも、すべてをローカルで実行しているためですか。"spark-output" ファイルの part-xxxx ファイルの数に示されているように、Spark RDD はデフォルトで 5 つのパーティションを使用しているようです。これがおそらく、esRDD.collect() と esRDD.distinct() が他の乱数ではなく、5 つの一意のレコードを返した理由です。しかし、それはまだ正しくありません。