Hadoop クラスターで実行しなければならない大規模なログ処理の問題があります。タスクは、ログの各行を実行可能な「cmd」にフィードし、結果をチェックして、ログのこの行を保持するかどうかを決定することです。
「cmd」プログラムは非常に大きな辞書を開くため、ログのすべての行に対してプログラムを呼び出す余裕はありません。私はそれを実行し続け、必要な入力をそれに送りたいと思っています。私の現在のソリューションは、Pythonのサブプロセスモジュールを使用しています。コードは次のとおりです。
import sys
from subprocess import Popen, PIPE
def main():
    pp = Popen('./bqc/bqc/bqc_tool ./bqc/bqc/bqc_dict/ ./bqc/bqc/word_dict/ flag', shell=True, stdin=PIPE, stdout=PIPE, stderr=PIPE)
    for line in sys.stdin:
        lstr = line.strip().split('\t')
        if len(lstr) != 7:
            continue
        pp.stdin.write('%s\n' % lstr[5])
        pp.stdin.flush()
        out = pp.stdout.readline()
        lout = out.strip().split('\t')
        if len(lout) == 3 and lout[1] == '401':
            print line.strip()
if __name__ == '__main__':
    main()
上記のコードは、ローカル マシンからテストしたときに動作します。ジョブを Hadoop に投入する際のマッパーとして使用されます。レデューサーは使用しません。構成は次のとおりです。
hadoop streaming \
-input /path_to_input \
-output /path_to_output \
-mapper  "python/python2.7/bin/python27.sh ./mapper.py" \
-cacheArchive /path_to_python/python272.tar.gz#python \
-cacheArchive /path_to_cmd/bqc.tar.gz#bqc \
-file ./mapper.py \
-jobconf mapred.job.name="JobName" \
-jobconf mapred.job.priority=HIGH
bqc.tar.gz のファイルは次のようになります。
bqc/
bqc/bqc_tool
bqc/bqc_dict/
bqc/word_dict/
私の意見では、「-cacheArchive /path_to_cmd/bqc.tar.gz#bqc \」という行で tar ファイルを抽出し、bqc というフォルダーに抽出する必要があります。
ただし、hadoop クラスターに送信すると失敗し、次のエラー メッセージが表示されます。
    Traceback (most recent call last):
      File "./mapper.py", line 19, in
        main()
      File "./mapper.py", line 11, in main
        pp.stdin.write('%s\n' % lstr[5])
    IOError: [Errno 32] Broken pipe
    java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
        at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:335)
        at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:590)
        at org.apache.hadoop.streaming.PipeMapper.map(PipeMapper.java:152)
        at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:54)
        at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:18)
        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:388)
        at org.apache.hadoop.mapred.Child.main(Child.java:194)
    java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
        at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:335)
        at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:590)
        at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:163)
        at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
        at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:18)
        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:388)
        at org.apache.hadoop.mapred.Child.main(Child.java:194)
誰でもアイデアを得る?どんな助けでも大歓迎です!
ありがとう!
ザカリー