10

ElasticSearch にロードされたいくつかのテスト データを使用して、ElasticSearch と Spark の統合をローカル マシンでテストしていました。

val sparkConf = new SparkConf().setAppName("Test").setMaster("local")
val sc = new SparkContext(sparkConf)
val conf = new JobConf()
conf.set("spark.serializer", classOf[KryoSerializer].getName)
conf.set("es.nodes", "localhost:9200")
conf.set("es.resource", "bank/account")
conf.set("es.query", "?q=firstname:Daniel")

val esRDD = sc.hadoopRDD(conf,classOf[EsInputFormat[Text, MapWritable]],
      classOf[Text], classOf[MapWritable])
esRDD.first()
esRDD.collect()

コードは正常に実行され、正しい結果が esRDD.first() で正常に返されます

ただし、esRDD.collect() は例外を生成します。

java.io.NotSerializableException: org.apache.hadoop.io.Text
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:71)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:193)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

これはhttp://www.elasticsearch.org/guide/en/elasticsearch/hadoop/current/spark.htmlで言及されている問題に関連していると思われる ので、それに応じてこの行を追加しました

conf.set("spark.serializer", classOf[KryoSerializer].getName)

それを機能させるために何か他のことをすることになっていますか?ありがとうございました


更新: シリアル化のセットアップの問題が解決されました。を使用して

sparkConf.set("spark.serializer", classOf[KryoSerializer].getName)

それ以外の

conf.set("spark.serializer", classOf[KryoSerializer].getName)

このデータセットには 1000 の個別のレコードがあります

esRDD.count()

1000 を返しても問題ありませんが、

esRDD.distinct().count()

5 を返します。レコードを印刷する場合

esRDD.foreach(println)

1000 レコードが正しく印刷されます。しかし、コレクトまたはテイクを使用すると

esRDD.collect().foreach(println)
esRDD.take(10).foreach(println)

DUPLICATED レコードを出力しますが、実際には 5 つの UNIQUE レコードしか表示されません。これは、データセット全体のランダムなサブセットのようです。最初の 5 つのレコードではありません。RDDを保存して読み返すと

esRDD.saveAsTextFile("spark-output")
val esRDD2 = sc.textFile("spark-output")
esRDD2.distinct().count()
esRDD2.collect().foreach(println)
esRDD2.take(10).foreach(println)

esRDD2 は期待どおりに動作します。コレクト/テイクの動作について、バグなのか、わからないところがあるのでしょうか。それとも、すべてをローカルで実行しているためですか。"spark-output" ファイルの part-xxxx ファイルの数に示されているように、Spark RDD はデフォルトで 5 つのパーティションを使用しているようです。これがおそらく、esRDD.collect() と esRDD.distinct() が他の乱数ではなく、5 つの一意のレコードを返した理由です。しかし、それはまだ正しくありません。

4

2 に答える 2

1

初期化するには、次のコードを使用する必要があります。

val sparkConf = new SparkConf().setAppName("Test").setMaster("local").set("spark.serializer", classOf[KryoSerializer].getName)
val sc = new SparkContext(sparkConf)
val conf = new JobConf()
conf.set("es.nodes", "localhost:9200")
conf.set("es.resource", "bank/account")
conf.set("es.query", "?q=firstname:Daniel")
于 2014-08-12T13:11:26.870 に答える