val ordersDF = spark.read.schema(revenue_schema).format("csv").load("s3://xxxx/fifa/pocs/smallMetrics.csv")
val product_df = spark.read.json("s3://xxxx/fifa/pocs/smallCatalogue.json").toDF("id", "product", "style_id")
val product_json_df = product_df.select($"style_id",to_json($"product").alias("product"))
val product_final_df = product_json_df.select($"style_id", get_json_object(($"product"), "$.brand").alias("brand")
, get_json_object(($"product"), "$.gender").alias("gender")
, get_json_object(($"product"), "$.article_type").alias("article_type")
, get_json_object(($"product"), "$.business_unit").alias("business_unit")
, get_json_object(($"product"), "$.season").alias("season")
, get_json_object(($"product"), "$.season_code").alias("season_code")
, get_json_object(($"product"), "$.brand_code").alias("brand_code")
, get_json_object(($"product"), "$.style_catalogued_date").alias("style_catalogued_date")
, get_json_object(($"product"), "$.base_colour").alias("base_colour")
, get_json_object(($"product"), "$.image").alias("image")
, get_json_object(($"product"), "$.image_array").alias("image_array")
, get_json_object(($"product"), "$.MRP").alias("mrp")
, get_json_object(($"product"), "$.attrs").alias("product_attributes")
)
product_final_df.show(false)
|style_id|brand |gender|article_type|business_unit |season|season_code|brand_code|style_catalogued_date|base_colour|image|image_array |mrp |product_attributes |
+--------+---------------+------+------------+--------------------+------+-----------+----------+---------------------+-----------+-----+------------------------------------+----+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|2024270 |Marks & Spencer|Women |Jeans |International Brands|Fall |FW17 |MKSP |null |Khaki |null |[null,null,null,null,null,null,null]|2299|{"ALL":"STYLES","Add-Ons":"NA","Brand Fit Name":"NA","Closure":"Button and Zip","Distress":"Clean Look","Fabric":"Cotton","Fade":"No Fade","Features":"NA","Fit":"Super Skinny Fit","Occasion":"Casual","Shade":"Dark","Waist Rise":"Mid-Rise","Waistband":"With belt loops"}|
|2023709 |Bossini |Boys |Tshirts |Kids Wear |Fall |FW17 |BILE |null |NA |null |[null,null,null,null,null,null,null]|599 |{"ALL":"STYLES","Fabric":"Polyester","Fabric Type":"Single jersey","Fit":"Regular Fit","Multipack Set":"Single","Neck":"Henley Neck","Pattern":"Solid","Pattern Coverage":"NA","Print or Pattern Type":"Solid","Sleeve Length":"Long Sleeves","Surface Styling":"NA"} |
|2024333 |Marks & Spencer|Women |Tops |International Brands|Fall |FW17 |MKSP |null |null |null |[null,null,null,null,null,null,null]|1999|{"ALL":"STYLES","Fabric":"Polyester","Neck":"Round Neck","Pattern":"Solid","Print or Pattern Type":"Solid","Sleeve Length":"Short Sleeves","Sleeve Styling":"Flared Sleeves","Surface Styling":"NA","Type":"Regular","Weave Type":"Knitted"}
val product_metrics_df = ordersDF.join(product_final_df,"style_id")
product_metrics_df.show(false)

|style_id|date |mrp |revenue|quantity|product_discount|coupon_discount|total_discount|list_count|add_to_cart_count|pdp_count|brand |gender|article_type|business_unit |season|season_code|brand_code|style_catalogued_date|base_colour|image|image_array |product_attributes |
+--------+--------+------+-------+--------+----------------+---------------+--------------+----------+-----------------+---------+---------------+------+------------+--------------------+------+-----------+----------+---------------------+-----------+-----+------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|2024270 |20170101|1000.0|1000.0 |1000 |1000.0 |1000.0 |1000.0 |1000 |2000 |2000 |Marks & Spencer|Women |Jeans |International Brands|Fall |FW17 |MKSP |null |Khaki |null |[null,null,null,null,null,null,null]|{"ALL":"STYLES","Add-Ons":"NA","Brand Fit Name":"NA","Closure":"Button and Zip","Distress":"Clean Look","Fabric":"Cotton","Fade":"No Fade","Features":"NA","Fit":"Super Skinny Fit","Occasion":"Casual","Shade":"Dark","Waist Rise":"Mid-Rise","Waistband":"With belt loops"}|
|2024333 |20170101|1000.0|1000.0 |1000 |1000.0 |1000.0 |1000.0 |1000 |2000 |2000 |Marks & Spencer|Women |Tops |International Brands|Fall |FW17 |MKSP |null |null |null |[null,null,null,null,null,null,null]|{"ALL":"STYLES","Fabric":"Polyester","Neck":"Round Neck","Pattern":"Solid","Print or Pattern Type":"Solid","Sleeve Length":"Short Sleeves","Sleeve Styling":"Flared Sleeves","Surface Styling":"NA","Type":"Regular","Weave Type":"Knitted"} |
|2023709 |20170101|1000.0|1000.0 |1000 |1000.0 |1000.0 |1000.0 |1000 |2000 |2000 |Bossini |Boys |Tshirts |Kids Wear |Fall |FW17 |BILE |null |NA |null |[null,null,null,null,null,null,null]|{"ALL":"STYLES","Fabric":"Polyester","Fabric Type":"Single jersey","Fit":"Regular Fit","Multipack Set":"Single","Neck":"Henley Neck","Pattern":"Solid","Pattern Coverage":"NA","Print or Pattern Type":"Solid","Sleeve Length":"Long Sleeves","Surface Styling":"NA"} |

product_metrics_df.saveToEs(elasticConf)
列が ESにproduct_attributes
書き込まれると、バックスラッシュと二重引用符でエスケープされます。
product_attributes "{\"ALL\":\"STYLES\",\"Add-Ons\":\"NA\",\"Brand Fit Name\":\"NA\",\"Closure\":\"Button and Zip\",\"Distress\":\"Clean Look\",\"Fabric\":\"Cotton\",\"Fade\":\"No Fade\",\"Features\":\"NA\",\"Fit\":\"Super Skinny Fit\",\"Occasion\":\"Casual\",\"Shade\":\"Dark\",\"Waist Rise\":\"Mid-Rise\",\"Waistband\":\"With belt loops\"}"
json がバックスラッシュでエスケープされないようにする方法はありますか? product_attributes の下のどのキーと値のペアが個別にインデックス化されておらず、有効な json ではないため、ES はそれを単一の文字列フィールドとして解釈しています。
product_attributes データがエスケープされているかどうかをクロスチェックするために、データフレームを S3 に書き込みました。
product_metrics_df.write.json("s3://xxxxx/fifa/pocs/output.csv")
ES インデックス テンプレート: https://pastebin.com/e4tmATHE
spark と python を使用すると、データを ES に問題なく書き込むことができるため、ES インデックス テンプレートを使用するとよいでしょう。
そして、json4sライブラリを使用してjsonを構築し、jsonをESに書き込む別の方法を試しましたが、ここでも同じ問題に直面しています
val json =
(
("style_id" -> row.getInt(0)) ~
("date" -> row.getInt(1)) ~
("mrp" -> row.getFloat(2)) ~
("revenue" -> row.getFloat(3)) ~
("quantity" -> row.getInt(4)) ~
("product_discount" -> row.getFloat(5)) ~
("coupon_discount" -> row.getFloat(6)) ~
("total_discount" -> row.getFloat(7)) ~
("list_count" -> row.getInt(8)) ~
("add_to_cart_count" -> row.getInt(9)) ~
("pdp_count" -> row.getInt(10)) ~
("brand" -> row.getString(11)) ~
("gender" -> row.getString(12)) ~
("article_type" -> row.getString(13)) ~
("business_unit" -> row.getString(14)) ~
("season" -> row.getString(15)) ~
("season_code" -> row.getString(16)) ~
("brand_code" -> row.getString(17)) ~
("style_catalogued_date" -> row.getString(18)) ~
("base_colour" -> row.getString(19)) ~
("image" -> row.getString(20)) ~
("image_array" -> row.getString(21)) ~
("product_attributes" -> row.getString(22) )
)
compact(render(json)).toString
}
val product_metrics_df = ordersDF.join(product_final_df,"style_id").map(convertRowToJSON)
json の準備ができたら、es.input.json
プロパティを true に設定して試してみましたが、うまくいきませんでした
メソッドも試してみsaveJsonToEs
ましたが、うまくいきません.jsonはまだエスケープされており、単一のオブジェクトとして扱われています
product_metrics_df.rdd.saveJsonToEs(elasticConf)
ありがとう