Cloudera 5.5.0 を使用して単純な Hadoop マップ削減の例を実装しようとしています。マップと削減の手順は、Python 2.6.6 を使用して実装する必要があります。
問題:
- スクリプトが UNIX コマンド ラインで実行されている場合、スクリプトは完全に正常に動作し、期待される出力を生成します。
cat join2*.txt | ./join3_mapper.py | ソート | ./join3_reducer.py
- しかし、スクリプトを Hadoop タスクとして実行すると、ひどく失敗します。
Hadoop jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar -input /user/cloudera/inputTV/join2_gen*.txt -output /user/cloudera/output_tv -mapper /home/cloudera/join3_mapper.py -reducer /ホーム/cloudera/join3_reducer.py -numReduceTasks 1
16/01/06 12:32:32 INFO mapreduce.Job: Task Id : attempt_1452069211060_0026_r_000000_0, Status : FAILED
Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:325)
at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:538)
at org.apache.hadoop.streaming.PipeReducer.close(PipeReducer.java:134)
at org.apache.hadoop.io.IOUtils.cleanup(IOUtils.java:244)
at org.apache.hadoop.mapred.ReduceTask.runOldReducer(ReduceTask.java:459)
at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:392)
at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:163)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1671)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)
Hadoop コマンドが -numReduceTasks 0 で実行され、hadoop ジョブがマップ ステップのみを実行し、正常に終了し、出力ディレクトリにマップ ステップからの結果ファイルが含まれている場合、マッパーは機能します。
それでは、reduceステップに何か問題があるに違いないと思いますか?
- Hue の stderr ログには、関連するものが何も表示されません。
ログのアップロード時間: Wed Jan 06 12:33:10 -0800 2016 ログの長さ: 222 log4j:WARN ロガー (org.apache.hadoop.ipc.Server) のアペンダーが見つかりませんでした。log4j:WARN log4j システムを適切に初期化してください。log4j:WARN 詳細については、http: //logging.apache.org/log4j/1.2/faq.html#noconfigを参照してください。
スクリプトのコード: 最初のファイル: join3_mapper.py
#!/usr/bin/env python
import sys
for line in sys.stdin:
line = line.strip() #strip out carriage return
tuple2 = line.split(",") #split line, into key and value, returns a list
if len(tuple2) == 2:
key = tuple2[0]
value = tuple2[1]
if value == 'ABC':
print('%s\t%s' % (key, value) )
elif value.isdigit():
print('%s\t%s' % (key, value) )
2 番目のファイル: join3_reducer.py
#!/usr/bin/env python
import sys
last_key = None #initialize these variables
running_total = 0
abcFound =False;
this_key = None
# -----------------------------------
# Loop the file
# --------------------------------
for input_line in sys.stdin:
input_line = input_line.strip()
# --------------------------------
# Get Next Key value pair, splitting at tab
# --------------------------------
tuple2 = input_line.split("\t")
this_key = tuple2[0]
value = tuple2[1]
if value.isdigit():
value = int(value)
# ---------------------------------
# Key Check part
# if this current key is same
# as the last one Consolidate
# otherwise Emit
# ---------------------------------
if last_key == this_key:
if value == 'ABC': # filter for only ABC in TV shows
abcFound=True;
else:
if isinstance(value, (int,long) ):
running_total += value
else:
if last_key: #if this key is different from last key, and the previous
# (ie last) key is not empy,
# then output
# the previous <key running-count>
if abcFound:
print('%s\t%s' % (last_key, running_total) )
abcFound=False;
running_total = value #reset values
last_key = this_key
if last_key == this_key:
print('%s\t%s' % (last_key, running_total) )
入力ファイルをhadoopコマンドに宣言するさまざまな方法を試しましたが、違いも成功もありませんでした。
私は何を間違っていますか?ヒント、アイデアは大歓迎です ありがとう