2

レデューサーで MultipleOutputs クラスを使用して、それぞれが独自の構成を持つことができる複数の出力を書き込むにはどうすればよいでしょうか? MultipleOutputs javadoc にいくつかのドキュメントがありますが、テキスト出力に限定されているようです。MultipleOutputs は各出力の出力パス、キー クラス、および値クラスを処理できますが、他の構成プロパティの使用を必要とする出力形式を使用しようとすると失敗します。

(この質問は何度か出てきましたが、質問者が実際には別の問題を抱えていたため、回答しようとしても失敗しました。この質問に答えるのに数日以上の調査が必要だったので、私は自分の質問に答えていますここでは、このメタ スタック オーバーフローの質問で提案されています。)

4

2 に答える 2

2

MultipleOutputs の実装をくまなく調べたところ、outputDir、キー クラス、および値クラス以外のプロパティを持つ OutputFormatType がサポートされていないことがわかりました。独自の MultipleOutputs クラスを作成しようとしましたが、Hadoop クラスのどこかでプライベート メソッドを呼び出す必要があるため、失敗しました。

すべての場合、および出力形式と構成のすべての組み合わせで機能するように見える回避策が 1 つだけ残されています。使用したい OutputFormat クラスのサブクラスを作成します (これらは再利用可能であることがわかります)。これらのクラスは、他の OutputFormats が同時に使用されていることを理解し、それらのプロパティを格納する方法を知っています。この設計では、RecordWriter を要求される直前に、OutputFormat をコンテキストで構成できるという事実を利用しています。

私は Cassandra の ColumnFamilyOutputFormat で動作するようにこれを持っています:

package com.myorg.hadoop.platform;

import org.apache.cassandra.hadoop.ColumnFamilyOutputFormat;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;

public abstract class ConcurrentColumnFamilyOutputFormat 
                        extends ColumnFamilyOutputFormat 
                        implements Configurable {

private static String[] propertyName = {
        "cassandra.output.keyspace" ,
        "cassandra.output.keyspace.username" ,
        "cassandra.output.keyspace.passwd" ,
        "cassandra.output.columnfamily" ,
        "cassandra.output.predicate",
        "cassandra.output.thrift.port" ,
        "cassandra.output.thrift.address" ,
        "cassandra.output.partitioner.class"
        };

private Configuration configuration;

public ConcurrentColumnFamilyOutputFormat() {
    super();
}

public Configuration getConf() {
    return configuration;
}

public void setConf(Configuration conf) {

    configuration = conf;

    String prefix = "multiple.outputs." + getMultiOutputName() + ".";

    for (int i = 0; i < propertyName.length; i++) {
        String property = prefix + propertyName[i];
        String value = conf.get(property);
        if (value != null) {
            conf.set(propertyName[i], value);
        }
    }

}

public void configure(Configuration conf) {

    String prefix = "multiple.outputs." + getMultiOutputName() + ".";

    for (int i = 0; i < propertyName.length; i++) {
        String property = prefix + propertyName[i];
        String value = conf.get(propertyName[i]);
        if (value != null) {
            conf.set(property, value);
        }
    }

}

public abstract String getMultiOutputName();

}

レデューサーに必要な Cassandra (この場合) 出力ごとに、次のクラスがあります。

package com.myorg.multioutput.ReadCrawled;

import com.myorg.hadoop.platform.ConcurrentColumnFamilyOutputFormat;

public class StrongOutputFormat extends ConcurrentColumnFamilyOutputFormat {

    public StrongOutputFormat() {
        super();
    }

    @Override
    public String getMultiOutputName() {
        return "Strong";
    }

}

マッパー/リデューサー構成クラスで構成します。

    // This is how you'd normally configure the ColumnFamilyOutputFormat

ConfigHelper.setOutputColumnFamily(job.getConfiguration(), "Partner", "Strong");
ConfigHelper.setOutputRpcPort(job.getConfiguration(), "9160");
ConfigHelper.setOutputInitialAddress(job.getConfiguration(), "localhost");
ConfigHelper.setOutputPartitioner(job.getConfiguration(), "org.apache.cassandra.dht.RandomPartitioner");

    // This is how you tell the MultipleOutput-aware OutputFormat that
    // it's time to save off the configuration so no other OutputFormat
    // steps all over it.

new StrongOutputFormat().configure(job.getConfiguration());

    // This is where we add the MultipleOutput-aware ColumnFamilyOutputFormat
    // to out set of outputs

MultipleOutputs.addNamedOutput(job, "Strong", StrongOutputFormat.class, ByteBuffer.class, List.class);

別の例を挙げると、FileOutputFormat の MultipleOutput サブクラスは次のプロパティを使用します。

    private static String[] propertyName = {
        "mapred.output.compression.type" ,
        "mapred.output.compression.codec" ,
        "mapred.output.compress" ,
        "mapred.output.dir"
        };

ConcurrentColumnFamilyOutputFormat上記のプロパティを使用することを除いて、上記と同じように実装されます。

于 2012-11-26T23:15:05.843 に答える
0

Cassandra の MultipleOutputs サポートを実装しました (この JIRA チケットを参照してください。現在、1.2 でのリリースが予定されています。今すぐ必要な場合は、チケットのパッチを適用できます。また、トピックに関するこのプレゼンテーションもチェックしてください。その使用法。

于 2012-11-28T14:49:39.200 に答える