MongoDB データベースに対して mrjob と Python を使用してマップを削減しようとしています。mongodb-hadoop コネクタには、AWS EMR を使用する方法の例がありますが、mrjob を使用する方法はありません。すべてのビットをまとめているわけではありません。mrjob.confに関する限り、私がすでに持っているものは次のとおりです。
enable_emr_debugging: true
ami_version: 3.0.4
interpreter: python2.7
upload_files:
- tweets-clean.txt
- train_model.py
python_archives:
- mrcc.py.tar.gz
setup:
#- python2.7 train_model.py
jobconf:
mongo.job.input.format : com.mongodb.hadoop.MongoInputFormat
mongo.input.uri : myserver:27017/twitter_db
stream.io.identifier.resolver : com.mongodb.streaming.io.MongoIdentifierResolver
bootstrap:
- sudo yum --releasever=2014.09 install -y python27 python27-devel gcc-c++ numpy scipy
- sudo python2.7 get-pip.py#
- sudo pip2.7 install boto mrjob simplejson scikit-learn sklearn pymongo-hadoop
- python2.7 train_model.py# tweets-clean.txt#
- mongo-hadoop-bootstrap.sh#
mrjob Python マッパー/リデューサーを使用する場合、次のようなコードを使用しました。
from mrjob.job import MRJob
class MRWordFrequencyCount(MRJob):
def mapper(self, _, line):
words=line.split()
for word in words:
yield word, 1
def reducer(self, key, values):
yield key, sum(values)
if __name__ == '__main__':
MRWordFrequencyCount.run()
mongodb-hadoop コネクタを使用するようにこれを変更するには、次のようにします。
from pymongo_hadoop import BSONMapper
from pymongo_hadoop import BSONReducer
from mrjob.job import MRJob
class MRWordFrequencyCount(MRJob):
def mapper(self, _, documents):
BSONMapper(self.bsonmapper)
def reducer(self, key, values):
BSONReducer(self.bsonreducer)
def bsonmapper(documents):
for doc in documents:
yield {'_id' : doc['id']['user.id']}, {'count' : 1}
def bsonreducer(self, key, values):
count = 0
for v in values:
count += v['count']
return {'_id' : key, 'count' : count}
if __name__ == '__main__':
MRWordFrequencyCount.run()
問題は、メソッドを BSONMapper と BSONReducer に正しく渡していないことです。BSONMapper クラスは、init () で 1 つの引数を想定していますが、2 つ取得しています。