hdfs から毎日テキスト ファイルを読み取り、テキスト ファイルの各行から一意のキーを抽出するスパーク ジョブを作成しました。各テキスト ファイルには、約 50000 個のキーがあります。次に、同じデータが抽出されたキーによってフィルタリングされ、hdfs に保存されます。
hdfs に、フィルター処理されたデータを含む hdfs://.../date/key という構造のディレクトリを作成したいと考えています。問題は、非常に多くのキーがあるため、hdfs への書き込みに非常に長い時間がかかることです。
それが今書かれている方法:
val inputData = sparkContext.textFile(""hdfs://...", 2)
val keys = extractKey(inputData) //keys is an array of approx 50000 unique strings
val cleanedData = cleanData(inputData) //cleaned data is an RDD of strings
keys.map(key => {
val filteredData = cleanedData.filter(line => line.contains(key))
filteredData.repartition(1).saveAsTextFile("hdfs://.../date/key")
})
これをより速くする方法はありますか?抽出したキーの数にデータを再分割することを考えましたが、hdfs://.../date/key の形式で保存できません。groupByKey も試しましたが、RDD ではないため値を保存できません。
どんな助けでも大歓迎です:)