3

データベースから Spark に履歴データをロードし、Spark に新しいストリーミング データを追加し続けたいという Spark のユース ケースがあり、その後、最新のデータセット全体で分析を行うことができます。

私の知る限り、Spark SQL も Spark Streaming も、履歴データをストリーミング データと組み合わせることができません。次に、この問題のために構築されていると思われるSpark 2.0のStructured Streamingを見つけました。しかし、いくつかの実験の後、私はまだそれを理解することができません. ここに私のコードがあります:

SparkSession spark = SparkSession
        .builder()
        .config(conf)
        .getOrCreate();

JavaSparkContext jsc = new JavaSparkContext(spark.sparkContext());

// Load historical data from MongoDB
JavaMongoRDD<Document> mongordd = MongoSpark.load(jsc);


// Create typed dataset with customized schema
JavaRDD<JavaRecordForSingleTick> rdd = mongordd.flatMap(new FlatMapFunction<Document, JavaRecordForSingleTick>() {...});
Dataset<Row> df = spark.sqlContext().createDataFrame(rdd, JavaRecordForSingleTick.class);
Dataset<JavaRecordForSingleTick> df1 = df.as(ExpressionEncoder.javaBean(JavaRecordForSingleTick.class));


// ds listens to a streaming data source
Dataset<Row> ds = spark.readStream()
        .format("socket")
        .option("host", "127.0.0.1")
        .option("port", 11111)
        .load();

// Create the typed dataset with customized schema
Dataset<JavaRecordForSingleTick> ds1 = ds
        .as(Encoders.STRING())
        .flatMap(new FlatMapFunction<String, JavaRecordForSingleTick>() {
    @Override
    public Iterator<JavaRecordForSingleTick> call(String str) throws Exception {
    ...
    }
}, ExpressionEncoder.javaBean(JavaRecordForSingleTick.class));


// ds1 and df1 have the same schema. ds1 gets data from the streaming data source, df1 is the dataset with historical data

ds1 = ds1.union(df1);
StreamingQuery query = ds1.writeStream().format("console").start();
query.awaitTermination();

「org.apache.spark.sql.AnalysisException: Union between streaming and batch DataFrames/Datasets is not supported;」というエラーが表示されました。2 つのデータセットを union() すると。

誰でも私を助けてもらえますか?私は間違った方向に進んでいますか?

4

1 に答える 1

1

このタイプの機能をサポートするという点で、MongoDB Spark コネクタについて話すことはできません。また、Google にもそれについてあまり情報がないようです。ただし、Spark データベース エコシステムには他のデータベースもあります。別の回答で、Spark データベース エコシステムにあるもののほとんどを取り上げました。SnappyDataMemSQLがそのリストに含まれていることは知っていますが、探しているタイプの機能を簡単に許可するデータベースを正確に言うことはできません。ただし、両方のリレーショナル形式のデータが必要になる場合があります。

于 2016-10-03T21:48:45.633 に答える