プロセス間で共有できないタイプの大きなオブジェクトがあります。それをインスタンス化し、そのデータを操作するメソッドがあります。
私がやっている現在の方法は、最初にメインの親プロセスでオブジェクトをインスタンス化し、次に何らかのイベントが発生したときにサブプロセスに渡すことです。問題は、サブプロセスが実行されるたびに、時間がかかるたびにオブジェクトをメモリにコピーすることです。オブジェクトの関数を呼び出すたびにコピーする必要がないように、彼らだけが利用できるメモリに保存したいと思います。
そのプロセスで使用するためだけにオブジェクトを保存するにはどうすればよいですか?
編集:コード
class MultiQ:
def __init__(self):
self.pred = instantiate_predict() #here I instantiate the big object
def enq_essay(self,essay):
p = Process(target=self.compute_results, args=(essay,))
p.start()
def compute_results(self, essay):
predictions = self.pred.predict_fields(essay) #computation in the large object that doesn't modify the object
これにより、大きなオブジェクトが毎回メモリにコピーされます。私はそれを避けようとしています。
編集 4: 20 個のニュースグループ データで実行される短いコード サンプル
import sklearn.feature_extraction.text as ftext
import sklearn.linear_model as lm
import multiprocessing as mp
import logging
import os
import numpy as np
import cPickle as pickle
def get_20newsgroups_fnames():
all_files = []
for i, (root, dirs, files) in enumerate(os.walk("/home/roman/Desktop/20_newsgroups/")):
if i>0:
all_files.extend([os.path.join(root,file) for file in files])
return all_files
documents = [unicode(open(f).read(), errors="ignore") for f in get_20newsgroups_fnames()]
logger = mp.get_logger()
formatter = logging.Formatter('%(asctime)s: [%(processName)12s] %(message)s',
datefmt = '%H:%M:%S')
handler = logging.StreamHandler()
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.setLevel(logging.WARNING)
mp._log_to_stderr = True
def free_memory():
"""
Return free memory available, including buffer and cached memory
"""
total = 0
with open('/proc/meminfo', 'r') as f:
for line in f:
line = line.strip()
if any(line.startswith(field) for field in ('MemFree', 'Buffers', 'Cached')):
field, amount, unit = line.split()
amount = int(amount)
if unit != 'kB':
raise ValueError(
'Unknown unit {u!r} in /proc/meminfo'.format(u=unit))
total += amount
return total
def predict(large_object, essay="this essay will be predicted"):
"""this method copies the large object in memory which is what im trying to avoid"""
vectorized_essay = large_object[0].transform(essay)
large_object[1].predict(vectorized_essay)
report_memory("done")
def train_and_model():
"""this is very similar to the instantiate_predict method from my first code sample"""
tfidf_vect = ftext.TfidfVectorizer()
X = tfidf_vect.fit_transform(documents)
y = np.random.random_integers(0,1,19997)
model = lm.LogisticRegression()
model.fit(X, y)
return (tfidf_vect, model)
def report_memory(label):
f = free_memory()
logger.warn('{l:<25}: {f}'.format(f=f, l=label))
def dump_large_object(large_object):
f = open("large_object.obj", "w")
pickle.dump(large_object, f, protocol=2)
f.close()
def load_large_object():
f = open("large_object.obj")
large_object = pickle.load(f)
f.close()
return large_object
if __name__ == '__main__':
report_memory('Initial')
tfidf_vect, model = train_and_model()
report_memory('After train_and_model')
large_object = (tfidf_vect, model)
procs = [mp.Process(target=predict, args=(large_object,))
for i in range(mp.cpu_count())]
report_memory('After Process')
for p in procs:
p.start()
report_memory('After p.start')
for p in procs:
p.join()
report_memory('After p.join')
出力 1:
19:01:39: [ MainProcess] Initial : 26585728
19:01:51: [ MainProcess] After train_and_model : 25958924
19:01:51: [ MainProcess] After Process : 25958924
19:01:51: [ MainProcess] After p.start : 25925908
19:01:51: [ Process-1] done : 25725524
19:01:51: [ Process-2] done : 25781076
19:01:51: [ Process-4] done : 25789880
19:01:51: [ Process-3] done : 25802032
19:01:51: [ MainProcess] After p.join : 25958272
roman@ubx64:$ du -h large_object.obj
4.6M large_object.obj
おそらく、大きなオブジェクトは大きくなく、私の問題は tfidf ベクトライザーの変換メソッドからのメモリ使用量にありました。
メインメソッドをこれに変更すると:
report_memory('Initial')
large_object = load_large_object()
report_memory('After loading the object')
procs = [mp.Process(target=predict, args=(large_object,))
for i in range(mp.cpu_count())]
report_memory('After Process')
for p in procs:
p.start()
report_memory('After p.start')
for p in procs:
p.join()
report_memory('After p.join')
これらの結果が得られます: 出力 2:
20:07:23: [ MainProcess] Initial : 26578356
20:07:23: [ MainProcess] After loading the object : 26544380
20:07:23: [ MainProcess] After Process : 26544380
20:07:23: [ MainProcess] After p.start : 26523268
20:07:24: [ Process-1] done : 26338012
20:07:24: [ Process-4] done : 26337268
20:07:24: [ Process-3] done : 26439444
20:07:24: [ Process-2] done : 26438948
20:07:24: [ MainProcess] After p.join : 26542860
次に、メインメソッドを次のように変更しました。
report_memory('Initial')
large_object = load_large_object()
report_memory('After loading the object')
predict(large_object)
report_memory('After Process')
そして、これらの結果を得ました: 出力 3:
20:13:34: [ MainProcess] Initial : 26572580
20:13:35: [ MainProcess] After loading the object : 26538356
20:13:35: [ MainProcess] done : 26513804
20:13:35: [ MainProcess] After Process : 26513804
この時点では何が起こっているのかわかりませんが、マルチプロセッシングは間違いなくより多くのメモリを使用します。