9

次のようにすると、すべてがローカルで正常に動作します。

cat input | python mapper.py | sort | python reducer.py

ただし、AWS Elastic Mapreduce でストリーミング MapReduce ジョブを実行すると、ジョブが正常に完了しません。途中まで実行されmapper.pyます(途中で書いたので、これを知ってstderrいます)。マッパーは「Broken Pipe」エラーによって中断されます。これは、失敗した後にタスク試行の syslog から取得できます。

java.io.IOException: Broken pipe
    at java.io.FileOutputStream.writeBytes(Native Method)
    at java.io.FileOutputStream.write(FileOutputStream.java:282)
    at java.io.BufferedOutputStream.write(BufferedOutputStream.java:105)
    at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:65)
    at java.io.BufferedOutputStream.write(BufferedOutputStream.java:109)
    at java.io.DataOutputStream.write(DataOutputStream.java:90)
    at org.apache.hadoop.streaming.io.TextInputWriter.writeUTF8(TextInputWriter.java:72)
    at org.apache.hadoop.streaming.io.TextInputWriter.writeValue(TextInputWriter.java:51)
    at org.apache.hadoop.streaming.PipeMapper.map(PipeMapper.java:109)
    at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:50)
    at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:36)
    at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:441)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:377)
    at org.apache.hadoop.mapred.Child$4.run(Child.java:255)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:396)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1059)
    at org.apache.hadoop.mapred.Child.main(Child.java:249)


2012-03-26 07:19:05,400 WARN org.apache.hadoop.streaming.PipeMapRed (main): java.io.IOException: Broken pipe
    at java.io.FileOutputStream.writeBytes(Native Method)
    at java.io.FileOutputStream.write(FileOutputStream.java:282)
    at java.io.BufferedOutputStream.write(BufferedOutputStream.java:105)
    at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:65)
    at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:123)
    at java.io.DataOutputStream.flush(DataOutputStream.java:106)
    at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:579)
    at org.apache.hadoop.streaming.PipeMapper.map(PipeMapper.java:124)
    at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:50)
    at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:36)
    at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:441)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:377)
    at org.apache.hadoop.mapred.Child$4.run(Child.java:255)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:396)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1059)
    at org.apache.hadoop.mapred.Child.main(Child.java:249)

2012-03-26 07:19:05,400 INFO org.apache.hadoop.streaming.PipeMapRed (main): mapRedFinished
2012-03-26 07:19:05,400 WARN org.apache.hadoop.streaming.PipeMapRed (main): java.io.IOException: Bad file descriptor
    at java.io.FileOutputStream.writeBytes(Native Method)
    at java.io.FileOutputStream.write(FileOutputStream.java:282)
    at java.io.BufferedOutputStream.write(BufferedOutputStream.java:105)
    at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:65)
    at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:123)
    at java.io.DataOutputStream.flush(DataOutputStream.java:106)
    at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:579)
    at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:135)
    at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:57)
    at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:36)
    at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:441)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:377)
    at org.apache.hadoop.mapred.Child$4.run(Child.java:255)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:396)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1059)
    at org.apache.hadoop.mapred.Child.main(Child.java:249)

2012-03-26 07:19:05,400 INFO org.apache.hadoop.streaming.PipeMapRed (main): mapRedFinished
2012-03-26 07:19:05,405 INFO org.apache.hadoop.streaming.PipeMapRed (Thread-13): MRErrorThread done
2012-03-26 07:19:05,408 INFO org.apache.hadoop.mapred.TaskLogsTruncater (main): Initializing logs' truncater with mapRetainSize=-1 and reduceRetainSize=-1
2012-03-26 07:19:05,519 INFO org.apache.hadoop.io.nativeio.NativeIO (main): Initialized cache for UID to User mapping with a cache timeout of 14400 seconds.
2012-03-26 07:19:05,520 INFO org.apache.hadoop.io.nativeio.NativeIO (main): Got UserName hadoop for UID 106 from the native implementation
2012-03-26 07:19:05,522 WARN org.apache.hadoop.mapred.Child (main): Error running child
java.io.IOException: log:null
R/W/S=7018/3/0 in:NA [rec/s] out:NA [rec/s]
minRecWrittenToEnableSkip_=9223372036854775807 LOGNAME=null
HOST=null
USER=hadoop
HADOOP_USER=null
last Hadoop input: |null|
last tool output: |text/html    1|
Date: Mon Mar 26 07:19:05 UTC 2012
java.io.IOException: Broken pipe
    at java.io.FileOutputStream.writeBytes(Native Method)
    at java.io.FileOutputStream.write(FileOutputStream.java:282)
    at java.io.BufferedOutputStream.write(BufferedOutputStream.java:105)
    at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:65)
    at java.io.BufferedOutputStream.write(BufferedOutputStream.java:109)
    at java.io.DataOutputStream.write(DataOutputStream.java:90)
    at org.apache.hadoop.streaming.io.TextInputWriter.writeUTF8(TextInputWriter.java:72)
    at org.apache.hadoop.streaming.io.TextInputWriter.writeValue(TextInputWriter.java:51)
    at org.apache.hadoop.streaming.PipeMapper.map(PipeMapper.java:109)
    at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:50)
    at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:36)
    at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:441)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:377)
    at org.apache.hadoop.mapred.Child$4.run(Child.java:255)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:396)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1059)
    at org.apache.hadoop.mapred.Child.main(Child.java:249)


    at org.apache.hadoop.streaming.PipeMapper.map(PipeMapper.java:125)
    at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:50)
    at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:36)
    at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:441)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:377)
    at org.apache.hadoop.mapred.Child$4.run(Child.java:255)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:396)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1059)
    at org.apache.hadoop.mapred.Child.main(Child.java:249)
2012-03-26 07:19:05,525 INFO org.apache.hadoop.mapred.Task (main): Runnning cleanup for the task
2012-03-26 07:19:05,526 INFO org.apache.hadoop.mapred.DirectFileOutputCommitter (main): Nothing to clean up on abort since there are no temporary files written

ここにありmapper.pyます。デバッグ情報を提供するために stderr に書き込むことに注意してください。

#!/usr/bin/env python

import sys
from warc import ARCFile

def main():
    warc_file = ARCFile(fileobj=sys.stdin)
    for web_page in warc_file:
        print >> sys.stderr, '%s\t%s' % (web_page.header.content_type, 1) #For debugging
        print '%s\t%s' % (web_page.header.content_type, 1)
    print >> sys.stderr, 'done' #For debugging
if __name__ == "__main__":
    main()

これは、mapper.py が実行されたときにタスク試行の stderr に表示されるものです。

text/html   1
text/html   1
text/html   1

基本的に、ループは 3 回実行され、Python がエラーをスローすることなく突然停止します。(注:何千行も出力されているはずです)。キャッチされなかった例外も stderr に表示されるはずです。

MapReduce は私のローカル コンピューターで完全に正常に動作するため、これは、mapper.py から印刷している出力を Hadoop が処理する方法に問題があると推測されます。しかし、私は問題が何であるかについては無知です。

4

3 に答える 3

8

これは受け入れられたエラーで述べられていますが、明確にしようとしましょう。必要がなくても、stdin でブロックする必要があります。これはLinux パイプと同じではないので、だまされてはいけません。直感的に何が起こるかというと、ストリーミングは実行可能ファイルを立ち上げ、「入力を取得するまでここで待ってください」と言います。Streaming が 100% の入力を送信する前に何らかの理由で実行可能ファイルが停止した場合、Streaming は次のように言います。 !」だから、ここにいくつかのpythonコードがあります。これが行うのはcatが行うことだけですが、このコードはすべての入力が処理されるまで終了しないことに注意してください。これが重要なポイントです。

#!/usr/bin/python
import sys

while True:
    s = sys.stdin.readline()
    if not s:
        break
    sys.stdout.write(s)
于 2014-08-14T16:57:26.490 に答える
1

AWS で Hadoop を使用した経験はありませんが、通常の Hadoop クラスターでも同じエラーが発生しました。私の場合、Python を開始した方法が問題でした-mapper ./mapper.py -reducer ./reducer.pyが、うまくいき-mapper python mapper.pyませんでした。

非標準のpythonパッケージも使用しているようですがwarc、必要なファイルをストリームジョブに送信しますか? -cacheFilesまたは-cacheArchive役立つ可能性があります。

于 2012-03-29T16:59:50.250 に答える