0

私のマルチプロセッシング コードでは、並列処理用に複数のワーカーを使用しています。

ワーカーは、request_queue とロック付きの共有値を介してのみ通信する必要があります。

しかし、例えばワーカー #4 と #5 が同じ辞書ドキュメントを共有しているなど、開始時の「フォーク」が原因であると思われます。id(document) を使用してメモリアドレスを確認したことがわかりました。

ドライバが _id をドキュメントに書き戻す mongodb にドキュメントを保存するため、奇妙なエラーが表示されます。

キューと共有値を期待して、すべてのワーカーが完全に分離されていることを確認する必要がありますが、今のところ方法はわかりません。

ワーカーは次のように開始されます。

for i in range(workers):
    Worker( request_queue,i,val, lock ).start()

class Worker(Process):
 def __init__(self, queue,ident,val,lock):
    super(Worker, self).__init__()

    self.queue= queue
    self.idstr= str(ident)
    self.val = val
    self.lock = lock
    dbconn = dbconnector.DBConnector()
    self.mongoconnection = dbconn.getMongoConnection()
    self.flagController = FlagController()
    print "Ident" + self.idstr

 def run(self):
    print 'Worker started'
    # do some initialization here

    print 'Worker Loop!'
    #time.sleep(5)
    try:
        for data in iter( self.queue.get, None ):
            mid = data["_id"]
            print "#" + self.idstr + " :  Mongoid " + str(mid)
            #time.sleep(5)
            try:

            timestamp = time.time()


            document = {"rawdata": data,
                                            "c": {
                                            "quelle": "t",
                                            "timestamp": mid.generation_time,
                                            "query" :  data["query"]                            
                                            }
                                            }

                    self.mongoconnection.insert("productive","input",document)

更新 Constructor を介して新しいドキュメントを渡し、self.document を介して Worker 内で使用しようとしましたが、残念ながら役に立ちません。

4

1 に答える 1