0

私は並列処理は初めてですが、それが役立つアプリケーションがあります。~10 ~ 100k のオブジェクト インスタンス (タイプClassA) があり、マルチプロセッシング モジュールを使用して、各オブジェクトで特定のクラス メソッドを呼び出す作業を分散させたいと考えています。ほとんどのマルチプロセッシング ドキュメントと、クラス メソッドの呼び出しに関するいくつかの投稿を読みましたが、ClassA オブジェクトはすべて、別の型の同じインスタンスを指す属性を持っているという複雑な問題があります (ClassB)、自分自身または他のオブジェクトを追加/削除できます。状態の共有は同時プロセスにとって悪いことを知っているので、これが可能かどうか疑問に思っています。正直なところ、Proxy/Manager のマルチプロセッシング メソッドは、共有オブジェクトへの影響をすべて理解するのに少し頭を悩ませています。そうでない場合、これは分散プロセスを設計する際の教訓になります。

ここに私の問題の簡略版があります:

ClassA:
    def __init__(self, classB_state1, classB_state2, another_obj):
        # Pointers to shared ClassB instances
        self.state1 = classB_state1
        self.state2 = classB_state2
        self.state1.add(self)
        self.object = another_obj

    def run(classB_anothercommonpool):
        # do something to self.object
        if #some property of self.object: 
            classB_anothercommonpool.add(object)
            self.object = None

        self.switch_states()

    def switch_states(self):
        if self in self.state1: 
            self.state1.remove(self)
            self.state2.add(self)

        elif self in self.state2:
            self.state2.remove(self)
            self.state1.add(self)

        else: 
            print "State switch failed!"

ClassB(set): 
# This is essentially a glorified set with a hash so I can have sets of sets.
# If that's a bad design choice, I'd also be interested in knowing why
    def __init__(self, name):
        self.name = name
        super(ClassB, self).__init__()

    def __hash__(self):
        return id(self)

ClassC:
    def __init__(self, property):
        self.property = property

# Define an import-able function for the ClassA method, for multiprocessing
def unwrap_ClassA_run(classA_instance):
    return classA_instance.run(classB_anothercommonpool)

def initialize_states():
    global state1
    global state2
    global anothercommonpool

    state1            = ClassB("state1")
    state2            = ClassB("state2")
    anothercommonpool = ClassB("objpool")

ここで、クラスが定義されている同じ .py ファイル内で:

from multiprocessing import Pool

def test_multiprocessing():
    initialize_states()

    # There are actually 10-100k classA instances
    object1 = ClassC('iamred')  
    object2 = ClassC('iamblue')
    classA1 = ClassA(state1, state2, object1)
    classA2 = ClassA(state1, state2, object2)

    pool = Pool(processes = 2)
    pool.map(unwrap_ClassA_run, [classA1, classA2])

このモジュールをインタープリターにインポートして test_multiprocessing() を実行すると、実行時にエラーは発生しませんが、「Switch state failed!」というメッセージが表示されます。メッセージが表示され、classA1/2 オブジェクトを調べた場合、それらはそれぞれの objects1/2 を変更しておらず、どちらの ClassB 状態オブジェクトのメンバーシップも切り替えていません (したがって、ClassA オブジェクトは、state1 セットのメンバーであることを登録しません) )。ありがとう!

4

0 に答える 0