2

HDFS の出力として CSV ファイルを生成する単純な Hadoop ジョブを維持しています。ジョブは TextOutputFormat を使用します。csv ファイルに先頭のヘッダー行を追加したい (パーツ ファイルが異なるワーカーによって作成されることはわかっていますが、それぞれがヘッダーを取得しても問題ありません)。これを達成する方法は?

編集:カスケードは役立ちますが、一見したところ、新しいフレームワークの使用を開始したくありません

編集:

そのため、出力 CSV ファイルのヘッダーを追加したいと考えています。列の数は決定論的です。これが私のReducerクラスのスケルトンです:

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;

public final class Reducer extends Reducer<Text, IntWritable, Text, IntWritable>
{
    private MultipleOutputs<Text, IntWritable> mos;

    private static final Text KEY_HOLDER = new Text();

    private static final IntWritable VALUE_HOLDER = new IntWritable(1);

    @Override
    public void setup(final Context context)
    {
        mos = new MultipleOutputs<Text, IntWritable>(context);
    }

    @Override
    public void cleanup(final Context context) throws IOException, InterruptedException
    {
        mos.close();
    }

    @Override
    public void reduce(final Text key, final Iterable<IntWritable> values, final Context context)
            throws IOException, InterruptedException
    {
        // [... some business logic ...]        
        mos.write(KEY_HOLDER, VALUE_HOLDER, "myFileName");
        context.progress();
    }
}
4

1 に答える 1

0

要件に応じてヘッダーを追加するために、マッパー/リデューサー クラスの run() をオーバーライドできます。最終的な o/p に FisrtName と LastName を追加する場合は、以下のコードを参考にしてください。

public void run(Context context) throws IOException, InterruptedException
  {
        setup(context);
        column = new Text("ColumnName") ;
        values = new Text("FirstName" + "\t" + "LastName") ;
        context.write(column, values);
        try
        {
          while (context.nextKey())
          {
            reduce(context.getCurrentKey(), context.getValues(), context);
            Iterator<IntWritable> iter = context.getValues().iterator();
            if(iter instanceof ReduceContext.ValueIterator)
            {              ((ReduceContext.ValueIterator<IntWritable>)iter).resetBackupStore();        
            }
          }
        }
        finally
        {
          cleanup(context);
        }
  }
于 2016-11-13T04:20:46.717 に答える