MySQL からデータを読み取り、Spark を使用して Elasticsearch に保存する簡単なジョブを設計しました。
コードは次のとおりです。
JavaSparkContext sc = new JavaSparkContext(
new SparkConf().setAppName("MySQLtoEs")
.set("es.index.auto.create", "true")
.set("es.nodes", "127.0.0.1:9200")
.set("es.mapping.id", "id")
.set("spark.serializer", KryoSerializer.class.getName()));
SQLContext sqlContext = new SQLContext(sc);
// Data source options
Map<String, String> options = new HashMap<>();
options.put("driver", MYSQL_DRIVER);
options.put("url", MYSQL_CONNECTION_URL);
options.put("dbtable", "OFFERS");
options.put("partitionColumn", "id");
options.put("lowerBound", "10001");
options.put("upperBound", "499999");
options.put("numPartitions", "10");
// Load MySQL query result as DataFrame
LOGGER.info("Loading DataFrame");
DataFrame jdbcDF = sqlContext.load("jdbc", options);
DataFrame df = jdbcDF.select("id", "title", "description",
"merchantId", "price", "keywords", "brandId", "categoryId");
df.show();
LOGGER.info("df.count : " + df.count());
EsSparkSQL.saveToEs(df, "offers/product");
コードが非常に単純であることがわかります。データを DataFrame に読み取り、いくつかの列を選択してから、Dataframe でcount
基本的なアクションとして実行します。この時点まではすべて正常に動作します。
次に、データを Elasticsearch に保存しようとしますが、一部のタイプを処理できないために失敗します。ここでエラー ログを確認できます。
なぜそのタイプを処理できないのかわかりません。なぜこれが起こっているのか誰にも分かりますか?
Apache Spark 1.5.0、Elasticsearch 1.4.4、elaticsearch-hadoop 2.1.1 を使用しています
編集:
- ソースコードとともにサンプルデータセットで要点リンクを更新しました。
- また、メーリング リストで @costin が言及しているように、elasticsearch -hadoop dev ビルドを使用しようとしました。