2

私はHadoopの初心者です。

私の目的は、大きな番号をアップロードすることです。さまざまな拡張子を持つファイルを Hadoop クラスターに配置すると、次のような出力が得られます。

拡張子 ファイル数

.jpeg 1000 .java 600 .txt 3000

等々。

拡張子を読み取ることができるように、ファイル名がマッパーメソッドのキーである必要があると想定しています(将来的には、他のファイル操作を実行します)

          public void map(Text fileName,
                   null/*will this do - value isn't required in this case*/,
                   OutputCollector<Text,IntWritable> output,
                   Reporter reporter)
                   throws IOException
           {
            Text extension = new Text(FilenameUtils.getExtension(filename));
            output.collect(extension, 1); 
        }

          public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { 
          int sum = 0; 
          while (values.hasNext()) { 
          sum += values.next().get(); 
          } 
          output.collect(key, new IntWritable(sum)); 
          }
          }

クエリ:

  1. ファイルの名前をキーとしてマッパーに送信する方法は? 私はRecordReaderインターフェイスを実装することを考えていましたが、それが必要かどうかはわかりませんでした。
  2. API と私の理解によると、InputFormatの実装は処理のための分割を提供する責任があります。仕事を完了するためにここで何かをする必要がありますか?

Hadoop MapReduce の概念に関して根本的に間違った仮定を行った場合は、ご教示ください。

-------------------1回目の編集-------------------

コード、出力、およびクエリの添付:

/**
 * 
 */
package com.hadoop.mapred.scratchpad;

import java.io.IOException;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;


public class Main {

    /**
     * @param args
     * @throws IOException
     */
    public static void main(String[] args) throws IOException {
        // TODO Auto-generated method stub

        Main main = new Main();

        if (args == null || args.length == 0) {
            throw new RuntimeException("Enter path to read files");
        }

        main.groupFilesByExtn(args);
    }

    private void groupFilesByExtn(String[] args) throws IOException {
        // TODO Auto-generated method stub

        JobConf conf = new JobConf(Main.class);
        conf.setJobName("Grp_Files_By_Extn");

        /* InputFormat and OutputFormat from 'mapred' package ! */
        conf.setInputFormat(CustomFileInputFormat.class);
        conf.setOutputFormat(org.apache.hadoop.mapred.TextOutputFormat.class);

        /* No restrictions here ! */
        conf.setOutputKeyClass(Text.class);
        conf.setOutputValueClass(IntWritable.class);

        /* Mapper and Reducer classes from 'mapred' package ! */
        conf.setMapperClass(CustomMapperClass.class);
        conf.setReducerClass(CustomReducer.class);
        conf.setCombinerClass(CustomReducer.class);

        CustomFileInputFormat.setInputPaths(conf, new Path(args[0]));
        FileOutputFormat.setOutputPath(conf, new Path(args[1]));

        JobClient.runJob(conf);
    }

}

カスタマイズされた FileInputFormat

/**
 * 
 */
package com.hadoop.mapred.scratchpad;

import java.io.IOException;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;

public class CustomFileInputFormat extends
        FileInputFormat<String, NullWritable> {

    @Override
    public RecordReader<String, NullWritable> getRecordReader(InputSplit aFile,
            JobConf arg1, Reporter arg2) throws IOException {
        // TODO Auto-generated method stub

        System.out.println("In CustomFileInputFormat.getRecordReader(...)");
        /* the cast - ouch ! */
        CustomRecordReader custRecRdr = new CustomRecordReader(
                (FileSplit) aFile);

        return custRecRdr;
    }

}

カスタマイズされた RecordReader

/**
 * 
 */
package com.hadoop.mapred.scratchpad;

import java.io.IOException;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.RecordReader;

public class CustomRecordReader implements RecordReader<String, NullWritable> {

    private FileSplit aFile;
    private String fileName;

    public CustomRecordReader(FileSplit aFile) {

        this.aFile = aFile;

        System.out.println("In CustomRecordReader constructor aFile is "
                + aFile.getClass().getName());
    }

    @Override
    public void close() throws IOException {
        // TODO Auto-generated method stub

    }

    @Override
    public String createKey() {
        // TODO Auto-generated method stub
        fileName = aFile.getPath().getName();

        System.out.println("In CustomRecordReader.createKey() "+fileName);

        return fileName;
    }

    @Override
    public NullWritable createValue() {
        // TODO Auto-generated method stub
        return null;
    }

    @Override
    public long getPos() throws IOException {
        // TODO Auto-generated method stub
        return 0;
    }

    @Override
    public float getProgress() throws IOException {
        // TODO Auto-generated method stub
        return 0;
    }

    @Override
    public boolean next(String arg0, NullWritable arg1) throws IOException {
        // TODO Auto-generated method stub
        return false;
    }

}

マッパー

package com.hadoop.mapred.scratchpad;

import java.io.IOException;

import org.apache.commons.io.FilenameUtils;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;

public class CustomMapperClass extends MapReduceBase implements
        Mapper<String, NullWritable, Text, IntWritable> {

    private static final int COUNT = 1;

    @Override
    public void map(String fileName, NullWritable value,
            OutputCollector<Text, IntWritable> outputCollector,
            Reporter reporter) throws IOException {
        // TODO Auto-generated method stub
        System.out.println("In CustomMapperClass.map(...) : key " + fileName
                + " value = " + value);

        outputCollector.collect(new Text(FilenameUtils.getExtension(fileName)),
                new IntWritable(COUNT));

        System.out.println("Returning from CustomMapperClass.map(...)");
    }

}

減力剤:

/**
 * 
 */
package com.hadoop.mapred.scratchpad;

import java.io.IOException;
import java.util.Iterator;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;


public class CustomReducer extends MapReduceBase implements
        Reducer<Text, IntWritable, Text, IntWritable> {

    @Override
    public void reduce(Text fileExtn, Iterator<IntWritable> countCollection,
            OutputCollector<Text, IntWritable> output, Reporter reporter)
            throws IOException {
        // TODO Auto-generated method stub

        System.out.println("In CustomReducer.reduce(...)");
        int count = 0;

        while (countCollection.hasNext()) {
            count += countCollection.next().get();
        }

        output.collect(fileExtn, new IntWritable(count));

        System.out.println("Returning CustomReducer.reduce(...)");
    }

}

出力 (hdfs) ディレクトリ:

hd@cloudx-538-520:~/hadoop/logs/userlogs$ hadoop fs -ls /scratchpad/output
Warning: $HADOOP_HOME is deprecated.

Found 3 items
-rw-r--r--   4 hd supergroup          0 2012-10-11 20:52 /scratchpad/output/_SUCCESS
drwxr-xr-x   - hd supergroup          0 2012-10-11 20:51 /scratchpad/output/_logs
-rw-r--r--   4 hd supergroup          0 2012-10-11 20:52 /scratchpad/output/part-00000
hd@cloudx-538-520:~/hadoop/logs/userlogs$
hd@cloudx-538-520:~/hadoop/logs/userlogs$ hadoop fs -ls /scratchpad/output/_logs
Warning: $HADOOP_HOME is deprecated.

Found 1 items
drwxr-xr-x   - hd supergroup          0 2012-10-11 20:51 /scratchpad/output/_logs/history
hd@cloudx-538-520:~/hadoop/logs/userlogs$
hd@cloudx-538-520:~/hadoop/logs/userlogs$

ログ(1つだけ開いた):

hd@cloudx-538-520:~/hadoop/logs/userlogs/job_201210091538_0019$ ls -lrt
total 16
-rw-r----- 1 hd hd 393 2012-10-11 20:52 job-acls.xml
lrwxrwxrwx 1 hd hd  95 2012-10-11 20:52 attempt_201210091538_0019_m_000000_0 -> /tmp/hadoop-hd/mapred/local/userlogs/job_201210091538_0019/attempt_201210091538_0019_m_000000_0
lrwxrwxrwx 1 hd hd  95 2012-10-11 20:52 attempt_201210091538_0019_m_000002_0 -> /tmp/hadoop-hd/mapred/local/userlogs/job_201210091538_0019/attempt_201210091538_0019_m_000002_0
lrwxrwxrwx 1 hd hd  95 2012-10-11 20:52 attempt_201210091538_0019_m_000001_0 -> /tmp/hadoop-hd/mapred/local/userlogs/job_201210091538_0019/attempt_201210091538_0019_m_000001_0
hd@cloudx-538-520:~/hadoop/logs/userlogs/job_201210091538_0019$
hd@cloudx-538-520:~/hadoop/logs/userlogs/job_201210091538_0019$ cat attempt_201210091538_0019_m_000000_0/stdout
In CustomFileInputFormat.getRecordReader(...)
In CustomRecordReader constructor aFile is org.apache.hadoop.mapred.FileSplit
In CustomRecordReader.createKey() ExtJS_Notes.docx
hd@cloudx-538-520:~/hadoop/logs/userlogs/job_201210091538_0019$
hd@cloudx-538-520:~/hadoop/logs/userlogs/job_201210091538_0019$

見られるように:

  1. HDFS の出力は 0kb ファイルです
  2. ログは、スレッドが CustomRecordReader になるまで sysout のみを表示します

私が見逃したのは何ですか?

4

1 に答える 1

1

カリユグ、

必要に応じて、ファイル名をマッパーに渡す必要はありません。マッパーではすでに利用可能です。以下のようにアクセスするだけです。それ以外は非常に単純です。単純な単語カウント プログラムを模倣するだけです。

  FileSplit fileSplit = (FileSplit)reporter.getInputSplit();
  String fileName = fileSplit.getPath().getName();

新しい API の場合、レポーターはコンテキストに変更する必要があります

パフォーマンスを最適化するには、ファイル名をマッパーのキーとして単純に提供するレコード リーダーを作成するだけです (上記と同じアプローチ)。レコードリーダーがファイルの内容を読み取らないようにします。値の部分を NullWritable にします。

Mapper はファイル名をキーとして取得します。レデューサー < file_extension,1 > に < key,value > ペアとして発行するだけです。

Reducer は wordcount と同じロジックを実行する必要があります。

于 2012-09-18T19:35:17.373 に答える