19

1_fileName.txttoの命名規則を使用して HDFS で利用できる 1000 以上のファイルがありますN_fileName.txt。各ファイルのサイズは 1024 MB です。ファイルの順序を維持しながら、これらのファイルを 1 つ (HDFS) にマージする必要があります。5_FileName.txt後にのみ追加する必要があると言う4_fileName.txt

この操作を実行する最善かつ最速の方法は何ですか。

データノード間で実際のデータをコピーせずにこのマージを実行する方法はありますか? 例: このファイルのブロックの場所を取得し、これらのブロックの場所で Namenode に新しいエントリ (FileName) を作成しますか?

4

5 に答える 5

18

これを効率的に行う方法はありません。すべてのデータを 1 つのノードに移動してから、HDFS に戻す必要があります。

これを行うコマンド ライン スクリプトレットは次のようになります。

hadoop fs -text *_fileName.txt | hadoop fs -put - targetFilename.txt

これにより、glob に一致するすべてのファイルが標準出力に cat されます。次に、そのストリームを put コマンドにパイプし、ストリームを targetFilename.txt という名前の HDFS ファイルに出力します。

あなたが持っている唯一の問題は、あなたが行ったファイル名の構造です - 幅が固定されている場合、数字の部分をゼロで埋めた方が簡単ですが、現在の状態では予期しない辞書式の順序 (1, 10, 100, 1000) になります、11、110 など) ではなく、番号順 (1、2、3、4 など)。スクリプトレットを次のように修正することで、これを回避できます。

hadoop fs -text [0-9]_fileName.txt [0-9][0-9]_fileName.txt \
    [0-9][0-9[0-9]_fileName.txt | hadoop fs -put - targetFilename.txt
于 2013-02-13T00:34:40.440 に答える
5

スパークを使用できる場合。それは次のように行うことができます

sc.textFile("hdfs://...../part*).coalesce(1).saveAsTextFile("hdfs://...../filename)

Spark は分散方式で動作するため、ファイルを 1 つのノードにコピーする必要はありません。単なる注意ですが、ファイルが非常に大きい場合、spark でのファイルの結合は遅くなる可能性があります。

于 2015-08-08T01:10:16.573 に答える
1

ファイルの順序は重要であり、辞書順では目的を達成できないため、このタスク用のマッパー プログラムを作成することをお勧めします。これはおそらく定期的に実行できます。もちろん、リデューサーはありません。これを HDFS マップ タスクとして記述すると、データ ノード間であまりデータを移動せずにこれらのファイルを 1 つの出力ファイルにマージできるため、効率的です。ソース ファイルは HDFS にあり、マッパー タスクはデータ アフィニティを試行するため、異なるデータ ノード間でファイルを移動することなくファイルをマージできます。

マッパー プログラムには、カスタム InputSplit (入力ディレクトリ内のファイル名を取得し、必要に応じて並べ替える) とカスタム InputFormat が必要です。

マッパーは、hdfs 追加、または byte[] に書き込むことができる未加工の出力ストリームのいずれかを使用できます。

私が考えている Mapper プログラムの大まかなスケッチは次のようなものです。

public class MergeOrderedFileMapper extends MapReduceBase implements Mapper<ArrayWritable, Text, ??, ??> 
{
    FileSystem fs;

    public void map(ArrayWritable sourceFiles, Text destFile, OutputCollector<??, ??> output, Reporter reporter) throws IOException 
    {

        //Convert the destFile to Path.
        ...
        //make sure the parent directory of destFile is created first.
        FSDataOutputStream destOS = fs.append(destFilePath);
        //Convert the sourceFiles to Paths.
        List<Path> srcPaths;
        ....
        ....
            for(Path p: sourcePaths) {

                FSDataInputStream srcIS = fs.open(p);
                byte[] fileContent
                srcIS.read(fileContent);
                destOS.write(fileContent);
                srcIS.close();
                reporter.progress();  // Important, else mapper taks may timeout.
            }
            destOS.close();


        // Delete source files.

        for(Path p: sourcePaths) {
            fs.delete(p, false);
            reporter.progress();
        }

    }
}
于 2013-08-19T16:21:49.020 に答える