here で説明されているプロキシ メソッドを使用して、複数の処理で既存のオブジェクトを共有しようとしています。私のマルチプロセッシングのイディオムは、こちらの 4 番目の例をモデルにしたワーカー/キューのセットアップです。
コードは、ディスク上のかなり大きなファイルに格納されているデータに対して計算を行う必要があります。すべての I/O 相互作用をカプセル化するクラスがあり、ディスクからファイルを読み取ると、次にタスクが同じデータを使用する必要があるときのためにデータをメモリに保存します (これは頻繁に発生します)。
上記にリンクされている例を読んで、すべてが機能していると思いました。これは、numpy ランダム配列を使用してディスク I/O をモデル化するコードのモックアップです。
import numpy
from multiprocessing import Process, Queue, current_process, Lock
from multiprocessing.managers import BaseManager
nfiles = 200
njobs = 1000
class BigFiles:
def __init__(self, nfiles):
# Start out with nothing read in.
self.data = [ None for i in range(nfiles) ]
# Use a lock to make sure only one process is reading from disk at a time.
self.lock = Lock()
def access(self, i):
# Get the data for a particular file
# In my real application, this function reads in files from disk.
# Here I mock it up with random numpy arrays.
if self.data[i] is None:
with self.lock:
self.data[i] = numpy.random.rand(1024,1024)
return self.data[i]
def summary(self):
return 'BigFiles: %d, %d Storing %d of %d files in memory'%(
id(self),id(self.data),
(len(self.data) - self.data.count(None)),
len(self.data) )
# I'm using a worker/queue setup for the multprocessing:
def worker(input, output):
proc = current_process().name
for job in iter(input.get, 'STOP'):
(big_files, i, ifile) = job
data = big_files.access(ifile)
# Do some calculations on the data
answer = numpy.var(data)
msg = '%s, job %d'%(proc, i)
msg += '\n Answer for file %d = %f'%(ifile, answer)
msg += '\n ' + big_files.summary()
output.put(msg)
# A class that returns an existing file when called.
# This is my attempted workaround for the fact that Manager.register needs a callable.
class ObjectGetter:
def __init__(self, obj):
self.obj = obj
def __call__(self):
return self.obj
def main():
# Prior to the place where I want to do the multprocessing,
# I already have a BigFiles object, which might have some data already read in.
# (Here I start it out empty.)
big_files = BigFiles(nfiles)
print 'Initial big_files.summary = ',big_files.summary()
# My attempt at making a proxy class to pass big_files to the workers
class BigFileManager(BaseManager):
pass
getter = ObjectGetter(big_files)
BigFileManager.register('big_files', callable = getter)
manager = BigFileManager()
manager.start()
# Set up the jobs:
task_queue = Queue()
for i in range(njobs):
ifile = numpy.random.randint(0, nfiles)
big_files_proxy = manager.big_files()
task_queue.put( (big_files_proxy, i, ifile) )
# Set up the workers
nproc = 12
done_queue = Queue()
process_list = []
for j in range(nproc):
p = Process(target=worker, args=(task_queue, done_queue))
p.start()
process_list.append(p)
task_queue.put('STOP')
# Log the results
for i in range(njobs):
msg = done_queue.get()
print msg
print 'Finished all jobs'
print 'big_files.summary = ',big_files.summary()
# Shut down the workers
for j in range(nproc):
process_list[j].join()
task_queue.close()
done_queue.close()
main()
これは、すべてを正しく計算し、途中で読み取ったデータをキャッシュしているという意味で機能します。私が抱えている唯一の問題は、最後に big_files オブジェクトにファイルがロードされていないことです。最終的に返されるメッセージは次のとおりです。
Process-2, job 999. Answer for file 198 = 0.083406
BigFiles: 4303246400, 4314056248 Storing 198 of 200 files in memory
しかし、すべてが完了すると、次のようになります。
Finished all jobs
big_files.summary = BigFiles: 4303246400, 4314056248 Storing 0 of 200 files in memory
だから私の質問は、保存されたすべてのデータに何が起こったのですか? ID(self.data)に従って同じself.dataを使用していると主張しています。でも今は空いています。
実際にはこのプロセス全体を何度も繰り返す必要があるため、big_files の最終状態には、途中で蓄積されたすべての保存データが必要です。したがって、すべての (遅い) I/O をやり直す必要はありません。毎回。
ObjectGetter クラスと何か関係があるに違いないと思います。BaseManager の使用例は、既存のオブジェクトを共有するのではなく、共有される新しいオブジェクトを作成する方法のみを示しています。既存の big_files オブジェクトを取得する方法に何か問題がありますか? このステップを実行するためのより良い方法を提案できる人はいますか?
どうもありがとう!