0

これがシナリオです

           Reducer1  
         /  
Mapper - - Reducer2  
         \   
           ReducerN  

レデューサーでは、さまざまなファイルにデータを書き込みたいのですが、レデューサーが次のようになっているとしましょう

def reduce():  
  for line in sys.STDIN:  
    if(line == type1):
      create_type_1_file(line)
    if(line == type2):
      create_type_2_file(line)
    if(line == type3):
      create_type3_file(line)
      ... and so on  
def create_type_1_file(line):
  # writes to file1  
def create_type2_file(line):
  # writes to file2  
def create_type_3_file(line):
  # write to file 3  

次のように書き込むパスを検討してください。

file1 = /home/user/data/file1  
file2 = /home/user/data/file2  
file3 = /home/user/data/file3  

で実行するとpseudo-distributed mode(machine with one node and hdfs daemons running)、すべてのデーモンが同じファイルのセットに書き込むため、問題はありません

質問: - 1000 台のマシンのクラスターでこれを実行すると、それらは同じファイル セットに書き込みますか? 私はwriting to local filesystemこの場合、でこの操作を実行するためのより良い方法はありhadoop streamingますか?

4

2 に答える 2

0

通常、reduce の o/p は、HDFS などの信頼できるストレージ システムに書き込まれます。これは、ノードの 1 つがダウンすると、そのノードに関連付けられている reduce データが失われるためです。Hadoop フレームワークのコンテキスト外で、その特定の reduce タスクを再度実行することはできません。また、ジョブが完了したら、1000 ノードからの o/p をさまざまな入力タイプに統合する必要があります。

HDFSでは同時書き込みはサポートされていません。複数のレデューサーが HDFS の同じファイルに書き込みを行っている可能性があり、これによりファイルが破損する可能性があります。1 つのノードで複数の reduce タスクが実行されている場合、1 つのローカル ファイルへの書き込み時にも同時実行性が問題になる可能性があります。

解決策の 1 つは、タスク固有のファイル名を減らし、後で特定の入力タイプのすべてのファイルを結合することです。

于 2011-10-11T01:38:31.333 に答える
0

MultipleOutputs クラスを使用して、Reducer から出力を複数の場所に書き込むことができます。file1、file2、file3 を 3 つのフォルダーと見なして、1000 個の Reducer の出力データをこれらのフォルダーに個別に書き込むことができます。


ジョブ送信の使用パターン:

 Job job = new Job();

 FileInputFormat.setInputPath(job, inDir);

//outDir is the root path, in this case, outDir="/home/user/data/"
 FileOutputFormat.setOutputPath(job, outDir);

//You have to assign the output formatclass.Using MultipleOutputs in this way will still create zero-sized default output, eg part-00000. To prevent this use LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class); instead of job.setOutputFormatClass(TextOutputFormat.class); in your Hadoop job configuration.

LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class); 

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(Text.class);

 job.setMapperClass(MOMap.class);

 job.setReducerClass(MOReduce.class);

 ...

 job.waitForCompletion(true);

レデューサーでの使用法:

private MultipleOutputs out;

 public void setup(Context context) {

   out = new MultipleOutputs(context);

   ...

 }

 public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {

//'/' characters in baseOutputPath will be translated into directory levels in your file system. Also, append your custom-generated path with "part" or similar, otherwise your output will be -00000, -00001 etc. No call to context.write() is necessary.
 for (Text line : values) {

    if(line == type1)
      out.write(key, new Text(line),"file1/part");

  else  if(line == type2)
      out.write(key, new Text(line),"file2/part");

 else   if(line == type3)
      out.write(key, new Text(line),"file3/part");
   }
 }

 protected void cleanup(Context context) throws IOException, InterruptedException {
       out.close();
   }

参照: https://hadoop.apache.org/docs/r2.6.3/api/org/apache/hadoop/mapreduce/lib/output/MultipleOutputs.html

于 2016-07-06T07:58:11.953 に答える