5

MySQL からデータを読み取り、Spark を使用して Elasticsearch に保存する簡単なジョブを設計しました。

コードは次のとおりです。

JavaSparkContext sc = new JavaSparkContext(
        new SparkConf().setAppName("MySQLtoEs")
                .set("es.index.auto.create", "true")
                .set("es.nodes", "127.0.0.1:9200")
                .set("es.mapping.id", "id")
                .set("spark.serializer", KryoSerializer.class.getName()));

SQLContext sqlContext = new SQLContext(sc);

// Data source options
Map<String, String> options = new HashMap<>();
options.put("driver", MYSQL_DRIVER);
options.put("url", MYSQL_CONNECTION_URL);
options.put("dbtable", "OFFERS");
options.put("partitionColumn", "id");
options.put("lowerBound", "10001");
options.put("upperBound", "499999");
options.put("numPartitions", "10");

// Load MySQL query result as DataFrame
LOGGER.info("Loading DataFrame");
DataFrame jdbcDF = sqlContext.load("jdbc", options);
DataFrame df = jdbcDF.select("id", "title", "description",
        "merchantId", "price", "keywords", "brandId", "categoryId");
df.show();
LOGGER.info("df.count : " + df.count());
EsSparkSQL.saveToEs(df, "offers/product");

コードが非常に単純であることがわかります。データを DataFrame に読み取り、いくつかの列を選択してから、Dataframe でcount基本的なアクションとして実行します。この時点まではすべて正常に動作します。

次に、データを Elasticsearch に保存しようとしますが、一部のタイプを処理できないために失敗します。ここでエラー ログを確認できます。

なぜそのタイプを処理できないのかわかりません。なぜこれが起こっているのか誰にも分かりますか?

Apache Spark 1.5.0、Elasticsearch 1.4.4、elaticsearch-hadoop 2.1.1 を使用しています

編集:

  • ソースコードとともにサンプルデータセットで要点リンクを更新しました。
  • また、メーリング リストで @costin が言及しているように、elasticsearch -hadoop dev ビルドを使用しようとしました。
4

1 に答える 1

15

これに対する答えは難しいものでしたが、 samklrのおかげで、問題の原因を突き止めることができました。

それにもかかわらず、解決策は簡単ではなく、いくつかの「不要な」変換を考慮する可能性があります。

まず、シリアライゼーションについて話しましょう。

データの Spark シリアライゼーションと関数のシリアライゼーションで考慮すべきシリアライゼーションの 2 つの側面があります。この場合、それはデータのシリアル化と逆シリアル化に関するものです。

Spark の観点からは、シリアライゼーションの設定だけが必要です。Spark はデフォルトで Java シリアライゼーションに依存していますが、これは便利ですがかなり非効率的です。これが、Hadoop 自体が独自のシリアライゼーション メカニズムと独自の型、つまり を導入した理由Writablesです。そのため、InputFormatそのままでは Spark が理解できないものOutputFormatsを返す必要があります。Writables

Elasticsearch-spark コネクタでは、変換を自動的に処理し、これも非常に効率的に行う別のシリアル化 (Kryo) を有効にする必要があります。

conf.set("spark.serializer","org.apache.spark.serializer.KryoSerializer")

Kryo では、シリアル化する特定のインターフェイスをクラスに実装する必要がないため、Kryo のシリアル化を有効にする以外の作業を行わなくても、RDD で POJO を使用できます。

そうは言っても、@samklr は、Kryo はクラスを使用する前にクラスを登録する必要があることを指摘してくれました。

これは、Kryo がシリアル化されるオブジェクトのクラスへの参照を書き込むためです (書き込まれるオブジェクトごとに 1 つの参照が書き込まれます)。これは、クラスが登録されている場合は単なる整数識別子ですが、それ以外の場合は完全なクラス名です。Spark は、ユーザーに代わって Scala クラスと他の多くのフレームワーク クラス (Avro Generic または Thrift クラスなど) を登録します。

クラスを Kryo に登録するのは簡単です。KryoRegistrator のサブクラスを作成し、registerClasses()メソッドをオーバーライドします。

public class MyKryoRegistrator implements KryoRegistrator, Serializable {
    @Override
    public void registerClasses(Kryo kryo) {
        // Product POJO associated to a product Row from the DataFrame            
        kryo.register(Product.class); 
    }
}

最後に、ドライバー プログラムで、spark.kryo.registrator プロパティを KryoRegistrator 実装の完全修飾クラス名に設定します。

conf.set("spark.kryo.registrator", "MyKryoRegistrator")

第 2 に、Kryo シリアライザーが設定され、クラスが登録され、Spark 1.5 に変更が加えられているにもかかわらず、何らかの理由で Elasticsearch はデータフレームをコネクタに推論できないため、データフレームを逆シリアルSchemaType化できませんでした。

そのため、Dataframe を JavaRDD に変換する必要がありました

JavaRDD<Product> products = df.javaRDD().map(new Function<Row, Product>() {
    public Product call(Row row) throws Exception {
        long id = row.getLong(0);
        String title = row.getString(1);
        String description = row.getString(2);
        int merchantId = row.getInt(3);
        double price = row.getDecimal(4).doubleValue();
        String keywords = row.getString(5);
        long brandId = row.getLong(6);
        int categoryId = row.getInt(7);
        return new Product(id, title, description, merchantId, price, keywords, brandId, categoryId);
    }
});

これで、データを elasticsearch に書き込む準備ができました。

JavaEsSpark.saveToEs(products, "test/test");

参考文献:

  • Elasticsearch の Apache Spark サポートドキュメント
  • Hadoop Definitive Guide、第 19 章。Spark 編。4 - トム・ホワイト。
  • ユーザーsamklr
于 2015-10-09T15:42:51.420 に答える