問題は、Hadoop ストリーミング (のみ) を使用して、Hadoop でジョブをチェーンする方法です。
1 に答える
この答えは、質問者が実際に質問に入れたものです。普通は引用しますが、あまりにも大きいので割愛します。
これは、 Hadoop Streaming (現在 1.0.3)を使用して、2 つ以上のストリーミング ジョブをチェーンする方法に関するドキュメントです。
チェーンを実行する最終的なコードを理解し、他のチェーン ジョブを記述できるようにするには、予備的かつ実用的な理論が必要です。
まず、Hadoopでの仕事とは?Hadoop ジョブは
hadoopJob = Configuration + Execution
どこ、
構成 : 実行を可能にするすべてのセットアップ。
実行 : 目的のタスクを実行する実行可能ファイルまたはスクリプト ファイルのセット。言い換えれば、私たちのタスクのマップと削減のステップです。
Configuration = hadoopEnvironment + userEnvironment
どこ、
hadoopEnvironment : Hadoop の一般的な環境のセットアップです。この一般的な環境は、リソース、つまり $HADOOP_HOME/conf ディレクトリにある xml ファイルから定義されます。たとえば、core-site.xml、mapred-site.xml、および hadoop-site.xml などのリソースは、hdfs 一時ディレクトリ、ジョブ トラッカー、およびクラスター ノードの数をそれぞれ定義します。
userEnvrironment : ジョブの実行時にユーザーが指定した引数です。Hadoop では、これらの引数はオプションと呼ばれます。
userEnvironment = genericOptions + streamingOptions
どこ、
genericOptions : ジョブとは独立して、すべてのストリーミング ジョブにアピールするという意味で一般的です。それらは GenericsOptionsParser から処理されます。
streamingOptions : 特定のジョブにアピールするという意味でジョブ固有です。たとえば、すべてのジョブには独自の入力および出力ディレクトリまたはファイルがあります。それらは StreamJob から処理されます。
概略的には、
hadoopJob
/\
/ \
/ \
/ \
/ \
Configuration Execution
/\ |
/ \ |
/ \ executable or script files
/ \
/ \
/ \
hadoopEnvironment userEnvironment
| /\
| / \
| / \
$HADOOP_HOME/conf / \
/ \
genericOptions streamingOptions
| |
| |
GenericOptionsParser StreamJob
誰でもわかるように、上記のすべてが一連の構成です。一部はクラスターの管理者 ( hadoopEnvironment ) 用で、もう 1 つはクラスターのユーザー ( userEnvironment ) 用です。結論として、ジョブは主に抽象的なレベルでの構成です。実行部分は忘れてください。
私たちのコードは、上記のすべてを処理する必要があります。これでコードを書く準備が整いました。
まず、コードレベルでの Hadoop ジョブとは何ですか? jarファイルです。ジョブを送信するたびに、いくつかのコマンド ライン引数を含む jar ファイルを送信します。たとえば、単一のストリーミング ジョブを実行する場合、次のコマンドを実行します。
$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/hadoop/contrib/streaming/hadoop-streaming-1.0.3.jar -D mapred.reduce.tasks=1 -mapper m.py -reducer r.py -input /in.txt -output /out/
どこ、
私たちの仕事は hadoop-streaming-1.0.3.jar です
コマンドライン引数付き -D mapred.reduce.tasks=1 -mapper m.py -reducer r.py -input /in.txt -output /out/
この jar の中には、すべてを適切に処理するクラスがあります。
したがって、TestChain.java などの新しい Java ファイルを開きます。
// import everything needed
public class TestChain
{
//code here
public static void main( String[] args) throws Exception
{
//code here
}//end main
}//end TestChain
hadoopEnvironment を処理するには、クラスはクラスConfiguredを継承する必要があります。Class Configured は、Hadoop の環境とパラメーター、つまり前述のリソースへのアクセスを提供します。リソースは、名前と値のペアの形式でデータを含む xml ファイルです。
今後、すべてのインターフェースは多かれ少なかれ、外界と世界が達成したいタスクとの間の媒体です。つまり、インターフェイスはタスクを達成するために使用するツールです。したがって、私たちのクラスはツールです。このために、クラスはメソッド run() を宣言するToolインターフェイスを実装する必要があります。このメソッドは、もちろんインターフェイスが実装されている場合のツールの動作を定義します。最後に、ツールを使用するために、クラスToolRunnerを使用します。ToolRunner は、クラスGenericOptionsParserを通じて、userEnvironment からも genericOptions を処理するのに役立ちます。
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.util.Tool;
// import everything else needed
public class TestChain extends Configured implements Tool
{
public int run( String[] args) throws Exception
{
//code here
return 0;
}//end run
public static void main( String[] args) throws Exception
{
// ToolRunner handles generic command line options
int res = ToolRunner.run( new Configuration(), new TestChain(), args);
System.exit( res);
}//end main
}//end TestChain
全体像を完成させるために、メソッド run() はドライバーとも呼ばれ、ジョブの初期化と構成を含め、ジョブをセットアップします。上記で、メソッド ToolRunner.run()の最初のパラメーター「new Configuration 」によって、hadoopEnnvironment を処理する ToolRunner に委譲したことに注意してください。
これまでに何をしましたか?ツールが動作する環境を設定するだけです。ここで、チェーンを実行するためのツールを定義する必要があります。
すべてのチェーン ジョブがストリーミング ジョブである限り、それぞれをそのように作成します。これは、クラスStreamJobの StreamJob.createJob( String[] args) メソッドを使用して行います。String の args マトリックスには、各ジョブの「コマンド ライン」引数が含まれています。これらのコマンド ライン引数は、userEnvironment の streamingOptions (ジョブ固有) を参照します。さらに、これらの引数は、パラメーター/値のペアの形式になっています。たとえば、ジョブの入力として in.txt ファイル、出力ディレクトリとして /out/、マッパーとして m.py、リデューサーとして r.py がある場合、
String[] example = new String[]
{
"-mapper" , "m.py"
"-reducer" , "r.py"
"-input" , "in.txt"
"-output" , "/out/"
}
2つのことに注意する必要があります。まず、「-」が必要です。パラメータと値を区別するのは、その小さなことです。ここで、mapper はパラメーターで、m.py はその値です。「~」で違いが分かります。次に、パラメーターの左の " と "-" の間にスペースを追加すると、このパラメーターは無視されます。" -mapper" がある場合、" -mapper" はパラメーターとは見なされません。パラメータ/値のペア. 最後に, ジョブは大まかに構成であることを思い出してください. StreamJob.creatJob() が構成またはそれに類似したものを返すことを期待しています. 実際に StreamJob.createJob() はJobConfオブジェクトを返します. .
チェーンするジョブが 3 つあると仮定すると、
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.streaming.StreamJob;
// import everything else needed
public class TestChain extends Configured implements Tool
{
public int run( String[] args) throws Exception
{
String[] job1 = new String[]
{
"-mapper" , "m1.py"
"-reducer" , "r1.py"
"-input" , "in1.txt"
"-output" , "/out1/"
}
JobConf job1Conf = new StreamJob.createJob( job1);
//code here
String[] job2 = new String[]
{
"-mapper" , "m2.py"
"-reducer" , "r2.py"
"-input" , "in2.txt"
"-output" , "/out2/"
}
JobConf job2Conf = new StreamJob.createJob( job2);
//code here
String[] job3 = new String[]
{
"-mapper" , "m3.py"
"-reducer" , "r3.py"
"-input" , "in3.txt"
"-output" , "/out3/"
}
JobConf job3Conf = new StreamJob.createJob( job3);
//code here
return 0;
}//end run
public static void main( String[] args) throws Exception
{
// ToolRunner handles generic command line options
int res = ToolRunner.run( new Configuration(), new TestChain(), args);
System.exit( res);
}//end main
}//end TestChain
この時点で、ツールが動作する環境を設定し、その動作を定義しました。ただし、私たちはそれを実行していません。ToolRunner だけでは不十分です。ToolRunner は、ツール全体を実行します。個々のチェーン ジョブは実行されません。私たちはこれをしなければなりません。
これには 2 つの方法があります。1 つ目は JobClient を使用する方法で、2 つ目は JobControl を使用する方法です。
最初の方法-JobClient
JobClient を使用すると、チェーン ジョブをシーケンスとして実行します。ジョブごとに JobClient を呼び出すことで、1 つのジョブが次々と実行されます。個々のジョブを実行するメソッドは JobClient.runJob( jobtorun) で、jobtorun は JobConf オブジェクトです。
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.streaming.StreamJob;
public class TestChain extends Configured implements Tool
{
public int run( String[] args) throws Exception
{
String[] job1 = new String[]
{
"-mapper" , "m1.py"
"-reducer" , "r1.py"
"-input" , "in1.txt"
"-output" , "/out1/"
}
JobConf job1Conf = new StreamJob.createJob( job1);
JobClient.runJob( job1Conf);
String[] job2 = new String[]
{
"-mapper" , "m2.py"
"-reducer" , "r2.py"
"-input" , "in2.txt"
"-output" , "/out2/"
}
JobConf job2Conf = new StreamJob.createJob( job2);
JobClient.runJob( job2Conf);
String[] job3 = new String[]
{
"-mapper" , "m3.py"
"-reducer" , "r3.py"
"-input" , "in3.txt"
"-output" , "/out3/"
}
JobConf job3Conf = new StreamJob.createJob( job3);
JobClient.runJob( job3Conf);
return 0;
}//end run
public static void main( String[] args) throws Exception
{
// ToolRunner handles generic command line options
int res = ToolRunner.run( new Configuration(), new TestChain(), args);
System.exit( res);
}//end main
}//end TestChain
JobClient を使用するこの方法の利点は、ジョブの進行状況が標準出力に出力されることです。
JobClient の欠点は、ジョブ間の依存関係を処理できないことです。
2 番目の方法 - JobControl
JobControl を使用すると、すべてのチェーン ジョブがジョブ グループの一部になります。ここでは、すべてのジョブがそのグループのフレームで実行されます。これは、すべてのチェーン ジョブを最初にグループに追加する必要があり、次にそのグループが実行されることを意味します。グループは FIFO であるか、グループ内の各ジョブの実行は FCFS (先着順) スキーマに従います。各ジョブは、メソッド JobControl.addJob( jobtoadd) を使用してグループに追加されます。
JobControl は、ジョブ x がジョブ y に依存するメソッド x.addDependingJob( y) を介して依存関係を処理できます。つまり、ジョブ x は、ジョブ y が終了するまで実行できません。ジョブ x がジョブ y と z の両方に依存し、z が y から独立している場合、 x.addDependingJob( y) と x.addDependingJob( z) を使用してこれらの依存関係を表現できます。
JobControl は、JobClient とは対照的に、Jobオブジェクトで「動作」します。たとえば x.addDependingJob( y) メソッドを呼び出すと、x、y は Job オブジェクトです。同じことが JobControl.addJob( jobtoadd) にも当てはまり、jobtoadd は Job オブジェクトです。各 Job オブジェクトは JobConf オブジェクトから作成されます。私たちが持っているコードに戻ると、
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.jobcontrol.Job;
import org.apache.hadoop.mapred.jobcontrol.JobControl;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.streaming.StreamJob;
public class TestChain extends Configured implements Tool
{
public int run( String[] args) throws Exception
{
//TestChain below is an arbitrary name for the group
JobControl jobc = new JobControl( "TestChain");
String[] job1 = new String[]
{
"-mapper" , "m1.py"
"-reducer" , "r1.py"
"-input" , "in1.txt"
"-output" , "/out1/"
}
JobConf job1Conf = new StreamJob.createJob( job1);
Job job1 = new Job( job1conf);
jobc.addJob( job1);
String[] job2 = new String[]
{
"-mapper" , "m2.py"
"-reducer" , "r2.py"
"-input" , "in2.txt"
"-output" , "/out2/"
}
JobConf job2Conf = new StreamJob.createJob( job2);
Job job2 = new Job( job2conf);
jobc.addJob( job2);
String[] job3 = new String[]
{
"-mapper" , "m3.py"
"-reducer" , "r3.py"
"-input" , "/out2/par*"
"-output" , "/out3/"
}
JobConf job3Conf = new StreamJob.createJob( job3);
Job job3 = new Job( job3conf);
job3.addDependingJob( job2);
jobc.addJob( job3);
//code here
return 0;
}//end run
public static void main( String[] args) throws Exception
{
// ToolRunner handles generic command line options
int res = ToolRunner.run( new Configuration(), new TestChain(), args);
System.exit( res);
}//end main
}//end TestChain
上記のコードでは、job3 が job2 に依存していることに注意してください。ご覧のとおり、job3 の入力は job2 の出力です。この事実は依存関係です。job3 は、job2 が終了するまで待機します。
これまでは、チェーン ジョブをグループに追加し、それらの依存関係を記述していました。この一連のジョブを実行するには、最後にもう 1 つ必要です。
力ずくで言えば、メソッド JobControl.run() を呼び出すだけです。このアプローチは機能しますが、問題があります。チェーン ジョブが終了しても、ジョブ全体が永久に実行されます。適切に機能するアプローチは、(ジョブの実行時に) 既に存在するジョブ スレッドから実行の新しいスレッドを定義することです。次に、チェーン ジョブが完了するまで待ってから終了します。チェーン ジョブの実行中に、ジョブの実行情報を要求できます。たとえば、終了したジョブの数や、ジョブが無効な状態にあるかどうか、およびその状態を確認できます。
JobControl を使用するこの方法の利点は、ジョブ間に存在する可能性のある多くの依存関係を処理できることです。
JobControl の欠点は、ジョブの進行状況が標準出力に出力されず、そのままでは表示されないことです。ジョブが失敗しても成功しても、有用なものは何も出力されません。ジョブの状態などを追跡するには、Hadoop の Web UI を確認するか、以下の while ループにコードを追加する必要があります。ついに、
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.jobcontrol.Job;
import org.apache.hadoop.mapred.jobcontrol.JobControl;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.streaming.StreamJob;
public class TestChain extends Configured implements Tool
{
public int run( String[] args) throws Exception
{
//TestChain below is an arbitrary name for the group
JobControl jobc = new JobControl( "TestChain");
String[] job1 = new String[]
{
"-mapper" , "m1.py"
"-reducer" , "r1.py"
"-input" , "in1.txt"
"-output" , "/out1/"
}
JobConf job1Conf = new StreamJob.createJob( job1);
Job job1 = new Job( job1conf);
jobc.addJob( job1);
String[] job2 = new String[]
{
"-mapper" , "m2.py"
"-reducer" , "r2.py"
"-input" , "in2.txt"
"-output" , "/out2/"
}
JobConf job2Conf = new StreamJob.createJob( job2);
Job job2 = new Job( job2conf);
jobc.addJob( job2);
String[] job3 = new String[]
{
"-mapper" , "m3.py"
"-reducer" , "r3.py"
"-input" , "/out2/par*"
"-output" , "/out3/"
}
JobConf job3Conf = new StreamJob.createJob( job3);
Job job3 = new Job( job3conf);
job3.addDependingJob( job2);
jobc.addJob( job3);
Thread runjobc = new Thread( jobc);
runjobc.start();
while( !jobc.allFinished())
{
//do whatever you want; just wait or ask for job information
}
return 0;
}//end run
public static void main( String[] args) throws Exception
{
// ToolRunner handles generic command line options
int res = ToolRunner.run( new Configuration(), new TestChain(), args);
System.exit( res);
}//end main
}//end TestChain
エラー
このセクションでは、発生する可能性のあるいくつかのエラーについて説明します。以下のエラー メッセージには、OptimizingJoins クラスがあります。このクラスは、さまざまなエラーを示すためだけのクラスであり、この議論とは関係ありません。
コンパイル中にパッケージが存在しません。
これはクラスパスの問題です。同様にコンパイルします (たとえば、hadoop-streaming-1.0.3.jar パッケージを追加するには)、
javac -classpath /usr/local/hadoop/contrib/streaming/hadoop-streaming-1.0.3.jar:/usr/local/hadoop/hadoop-core-1.0.3.jar TestChain.java
不足しているパッケージを追加します。
java.lang.NoClassDefFoundError: org/apache/hadoop/streaming/StreamJob
総誤差は、
Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/hadoop/streaming/StreamJob
at OptimizingJoins.run(OptimizingJoins.java:135)
at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:65)
at OptimizingJoins.main(OptimizingJoins.java:248)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:601)
at org.apache.hadoop.util.RunJar.main(RunJar.java:156)
Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.streaming.StreamJob
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:423)
at java.lang.ClassLoader.loadClass(ClassLoader.java:356)
... 8 more
これは、jar ファイルのマニフェスト ファイルの問題です。上記の方法でジョブをコンパイルすると、すべて問題ありません。Java コンパイラーは、必要なものをすべて見つけます。しかし、コマンドを使用して Hadoop でジョブを実行すると、
$HADOOP_HOME/bin/hadoop jar /home/hduser/TestChain.jar TestChain
その場合、jar を実行する JVM は StreamJob を見つけることができません。これを解決するために、jar ファイルを作成するときに、StreamJob のクラスパスを含むマニフェスト ファイルを jar に入れます。特に、
MANIFEST.MF
Manifest-Version: 1.0
Class-Path: /usr/local/hadoop/contrib/streaming/hadoop-streaming-1.0.3.jar
Created-By: 1.7.0_07 (Oracle Corporation)
MANIFEST.MF ファイルは常に空白行で終わることに注意してください。MANIFEST.MF ファイルには 3 行ではなく 4 行あります。次に、次のような jar ファイルを作成します。
jar cmf META-INF/MANIFEST.MF TestChain.jar TestChain.class
エラー streaming.StreamJob: 認識されないオプション: -D
このエラーは、StreamJob が -D オプションを解析できないために発生します。StreamJob は、ストリーミング、ジョブ固有のオプションのみを解析できます。-D は一般的なオプションです。
この問題には 2 つの解決策があります。最初の解決策は、-D の代わりに -jobconf オプションを使用することです。2 番目の解決策は、GenericOptionsParser オブジェクトを介して -D オプションを解析することです。もちろん、2 番目のソリューションでは、StreamJob.createJob() 引数から -D オプションを削除する必要があります。
例を挙げると、2 番目のソリューションの「クリーンな」コード実装は次のとおりです。
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.streaming.StreamJob;
public class TestChain
{
public class Job1 extends Configured implements Tool
{
public int run( String[] args) throws Exception
{
String[] job1 = new String[]
{
"-mapper" , "m1.py"
"-reducer" , "r1.py"
"-input" , "in1.txt"
"-output" , "/out1/"
}
JobConf job1Conf = new StreamJob.createJob( job1);
JobClient.runJob( job1Conf);
return 0;
}//end run
}
public class Job2 extends Configured implements Tool
{
public int run( String[] args) throws Exception
{
String[] job2 = new String[]
{
"-mapper" , "m2.py"
"-reducer" , "r2.py"
"-input" , "in2.txt"
"-output" , "/out2/"
}
JobConf job2Conf = new StreamJob.createJob( job2);
JobClient.runJob( job2Conf);
return 0;
}//end run
}
public class Job3 extends Configured implements Tool
{
public int run( String[] args) throws Exception
{
String[] job3 = new String[]
{
"-mapper" , "m3.py"
"-reducer" , "r3.py"
"-input" , "in3.txt"
"-output" , "/out3/"
}
JobConf job3Conf = new StreamJob.createJob( job3);
JobClient.runJob( job3Conf);
return 0;
}//end run
}
public static void main( String[] args) throws Exception
{
TestChain tc = new TestChain();
//Domination
String[] j1args = new String[]
{
"-D", "mapred.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator",
"-D", "mapred.text.key.comparator.options=-k1,1" ,
"-D", "mapred.reduce.tasks=1"
};
// Let ToolRunner handle generic command-line options
int j1res = ToolRunner.run( new Configuration(), tc.new Job1(), j1args);
//Cost evaluation
String[] j2rgs = new String[]
{
"-D", "mapred.reduce.tasks=12 " ,
"-D", "mapred.text.key,partitioner.options=-k1,1"
};
// Let ToolRunner handle generic command-line options
int j2res = ToolRunner.run( new Configuration(), tc.new Job2(), j2args);
//Minimum Cost
String[] j3args = new String[]
{
"-D", "mapred.reduce.tasks=1"
};
// Let ToolRunner handle generic command-line options
int j3res = ToolRunner.run( new Configuration(), tc.new Job1(), j3args);
System.exit( mres);
}
}//end TestChain
上記のコードでは、チェーン ジョブをカプセル化するグローバル クラス TestChain を定義します。次に、個別のチェーン ジョブを定義します。つまり、run メソッドを定義します。すべてのチェーン ジョブは、Configured を継承して Tool を実装するクラスです。最後に、TestChain のメイン メソッドから、各ジョブを順番に実行します。チェーン ジョブを実行する前に、一般的なオプションを定義することに注意してください。
コンパイル
javac -classpath /usr/local/hadoop/contrib/streaming/hadoop-streaming-1.0.3.jar:/usr/local/hadoop/hadoop-core-1.0.3.jar TestChain.java
ジャー
jar cmf META-INF/MANIFEST.MF TestChain.jar TestChain.class TestChain\$Dom.class TestChain\$Cost.class TestChain\$Min.class
エラー security.UserGroupInformation: PriviledgedActionException as:hduser cause:org.apache.hadoop.mapred.InvalidInputException: 入力パターン hdfs://localhost:54310/user/hduser/whateverFile が 0 ファイルに一致する
このエラーは、JobControl を使用すると発生します。たとえば、ジョブが前のジョブの出力を入力として持っている場合、この入力 - 出力ファイルがまだ存在しない場合、このエラーが発生します。JobControl は、JobClient のように 1 つずつではなく、すべての独立したジョブを「並列」で実行します。そのため、Jobcontrol は入力ファイルが存在しないジョブを実行しようとし、そのために失敗します。
この状況を回避するために、x.addDependingJob( y) を使用してこれら 2 つのジョブ間に依存関係があることを宣言します。ジョブ x はジョブ y に依存します。現在、JobControl は並列依存ジョブで実行しようとしません。