1

私には一連のプロセスがあります。それらをA、B、Cと呼びましょう。これらは、相互に通信する必要があります。AはBおよびCと通信する必要があります。BはAおよびCと通信する必要があります。CはAおよびBと通信する必要があります。A、B、およびCは、異なるマシンまたは同じマシンに配置できます。

私の考えは、ソケットを介して通信し、それらがすべて同じマシン上にある場合は「localhost」を使用することでした(たとえば、ポート11111のA、ポート22222のBなど)。このように、非ローカルプロセスはローカルプロセスのように扱われます。そのためには、A、B、CのそれぞれにSocketServerインスタンスを設定し、それぞれが他の2つのアドレスを知っていると思いました。たとえばAからBへの通信が必要な場合は常に、AはBへのソケットを開き、データを書き込みます。次に、Bの常時稼働しているサーバーがデータを読み取り、後で必要になったときに使用できるようにリストに保存します。

私が遭遇している問題は、保存された情報がfinish_requestメソッド(リスニングを処理している)と__call__メソッド(トークを処理している)の間で共有されていないことです。(サーバークラスは、他の何かのために必要なので呼び出し可能です。それが問題に関連しているとは思いません。)

私の質問は、これは私が想像したように機能するでしょうか?、、multiprocessingおよびはすべて同じマシンthreadingsocketserver一緒にうまく機能しますか?プロセス間で通信するために他のメカニズムを使用することに興味はありません(QueueまたはなどPipe)。私はそれらで実用的な解決策を持っています。効率が悪くても、このアプローチが可能かどうか知りたいです。そして、もしそうなら、それが機能するのを妨げている私は何を間違っているのですか?

この問題を説明する最小限の例を以下に示します。

import uuid
import sys
import socket
import time
import threading
import collections
import SocketServer
import multiprocessing

class NetworkMigrator(SocketServer.ThreadingMixIn, SocketServer.TCPServer):
    def __init__(self, server_address, client_addresses, max_migrants=1):
        SocketServer.TCPServer.__init__(self, server_address, None)
        self.client_addresses = client_addresses
        self.migrants = collections.deque(maxlen=max_migrants)
        self.allow_reuse_address = True
        t = threading.Thread(target=self.serve_forever)
        t.daemon = True
        t.start()

    def finish_request(self, request, client_address):
        try:
            rbufsize = -1
            wbufsize = 0
            rfile = request.makefile('rb', rbufsize)
            wfile = request.makefile('wb', wbufsize)

            data = rfile.readline().strip()
            self.migrants.append(data)
            print("finish_request::  From: %d  To: %d  MID: %d  Size: %d -- %s" % (client_address[1], 
                                                                                   self.server_address[1], 
                                                                                   id(self.migrants), 
                                                                                   len(self.migrants), 
                                                                                   data))

            if not wfile.closed:
                wfile.flush()
            wfile.close()
            rfile.close()        
        finally:
            sys.exc_traceback = None

    def __call__(self, random, population, args):
        client_address = random.choice(self.client_addresses)
        migrant_index = random.randint(0, len(population) - 1)
        data = population[migrant_index]
        data = uuid.uuid4().hex
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        try:
            sock.connect(client_address)
            sock.send(data + '\n')
        finally:
            sock.close()
        print("      __call__::  From: %d  To: %d  MID: %d  Size: %d -- %s" % (self.server_address[1], 
                                                                               client_address[1], 
                                                                               id(self.migrants), 
                                                                               len(self.migrants), 
                                                                               data))
        if len(self.migrants) > 0:
            migrant = self.migrants.popleft()
            population[migrant_index] = migrant
        return population


def run_it(migrator, rand, pop):
    for i in range(10):
        pop = migrator(r, pop, {})
        print("        run_it::  Port: %d  MID: %d  Size: %d" % (migrator.server_address[1], 
                                                                 id(migrator.migrants), 
                                                                 len(migrator.migrants)))
        time.sleep(1)


if __name__ == '__main__':
    import random
    r = random.Random()
    a = ('localhost', 11111)
    b = ('localhost', 22222)
    c = ('localhost', 33333)
    am = NetworkMigrator(a, [b, c], max_migrants=11)
    bm = NetworkMigrator(b, [a, c], max_migrants=22)
    cm = NetworkMigrator(c, [a, b], max_migrants=33)

    fun = [am, bm, cm]
    pop = [["larry", "moe", "curly"], ["red", "green", "blue"], ["small", "medium", "large"]]
    jobs = []
    for f, p in zip(fun, pop):
        pro = multiprocessing.Process(target=run_it, args=(f, r, p))
        jobs.append(pro)
        pro.start()
    for j in jobs:
        j.join()
    am.shutdown()
    bm.shutdown()
    cm.shutdown()

この例の出力を見ると、次の3種類の印刷があります。

        run_it::  Port: 11111  MID: 3071227860  Size: 0
      __call__::  From: 11111  To: 22222  MID: 3071227860  Size: 0 -- e00e0891e0714f99b86e9ad743731a00
finish_request::  From: 60782  To: 22222  MID: 3071227972  Size: 10 -- e00e0891e0714f99b86e9ad743731a00

「MID」は、そのid場合のmigrants両端キューです。「From」と「To」は、送信を送受信するポートです。そして、私は今、データをランダムな16進文字列に設定しているので、個々の送信を追跡できます。

同じMIDでも、ある時点でサイズがゼロ以外と表示され、後でサイズが0と表示される理由がわかりません。呼び出しはマルチスレッドです。最後の2つのループの代わりにこれらの行を使用するforと、システムは期待どおりに機能します。

for _ in range(10):
    for f, p in zip(fun, pop):
        f(r, p, {})
        time.sleep(1)

それで、それを壊すマルチプロセッシングバージョンで何が起こっているのでしょうか?

4

1 に答える 1

2

3つの新しいNetworkMigratorオブジェクトを作成すると、3つの新しいスレッドが開始され、それぞれが新しいTCP接続をリッスンします。その後、run_it関数の3つの新しいプロセスを開始します。合計で4つのプロセスがあり、最初のプロセスには4つのスレッド(1つのメイン+ 3つのサーバー)が含まれています。ここで問題となるのは、他の3つのプロセスが、リスニングサーバースレッドによってオブジェクトに加えられた変更にアクセスできないことです。これは、プロセスがデフォルトでメモリを共有しないためです。

したがって、プロセスの代わりに3つの新しいスレッドを開始すると、違いに気付くでしょう。

pro = threading.Thread(target=run_it,args=(f,r,p))

もう1つの小さな問題があります。スレッド間のこの共有も完全に安全ではありません。オブジェクトの状態を変更するときは常にロックを使用するのが最善です。finish_requestメソッドとcallメソッドの両方で以下のようなことを行うのが最善です。

lock = Lock()
...
lock.acquire()    
self.migrants.append(data)
lock.release()

マルチスレッドに不満があり、マルチプロセッシングが必要な場合は、ここで説明されているようにプロキシオブジェクトを使用できます:http: //docs.python.org/library/multiprocessing.html#proxy-objects

オブジェクトIDが同じであるということに関しては、それは予想外ではありません。新しいプロセスは、その時点でのオブジェクトの状態(オブジェクトIDを含む)に渡されます。新しいプロセスはこれらのオブジェクトIDを保持し続けますが、これらは異なるプロセスであるため、ここでは2つの完全に異なるメモリスペースについて説明しています。したがって、メインプロセスによって行われた変更は、作成されたサブプロセスには反映されません。

于 2012-02-23T04:57:12.990 に答える