4

外部ソースからデータを取得する JavaDStream があります。Spark Streaming と SparkSQL を統合しようとしています。JavaDStream は JavaRDD の . また、JavaRDD がある場合にのみ applySchema() 関数を適用できます。それをJavaRDDに変換するのを手伝ってください。私はscalaに関数があることを知っています、そしてそれははるかに簡単です。しかし、Javaで私を助けてください。

4

3 に答える 3

1

最初に forEachRDD を次のように使用して、DStream 内のすべての RDD にアクセスする必要があります。

javaDStream.foreachRDD( rdd => {
    rdd.collect.foreach({
        ...
    })
})
于 2015-09-29T01:49:08.120 に答える
0

これが JavaDstream を JavaRDD に変換するのに役立つことを願っています!

    JavaDStream<String> lines = stream.map(ConsumerRecord::value);

    //Create JavaRDD<Row>
    lines.foreachRDD(new VoidFunction<JavaRDD<String>>() {
        @Override
        public void call(JavaRDD<String> rdd) {
            JavaRDD<Row> rowRDD = rdd.map(new Function<String, Row>() {
                @Override
                public Row call(String msg) {
                    Row row = RowFactory.create(msg);
                    return row;
                }
            });
            //Create Schema
            StructType schema = DataTypes.createStructType(new StructField[] {
                    DataTypes.createStructField("value", DataTypes.StringType, true)});
            //Get Spark 2.0 session
            SparkSession spark = JavaSparkSessionSingleton.getInstance(rdd.context().getConf());
            Dataset msgDataFrame = spark.createDataFrame(rowRDD, schema);
            msgDataFrame.show();
于 2019-08-01T15:55:22.090 に答える