0

ストリーミング マップ削減ジョブがあります。処理用のスロットが 30 ほどあります。最初に、60 レコード (フィールドはタブ区切り) を含む単一の入力ファイルを取得します。すべてのレコードの最初のフィールドは数値で、最初のレコード番号 (最初のフィールド) は 1、2 番目のレコード番号 (最初のフィールド) は 2 などです。 . 処理の次のステップのために、これらのレコードから 30 個のファイルを作成したいと考えています。それぞれに 2 つのレコードが含まれています (均等な分布)。

これが機能するために、hadoop ジョブにレデューサーの数を 30 と指定しました。最初のフィールドがキーとして使用され、それぞれ 2 つのレコードを含む 30 個の出力ファイルが得られると予想しました。

30 個の出力ファイルを取得できますが、すべてが同じ数のレコードを含むわけではありません。一部のファイルは空 (ゼロ サイズ) です。何か案が

4

2 に答える 2

0

出力キーのタイプは何ですか? IntWritable ではなく Text を使用している場合 (ストリーミングを使用している必要があると思います)、リデュース数はキー値の UTF-8 '文字列' のバイト表現のハッシュに基づいて計算されます。これを実際に観察するための簡単な単体テストを作成できます。

public class TextHashTest {
    @Test
    public void testHash() {
        int partitions = 30;
        for (int x = 0; x < 100; x++) {
            int hash = new Text(String.valueOf(x)).hashCode();
            int part = hash % partitions;
            System.err.printf("%d = %d => %d\n", x, hash, part);            
        }
    }
}

出力は貼り付けませんが、100 個の値のうち、パーティション ビン 0 ~ 7 は値を受け取りません。

Thomas Jungblutがコメントで述べているように、カスタム パーティショナーを作成して Text 値を整数値に変換し、この数値をパーティションの総数でモジュロする必要がありますが、それでも「均等」にはならない場合があります。値自体が1アップシーケンスでない場合の分布(そうであると言うので、大丈夫です)

public class IntTextPartitioner implements Partitioner<Text, Text> {
    public void configure(JobConf job) {}

    public int getPartition(Text key, Text value, int numPartitions) {
        return Integer.valueOf(key.toString()) % numPartitions;
    }            
}
于 2012-05-30T01:45:03.003 に答える