126

MapReduce を適用する多くの実際の状況では、最終的なアルゴリズムはいくつかの MapReduce ステップになります。

つまり、 Map1 、 Reduce1 、 Map2 、 Reduce2 などです。

したがって、次のマップの入力として必要な最後の reduce からの出力が得られます。

中間データは、(一般に) パイプラインが正常に完了すると保持したくないものです。また、この中間データは一般に何らかのデータ構造 (「マップ」や「セット」など) であるため、これらのキーと値のペアの書き込みと読み取りにあまり労力をかけたくないでしょう。

Hadoop で推奨される方法は何ですか?

後のクリーンアップを含め、この中間データを正しい方法で処理する方法を示す (簡単な) 例はありますか?

4

14 に答える 14

58

Yahoo の開発者ネットワークのこのチュートリアルがこれに役立つと思います: Chaining Jobs

を使用しJobClient.runJob()ます。最初のジョブからのデータの出力パスは、2 番目のジョブへの入力パスになります。これらは、適切なコードを使用してジョブに引数として渡され、それらを解析してジョブのパラメーターを設定する必要があります。

ただし、上記の方法は、現在は古い mapred API が行っていた方法である可能性があると思いますが、それでも機能するはずです。新しい mapreduce API にも同様のメソッドがありますが、それが何であるかはわかりません。

ジョブが終了した後に中間データを削除する限り、コードでこれを行うことができます。私が以前に行った方法は、次のようなものを使用しています。

FileSystem.delete(Path f, boolean recursive);

パスは、データの HDFS 上の場所です。このデータは、他のジョブが必要としなくなった場合にのみ削除するようにしてください。

于 2010-03-24T22:31:25.010 に答える
22

あなたがそれをすることができる多くの方法があります。

(1)カスケードジョブ

最初のジョブのJobConfオブジェクト「job1」を作成し、「input」を入力ディレクトリ、「temp」を出力ディレクトリとしてすべてのパラメータを設定します。このジョブを実行します。

JobClient.run(job1).

そのすぐ下に、2番目のジョブのJobConfオブジェクト「job2」を作成し、「temp」を入力ディレクトリ、「output」を出力ディレクトリとしてすべてのパラメータを設定します。このジョブを実行します。

JobClient.run(job2).

(2) JobClient.runを使用しないことを除いて、(1)と同じように2つのJobConfオブジェクトを作成し、それらにすべてのパラメーターを設定します。

次に、jobconfsをパラメーターとして使用して2つのJobオブジェクトを作成します。

Job job1=new Job(jobconf1); 
Job job2=new Job(jobconf2);

jobControlオブジェクトを使用して、ジョブの依存関係を指定してから、ジョブを実行します。

JobControl jbcntrl=new JobControl("jbcntrl");
jbcntrl.addJob(job1);
jbcntrl.addJob(job2);
job2.addDependingJob(job1);
jbcntrl.run();

(3) Map+のような構造が必要な場合| 減らす| Map *では、Hadoopバージョン0.19以降に付属するChainMapperクラスとChainReducerクラスを使用できます。

于 2010-07-03T17:15:29.777 に答える
7

これを行うには、実際にはいくつかの方法があります。2つに焦点を当てます。

1つは、Riffle(http://github.com/cwensel/riffle)を介して、依存関係を識別し、依存関係(トポロジカル)の順序でそれらを「実行」するための注釈ライブラリです。

または、Cascading( http://www.cascading.org/ )でCascade(およびMapReduceFlow)を使用できます。将来のバージョンではRiffleアノテーションがサポートされますが、現在は生のMRJobConfジョブでうまく機能します。

これの変形は、MRジョブを手動で管理するのではなく、CascadingAPIを使用してアプリケーションを開発することです。次に、JobConfとジョブチェーンは、CascadingplannerクラスとFlowクラスを介して内部的に処理されます。

このようにして、Hadoopジョブの管理の仕組みなどではなく、問題に集中することに時間を費やします。開発とアプリケーションをさらに簡素化するために、さまざまな言語(clojureやjrubyなど)を上に重ねることもできます。http://www.cascading.org/modules.html

于 2010-03-26T19:25:42.957 に答える
7

JobConf オブジェクトを次々と使用してジョブチェーンを実行しました。ジョブを連鎖させるために WordCount の例を取り上げました。1 つのジョブは、指定された出力で単語が繰り返される回数を計算します。2 番目のジョブは、最初のジョブの出力を入力として受け取り、指定された入力内の単語の合計を計算します。以下は、Driver クラスに配置する必要があるコードです。

    //First Job - Counts, how many times a word encountered in a given file 
    JobConf job1 = new JobConf(WordCount.class);
    job1.setJobName("WordCount");

    job1.setOutputKeyClass(Text.class);
    job1.setOutputValueClass(IntWritable.class);

    job1.setMapperClass(WordCountMapper.class);
    job1.setCombinerClass(WordCountReducer.class);
    job1.setReducerClass(WordCountReducer.class);

    job1.setInputFormat(TextInputFormat.class);
    job1.setOutputFormat(TextOutputFormat.class);

    //Ensure that a folder with the "input_data" exists on HDFS and contains the input files
    FileInputFormat.setInputPaths(job1, new Path("input_data"));

    //"first_job_output" contains data that how many times a word occurred in the given file
    //This will be the input to the second job. For second job, input data name should be
    //"first_job_output". 
    FileOutputFormat.setOutputPath(job1, new Path("first_job_output"));

    JobClient.runJob(job1);


    //Second Job - Counts total number of words in a given file

    JobConf job2 = new JobConf(TotalWords.class);
    job2.setJobName("TotalWords");

    job2.setOutputKeyClass(Text.class);
    job2.setOutputValueClass(IntWritable.class);

    job2.setMapperClass(TotalWordsMapper.class);
    job2.setCombinerClass(TotalWordsReducer.class);
    job2.setReducerClass(TotalWordsReducer.class);

    job2.setInputFormat(TextInputFormat.class);
    job2.setOutputFormat(TextOutputFormat.class);

    //Path name for this job should match first job's output path name
    FileInputFormat.setInputPaths(job2, new Path("first_job_output"));

    //This will contain the final output. If you want to send this jobs output
    //as input to third job, then third jobs input path name should be "second_job_output"
    //In this way, jobs can be chained, sending output one to other as input and get the
    //final output
    FileOutputFormat.setOutputPath(job2, new Path("second_job_output"));

    JobClient.runJob(job2);

これらのジョブを実行するコマンドは次のとおりです。

bin/hadoop jar TotalWords.

コマンドの最終ジョブ名を指定する必要があります。上記の場合、TotalWords です。

于 2014-10-15T11:40:16.733 に答える
6

コードで指定された方法で MR チェーンを実行できます。

注意してください: ドライバー コードのみが提供されています。

public class WordCountSorting {
// here the word keys shall be sorted
      //let us write the wordcount logic first

      public static void main(String[] args)throws IOException,InterruptedException,ClassNotFoundException {
            //THE DRIVER CODE FOR MR CHAIN
            Configuration conf1=new Configuration();
            Job j1=Job.getInstance(conf1);
            j1.setJarByClass(WordCountSorting.class);
            j1.setMapperClass(MyMapper.class);
            j1.setReducerClass(MyReducer.class);

            j1.setMapOutputKeyClass(Text.class);
            j1.setMapOutputValueClass(IntWritable.class);
            j1.setOutputKeyClass(LongWritable.class);
            j1.setOutputValueClass(Text.class);
            Path outputPath=new Path("FirstMapper");
            FileInputFormat.addInputPath(j1,new Path(args[0]));
                  FileOutputFormat.setOutputPath(j1,outputPath);
                  outputPath.getFileSystem(conf1).delete(outputPath);
            j1.waitForCompletion(true);
                  Configuration conf2=new Configuration();
                  Job j2=Job.getInstance(conf2);
                  j2.setJarByClass(WordCountSorting.class);
                  j2.setMapperClass(MyMapper2.class);
                  j2.setNumReduceTasks(0);
                  j2.setOutputKeyClass(Text.class);
                  j2.setOutputValueClass(IntWritable.class);
                  Path outputPath1=new Path(args[1]);
                  FileInputFormat.addInputPath(j2, outputPath);
                  FileOutputFormat.setOutputPath(j2, outputPath1);
                  outputPath1.getFileSystem(conf2).delete(outputPath1, true);
                  System.exit(j2.waitForCompletion(true)?0:1);
      }

}

THE SEQUENCE IS

( JOB1 )MAP->REDUCE-> ( JOB2 )MAP
これはキーをソートするために行われましたが、ツリーマップを使用するなど、他にも方法があります
。 !
ありがとうございました

于 2015-10-27T13:12:19.137 に答える
4

MapReduce ジョブの barch 処理に oozie を使用できます。http://issues.apache.org/jira/browse/HADOOP-5303

于 2010-03-23T21:05:43.513 に答える
3

Jobのメソッドを利用しwaitForCompletion(true)て、ジョブ間の依存関係を定義できます。

私のシナリオでは、互いに依存している3つの仕事がありました。ドライバークラスでは、以下のコードを使用しましたが、期待どおりに機能します。

public static void main(String[] args) throws Exception {
        // TODO Auto-generated method stub

        CCJobExecution ccJobExecution = new CCJobExecution();

        Job distanceTimeFraudJob = ccJobExecution.configureDistanceTimeFraud(new Configuration(),args[0], args[1]);
        Job spendingFraudJob = ccJobExecution.configureSpendingFraud(new Configuration(),args[0], args[1]);
        Job locationFraudJob = ccJobExecution.configureLocationFraud(new Configuration(),args[0], args[1]);

        System.out.println("****************Started Executing distanceTimeFraudJob ================");
        distanceTimeFraudJob.submit();
        if(distanceTimeFraudJob.waitForCompletion(true))
        {
            System.out.println("=================Completed DistanceTimeFraudJob================= ");
            System.out.println("=================Started Executing spendingFraudJob ================");
            spendingFraudJob.submit();
            if(spendingFraudJob.waitForCompletion(true))
            {
                System.out.println("=================Completed spendingFraudJob================= ");
                System.out.println("=================Started locationFraudJob================= ");
                locationFraudJob.submit();
                if(locationFraudJob.waitForCompletion(true))
                {
                    System.out.println("=================Completed locationFraudJob=================");
                }
            }
        }
    }
于 2013-01-28T07:23:37.917 に答える
3

複数の MapReduce ジョブをチェーンする Apache Mahout プロジェクトの例があります。例の 1 つは次の場所にあります。

RecommenderJob.java

http://search-lucene.com/c/Mahout:/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJob.java%7C%7CRecommenderJob

于 2011-05-25T23:38:52.870 に答える
1

oozie は、後続のジョブが前のジョブから直接入力を受け取るのに役立つと思います。これにより、ジョブ制御で実行される I/O 操作が回避されます。

于 2012-11-13T22:28:59.500 に答える
1

oozie などの複雑なサーバー ベースの Hadoop ワークフロー エンジンがありますが、ワークフローとして複数の Hadoop ジョブを実行できる単純な Java ライブラリがあります。ジョブ間の依存関係を定義するジョブ構成とワークフローは、JSON ファイルで構成されます。すべてが外部から構成可能であり、ワークフローの一部として既存の map reduce 実装を変更する必要はありません。

詳細はこちら。ソースコードと jar は github で入手できます。

http://pkghosh.wordpress.com/2011/05/22/hadoop-orchestration/

プラナブ

于 2011-05-26T18:58:56.730 に答える