5

Dataset<Row>json 文字列の単一の列を持つ:

+--------------------+
|               value|
+--------------------+
|{"Context":"00AA0...|
+--------------------+

Json サンプル:

{"Context":"00AA00AA","MessageType":"1010","Module":"1200"}

次のように最も効率的に取得するにはどうすればよいですかDataset<Row>

+--------+-----------+------+
| Context|MessageType|Module|
+--------+-----------+------+
|00AA00AA|       1010|  1200|
+--------+-----------+------+

これらのデータをストリームで処理しています。ファイルから読み取っているときに、spark が自分でこれを実行できることを知っています。

spark
.readStream()
.schema(MyPojo.getSchema())
.json("src/myinput")

しかし今、私はカフカからデータを読んでおり、別の形式でデータを提供しています。私は Gson のようないくつかのパーサーを使用できることを知っていますが、spark にそれをさせたいと思っています。

4

1 に答える 1

1

このサンプルを試してください。

public class SparkJSONValueDataset {
    public static void main(String[] args) {
        SparkSession spark = SparkSession
                .builder()
                .appName("SparkJSONValueDataset")
                .config("spark.sql.warehouse.dir", "/file:C:/temp")
                .master("local")
                .getOrCreate();

        //Prepare data Dataset<Row>
        List<String> data = Arrays.asList("{\"Context\":\"00AA00AA\",\"MessageType\":\"1010\",\"Module\":\"1200\"}");
        Dataset<Row> df = spark.createDataset(data, Encoders.STRING()).toDF().withColumnRenamed("_1", "value");
        df.show();

        //convert to Dataset<String> and Read
        Dataset<String> df1 = df.as(Encoders.STRING());
        Dataset<Row> df2 = spark.read().json(df1.javaRDD());
        df2.show();
        spark.stop();
    }
 }
于 2016-11-22T10:44:34.303 に答える