3

Spark RDD から MapFile を作成しようとしていますが、十分な情報が見つかりません。これまでの私の手順は次のとおりです。

私は、

rdd.saveAsNewAPIHadoopFile(....MapFileOutputFormat.class)

MapFilesソートする必要があるため、例外がスローされました。だから私は次のように変更しました:

rdd.sortByKey().saveAsNewAPIHadoopFile(....MapFileOutputFormat.class)

これは正常に機能し、MapFile が作成されました。したがって、次のステップはファイルにアクセスすることでした。作成されたディレクトリ名を使用すると、ファイルpartsが見つからないと言って失敗しました。Google に戻ると、使用する必要のあるパーツにdataアクセスするために次のことがわかりました。MapFile

Object ret = new Object();//My actual WritableComparable impl
Reader[] readers = MapFileOutputFormat.getReaders(new Path(file), new Configuration());
Partitioner<K,V> p = new HashPartitioner<>();
Writable e = MapFileOutputFormat.getEntry(readers, p key, ret);

素朴に、私はそのビットを無視し、HashPartionerこれでエントリが見つかると期待しましたが、うまくいきませんでした。したがって、私の次のステップは、リーダーをループしてget(..). このソリューションは機能しましたが、ファイルが 128 個のタスクによって作成され、結果として 128 個のファイルが作成されたため、非常に低速でしたpart

の重要性を調べたHashPartitionerところ、内部的に使用するリーダーを識別するために使用されていることがわかりましたが、Spark は同じパーティショニング ロジックを使用していないようです。だから私は次のように変更しました:

rdd.partitionBy(new org.apache.spark.HashPartitioner(128)).sortByKey().saveAsNewAPIHadoopFile(....MapFileOutputFormat.class)

しかし、再び2HashPartionerは一致しませんでした。では、質問の部分...

  • 効率的に結合する方法はありMapFilesますか (これはパーティショニング ロジックを無視するため)。
  • MapFileOutputFormat.getReaders(new Path(file), new Configuration());非常に遅いです。読者をより効率的に識別できますか?
  • 基盤となる DFS として MapR-FS を使用しています。これは同じHashParitioner実装を使用しますか?
  • 再分割を回避する方法はありますか、それともデータをファイル全体でソートする必要がありますか? (パーティション内でソートされるのとは対照的)
  • 例外も発生しています_SUCCESS/data does not exist。このファイルを手動で削除する必要がありますか?

これに関するリンクは大歓迎です。

PS。HashPartitionerエントリがソートされている場合、を使用して正しい を見つけるにはどうすればよい Readerですか? partsこれは、データがHash Partitionedキーでソートされていることを意味します。だから私も試してみrdd.repartiotionAndSortWithinPartitions(new HashPartitioner(280))ましたが、やはり運がありませんでした。

4

1 に答える 1

3

この問題を掘り下げてみると、Spark HashPartitioner と Hadoop HashPartitioner のロジックが異なることがわかりました。

したがって、私が試して機能する「ブルートフォース」ソリューションは次のとおりです。

を使用してMapFileを保存しますrdd.repartitionAndSortWithinPArtitions(new org.apache.aprk.HashPartitioner(num_of_parititions)).saveAsNewAPIHadoopFile(....MapFileOutputFormat.class);

以下を使用してルックアップ:

  • Reader[] リーダー = MapFileOutputFormat.getReaders(新しいパス(ファイル),新しい構成());
  • org.apache.aprk.HashPartitioner p = 新しい org.apache.aprk.HashPartitioner(readers.length);
  • リーダー[p.getPartition(key)].get(key,val);

これは、MapFile アクセスが直感的な Hadoop HashPartitioner ではなく Spark パーティショナーにバインドされているため、「ダーティ」です。ただし、 HadoopHashPartitionerを使用して改善する Spark パーティショナーを実装することはできます。

これは、比較的多数のレデューサーへのアクセスが遅いという問題にも対処しません。パーティショナーからファイルのパーツ番号を生成することで、これをさらに「汚い」ものにすることができますが、クリーンな解決策を探しているので、この問題に対するより良いアプローチがあれば投稿してください。

于 2015-04-18T10:41:27.303 に答える