3

私は Hadoop の初心者ですが、これは先月の私の学習プロジェクトでした。

他の人に役立つように、このあいまいさを維持するために、最初に基本的な目標を捨てさせてください.... 仮定:

  1. 何百万もの基本的な ASCII テキスト ファイルからなる大規模なデータ セット (明らかに) があります。
    • 各ファイルは「レコード」です。
  2. レコードは、顧客と日付を識別するためにディレクトリ構造に保存されます
    • 例 /user/hduser/data/customer1/YYYY-MM-DD、/user/hduser/data/customer2/YYYY-MM-DD
  3. 出力構造の入力構造を模倣したい
    • 例 /user/hduser/out/customer1/YYYY-MM-DD、/user/hduser/out/customer2/YYYY-MM-DD

複数のスレッドを見てきました:

他にもたくさんあります.Tom White の Hadoop の本も読んでいます。私は熱心にこれを学ぼうとしました。私は頻繁に新しい API と古い API を交換してきましたが、これはこれを学ぼうとする混乱を助長しています。

多くの人がMultipleOutputs (または古い API バージョン) を指摘していますが、目的の出力を生成できないようです。たとえば、MultipleOutputs は write() でディレクトリ構造を作成するための「/」を受け入れないようです

目的の出力構造を持つファイルを作成するには、どのような手順を実行する必要がありますか? 現在、WholeFileInputFormatクラスと、(NullWritable K, ByteWritable V) ペア (必要に応じて変更可能) を持つ関連する RecordReader があります。

私のマップ設定:

public class MapClass extends Mapper<NullWritable, BytesWritable, Text, BytesWritable> {
    private Text filenameKey;
    private MultipleOutputs<NullWritable, Text> mos;

    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
        InputSplit split = context.getInputSplit();
        Path path = ((FileSplit) split).getPath();
        filenameKey = new Text(path.toString().substring(38)); // bad hackjob, until i figure out a better way.. removes hdfs://master:port/user/hduser/path/
        mos = new MultipleOutputs(context);
    }
}

mos.close()を呼び出すcleanup()関数もあり、map()関数は現在不明です (ここで助けが必要です)

これは、初心者に答えの方向を示すのに十分な情報ですか? 次に考えたのは、すべての map() タスクで MultipleOutputs() オブジェクトを作成し、それぞれに新しい baseoutput String を作成することでしたが、それが効率的であるか、適切な種類のアクションであるかはわかりません。

アドバイスをいただければ幸いです。入力を除いて、この時点でプログラム内のすべてが変更される可能性があります-私はフレームワークを学ぼうとしています-しかし、できるだけこの結果に近づきたいです(おそらく後でレコードを結合してより大きなファイルにすることを考えていますが、すでにレコードあたり 20MB になっており、メモ帳で読み取れないようにする前に、それが機能することを確認したいと考えています。

編集: TextOutputFormat.class を変更/拡張することで、この問題を解決できますか? 機能するメソッドがいくつかあるようですが、どのメソッドをオーバーライドする必要があるのか​​ わかりません...

4

1 に答える 1

5

投機的実行をオフにすると、マッパーで出力フォルダー構造/ファイルを手動で作成し、それらにレコードを書き込むことを止めるものは何もありません (出力コンテキスト/コレクターを無視します)。

たとえば、スニペット (セットアップ メソッド) を拡張すると、次のようなことができます (これは基本的に複数の出力が行っていることですが、2 つのマップ タスクが同じファイルに書き込もうとするファイルの衝突を避けるために投機的実行がオフになっていると仮定します)。出力ファイル):

import java.io.IOException;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class MultiOutputsMapper extends
        Mapper<LongWritable, Text, NullWritable, NullWritable> {
    protected String filenameKey;
    private RecordWriter<Text, Text> writer;
    private Text outputValue;
    private Text outputKey;

    @Override
    protected void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {
        // operate on the input record
        // ...

        // write to output file using writer rather than context
        writer.write(outputKey, outputValue);
    }

    @Override
    protected void setup(Context context) throws IOException,
            InterruptedException {
        InputSplit split = context.getInputSplit();
        Path path = ((FileSplit) split).getPath();

        // extract parent folder and filename
        filenameKey = path.getParent().getName() + "/" + path.getName();

        // base output folder
        final Path baseOutputPath = FileOutputFormat.getOutputPath(context);
        // output file name
        final Path outputFilePath = new Path(baseOutputPath, filenameKey);

        // We need to override the getDefaultWorkFile path to stop the file being created in the _temporary/taskid folder
        TextOutputFormat<Text, Text> tof = new TextOutputFormat<Text, Text>() {
            @Override
            public Path getDefaultWorkFile(TaskAttemptContext context,
                    String extension) throws IOException {
                return outputFilePath;
            }
        };

        // create a record writer that will write to the desired output subfolder
        writer = tof.getRecordWriter(context);
    }

    @Override
    protected void cleanup(Context context) throws IOException,
            InterruptedException {
        writer.close(context);
    }
}

考慮すべき点:

  • パスcustomerx/yyyy-MM-ddファイルまたはファイルのフォルダー (ファイルのフォルダーの場合は、それに応じて修正する必要があります。この実装では、日付ごとに 1 つのファイルがあり、ファイル名が yyyy-MM-dd であると想定しています)
  • 空の出力マップ ファイルが作成されないように、 LazyOutputFormatを調べることをお勧めします。
于 2013-07-30T23:30:34.577 に答える