5

これはあまり重要ではなく、ばかげた実験です。独自のメッセージパッシングを作成したいと思います。各キーがプロセスの PID であるキューの辞書が必要です。プロセス (Process() によって作成された) がメッセージを交換して、送信先のプロセスのキューに挿入するようにしたいためです (その pid を知っています)。これはばかげたコードです:

from multiprocessing import Process, Manager, Queue
from os import getpid
from time import sleep

def begin(dic, manager, parentQ):
    parentQ.put(getpid())
    dic[getpid()] = manager.Queue()
    dic[getpid()].put("Something...")

if __name__== '__main__':
    manager = Manager()
    dic = manager.dict()
    parentQ = Queue()

    p = Process(target = begin, args=(dic, manager, parentQ))
    p.start()
    son = parentQ.get()
    print son
    sleep(2)
    print dic[son].get()

dic[getpid()] = manager.Queue()、これはうまくいきます。しかし、実行する dic[son].put()/get()と、次のメッセージが表示されます。

Process Process-2:
Traceback (most recent call last):
  File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap
    self.run()
  File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run
    self._target(*self._args, **self._kwargs)
  File "mps.py", line 8, in begin
    dic[getpid()].put("Something...")
  File "<string>", line 2, in __getitem__
  File "/usr/lib/python2.7/multiprocessing/managers.py", line 773, in _callmethod
    raise convert_to_error(kind, result)
RemoteError: 
---------------------------------------------------------------------------
Unserializable message: ('#RETURN', <Queue.Queue instance at 0x8a92d0c>)
---------------------------------------------------------------------------

正しい方法を知っていますか?

4

1 に答える 1

1

トレースバックが示すように、キューはシリアル化できないため、コードが失敗していると思います。multiprocessing.Manager() オブジェクトは、ここで行ったのと同じように問題なく共有 dict を作成できますが、dict に格納された値は依然としてシリアライズ可能 (またはPythonese でピクル可能) である必要があります。サブプロセスが互いの queuesにアクセスできないことに問題がなければ、これでうまくいくはずです:

from multiprocessing import Process, Manager, Queue
from os import getpid

number_of_subprocesses_i_want = 5

def begin(myQ):
    myQ.put("Something sentimental from your friend, PID {0}".format(getpid()))
    return

if __name__== '__main__':
    queue_dic = {}
    queue_manager = Manager()

    process_list = []

    for i in xrange(number_of_subprocesses_i_want):
        child_queue = queue_manager.Queue()

        p = Process(target = begin, args=(child_queue,))
        p.start()
        queue_dic[p.pid] = child_queue
        process_list.append(p)

    for p in process_list:
        print(queue_dic[p.pid].get())
        p.join()

これにより、キーが子プロセスであり、値がそれぞれのキューであるディクショナリが残ります。これは、メイン プロセスから使用できます。

サブプロセスで使用するキューは、作成時にプロセスに渡す必要があるため、元の目標はキューで達成できないと思います。そのため、より多くのプロセスを起動すると、既存のプロセスにアクセスを許可する方法がありません新しいキュー。

プロセス間通信を行う 1 つの可能な方法は、全員が 1 つのキューを共有して、タプルなどのある種のヘッダーにバンドルされたメッセージをメイン プロセスに戻すことです。

(destination_pid, sender_pid, message)

..そして、メインにdestination_pidを読み取らせ、 (sender_pid、メッセージ)をそのサブプロセスのキューに送ります。もちろん、これは、新しいプロセスが通信可能になったときに既存のプロセスに通知する方法が必要であることを意味します。

于 2014-07-14T20:27:56.210 に答える