3

プロセス間で共有できないタイプの大きなオブジェクトがあります。それをインスタンス化し、そのデータを操作するメソッドがあります。

私がやっている現在の方法は、最初にメインの親プロセスでオブジェクトをインスタンス化し、次に何らかのイベントが発生したときにサブプロセスに渡すことです。問題は、サブプロセスが実行されるたびに、時間がかかるたびにオブジェクトをメモリにコピーすることです。オブジェクトの関数を呼び出すたびにコピーする必要がないように、彼らだけが利用できるメモリに保存したいと思います。

そのプロセスで使用するためだけにオブジェクトを保存するにはどうすればよいですか?

編集:コード

class MultiQ:
    def __init__(self):
        self.pred = instantiate_predict() #here I instantiate the big object

    def enq_essay(self,essay):
        p = Process(target=self.compute_results, args=(essay,))
        p.start()

    def compute_results(self, essay):
        predictions = self.pred.predict_fields(essay) #computation in the large object that doesn't modify the object

これにより、大きなオブジェクトが毎回メモリにコピーされます。私はそれを避けようとしています。

編集 4: 20 個のニュースグループ データで実行される短いコード サンプル

import sklearn.feature_extraction.text as ftext
import sklearn.linear_model as lm
import multiprocessing as mp
import logging
import os
import numpy as np
import cPickle as pickle


def get_20newsgroups_fnames():
    all_files = []
    for i, (root, dirs, files) in enumerate(os.walk("/home/roman/Desktop/20_newsgroups/")):
        if i>0:
            all_files.extend([os.path.join(root,file) for file in files])
    return all_files

documents = [unicode(open(f).read(), errors="ignore") for f in get_20newsgroups_fnames()]
logger = mp.get_logger()
formatter = logging.Formatter('%(asctime)s: [%(processName)12s] %(message)s',
                              datefmt = '%H:%M:%S')
handler = logging.StreamHandler()
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.setLevel(logging.WARNING)
mp._log_to_stderr = True


def free_memory():
    """
    Return free memory available, including buffer and cached memory
    """
    total = 0
    with open('/proc/meminfo', 'r') as f:
        for line in f:
            line = line.strip()
            if any(line.startswith(field) for field in ('MemFree', 'Buffers', 'Cached')):
                field, amount, unit = line.split()
                amount = int(amount)
                if unit != 'kB':
                    raise ValueError(
                        'Unknown unit {u!r} in /proc/meminfo'.format(u=unit))
                total += amount
    return total


def predict(large_object, essay="this essay will be predicted"):
    """this method copies the large object in memory which is what im trying to avoid"""
    vectorized_essay = large_object[0].transform(essay)
    large_object[1].predict(vectorized_essay)
    report_memory("done")


def train_and_model():
    """this is very similar to the instantiate_predict method from my first code sample"""
    tfidf_vect = ftext.TfidfVectorizer()
    X = tfidf_vect.fit_transform(documents)
    y = np.random.random_integers(0,1,19997)
    model = lm.LogisticRegression()
    model.fit(X, y)
    return (tfidf_vect, model)


def report_memory(label):
    f = free_memory()
    logger.warn('{l:<25}: {f}'.format(f=f, l=label))

def dump_large_object(large_object):
    f = open("large_object.obj", "w")
    pickle.dump(large_object, f, protocol=2)
    f.close()

def load_large_object():
    f = open("large_object.obj")
    large_object = pickle.load(f)
    f.close()
    return large_object

if __name__ == '__main__':
    report_memory('Initial')
    tfidf_vect, model = train_and_model()
    report_memory('After train_and_model')
    large_object = (tfidf_vect, model)
    procs = [mp.Process(target=predict, args=(large_object,))
             for i in range(mp.cpu_count())]
    report_memory('After Process')
    for p in procs:
        p.start()
    report_memory('After p.start')
    for p in procs:
        p.join()
    report_memory('After p.join')

出力 1:

19:01:39: [ MainProcess] Initial                  : 26585728
19:01:51: [ MainProcess] After train_and_model    : 25958924
19:01:51: [ MainProcess] After Process            : 25958924
19:01:51: [ MainProcess] After p.start            : 25925908
19:01:51: [   Process-1] done                     : 25725524
19:01:51: [   Process-2] done                     : 25781076
19:01:51: [   Process-4] done                     : 25789880
19:01:51: [   Process-3] done                     : 25802032
19:01:51: [ MainProcess] After p.join             : 25958272
roman@ubx64:$ du -h large_object.obj
4.6M    large_object.obj

おそらく、大きなオブジェクトは大きくなく、私の問題は tfidf ベクトライザーの変換メソッドからのメモリ使用量にありました。

メインメソッドをこれに変更すると:

report_memory('Initial')
large_object = load_large_object()
report_memory('After loading the object')
procs = [mp.Process(target=predict, args=(large_object,))
         for i in range(mp.cpu_count())]
report_memory('After Process')
for p in procs:
    p.start()
report_memory('After p.start')
for p in procs:
    p.join()
report_memory('After p.join')

これらの結果が得られます: 出力 2:

20:07:23: [ MainProcess] Initial                  : 26578356
20:07:23: [ MainProcess] After loading the object : 26544380
20:07:23: [ MainProcess] After Process            : 26544380
20:07:23: [ MainProcess] After p.start            : 26523268
20:07:24: [   Process-1] done                     : 26338012
20:07:24: [   Process-4] done                     : 26337268
20:07:24: [   Process-3] done                     : 26439444
20:07:24: [   Process-2] done                     : 26438948
20:07:24: [ MainProcess] After p.join             : 26542860

次に、メインメソッドを次のように変更しました。

report_memory('Initial')
large_object = load_large_object()
report_memory('After loading the object')
predict(large_object)
report_memory('After Process')

そして、これらの結果を得ました: 出力 3:

20:13:34: [ MainProcess] Initial                  : 26572580
20:13:35: [ MainProcess] After loading the object : 26538356
20:13:35: [ MainProcess] done                     : 26513804
20:13:35: [ MainProcess] After Process            : 26513804

この時点では何が起こっているのかわかりませんが、マルチプロセッシングは間違いなくより多くのメモリを使用します。

4

2 に答える 2

2

Linux はcopy-on-write を使用します。これは、サブプロセスがフォークされると、値が変更されるまで、各サブプロセスのグローバル変数が同じメモリ アドレスを共有することを意味します。値が変更された場合にのみ、値がコピーされます。

したがって、理論的には、大きなオブジェクトが変更されていなければ、メモリを消費することなくサブプロセスで使用できます。その理論を検証してみましょう。

これがあなたのコードで、メモリ使用量のログが少し追加されています。

import sklearn.feature_extraction.text as ftext
import sklearn.linear_model as lm
import multiprocessing as mp
import logging

logger = mp.get_logger()
formatter = logging.Formatter('%(asctime)s: [%(processName)12s] %(message)s',
                              datefmt='%H:%M:%S')
handler = logging.StreamHandler()
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.setLevel(logging.WARNING)
mp._log_to_stderr = True


def predict(essay="this essay will be predicted"):
    """this method copies the large object in memory which is what im trying to avoid"""
    vectorized_essay = large_object[0].transform(essay)
    large_object[1].predict(vectorized_essay)
    report_memory("done")


def train_and_model():
    """this is very similar to the instantiate_predict method from my first code sample"""
    tfidf_vect = ftext.TfidfVectorizer()
    N = 100000
    corpus = [
        'This is the first document.',
        'This is the second second document.',
        'And the third one.',
        'Is this the first document?', ] * N
    y = [1, 0, 1, 0] * N
    report_memory('Before fit_transform')
    X = tfidf_vect.fit_transform(corpus)
    model = lm.LogisticRegression()
    model.fit(X, y)
    report_memory('After model.fit')
    return (tfidf_vect, model)


def free_memory():
    """
    Return free memory available, including buffer and cached memory
    """
    total = 0
    with open('/proc/meminfo', 'r') as f:
        for line in f:
            line = line.strip()
            if any(line.startswith(field) for field in ('MemFree', 'Buffers', 'Cached')):
                field, amount, unit = line.split()
                amount = int(amount)
                if unit != 'kB':
                    raise ValueError(
                        'Unknown unit {u!r} in /proc/meminfo'.format(u=unit))
                total += amount
    return total


def gen_change_in_memory():
    f = free_memory()
    diff = 0
    while True:
        yield diff
        f2 = free_memory()
        diff = f - f2
        f = f2
change_in_memory = gen_change_in_memory().next

def report_memory(label):
    logger.warn('{l:<25}: {d:+d}'.format(d=change_in_memory(), l=label))

if __name__ == '__main__':
    report_memory('Initial')
    tfidf_vect, model = train_and_model()
    report_memory('After train_and_model')
    large_object = (tfidf_vect, model)
    procs = [mp.Process(target=predict) for i in range(mp.cpu_count())]
    report_memory('After Process')
    for p in procs:
        p.start()
    for p in procs:
        p.join()
    report_memory('After p.join')

次の結果が得られます。

21:45:01: [ MainProcess] Initial                  : +0
21:45:01: [ MainProcess] Before fit_transform     : +3224
21:45:12: [ MainProcess] After model.fit          : +153572
21:45:12: [ MainProcess] After train_and_model    : -3100
21:45:12: [ MainProcess] After Process            : +0
21:45:12: [   Process-1] done                     : +2232
21:45:12: [   Process-2] done                     : +2976
21:45:12: [   Process-3] done                     : +3596
21:45:12: [   Process-4] done                     : +3224
21:45:12: [ MainProcess] After p.join             : -372

報告される数値は、空きメモリ (キャッシュとバッファを含む) の KiB の変化です。したがって、たとえば、「初期」と「train_and_model 後」の間の空きメモリの変化は約 150MB でした。したがって、 にlarge_objectは約 150MB が必要です。

次に、4 つのサブプロセスが完了すると、はるかに少ない量のメモリ (合計で約 12MB) が消費されます。消費されるメモリは、サブプロセスの作成に加えてtransformpredictメソッドによって使用されるメモリが原因である可能性があります。

したがって、large_objectがコピーされていないように見えます。コピーされていれば、消費されるメモリが約 150MB 増加するはずだったからです。


20 のニュースグループでの実行についてのコメント:

空きメモリの変化は次のとおりです。

20 ニュースグループのデータ:

| Initial               |       0 |
| After train_and_model |  626804 | <-- Large object requires 627M
| After Process         |       0 |
| After p.start         |   33016 |
| done                  |  200384 | 
| done                  |  -55552 |
| done                  |   -8804 |
| done                  |  -12152 |
| After p.join          | -156240 |

そのため、ラージ オブジェクトのインスタンス化には 627MB が必要なようです。done最初のサイズに達した後、さらに 200 MB 以上消費された理由はわかりません。

load_large_object の使用:

| Initial                  |       0 |
| After loading the object |   33976 |
| After Process            |       0 |
| After p.start            |   21112 |
| done                     |  185256 |
| done                     |     744 |
| done                     | -102176 |
| done                     |     496 |
| After p.join             | -103912 |

どうやら、large_object 自体は 34MB しか必要とせず、残りのメモリは 627-34 = 593MB が で呼び出されたfit_transformおよびfitメソッドによって消費されたに違いありませんtrain_and_model

単一プロセスの使用:

| Initial                  |     0 |
| After loading the object | 34224 |
| done                     | 24552 |
| After Process            |     0 |

これはもっともらしい。

したがって、蓄積されたデータは、大きなオブジェクト自体が各サブプロセスによってコピーされていないという主張をサポートしているようです。しかし、新たな謎が生じます。「After p.start」と最初の「done」の間にメモリが大量に消費されるのはなぜですか。その答えはわかりません。


report_memory電話をかけてみることもできます

vectorized_essay = large_object[0].transform(essay)

large_object[1].predict(vectorized_essay)

余分なメモリが消費されている場所を確認します。私の推測では、これらの scikit-learn メソッドの 1 つが、この (比較的) 大量のメモリを割り当てることを選択していると思います。

于 2013-01-21T19:42:17.517 に答える