0

俳優と彼らが演じた映画 (1 行に 1 つの映画) のリストを含むテキスト ファイルを読み取り、各俳優が参加した映画の数を返す Hadoop マップ/リデュース クラスを作成しようとしています。

最後に、結果を映画の数で並べ替えたいと思います (昇順でも降順でも構いません)。ただし、私のコードでは、映画のタイトルの文字数で結果を並べ替えているようです。出力を逆にする (Text、IntWritable から IntWritable、Text) ことや、別のコンパレーターを使用することなど、考えられるすべてのことを試しましたが、映画の数で結果を並べ替えることができませんでした。

とても単純なことだと思いますが、一生理解できませんでした。アドバイスをいただければ幸いです。

データファイルからの抜粋:

Chan, Jackie (I)    The Forbidden Kingdom   2008
Chan, Jackie (I)    Kung Fu Panda 2 2011
Chan, Jackie (I)    Shanghai Noon   2000
Chan, Jackie (I)    Pik lik for 1995
Chan, Jackie (I)    The Karate Kid  2010
Chan, Jackie (I)    Shanghai Knights    2003
Chan, Jackie (I)    Around the World in 80 Days 2004
Chan, Jackie (I)    Rush Hour   1998
Chan, Jackie (I)    The Tuxedo  2002
Chan, Jackie (I)    Kung Fu Panda   2008
Chan, Jackie (I)    Rush Hour 2 2001
Chan, Jackie (I)    Rush Hour 3 2007
Davi, Robert    Licence to Kill 1989
Davi, Robert    Die Hard    1988
Davi, Robert    The Hot Chick   2002
Davi, Robert    The Goonies 1985

私のコードは以下の通りです:

// MovieCountByActor.java

package ucsc.hadoop.homework2;

import java.io.IOException;
import java.nio.ByteBuffer;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
// import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import ucsc.hadoop.util.ConfigurationUtil;

public class MovieCountByActor extends Configured implements Tool {
private static final Log LOG = LogFactory.getLog(MovieCountByActor.class);

    public int run(String[] args) throws Exception {
        // Configuration conf = getConf();
        JobConf conf = new JobConf(getConf(), MovieCountByActor.class);
        conf.setOutputKeyComparatorClass(CountSort.class);
        conf.setOutputValueGroupingComparator(CountSort.class);

        if (args.length != 2) {
            System.err.println("Usage: moviecountbyactor <in> <out>");
            System.exit(2);
        }

        ConfigurationUtil.dumpConfigurations(conf, System.out);

        LOG.info("input: " + args[0] + " output: " + args[1]);

        Job job = new Job(conf, "movie count");
        job.setJarByClass(MovieCountByActor.class);
        job.setMapperClass(MovieTokenizerMapper.class);
        job.setReducerClass(MovieCountReducer.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        job.setSortComparatorClass(CountSort.class);

        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        boolean result = job.waitForCompletion(true);
        return (result) ? 0 : 1;
    }

    public static void main(String[] args) throws Exception {
        int exitCode = ToolRunner.run(new MovieCountByActor(), args);
        System.exit(exitCode);
    }

    public static class MovieTokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {
        private final static IntWritable ONE = new IntWritable(1);
        private final static Text ACTOR = new Text();

        @Override
        public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
            String[] tokens = value.toString().split("\\t");

            String actor = "";
            if (tokens.length == 3) {
                actor = tokens[0];
                ACTOR.set(actor);
                context.write(ACTOR, ONE);
            }
        }
    }

    public static class MovieCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
        private IntWritable result = new IntWritable();

        public void reduce(Text actor, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {

            int movieCountPerActor = 0;
            for (IntWritable count : values) {
                movieCountPerActor += count.get();
            }
            result.set(movieCountPerActor);
            context.write(actor, result);
        }
    }

    public static class CountSort extends WritableComparator {
        protected CountSort() {
            super (IntWritable.class);
        }

        @Override
        public int compare(byte[] b1, int j1, int k1, byte[] b2, int j2, int k2) {
            Integer a = ByteBuffer.wrap(b1, j1, k1).getInt();
            Integer b = ByteBuffer.wrap(b2, j2, k2).getInt();
            return a.compareTo(b) * -1;
        }
    }

}
4

2 に答える 2

1

何をしているのか混乱していると思いますjob.setSortComparatorClass(CountSort.class);-これは、キー値が削減される前のコンパレータです。シリアル化された Text オブジェクト (アクター名) の Int 部分を調べているだけだと思います。これは、出力がアクター名の長さで表示されている理由を説明しています (2 つのアクターがある場合、予期しない出力が表示されると思います)同じ縮小インスタンスにハッシュされたのと同じ名前の長さ。

映画の数で出力を並べ替えるには、別の M/R ジョブを実行して最初のジョブ (俳優別の映画の数) の出力を取得し、マッパーを使用してキー/値を切り替える必要があります (したがって、出力キーはカウントで、値はアクター名です)。単一のレデューサーを使用すると、ムービー カウントの昇順でアクターが取得されます。

于 2013-03-17T19:51:33.663 に答える
-2

デフォルトでは、Map Reduce はレデューサーの出力のキーを並べ替えます。そのため、特定のアクターのムービーをカウントした後にできることは、レデューサーの出力キーを moviecount として設定し、値をアクターの名前として設定することです。

次のように:

public static class MovieCountReducer extends Reducer<Text, IntWritable, IntWritable,Text> {
        private IntWritable result = new IntWritable();

        public void reduce(Text actor, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {

            int movieCountPerActor = 0;
            for (IntWritable count : values) {
                movieCountPerActor += count.get();
            }
            result.set(movieCountPerActor);
            context.write(result, actor);
        }
}

また、ジョブ構成で次の変更を行います。

job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(Text.class);
于 2013-03-18T10:03:21.820 に答える