2

電子ブック Mahout in Action の第 6 章 (リスト 6.1 ~ 6.4) のレコメンダーの例を実行しようとしています。2 つのマッパー/リデューサーのペアがあります。コードは次のとおりです。

マッパー - 1

public class WikipediaToItemPrefsMapper extends 
        Mapper<LongWritable,Text,VarLongWritable,VarLongWritable> {

private static final Pattern NUMBERS = Pattern.compile("(\d+)");

@Override
public void map(LongWritable key,
          Text value,
          Context context)
throws IOException, InterruptedException {

    String line = value.toString();
    Matcher m = NUMBERS.matcher(line);
    m.find();
    VarLongWritable userID = new VarLongWritable(Long.parseLong(m.group()));
    VarLongWritable itemID = new VarLongWritable();
    while (m.find()) {
        itemID.set(Long.parseLong(m.group()));
        context.write(userID, itemID);
    }
}

}

レデューサー - 1

public class WikipediaToUserVectorReducer extends 
        Reducer<VarLongWritable,VarLongWritable,VarLongWritable,VectorWritable> {
@Override
public void reduce(VarLongWritable userID, 
                    Iterable<VarLongWritable> itemPrefs,
                    Context context)
  throws IOException, InterruptedException {

        Vector userVector = new RandomAccessSparseVector(
        Integer.MAX_VALUE, 100);
        for (VarLongWritable itemPref : itemPrefs) {
            userVector.set((int)itemPref.get(), 1.0f);
        }

        //LongWritable userID_lw = new LongWritable(userID.get());
        context.write(userID, new VectorWritable(userVector));
        //context.write(userID_lw, new VectorWritable(userVector));
}

}

レデューサーは userID と userVector を出力し、次のようになります: 98955 {590:1.0 22:1.0 9059:1.0 3:1.0 2:1.0 1:1.0} ドライバーで FileInputformat と TextInputFormat が使用されている場合。

このデータをさらに処理するために、マッパーとリデューサーの別のペアを使用したいと思います。

マッパー - 2

public class UserVectorToCooccurenceMapper extends
Mapper<VarLongWritable,VectorWritable,IntWritable,IntWritable> {

@Override
public void map(VarLongWritable userID,
                VectorWritable userVector,
                Context context)
throws IOException, InterruptedException {

    Iterator<Vector.Element> it = userVector.get().iterateNonZero();
    while (it.hasNext()) {
        int index1 = it.next().index();
        Iterator<Vector.Element> it2 = userVector.get().iterateNonZero();
        while (it2.hasNext()) {
            int index2 = it2.next().index();
                context.write(new IntWritable(index1),
                                new IntWritable(index2));
        }
    }
}

}

レデューサー - 2

public class UserVectorToCooccurenceReducer extends Reducer {

@Override
public void reduce(IntWritable itemIndex1,
          Iterable<IntWritable> itemIndex2s,
          Context context)
throws IOException, InterruptedException {

    Vector cooccurrenceRow = new RandomAccessSparseVector(Integer.MAX_VALUE, 100);
    for (IntWritable intWritable : itemIndex2s) {
        int itemIndex2 = intWritable.get();
        cooccurrenceRow.set(itemIndex2, cooccurrenceRow.get(itemIndex2) + 1.0);
    }
    context.write(itemIndex1, new VectorWritable(cooccurrenceRow));
}

}

これは私が使用しているドライバーです:

public final class RecommenderJob extends Configured implements Tool {

@Override public int run(String[] args) throws Exception {

  Job job_preferenceValues = new Job (getConf());
  job_preferenceValues.setJarByClass(RecommenderJob.class);
  job_preferenceValues.setJobName("job_preferenceValues");

  job_preferenceValues.setInputFormatClass(TextInputFormat.class);
  job_preferenceValues.setOutputFormatClass(SequenceFileOutputFormat.class);

  FileInputFormat.setInputPaths(job_preferenceValues, new Path(args[0]));
  SequenceFileOutputFormat.setOutputPath(job_preferenceValues, new Path(args[1]));

  job_preferenceValues.setMapOutputKeyClass(VarLongWritable.class);
  job_preferenceValues.setMapOutputValueClass(VarLongWritable.class);

  job_preferenceValues.setOutputKeyClass(VarLongWritable.class);
  job_preferenceValues.setOutputValueClass(VectorWritable.class);

  job_preferenceValues.setMapperClass(WikipediaToItemPrefsMapper.class);
  job_preferenceValues.setReducerClass(WikipediaToUserVectorReducer.class);

  job_preferenceValues.waitForCompletion(true);

  Job job_cooccurence = new Job (getConf());
  job_cooccurence.setJarByClass(RecommenderJob.class);
  job_cooccurence.setJobName("job_cooccurence");

  job_cooccurence.setInputFormatClass(SequenceFileInputFormat.class);
  job_cooccurence.setOutputFormatClass(TextOutputFormat.class);

  SequenceFileInputFormat.setInputPaths(job_cooccurence, new Path(args[1]));
  FileOutputFormat.setOutputPath(job_cooccurence, new Path(args[2]));

  job_cooccurence.setMapOutputKeyClass(VarLongWritable.class);
  job_cooccurence.setMapOutputValueClass(VectorWritable.class);

  job_cooccurence.setOutputKeyClass(IntWritable.class);
  job_cooccurence.setOutputValueClass(VectorWritable.class);

  job_cooccurence.setMapperClass(UserVectorToCooccurenceMapper.class);
  job_cooccurence.setReducerClass(UserVectorToCooccurenceReducer.class);

  job_cooccurence.waitForCompletion(true);

  return 0;

}

public static void main(String[] args) throws Exception {
ToolRunner.run(new Configuration(), new RecommenderJob(), args);

} }

私が得るエラーは次のとおりです。

java.io.IOException: Type mismatch in key from map: expected org.apache.mahout.math.VarLongWritable, received org.apache.hadoop.io.IntWritable

修正のためのグーグルの過程で、私の問題がこの質問に似ていることがわかりました。しかし、違いは、私が既に SequenceFileInputFormat と SequenceFileOutputFormat を使用していることです。私は正しく信じています。また、org.apache.mahout.cf.taste.hadoop.item.RecommenderJob が多かれ少なかれ似たようなことをしていることもわかります。私の理解では & Yahoo チュートリアル

SequenceFileOutputFormat は、任意のデータ型をファイルに迅速にシリアル化します。対応する SequenceFileInputFormat は、ファイルを同じタイプに逆シリアル化し、前の Reducer によって出力されたのと同じ方法で次の Mapper にデータを提示します。

私は何を間違っていますか?誰かからのいくつかの指針に本当に感謝します..私はこれを修正するために一日を費やしましたが、どこにも行きませんでした:(

4

1 に答える 1

2

2 番目のマッパーには次の署名があります。

public class UserVectorToCooccurenceMapper extends 
        Mapper<VarLongWritable,VectorWritable,IntWritable,IntWritable>

ただし、ドライバー コードで次のように定義します。

job_cooccurence.setMapOutputKeyClass(VarLongWritable.class);
job_cooccurence.setMapOutputValueClass(VectorWritable.class);

レデューサーは<IntWritable, IntWritable>入力として期待しているため、ドライバー コードを次のように修正する必要があります。

job_cooccurence.setMapOutputKeyClass(IntWritable.class);
job_cooccurence.setMapOutputValueClass(IntWritable.class);
于 2012-07-26T01:31:56.360 に答える