最終的に SparkSQL を使用してクエリを実行できるように、いくつかの圧縮された csv ファイルを DataFrame に取り込もうとしています。私は通常 sc.textFile() を使用してファイルを消費し、さまざまな map() 変換を使用してデータを解析および変換しますが、問題のファイルには解析が難しい値がいくつかあります。特に、内部にコンマを含む引用符でカプセル化された値があり、map() 変換内で split() 関数を使用するオプションが壊れています。
これが私がやっていることです:
spark-csv および commons-csv jar を使用して spark を起動します
PYSPARK_PYTHON=python2.7 sudo pyspark --jars "spark-csv_2.10-1.0.0.jar,commons-csv-1.1.jar"
私のcsvにはヘッダーがないため、スキーマ変数を作成してから、以下の呼び出しを行います
sqlc = SQLContext(sc)
apps_df = sqlc.read.format("com.databricks.spark.csv").options(header="false",codec="org.apache.hadoop.io.compress.GzipCodec").load("s3://path_to_file.csv.gz", schema = customSchema)
これは、apps_df.printSchema() を使用すると正しいスキーマを持つ DataFrame オブジェクトを返しますが、apps_df.count() は 0 を返し、apps_df.first() は何も返しません。
編集:
これが私の、うまくいけば、再現可能な例です
full_filepathをディレクトリ内の .csv ファイルに置き換えます
full_gzip_filepathをディレクトリ内の csv ファイルの .gz バージョンに置き換えます
from pyspark.sql import SQLContext
from pyspark.sql.types import *
sqlc = SQLContext(sc)
import pandas as pd
import numpy as np
from subprocess import check_call
columns = ['A','B', 'C']
data = np.array([np.arange(10)]*3).T
df = pd.DataFrame(data, columns=columns)
df.to_csv('full_filepath')
check_call(['gzip', 'full_filepath'])
test_scsv_df = sqlc.read.format("com.databricks.spark.csv").options(header="true",inferSchema="true",codec="org.apache.hadoop.io.compress.GzipCodec").load("full_gzip_filepath")
test_scsv_df.show()
これは以下を返します:
+---+---+---+---+
| | A| B| C|
+---+---+---+---+
+---+---+---+---+
次のいくつかのコマンドも実行すると、ファイルが pandas を介して適切に消費されることがわかります。
test_pd = pd.read_csv('full_gzip_filepath', sep=',', compression='gzip', quotechar='"', header=0)
test_pd_df = sqlc.createDataFrame(test_pd)
test_pd_df.show()
これは以下を返します:
+----------+---+---+---+
|Unnamed: 0| A| B| C|
+----------+---+---+---+
| 0| 0| 0| 0|
| 1| 1| 1| 1|
| 2| 2| 2| 2|
| 3| 3| 3| 3|
| 4| 4| 4| 4|
| 5| 5| 5| 5|
| 6| 6| 6| 6|
| 7| 7| 7| 7|
| 8| 8| 8| 8|
| 9| 9| 9| 9|
+----------+---+---+---+