Java アプリケーションで SparkSQL を使用して、解析に Databricks を使用して CSV ファイルを処理しています。
私が処理しているデータはさまざまなソース (リモート URL、ローカル ファイル、Google Cloud Storage) からのものであり、データがどこから来たのかを知らなくてもデータを解析および処理できるように、すべてを InputStream に変換する習慣があります。
Sparkで見たすべてのドキュメントは、パスからファイルを読み取ります。
SparkConf conf = new SparkConf().setAppName("spark-sandbox").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
SQLContext sqlc = new SQLContext(sc);
DataFrame df = sqlc.read()
.format("com.databricks.spark.csv")
.option("inferSchema", "true")
.option("header", "true")
.load("path/to/file.csv");
DataFrame dfGrouped = df.groupBy("varA","varB")
.avg("varC","varD");
dfGrouped.show();
そして、私がやりたいのは、InputStream、または既にメモリ内の文字列から読み取ることです。次のようなもの:
InputStream stream = new URL(
"http://www.sample-videos.com/csv/Sample-Spreadsheet-100-rows.csv"
).openStream();
DataFrame dfRemote = sqlc.read()
.format("com.databricks.spark.csv")
.option("inferSchema", "true")
.option("header", "true")
.load(stream);
String someString = "imagine,some,csv,data,here";
DataFrame dfFromString = sqlc.read()
.format("com.databricks.spark.csv")
.option("inferSchema", "true")
.option("header", "true")
.read(someString);
ここに欠けている簡単なものはありますか?
Spark Streaming とカスタム レシーバーに関するドキュメントを少し読みましたが、私が知る限り、これは継続的にデータを提供する接続を開くためのものです。Spark Streaming は、データをチャンクに分割し、それに対して何らかの処理を行っているように見えます。より多くのデータが終わりのないストリームに入ることを期待しています。
私の推測では、Hadoop の子孫である Spark は、おそらくどこかのファイルシステムに存在する大量のデータを想定しています。しかし、Spark はメモリ内で処理を行うため、SparkSQL が既にメモリ内にあるデータを解析できることは理にかなっています。
どんな助けでも大歓迎です。