MultipleOutputs api (Hadoop バージョン 1.0.3) を使用して、cassandra から読み取り、リデューサーの出力を複数の出力ファイルに書き込もうとしています。私の場合のファイル形式は、FileOutputFormat を拡張したカスタム出力形式です。MultipleOutputs apiに示されているのと同様の方法でジョブを構成しました。ただし、ジョブを実行すると、テキスト出力形式の part-r-0000 という名前の出力ファイルが 1 つしか得られません。が設定されていない場合job.setOutputFormatClass()
、デフォルトで TextOutputFormat がフォーマットであると見なされます。また、2 つの形式クラスのうちの 1 つだけを初期化できます。で指定した出力形式は完全に無視されMulitpleOutputs.addNamedOutput(job, "format1", MyCustomFileFormat1.class, Text.class, Text.class) and MulitpleOutputs.addNamedOutput(job, "format2", MyCustomFileFormat2.class, Text.class, Text.class)
ます。他の誰かが同様の問題に直面していますか、それとも私は何か間違っていますか?
また、MultipleOutputs api に示されているように、テキスト ファイルから読み取り、TextOutputFormat と SequenceFileOutputFormat の 2 つの形式で出力を書き込む非常に単純な MR プログラムを作成しようとしました。しかし、そこにも運はありません。テキスト出力形式の出力ファイルは 1 つしかありません。
誰かがこれで私を助けることができますか?
Job job = new Job(getConf(), "cfdefGen");
job.setJarByClass(CfdefGeneration.class);
//read input from cassandra column family
ConfigHelper.setInputColumnFamily(job.getConfiguration(), KEYSPACE, COLUMN_FAMILY);
job.setInputFormatClass(ColumnFamilyInputFormat.class);
job.getConfiguration().set("cassandra.consistencylevel.read", "QUORUM");
//thrift input job configurations
ConfigHelper.setInputRpcPort(job.getConfiguration(), "9160");
ConfigHelper.setInputInitialAddress(job.getConfiguration(), HOST);
ConfigHelper.setInputPartitioner(job.getConfiguration(), "RandomPartitioner");
SlicePredicate predicate = new SlicePredicate().setColumn_names(Arrays.asList(ByteBufferUtil.bytes("classification")));
//ConfigHelper.setRangeBatchSize(job.getConfiguration(), 2048);
ConfigHelper.setInputSlicePredicate(job.getConfiguration(), predicate);
//specification for mapper
job.setMapperClass(MyMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
//specifications for reducer (writing to files)
job.setReducerClass(ReducerToFileSystem.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
//job.setOutputFormatClass(MyCdbWriter1.class);
job.setNumReduceTasks(1);
//set output path for storing output files
Path filePath = new Path(OUTPUT_DIR);
FileSystem hdfs = FileSystem.get(getConf());
if(hdfs.exists(filePath)){
hdfs.delete(filePath, true);
}
MyCdbWriter1.setOutputPath(job, new Path(OUTPUT_DIR));
MultipleOutputs.addNamedOutput(job, "cdb1', MyCdbWriter1.class, Text.class, Text.class);
MultipleOutputs.addNamedOutput(job, "cdb2", MyCdbWriter2.class, Text.class, Text.class);
boolean success = job.waitForCompletion(true);
return success ? 0:1;
public static class ReducerToFileSystem extends Reducer<Text, Text, Text, Text>
{
private MultipleOutputs<Text, Text> mos;
public void setup(Context context){
mos = new MultipleOutputs<Text, Text>(context);
}
//public void reduce(Text key, Text value, Context context)
//throws IOException, InterruptedException (This was the mistake, changed the signature and it worked fine)
public void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException
{
//context.write(key, value);
mos.write("cdb1", key, value, OUTPUT_DIR+"/"+"cdb1");
mos.write("cdb2", key, value, OUTPUT_DIR+"/"+"cdb2");
context.progress();
}
public void cleanup(Context context) throws IOException, InterruptedException {
mos.close();
}
}
public class MyCdbWriter1<K, V> extends FileOutputFormat<K, V>
{
@Override
public RecordWriter<K, V> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException
{
}
public static void setOutputPath(Job job, Path outputDir) {
job.getConfiguration().set("mapred.output.dir", outputDir.toString());
}
protected static class CdbDataRecord<K, V> extends RecordWriter<K, V>
{
@override
write()
close()
}
}