0

次のようにすると、すべてがローカルで正常に動作します。

Cat data | mapper.py | sort | combiner.py | reducer.py  but when I ran this in Hadoop - combiner keeps on running without sending any output to reducer. Finally job gets killed.

表示中"Java.io.IOException: Bad file descriptor" and "WARN org.apache.hadoop.streaming.PipeMapRed: java.io.IOException: Broken pipe"

これは、コンバイナーで実行した場合にのみ発生し、マッパーとリデューサーで実行した場合には発生しません。

マッパー:

#!/usr/bin/python

import sys
import csv
import re

def mapper():
    reader = csv.reader(sys.stdin, delimiter='\t')



    for line in reader:

        if line[0] =="id":
            continue
        body = line[4].strip()
        ids =line[0]
        #print body
        body_split =re.split('\W+', body)



        for i in range(len(body_split)):
            p =re.compile('[a-z]+')
            if p.match(body_split[i].lower()):
                print "{0}\t{1}\t{2}".format(body_split[i].lower(), line[0],1)



mapper()

コンバイナー:

enter code here
#!/usr/bin/python
import sys

oldKey =None 
old_doc =None
doc_count =0


for line in sys.stdin:
    data_mapped = line.strip().split("\t")
    if len(data_mapped) != 3:
        # Something has gone wrong. Skip this line.
        continue

    thisKey, thisDoc,nos = data_mapped

    if oldKey and oldKey != thisKey:
        print oldKey, "\t", old_doc,"\t",str(doc_count)
        doc_list = []
        doc_count =0
        old_doc=None

    oldKey = thisKey

    if old_doc ==None:
        old_doc =thisDoc
        doc_count +=int(1)
        continue
    if old_doc !=None:
        if thisDoc ==old_doc:
            doc_count +=int(1)
            continue
        else:
            print oldKey, "\t", old_doc,"\t",str(doc_count)
            old_doc =thisDoc
            doc_count =0
            doc_count +=int(1)

if oldKey != None:
    print oldKey, "\t", old_doc,"\t",str(doc_count)

レデューサー:

enter code here
#!/usr/bin/python
import sys

oldKey =None 
doc_list = []
doc_count =0


for line in sys.stdin:
    data_mapped = line.strip().split("\t")
    if len(data_mapped) != 3:
        # Something has gone wrong. Skip this line.
        continue

    thisKey, thisDoc,nos = data_mapped

    if oldKey and oldKey != thisKey:
        print oldKey, "\t", sorted(doc_list), "\t",doc_count
        doc_list = []
        doc_count =0
        oldKey=None

    oldKey = thisKey
    doc_count +=int(nos)
    if int(thisDoc) not in doc_list:
        doc_list.append(int(thisDoc))
        #print doc_list

if oldKey != None:
    print oldKey, "\t", sorted(doc_list), "\t",doc_count

この問題は「逆索引」に似ています。最終的な出力は < word , [list of docs],count> になります。

どんな助けでもいいでしょう。

4

0 に答える 0