2

hdfs から毎日テキスト ファイルを読み取り、テキスト ファイルの各行から一意のキーを抽出するスパーク ジョブを作成しました。各テキスト ファイルには、約 50000 個のキーがあります。次に、同じデータが抽出されたキーによってフィルタリングされ、hdfs に保存されます。

hdfs に、フィルター処理されたデータを含む hdfs://.../date/key という構造のディレクトリを作成したいと考えています。問題は、非常に多くのキーがあるため、hdfs への書き込みに非常に長い時間がかかることです。

それが今書かれている方法:

val inputData = sparkContext.textFile(""hdfs://...", 2)
val keys = extractKey(inputData) //keys is an array of approx 50000 unique strings
val cleanedData = cleanData(inputData) //cleaned data is an RDD of strings
keys.map(key => {
    val filteredData = cleanedData.filter(line => line.contains(key))
    filteredData.repartition(1).saveAsTextFile("hdfs://.../date/key")
})

これをより速くする方法はありますか?抽出したキーの数にデータを再分割することを考えましたが、hdfs://.../date/key の形式で保存できません。groupByKey も試しましたが、RDD ではないため値を保存できません。

どんな助けでも大歓迎です:)

4

3 に答える 3

0

入力用に 2 つのパーティションのみを指定し、出力用に 1 つのパーティションを指定しています。この影響の 1 つは、これらの操作の並列処理が大幅に制限されることです。なぜこれらが必要なのですか?

フィルター処理された 50,000 の RDD を計算する代わりに、キーで直接グループ化するのはどうですか? それらを別のディレクトリに出力したいのですが、それが実際にここでボトルネックを引き起こしています。(キー、値) 結果を単純に読み取ることができる、これを設計する別の方法はおそらくありますか?

于 2014-10-11T19:49:22.577 に答える
0

アプローチはWrite to multiple outputs by key Spark - one Spark jobに似ているはずだと思います。パーティション番号はディレクトリ番号とは関係ありません。これを実装するには、別のディレクトリに保存するために、カスタマイズしたバージョンで generateFileNameForKeyValue をオーバーライドする必要がある場合があります。

スケーラビリティに関しては、spark の問題ではなく、hdfs の問題です。しかし、どのように実装しても、要件が変更されない限り、それは避けられません。しかし、Hdfs はおそらく 50,000 個のファイル ハンドラーで問題ないと思います

于 2014-10-11T19:40:47.190 に答える