14

Java アプリケーションで SparkSQL を使用して、解析に Databricks を使用して CSV ファイルを処理しています。

私が処理しているデータはさまざまなソース (リモート URL、ローカル ファイル、Google Cloud Storage) からのものであり、データがどこから来たのかを知らなくてもデータを解析および処理できるように、すべてを InputStream に変換する習慣があります。

Sparkで見たすべてのドキュメントは、パスからファイルを読み取ります。

SparkConf conf = new SparkConf().setAppName("spark-sandbox").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
SQLContext sqlc = new SQLContext(sc);

DataFrame df = sqlc.read()
    .format("com.databricks.spark.csv")
    .option("inferSchema", "true")
    .option("header", "true")
    .load("path/to/file.csv");

DataFrame dfGrouped = df.groupBy("varA","varB")
    .avg("varC","varD");

dfGrouped.show();

そして、私がやりたいのは、InputStream、または既にメモリ内の文字列から読み取ることです。次のようなもの:

InputStream stream = new URL(
    "http://www.sample-videos.com/csv/Sample-Spreadsheet-100-rows.csv"
    ).openStream();

DataFrame dfRemote = sqlc.read()
    .format("com.databricks.spark.csv")
    .option("inferSchema", "true")
    .option("header", "true")
    .load(stream);

String someString = "imagine,some,csv,data,here";

DataFrame dfFromString = sqlc.read()
    .format("com.databricks.spark.csv")
    .option("inferSchema", "true")
    .option("header", "true")
    .read(someString);

ここに欠けている簡単なものはありますか?

Spark Streaming とカスタム レシーバーに関するドキュメントを少し読みましたが、私が知る限り、これは継続的にデータを提供する接続を開くためのものです。Spark Streaming は、データをチャンクに分割し、それに対して何らかの処理を行っているように見えます。より多くのデータが終わりのないストリームに入ることを期待しています。

私の推測では、Hadoop の子孫である Spark は、おそらくどこかのファイルシステムに存在する大量のデータを想定しています。しかし、Spark はメモリ内で処理を行うため、SparkSQL が既にメモリ内にあるデータを解析できることは理にかなっています。

どんな助けでも大歓迎です。

4

1 に答える 1

4

生活を楽にするために、少なくとも 4 つの異なるアプローチを使用できます。

  1. 入力ストリームを使用し、ローカル ファイルに書き込み (SSD で高速)、Spark で読み取ります。

  2. S3、Google Cloud Storage用の Hadoop ファイル システム コネクタを使用して、すべてをファイル操作に変換します。(このための HDFS コネクタがないため、任意の URL からの読み取りに関する問題は解決しません。)

  3. さまざまな入力タイプをさまざまな URI として表し、URI を検査して適切な読み取り操作をトリガーするユーティリティ関数を作成します。

  4. (3) と同じですが、URI の代わりにケース クラスを使用し、入力タイプに基づいて単純にオーバーロードします。

于 2016-07-25T20:08:43.747 に答える