6

問題は解決しました最終的には下部にある私の解決策を確認してください


最近、Mahout in Actionのchaper6(リスト6.1〜6.4)で推奨例を実行しようとしています。しかし、問題が発生し、グーグルで検索しましたが、解決策が見つかりません。

ここに問題があります:私はマッパーリデューサーのペアを持っています

public final class WikipediaToItemPrefsMapper extends
    Mapper<LongWritable, Text, VarLongWritable, VarLongWritable> {

private static final Pattern NUMBERS = Pattern.compile("(\\d+)");

@Override
protected void map(LongWritable key, Text value, Context context)
        throws IOException, InterruptedException {
    String line = value.toString();
    Matcher m = NUMBERS.matcher(line);
    m.find();
    VarLongWritable userID = new VarLongWritable(Long.parseLong(m.group()));
    VarLongWritable itemID = new VarLongWritable();
    while (m.find()) {
        itemID.set(Long.parseLong(m.group()));
        context.write(userID, itemID);
    }
}
}

public class WikipediaToUserVectorReducer
    extends
    Reducer<VarLongWritable, VarLongWritable, VarLongWritable, VectorWritable> {

public void reduce(VarLongWritable userID,
        Iterable<VarLongWritable> itemPrefs, Context context)
        throws IOException, InterruptedException {
    Vector userVector = new RandomAccessSparseVector(Integer.MAX_VALUE, 100);
    for (VarLongWritable itemPref : itemPrefs) {
        userVector.set((int) itemPref.get(), 1.0f);
    }
    context.write(userID, new VectorWritable(userVector));
}
}

レデューサーはuserIDとuserVectorを出力し、次のようになります。98955 {590:1.0 22:1.0 9059:1.0 3:1.0 2:1.0 1:1.0}

次に、別のマッパーリデューサーのペアを使用してこのデータを処理します

public class UserVectorSplitterMapper
    extends
    Mapper<VarLongWritable, VectorWritable, IntWritable, VectorOrPrefWritable> {

public void map(VarLongWritable key, VectorWritable value, Context context)
        throws IOException, InterruptedException {
    long userID = key.get();
    Vector userVector = value.get();
    Iterator<Vector.Element> it = userVector.iterateNonZero();
    IntWritable itemIndexWritable = new IntWritable();
    while (it.hasNext()) {
        Vector.Element e = it.next();
        int itemIndex = e.index();
        float preferenceValue = (float) e.get();
        itemIndexWritable.set(itemIndex);
        context.write(itemIndexWritable, 
                new VectorOrPrefWritable(userID, preferenceValue));
    }
}
}

ジョブを実行しようとすると、キャストエラーが表示されます

org.apache.hadoop.io.Textをorg.apache.mahout.math.VectorWritableにキャストできません

最初のmapper-reducerが出力をhdfsに書き込み、2番目のmapper-reducerが出力を読み取ろうとすると、マッパーは98955をVarLongWritableにキャストできますが、変換できません{590:1.0 22:1.0 9059:1.0 3: 1.0 2:1.0 1:1.0}をVectorWritableに変換します。したがって、最初のマッパーリデューサーが出力を2番目のペアに直接送信する方法があるので、データ変換を行う必要はありません。私は実際にHadoopを調べましたが、hadoop:決定的なガイドです。そのような方法はないようですが、何か提案はありますか?


問題が解決しました

解決策:SequenceFileOutputFormatを使用すると、最初のMapReduceワークフローのリデュース結果をDFSに出力して保存できます。次に、2番目のMapReduceワークフローは、マッパーの作成時にSequenceFileInputFormatクラスをパラメーターとして使用して、一時ファイルを入力として読み取ることができます。ベクトルは特定の形式のバイナリシーケンスファイルに保存されるため、SequenceFileInputFormatはそれを読み取り、ベクトル形式に戻すことができます。

次にいくつかのサンプルコードを示します。

confFactory ToItemPrefsWorkFlow = new confFactory
            (new Path("/dbout"), //input file path
             new Path("/mahout/output.txt"), //output file path
             TextInputFormat.class, //input format
             VarLongWritable.class, //mapper key format
             Item_Score_Writable.class, //mapper value format
             VarLongWritable.class, //reducer key format
             VectorWritable.class, //reducer value format
             **SequenceFileOutputFormat.class** //The reducer output format             
             
    );
    ToItemPrefsWorkFlow.setMapper( WikipediaToItemPrefsMapper.class);
    ToItemPrefsWorkFlow.setReducer(WikipediaToUserVectorReducer.class);
    JobConf conf1 = ToItemPrefsWorkFlow.getConf();
    
    
    confFactory UserVectorToCooccurrenceWorkFlow = new confFactory
            (new Path("/mahout/output.txt"),
             new Path("/mahout/UserVectorToCooccurrence"),
             SequenceFileInputFormat.class, //notice that the input format of mapper of the second work flow is now SequenceFileInputFormat.class
             //UserVectorToCooccurrenceMapper.class,
             IntWritable.class,
             IntWritable.class,
             IntWritable.class,
             VectorWritable.class,
             SequenceFileOutputFormat.class                                      
             );
     UserVectorToCooccurrenceWorkFlow.setMapper(UserVectorToCooccurrenceMapper.class);
     UserVectorToCooccurrenceWorkFlow.setReducer(UserVectorToCooccurrenceReducer.class);
    JobConf conf2 = UserVectorToCooccurrenceWorkFlow.getConf();
    
    JobClient.runJob(conf1);
    JobClient.runJob(conf2);

これで問題が発生した場合は、お気軽にご連絡ください

4

3 に答える 3

4

SequenceFileOutputFormatを使用し、出力キーと値のクラスを定義するには、最初のジョブの出力を明示的に構成する必要があります。

job.setOutputFormat(SequenceFileOutputFormat.class);
job.setOutputKeyClass(VarLongWritable.class);
job.setOutputKeyClass(VectorWritable.class);

ドライバーコードを見ずに、最初のジョブの出力としてTextOutputFormatを使用し、2番目のジョブへの入力としてTextInputFormatを使用していると推測します。この入力形式は、ペアを<Text, Text>2番目のマッパーに送信します。

于 2012-04-22T13:12:57.150 に答える
1

私はHadoopの初心者です。これは私の推測にすぎないので、それを我慢してください/それが素朴だと思われる場合は指摘してください。

HDFSを節約せずにレデューサーから次のマッパーに送信するのは合理的ではないと思います。「どのデータ分割がどのマッパーに行くか」は、ローカリティ基準を満たすようにエレガントに設計されているためです(データがローカルに保存されているマッパーノードに行きます)。

HDFSに保存しないと、すべてのデータがネットワークによって送信される可能性が高く、速度が遅く、帯域幅の問題が発生する可能性があります。

于 2012-04-23T23:43:19.527 に答える
0

2番目のmap-reduceが使用できるように、最初のmap-reduceの出力を一時的に保存する必要があります。

これは、最初のmap-reduceの出力が2番目のmap-reduceにどのように渡されるかを理解するのに役立つ場合があります。(これは、 Apache nutchのGenerator.javaに基づいています)。

これは、最初のmap-reduceの出力の一時的なディレクトリです。

Path tempDir =
  new Path(getConf().get("mapred.temp.dir", ".")
           + "/job1-temp-"
           + Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));

最初のmap-reduceジョブの設定:

JobConf job1 = getConf();
job1.setJobName("job 1");
FileInputFormat.addInputPath(...);
sortJob.setMapperClass(...);

FileOutputFormat.setOutputPath(job1, tempDir);
job1.setOutputFormat(SequenceFileOutputFormat.class);
job1.setOutputKeyClass(Text.class);
job1.setOutputValueClass(...);
JobClient.runJob(job1);

出力ディレクトリがジョブ構成で設定されていることを確認してください。2番目のジョブでこれを使用します。

JobConf job2 = getConf();
FileInputFormat.addInputPath(job2, tempDir);
job2.setReducerClass(...);
JobClient.runJob(job2);

完了したら、一時的なdirをクリーンアップすることを忘れないでください。

// clean up
FileSystem fs = FileSystem.get(getConf());
fs.delete(tempDir, true);

お役に立てれば。

于 2012-04-22T10:35:20.613 に答える