Ubuntu 仮想マシンで、Michael Noll のチュートリアルに従って単一ノード クラスターをセットアップしました。これが、Hadoop プログラムを作成するための出発点になりました。
また、参考までに、この.
私のプログラムは Python で、Hadoop Streaming を使用しています。
私は単純なベクトル乗算プログラムを作成しました。このプログラムでmapper.py
は、入力ファイルv1
とを受け取りv2
、それぞれがフォーム内のベクトルを含み、12,33,10
積を返します。次にreducer.py
、積の合計を返します。つまり、次のようになります。
マッパー:map(mult,v1,v2)
レデューサー:sum(p1,p2,p3,...,pn)
mapper.py :
import sys
def mult(x,y):
return int(x)*int(y)
# Input comes from STDIN (standard input).
inputvec = tuple()
for i in sys.stdin:
i = i.strip()
inputvec += (tuple(i.split(",")),)
v1 = inputvec[0]
v2 = inputvec[1]
results = map(mult, v1, v2)
# Simply printing the results variable would print the tuple. This
# would be fine except that the STDIN of reduce.py takes all the
# output as input, including brackets, which can be problematic
# Cleaning the output ready to be input for the Reduce step:
for o in results:
print ' %s' % o,
レデューサー.py:
import sys
result = int()
for a in sys.stdin:
a = a.strip()
a = a.split()
for r in range(len(a)):
result += int(a[r])
print result
in
サブディレクトリには、含むv1
と5,12,20
含まv2
れてい14,11,3
ます。
ローカルでテストすると、期待どおりに動作します。
hduser@ubuntu:~/VectMult$ cat in/* | python ./mapper.py
70 132 60
hduser@ubuntu:~/VectMult$ cat in/* | python ./mapper.py | sort
70 132 60
hduser@ubuntu:~/VectMult$ cat in/* | python ./mapper.py | sort | python ./reducer.py
262
Hadoop で実行すると、正常に実行されたように見え、例外はスローされません。
hduser@ubuntu:/usr/local/hadoop$ bin/hadoop jar contrib/streaming/hadoop-*streaming*.jar -mapper python /home/hduser/VectMult3/mapper.py -reducer python /home/hduser/VectMult3/reducer.py -input /home/hduser/VectMult3/in -output /home/hduser/VectMult3/out4
Warning: $HADOOP_HOME is deprecated.
packageJobJar: [/app/hadoop/tmp/hadoop-unjar2168776605822419867/] [] /tmp/streamjob6920304075078514767.jar tmpDir=null
12/11/18 21:20:09 INFO util.NativeCodeLoader: Loaded the native-hadoop library
12/11/18 21:20:09 WARN snappy.LoadSnappy: Snappy native library not loaded
12/11/18 21:20:09 INFO mapred.FileInputFormat: Total input paths to process : 2
12/11/18 21:20:09 INFO streaming.StreamJob: getLocalDirs(): [/app/hadoop/tmp/mapred/local]
12/11/18 21:20:09 INFO streaming.StreamJob: Running job: job_201211181903_0009
12/11/18 21:20:09 INFO streaming.StreamJob: To kill this job, run:
12/11/18 21:20:09 INFO streaming.StreamJob: /usr/local/hadoop/libexec/../bin/hadoop job -Dmapred.job.tracker=localhost:54311 -kill job_201211181903_0009
12/11/18 21:20:09 INFO streaming.StreamJob: Tracking URL: http://localhost:50030/jobdetails.jsp?jobid=job_201211181903_0009
12/11/18 21:20:10 INFO streaming.StreamJob: map 0% reduce 0%
12/11/18 21:20:24 INFO streaming.StreamJob: map 67% reduce 0%
12/11/18 21:20:33 INFO streaming.StreamJob: map 100% reduce 0%
12/11/18 21:20:36 INFO streaming.StreamJob: map 100% reduce 22%
12/11/18 21:20:45 INFO streaming.StreamJob: map 100% reduce 100%
12/11/18 21:20:51 INFO streaming.StreamJob: Job complete: job_201211181903_0009
12/11/18 21:20:51 INFO streaming.StreamJob: Output: /home/hduser/VectMult3/out4
hduser@ubuntu:/usr/local/hadoop$ bin/hadoop dfs -cat /home/hduser/VectMult3/out4/part-00000
Warning: $HADOOP_HOME is deprecated.
hduser@ubuntu:/usr/local/hadoop$ bin/hadoop dfs -ls /home/hduser/VectMult3/out4/
Warning: $HADOOP_HOME is deprecated.
Found 3 items
-rw-r--r-- 1 hduser supergroup 0 2012-11-18 22:05 /home/hduser/VectMult3/out4/_SUCCESS
drwxr-xr-x - hduser supergroup 0 2012-11-18 22:05 /home/hduser/VectMult3/out4/_logs
-rw-r--r-- 1 hduser supergroup 0 2012-11-18 22:05 /home/hduser/VectMult3/out4/part-00000
しかし、出力を確認すると、0 バイトの空のファイルしか見つかりません。
何がうまくいかなかったのかわかりません。誰でも助けることができますか?
編集:@DiJuMxへの応答
これを修正する 1 つの方法は、map から一時ファイルに出力し、その一時ファイルを reduce で使用することです。
Hadoop でこれが許可されているかどうかわかりませんか? うまくいけば、よく知っている人がこれについて私を訂正してくれるでしょう。
これを試みる前に、データを処理せずにそのまま渡すだけの単純なバージョンを作成してみてください。
データが正しく流れていることを確認するためだけに、これは良い考えだと思いました。これには以下を使用しました。
mapper.py と reducer.py の両方が
sys をインポートします
for i in sys.stdin:
print i,
出てくるものは、入ったものとまったく同じでなければなりません。それでも、空のファイルが出力されます。
または、既存のコードを reduce で編集して、入力が空白の場合に (エラー) メッセージを出力ファイルに出力します。
mapper.py
import sys
for i in sys.stdin:
print "mapped",
print "mapper",
reducer.py
import sys
for i in sys.stdin:
print "reduced",
print "reducer",
入力が受信された場合、最終的に出力する必要がありますreduced
。いずれにせよ、少なくとも出力する必要がありますreducer
。実際の出力はまだ空のファイルです。