4

0mqを使用してプロセス間の双方向通信を確立するための最も正しい方法は何ですか?メインプロセスからのコマンドを待機し、いくつかの計算を実行して結果をメインプロセスに戻す、いくつかのバックグラウンドプロセスを作成する必要があります。

4

2 に答える 2

7

これを行うにはいくつかの方法があります。最も簡単なアプローチは、REQ/REPソケットを使用することです。各バックグラウンドプロセス/ワーカーにはソケットがあり、REPソケットを使用REQしてそれらと通信します。

import zmq

def worker(addr):
    context = zmq.Context()
    socket = context.socket(zmq.REP)
    socket.bind(addr)
    while True:
        # get message from boss
        msg = socket.recv()
        # ...do smth
        # send back results
        socket.send(msg)

if __name__ == '__main__':
    # spawn 5 workers
    from multiprocessing import Process
    for i in range(5):
        Process(target=worker, args=('tcp://127.0.0.1:500%d' % i,)).start()

メッセージを送信して結果を返すには、各ワーカーに接続する必要があります。

context = zmq.Context()
socket = context.socket(zmq.REQ)
socket.connect(worker_addr)
socket.send('message')
msg = socket.recv()

別のアプローチは、PUB/SUBを使用してワーカーへのメッセージを送信したり、PUSH/PULL結果を収集したりすることです。

import zmq

def worker(worker_id, publisher_addr, results_addr):
    context = zmq.Context()
    sub = context.socket(zmq.SUB)
    sub.connect(publisher_addr)
    sub.setsockopt(zmq.SUBSCRIBE, worker_id)
    push = context.socket(zmq.PUSH)
    push.connect(results_addr)

    while True:
        msg = sub.recv_multipart()[1]
        # do smth, send off results
        push.send_multipart([worker_id, msg])

if __name__ == '__main__':
    publisher_addr = 'tcp://127.0.0.1:5000'
    results_addr = 'tcp://127.0.0.1:5001'

    # launch some workers into space
    from multiprocessing import Process
    for i in range(5):
        Process(target=worker, args=('worker-%d' % i, publisher_addr, results_addr,)).start()

特定のワーカーにコマンドをブロードキャストするには、次のようにします。

context = zmq.Context()
pub = context.socket(zmq.PUB)
pub.bind(publisher_addr)
# send message to worker-1
pub.send_multipart(['worker-1', 'hello'])

結果を引き出す:

context = zmq.Context()
pull = context.socket(zmq.PULL)
pull.bind(results_addr)

while True:
    worker_id, result = pull.recv_multipart()
    print worker_id, result
于 2011-08-24T17:24:01.077 に答える
3

Request Reply Brokerの使用を検討しますが、REQソケットをDEALERに交換します。DEALERは送信をブロックせず、ワーカーへのトラフィックを自動的に負荷分散します。

写真でClientはあなたmain processService A/B/Cあなたbackground processes (workers)です。Main processエンドポイントにバインドする必要があります。Workers作業項目を受け取るには、メインプロセスのエンドポイントに接続する必要があります。

main process作業項目のリストを保持し、時間を送信します。しばらくの間回答がない場合は、workerおそらく死亡したため、作業項目を再送信してください。

リクエストリプライブローカー

于 2011-08-24T17:17:35.297 に答える