バイナリ実行可能ファイルとキャッシュされたアーカイブを使用して簡単な例を実行しようとしていますが、動作していないようです:
私が実行しようとしている例には、3 つのランダムな double とキーを生成するマッパーがあり、リデューサーはこれら 3 つの数値を平均して平均をログに記録します。とてもシンプルなもの。私は乱数を生成するcで簡単なEXEを書きました:
#include <cstdio>
#include <stdlib.h>
#include <time.h>
int main(int argc, char*argv[]) {
srand ( time(NULL) );
int rand1 = rand() % 10 + 1;
int rand2 = rand() % 10 + 1;
int rand3 = rand() % 10 + 1;
printf("%s, %f, %f, %f", argv[1], (float)rand1/5, (float)rand2/5, (float)rand3/5);
return 0;
}
./a.out [key] を呼び出すと
私は見る
キー、ランダム 1、ランダム 2、ランダム 3
私はpythonストリーミングを使用しています。これがpythonで書かれた私のマッパーです:
#!/usr/bin/python
import os
import sys
import random
import shlex, subprocess
from subprocess import PIPE
from misc import *
for line in sys.stdin:
line = line.strip()
sline = line.split(',')
# parse out the au and cost from the line ....
key = int( sline[0] )
au = int( sline[1])
cost = float( sline[2] )
command = "./a.out %d" % ( key )
cli_parts = shlex.split(command)
mp = subprocess.Popen(cli_parts, stdin=PIPE, stderr=PIPE,
stdout=PIPE)
print mp.communicate()[0].strip('\r\n')
平均化を行うだけのレデューサーは次のとおりです。
#!/usr/bin/python
import os
import sys
import math
import re
from misc import *
for line in sys.stdin:
line = line.strip()
m = re.match("(\w+),\s+(.*),\s+(.*),\s+(.*)", line)
if m:
average = (float(m.groups(0)[1]) + float(m.groups(0)[2]) +
float(m.groups(0)[3])) / 3
print "key: %d average: %f" % ( int(m.groups(0)[0]), average )
else:
print "not found"
#f.close()
ドキュメントを読んだ後、バイナリとtar.gz-itをコンパイルする必要があるようです
1) tar cvaf a.out.tar.gz a.out
これで、-cacheArchive パラメータを介してこれをデータノードに渡すことができ、すべて正常に動作するはずです。私のHadoopコマンドは次のとおりです。
hadoop jar /usr/lib/hadoop-0.20/contrib/streaming/hadoop-streaming-0.20.2+737.jar \ -numReduceTasks 1 \ -mapper mapper1.py \ -file mapper1.py \ -reducer reducer1.py \ - file reducer1.py \ -file misc.py \ -cacheArchive a.out.tar.gz \ -input input/* \ -output testsvmoutput \ -verbose
言うまでもなく、これは機能しません。マッパーがデータを生成していないことが原因のようです。
コマンドラインでテストして、コードが機能することを確認しました。
猫入力/svminput1.txt | python mapper1.py | ソート | python reducer1.py
なぜこれが機能しないのか、データノードで cacheArchive コマンドを介して exe を渡す方法、および/または Cloudera html パネルから出てくるエラーメッセージはあまり役に立たないため、これをデバッグする方法を誰かに説明してもらいたいです。
ありがとう
これが私が見ているエラーです:
java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:362)
at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:572)
at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:136)
at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:57)
at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:36)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:383)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:317)
at org.apache.hadoop.mapred.Child$4.run(Child.java:217)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:396)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1063)
at org.apache.hadoop.mapred.Child.main(Child.java:211)