63

簡単にするために、次のようなCassandraテーブルがあります。

key: text
jsonData: text
blobData: blob

spark と spark-cassandra-connector を使用して、このための基本的なデータ フレームを作成できます。

val df = sqlContext.read
  .format("org.apache.spark.sql.cassandra")
  .options(Map("table" -> "mytable", "keyspace" -> "ks1"))
  .load()

JSONデータをその基礎となる構造に拡張するのに苦労しています。最終的には、json 文字列内の属性に基づいてフィルター処理し、ブロブ データを返すことができるようにしたいと考えています。jsonData.foo = "bar" のようなもので、blobData を返します。これは現在可能ですか?

4

5 に答える 5

100

火花 >= 2.4

必要に応じて、schema_of_json関数を使用してスキーマを決定できます (これは、任意の行がスキーマの有効な代表であると想定していることに注意してください)。

import org.apache.spark.sql.functions.{lit, schema_of_json, from_json}
import collection.JavaConverters._

val schema = schema_of_json(lit(df.select($"jsonData").as[String].first))
df.withColumn("jsonData", from_json($"jsonData", schema, Map[String, String]().asJava))

スパーク >= 2.1

関数を使用できfrom_jsonます:

import org.apache.spark.sql.functions.from_json
import org.apache.spark.sql.types._

val schema = StructType(Seq(
  StructField("k", StringType, true), StructField("v", DoubleType, true)
))

df.withColumn("jsonData", from_json($"jsonData", schema))

火花 >= 1.6

get_json_object列とパスを取る which を使用できます。

import org.apache.spark.sql.functions.get_json_object

val exprs = Seq("k", "v").map(
  c => get_json_object($"jsonData", s"$$.$c").alias(c))

df.select($"*" +: exprs: _*)

フィールドを個々の文字列に抽出し、さらに予想される型にキャストできます。

path引数はドット構文を使用して表現され、先頭は$.ドキュメント ルートを示します (上記のコードは文字列補間$を使用するため、エスケープする必要があるため、$$.)。

スパーク <= 1.5 :

これは現在可能ですか?

私の知る限り、それは直接可能ではありません。これに似たものを試すことができます:

val df = sc.parallelize(Seq(
  ("1", """{"k": "foo", "v": 1.0}""", "some_other_field_1"),
  ("2", """{"k": "bar", "v": 3.0}""", "some_other_field_2")
)).toDF("key", "jsonData", "blobData")

blobフィールドはJSONで表現できないと思います。それ以外の場合は、分割と結合を省略できます。

import org.apache.spark.sql.Row

val blobs = df.drop("jsonData").withColumnRenamed("key", "bkey")
val jsons = sqlContext.read.json(df.drop("blobData").map{
  case Row(key: String, json: String) =>
    s"""{"key": "$key", "jsonData": $json}"""
}) 

val parsed = jsons.join(blobs, $"key" === $"bkey").drop("bkey")
parsed.printSchema

// root
//  |-- jsonData: struct (nullable = true)
//  |    |-- k: string (nullable = true)
//  |    |-- v: double (nullable = true)
//  |-- key: long (nullable = true)
//  |-- blobData: string (nullable = true)

別の (より安価ですが、より複雑な) アプローチは、UDF を使用して JSON を解析し、structまたはmap列を出力することです。たとえば、次のようなものです。

import net.liftweb.json.parse

case class KV(k: String, v: Int)

val parseJson = udf((s: String) => {
  implicit val formats = net.liftweb.json.DefaultFormats
  parse(s).extract[KV]
})

val parsed = df.withColumn("parsedJSON", parseJson($"jsonData"))
parsed.show

// +---+--------------------+------------------+----------+
// |key|            jsonData|          blobData|parsedJSON|
// +---+--------------------+------------------+----------+
// |  1|{"k": "foo", "v":...|some_other_field_1|   [foo,1]|
// |  2|{"k": "bar", "v":...|some_other_field_2|   [bar,3]|
// +---+--------------------+------------------+----------+

parsed.printSchema

// root
//  |-- key: string (nullable = true)
//  |-- jsonData: string (nullable = true)
//  |-- blobData: string (nullable = true)
//  |-- parsedJSON: struct (nullable = true)
//  |    |-- k: string (nullable = true)
//  |    |-- v: integer (nullable = false)
于 2015-12-03T15:36:02.013 に答える