0

ログ ファイルが作成された日付に基づいて、異なるディレクトリにログ ファイルが保存されます。

例えば

> /mypath/2017/01/20/... 
.
.
.
> /mypath/2017/02/13/...
> /mypath/2017/02/14/...

このマスター ファイルで集計を実行できるように、pyspark を使用してこれらすべてのログ ファイルを 1 つの rdd に結合したいと考えています。

今日まで、私は sqlContext と呼ばれる個々のディレクトリを取得し、Union を使用して特定の日付のすべてのログ ファイルを結合しました。

DF1 = (sqlContext.read.schema(schema).json("/mypath/2017/02/13")).union(sqlContext.read.schema(schema).json("/mypath/2017/02/14"))

日付の範囲からログ ファイルを指定してマスター rdd を取得する簡単な方法はありますか? (つまり、2017/01/20 から 2017/02/14 まで)

私はスパークにまったく慣れていないので、どこかで間違っていたら訂正してください。

4

1 に答える 1

1

sqlContext に固執する場合、簡単な解決策は、入力ディレクトリ内のすべてのファイルをリストするメソッドを定義することです

case class FileWithDate(basePath: String, year: Int, month: Int, day: Int) {
 def path = s"${basePath}/${year}/${month}/${day}"
}

def listFileSources() : List[FileWithDate] = ??? // implement here

ソースからすべてのデータフレームを結合したい場合は、次のように実行できます。

// create an empty dataframe with the strucutre for the json
val files = listSources()
val allDFs = files.foldLeft(emptyDF){case (df, f) => df.union(sqlContext.read.schema(schema).json(f.path))}

入力ファイルを日付でフィルタリングしたい場合は簡単です。このようなもの

files.filter(_.year == 2016 && (_.month >=2 || _.month <=3))

もう1つの解決策は、年、月、日でデータフレームを拡張し(追加の列を配置)、新しいデータフレームですべてのビジネスロジックを実行することです

于 2017-02-14T13:00:00.660 に答える