MultipleOutputs の実装をくまなく調べたところ、outputDir、キー クラス、および値クラス以外のプロパティを持つ OutputFormatType がサポートされていないことがわかりました。独自の MultipleOutputs クラスを作成しようとしましたが、Hadoop クラスのどこかでプライベート メソッドを呼び出す必要があるため、失敗しました。
すべての場合、および出力形式と構成のすべての組み合わせで機能するように見える回避策が 1 つだけ残されています。使用したい OutputFormat クラスのサブクラスを作成します (これらは再利用可能であることがわかります)。これらのクラスは、他の OutputFormats が同時に使用されていることを理解し、それらのプロパティを格納する方法を知っています。この設計では、RecordWriter を要求される直前に、OutputFormat をコンテキストで構成できるという事実を利用しています。
私は Cassandra の ColumnFamilyOutputFormat で動作するようにこれを持っています:
package com.myorg.hadoop.platform;
import org.apache.cassandra.hadoop.ColumnFamilyOutputFormat;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
public abstract class ConcurrentColumnFamilyOutputFormat
extends ColumnFamilyOutputFormat
implements Configurable {
private static String[] propertyName = {
"cassandra.output.keyspace" ,
"cassandra.output.keyspace.username" ,
"cassandra.output.keyspace.passwd" ,
"cassandra.output.columnfamily" ,
"cassandra.output.predicate",
"cassandra.output.thrift.port" ,
"cassandra.output.thrift.address" ,
"cassandra.output.partitioner.class"
};
private Configuration configuration;
public ConcurrentColumnFamilyOutputFormat() {
super();
}
public Configuration getConf() {
return configuration;
}
public void setConf(Configuration conf) {
configuration = conf;
String prefix = "multiple.outputs." + getMultiOutputName() + ".";
for (int i = 0; i < propertyName.length; i++) {
String property = prefix + propertyName[i];
String value = conf.get(property);
if (value != null) {
conf.set(propertyName[i], value);
}
}
}
public void configure(Configuration conf) {
String prefix = "multiple.outputs." + getMultiOutputName() + ".";
for (int i = 0; i < propertyName.length; i++) {
String property = prefix + propertyName[i];
String value = conf.get(propertyName[i]);
if (value != null) {
conf.set(property, value);
}
}
}
public abstract String getMultiOutputName();
}
レデューサーに必要な Cassandra (この場合) 出力ごとに、次のクラスがあります。
package com.myorg.multioutput.ReadCrawled;
import com.myorg.hadoop.platform.ConcurrentColumnFamilyOutputFormat;
public class StrongOutputFormat extends ConcurrentColumnFamilyOutputFormat {
public StrongOutputFormat() {
super();
}
@Override
public String getMultiOutputName() {
return "Strong";
}
}
マッパー/リデューサー構成クラスで構成します。
// This is how you'd normally configure the ColumnFamilyOutputFormat
ConfigHelper.setOutputColumnFamily(job.getConfiguration(), "Partner", "Strong");
ConfigHelper.setOutputRpcPort(job.getConfiguration(), "9160");
ConfigHelper.setOutputInitialAddress(job.getConfiguration(), "localhost");
ConfigHelper.setOutputPartitioner(job.getConfiguration(), "org.apache.cassandra.dht.RandomPartitioner");
// This is how you tell the MultipleOutput-aware OutputFormat that
// it's time to save off the configuration so no other OutputFormat
// steps all over it.
new StrongOutputFormat().configure(job.getConfiguration());
// This is where we add the MultipleOutput-aware ColumnFamilyOutputFormat
// to out set of outputs
MultipleOutputs.addNamedOutput(job, "Strong", StrongOutputFormat.class, ByteBuffer.class, List.class);
別の例を挙げると、FileOutputFormat の MultipleOutput サブクラスは次のプロパティを使用します。
private static String[] propertyName = {
"mapred.output.compression.type" ,
"mapred.output.compression.codec" ,
"mapred.output.compress" ,
"mapred.output.dir"
};
ConcurrentColumnFamilyOutputFormat
上記のプロパティを使用することを除いて、上記と同じように実装されます。