0

マルチスレッドのデータ共有に inproc zmq ソケットを使用しています。1 つのマスター スレッドに対して複数のスレッド クライアントがあります。各クライアントには PUSH ソケットがあり、マスターにはすべてのクライアントのシンクとして機能する PULL ソケットがあります。ほとんどの場合、各クライアントは独立していますが、1 つのクライアント スレッドがかなり特殊であるため、多少の順序要件があります。

私の問題のバリエーションを示すコードを次に示します。

import threading
import zmq

context = zmq.Context()

pull = context.socket(zmq.PULL)
pull.bind('inproc://my-socket')

def slave():
    global context
    push = context.socket(zmq.PUSH)
    push.connect('inproc://my-socket')

    for x in 'one two three'.split(' '):
        push.send('>>> '+x)
    #push.send('END')
    push.close()

def master():
    global context
    push = context.socket(zmq.PUSH)
    push.connect('inproc://my-socket')

    for x in 'one two three'.split(' '):
        push.send(x)

    x = threading.Thread(target=slave)
    x.start()

    while x.is_alive():
        pass

    push.send('END')
    push.close()

thread = threading.Thread(target=master)
thread.start()

while True:
    if pull.poll():
        x = pull.recv()
        if x == 'END':
            print 'END - exiting'
            break
        print x

マスタースレッドがメインペイロードを送信した後にスレーブスレッドが完全に開始されるという事実により、スレーブのデータの前にマスターのすべてのデータを期待するようになりました。ただし、一貫してそうであるとは限りません。次の出力を考えてみましょう (実際には、順序は一貫していませんが、実際にはこの順序になっています)。

$ python zmq_threads.py 
one
two
>>> one
three
>>> two
END - exiting

私は次の順序を確実に望んでおり、送信に関する限り、この順序はマスター/スレーブの配置によって強制されていると思います

$ python zmq_threads.py 
one
two
three
>>> one
>>> two
>>> three
END - exiting

考えてみると、複数のソケット クライアントがこの種の同期を約束しないことがわかります。ただし、recv の順序を強制するために何らかの方法で何かをフラッシュできる必要があるように感じます (特に inproc トランスポートの場合)。何か案は?

4

1 に答える 1