私のマルチプロセッシング コードでは、並列処理用に複数のワーカーを使用しています。
ワーカーは、request_queue とロック付きの共有値を介してのみ通信する必要があります。
しかし、例えばワーカー #4 と #5 が同じ辞書ドキュメントを共有しているなど、開始時の「フォーク」が原因であると思われます。id(document) を使用してメモリアドレスを確認したことがわかりました。
ドライバが _id をドキュメントに書き戻す mongodb にドキュメントを保存するため、奇妙なエラーが表示されます。
キューと共有値を期待して、すべてのワーカーが完全に分離されていることを確認する必要がありますが、今のところ方法はわかりません。
ワーカーは次のように開始されます。
for i in range(workers):
Worker( request_queue,i,val, lock ).start()
class Worker(Process):
def __init__(self, queue,ident,val,lock):
super(Worker, self).__init__()
self.queue= queue
self.idstr= str(ident)
self.val = val
self.lock = lock
dbconn = dbconnector.DBConnector()
self.mongoconnection = dbconn.getMongoConnection()
self.flagController = FlagController()
print "Ident" + self.idstr
def run(self):
print 'Worker started'
# do some initialization here
print 'Worker Loop!'
#time.sleep(5)
try:
for data in iter( self.queue.get, None ):
mid = data["_id"]
print "#" + self.idstr + " : Mongoid " + str(mid)
#time.sleep(5)
try:
timestamp = time.time()
document = {"rawdata": data,
"c": {
"quelle": "t",
"timestamp": mid.generation_time,
"query" : data["query"]
}
}
self.mongoconnection.insert("productive","input",document)
更新 Constructor を介して新しいドキュメントを渡し、self.document を介して Worker 内で使用しようとしましたが、残念ながら役に立ちません。