44

サブプロセスから値を返そうとしていますが、残念ながらこれらの値はピクルできません。そのため、スレッドモジュールでグローバル変数を使用して成功しましたが、マルチプロセッシングモジュールを使用しているときにサブプロセスで行われた更新を取得できませんでした。何かが足りないことを願っています。

最後に出力される結果は、varsdataDV03およびが指定された初期値と常に同じdataDV04です。サブプロセスはこれらのグローバル変数を更新していますが、これらのグローバル変数は親で変更されていません。

import multiprocessing

# NOT ABLE to get python to return values in passed variables.

ants = ['DV03', 'DV04']
dataDV03 = ['', '']
dataDV04 = {'driver': '', 'status': ''}


def getDV03CclDrivers(lib):  # call global variable
    global dataDV03
    dataDV03[1] = 1
    dataDV03[0] = 0

# eval( 'CCL.' + lib + '.' +  lib + '( "DV03" )' ) these are unpicklable instantiations

def getDV04CclDrivers(lib, dataDV04):   # pass global variable
    dataDV04['driver'] = 0  # eval( 'CCL.' + lib + '.' +  lib + '( "DV04" )' )


if __name__ == "__main__":

    jobs = []
    if 'DV03' in ants:
        j = multiprocessing.Process(target=getDV03CclDrivers, args=('LORR',))
        jobs.append(j)

    if 'DV04' in ants:
        j = multiprocessing.Process(target=getDV04CclDrivers, args=('LORR', dataDV04))
        jobs.append(j)

    for j in jobs:
        j.start()

    for j in jobs:
        j.join()

    print 'Results:\n'
    print 'DV03', dataDV03
    print 'DV04', dataDV04

質問に投稿できないので、オリジナルを編集してみます。

pickle 化できないオブジェクトは次のとおりです。

In [1]: from CCL import LORR
In [2]: lorr=LORR.LORR('DV20', None)
In [3]: lorr
Out[3]: <CCL.LORR.LORR instance at 0x94b188c>

これは、インスタンスを親に戻すために multiprocessing.Pool を使用すると返されるエラーです。

Thread getCcl (('DV20', 'LORR'),)
Process PoolWorker-1:
Traceback (most recent call last):
File "/alma/ACS-10.1/casa/lib/python2.6/multiprocessing/process.py", line 232, in _bootstrap
self.run()
File "/alma/ACS-10.1/casa/lib/python2.6/multiprocessing/process.py", line 88, in run
self._target(*self._args, **self._kwargs)
File "/alma/ACS-10.1/casa/lib/python2.6/multiprocessing/pool.py", line 71, in worker
put((job, i, result))
File "/alma/ACS-10.1/casa/lib/python2.6/multiprocessing/queues.py", line 366, in put
return send(obj)
UnpickleableError: Cannot pickle <type 'thread.lock'> objects
In [5]: dir(lorr)
Out[5]:
['GET_AMBIENT_TEMPERATURE',
 'GET_CAN_ERROR',
 'GET_CAN_ERROR_COUNT',
 'GET_CHANNEL_NUMBER',
 'GET_COUNT_PER_C_OP',
 'GET_COUNT_REMAINING_OP',
 'GET_DCM_LOCKED',
 'GET_EFC_125_MHZ',
 'GET_EFC_COMB_LINE_PLL',
 'GET_ERROR_CODE_LAST_CAN_ERROR',
 'GET_INTERNAL_SLAVE_ERROR_CODE',
 'GET_MAGNITUDE_CELSIUS_OP',
 'GET_MAJOR_REV_LEVEL',
 'GET_MINOR_REV_LEVEL',
 'GET_MODULE_CODES_CDAY',
 'GET_MODULE_CODES_CMONTH',
 'GET_MODULE_CODES_DIG1',
 'GET_MODULE_CODES_DIG2',
 'GET_MODULE_CODES_DIG4',
 'GET_MODULE_CODES_DIG6',
 'GET_MODULE_CODES_SERIAL',
 'GET_MODULE_CODES_VERSION_MAJOR',
 'GET_MODULE_CODES_VERSION_MINOR',
 'GET_MODULE_CODES_YEAR',
 'GET_NODE_ADDRESS',
 'GET_OPTICAL_POWER_OFF',
 'GET_OUTPUT_125MHZ_LOCKED',
 'GET_OUTPUT_2GHZ_LOCKED',
 'GET_PATCH_LEVEL',
 'GET_POWER_SUPPLY_12V_NOT_OK',
 'GET_POWER_SUPPLY_15V_NOT_OK',
 'GET_PROTOCOL_MAJOR_REV_LEVEL',
 'GET_PROTOCOL_MINOR_REV_LEVEL',
 'GET_PROTOCOL_PATCH_LEVEL',
 'GET_PROTOCOL_REV_LEVEL',
 'GET_PWR_125_MHZ',
 'GET_PWR_25_MHZ',
 'GET_PWR_2_GHZ',
 'GET_READ_MODULE_CODES',
 'GET_RX_OPT_PWR',
 'GET_SERIAL_NUMBER',
 'GET_SIGN_OP',
 'GET_STATUS',
 'GET_SW_REV_LEVEL',
 'GET_TE_LENGTH',
 'GET_TE_LONG_FLAG_SET',
 'GET_TE_OFFSET_COUNTER',
 'GET_TE_SHORT_FLAG_SET',
 'GET_TRANS_NUM',
 'GET_VDC_12',
 'GET_VDC_15',
 'GET_VDC_7',
 'GET_VDC_MINUS_7',
 'SET_CLEAR_FLAGS',
 'SET_FPGA_LOGIC_RESET',
 'SET_RESET_AMBSI',
 'SET_RESET_DEVICE',
 'SET_RESYNC_TE',
 'STATUS',
 '_HardwareDevice__componentName',
 '_HardwareDevice__hw',
 '_HardwareDevice__stickyFlag',
 '_LORRBase__logger',
 '__del__',
 '__doc__',
 '__init__',
 '__module__',
 '_devices',
 'clearDeviceCommunicationErrorAlarm',
 'getControlList',
 'getDeviceCommunicationErrorCounter',
 'getErrorMessage',
 'getHwState',
 'getInternalSlaveCanErrorMsg',
 'getLastCanErrorMsg',
 'getMonitorList',
 'hwConfigure',
 'hwDiagnostic',
 'hwInitialize',
 'hwOperational',
 'hwSimulation',
 'hwStart',
 'hwStop',
 'inErrorState',
 'isMonitoring',
 'isSimulated']

In [6]:
4

5 に答える 5

51

を使用multiprocessingして 2 番目のプロセスを開くと、Python のまったく新しいインスタンスが作成され、独自のグローバル状態が作成されます。そのグローバル状態は共有されないため、子プロセスによってグローバル変数に加えられた変更は、親プロセスには見えません。

さらに、multiprocessing提供する抽象化のほとんどは pickle を使用してデータを転送します。プロキシを使用して転送されるすべてのデータは pickleable でなければなりませんManagerが提供するすべてのオブジェクトが含まれます。関連する引用(私の強調):

プロキシのメソッドへの引数が pickle 化可能であることを確認してください。

そして(Managerセクションで):

他のプロセスは、プロキシを使用して共有オブジェクトにアクセスできます。

Queueまた、pickleable データも必要です。ドキュメントにはそうは書かれていませんが、簡単なテストで確認できます。

import multiprocessing
import pickle

class Thing(object):
    def __getstate__(self):
        print 'got pickled'
        return self.__dict__
    def __setstate__(self, state):
        print 'got unpickled'
        self.__dict__.update(state)

q = multiprocessing.Queue()
p = multiprocessing.Process(target=q.put, args=(Thing(),))
p.start()
print q.get()
p.join()

出力:

$ python mp.py 
got pickled
got unpickled
<__main__.Thing object at 0x10056b350>

本当にデータをピクルできない場合に有効なアプローチの 1 つは、データをctypeオブジェクトとして格納する方法を見つけることです。その後、メモリへの参照を子プロセスに渡すことができます。これはかなり怪しいと思います。私はそれをしたことがありません。しかし、それはあなたにとって可能な解決策かもしれません。

あなたの更新を考えると、の内部についてもっと知る必要があるようですLORRLORRクラスですか?それからサブクラス化できますか?他の何かのサブクラスですか?そのMROは何ですか?LORR.__mro__(動作する場合は、出力を試して投稿してください。) 純粋な python オブジェクトの場合は、サブクラス化して、 a__setstate__と aを作成して__getstate__酸洗を有効にすることができる場合があります。

LORRもう 1 つのアプローチは、関連するデータをインスタンスから取得し、単純な文字列を介して渡す方法を見つけ出すことです。あなたは本当にオブジェクトのメソッドを呼び出したいだけだと言っているので、Queues を使用してメッセージをやり取りしてみませんか? 言い換えれば、次のようなものです(概略的に):

Main Process              Child 1                       Child 2
                          LORR 1                        LORR 2 
child1_in_queue     ->    get message 'foo'
                          call 'foo' method
child1_out_queue    <-    return foo data string
child2_in_queue                   ->                    get message 'bar'
                                                        call 'bar' method
child2_out_queue                  <-                    return bar data string
于 2012-06-15T18:39:33.197 に答える
7

@DBlas は回答で簡単な URL と Manager クラスへの参照を提供しますが、まだ少しあいまいだと思うので、適用されていることを確認するだけでも役立つと思います...

import multiprocessing
from multiprocessing import Manager

ants = ['DV03', 'DV04']

def getDV03CclDrivers(lib, data_dict):  
    data_dict[1] = 1
    data_dict[0] = 0

def getDV04CclDrivers(lib, data_list):   
    data_list['driver'] = 0  


if __name__ == "__main__":

    manager = Manager()
    dataDV03 = manager.list(['', ''])
    dataDV04 = manager.dict({'driver': '', 'status': ''})

    jobs = []
    if 'DV03' in ants:
        j = multiprocessing.Process(
                target=getDV03CclDrivers, 
                args=('LORR', dataDV03))
        jobs.append(j)

    if 'DV04' in ants:
        j = multiprocessing.Process(
                target=getDV04CclDrivers, 
                args=('LORR', dataDV04))
        jobs.append(j)

    for j in jobs:
        j.start()

    for j in jobs:
        j.join()

    print 'Results:\n'
    print 'DV03', dataDV03
    print 'DV04', dataDV04

マルチプロセッシングは実際には個別のプロセスを使用するため、グローバル変数を単純に共有することはできません。それらはメモリ内の完全に異なる「スペース」にあるからです。あるプロセスでグローバルに対して行うことは、別のプロセスには反映されません。あなたの見方からすると混乱しているように見えることは認めますが、すべてが同じコードの中にあるのに、「これらのメソッドがグローバルにアクセスできないのはなぜですか」? それらが異なるプロセスで実行されるという考えに頭を悩ませるのは難しい.

Manager クラスは、プロセス間で情報をやり取りできるデータ構造のプロキシとして機能するように指定されています。あなたがすることは、マネージャーから特別な辞書とリストを作成し、それらをメソッドに渡し、ローカルで操作することです。

pickle 化できないデータ

特化した LORR オブジェクトの場合、インスタンスの選択可能な状態を表すことができるプロキシのようなものを作成する必要がある場合があります。

非常に堅牢ではなく、あまりテストされていませんが、アイデアは得られます。

class LORRProxy(object):

    def __init__(self, lorrObject=None):
        self.instance = lorrObject

    def __getstate__(self):
        # how to get the state data out of a lorr instance
        inst = self.instance
        state = dict(
            foo = inst.a,
            bar = inst.b,
        )
        return state

    def __setstate__(self, state):
        # rebuilt a lorr instance from state
        lorr = LORR.LORR()
        lorr.a = state['foo']
        lorr.b = state['bar']
        self.instance = lorr
于 2012-06-15T18:33:46.380 に答える
6

を使用multiprocessする場合、プロセス間でオブジェクトを渡す唯一の方法は、QueueまたはPipe;を使用することです。グローバルは共有されません。オブジェクトはピクルス可能でなければならないので、multiprocessここでは役に立ちません。

于 2012-06-15T18:24:15.237 に答える
5

multiprocessing Arrayを使用することもできます。これにより、プロセス間で状態を共有できるようになり、おそらくグローバル変数に最も近いものになります。

main の先頭で、Array を宣言します。最初の引数 'i' は、整数になることを示しています。2 番目の引数は、初期値を示します。

shared_dataDV03 = multiprocessing.Array ('i', (0, 0)) #a shared array

次に、この配列を引数としてプロセスに渡します。

j = multiprocessing.Process(target=getDV03CclDrivers, args=('LORR',shared_dataDV03))

呼び出される関数で配列引数を受け取る必要があり、関数内でそれを変更できます。

def getDV03CclDrivers(lib,arr):  # call global variable
    arr[1]=1
    arr[0]=0

配列は親と共有されるため、親の最後に値を出力できます。

print 'DV03', shared_dataDV03[:]

そして、変更が表示されます:

DV03 [0, 1]
于 2016-09-08T07:14:22.843 に答える
1

p.map() を使用して、多数のプロセスをリモート サーバーにスピンオフし、予期しない時間に戻ってきたときに結果を出力します。

Servers=[...]
from multiprocessing import Pool
p=Pool(len(Servers))
p.map(DoIndividualSummary, Servers)

これを結果にDoIndividualSummary使用すると問題なく動作printしましたが、全体的な結果は予測できない順序であり、解釈が困難でした。グローバル変数を使用する方法をいくつか試しましたが、問題が発生しました。最後に、sqlite3 で成功しました。

の前p.map()に、sqlite 接続を開き、テーブルを作成します。

import sqlite3
conn=sqlite3.connect('servers.db') # need conn for commit and close
db=conn.cursor()
try: db.execute('''drop table servers''')
except: pass
db.execute('''CREATE TABLE servers (server text, serverdetail text, readings     text)''')
conn.commit()

次に、 から戻るときにDoIndividualSummary()、結果をテーブルに保存します。

db.execute('''INSERT INTO servers VALUES (?,?,?)''',         (server,serverdetail,readings))
conn.commit()
return

map()ステートメントの後に、結果を出力します。

db.execute('''select * from servers order by server''')
rows=db.fetchall()
for server,serverdetail,readings in rows: print serverdetail,readings

やり過ぎのように思えるかもしれませんが、推奨される解決策よりも簡単でした。

于 2014-10-14T11:14:23.087 に答える