6

この上から髪を伸ばしています。zeromq と gevent を使用した最も単純な例を取得しようとしています。PUB/SUB ソケットを使用するようにこのスクリプトを変更しました。これを実行すると、「サーバー」ソケットが永遠にループします。gevent.sleep(0.1) 行のコメントを外すと、期待どおりに動作し、他の緑色のスレッド (この場合はクライアント) に譲ります。

問題は、スリープ コールを手動で追加する必要があるのはなぜですか? zmq.green バージョンの zmq をインポートすると、送信呼び出しと受信呼び出しがブロックされておらず、その下でタスクの切り替えが行われると思いました。

つまり、この例を機能させるために gevent.sleep() 呼び出しを追加する必要があるのはなぜでしょうか? Jeff Lindsey の元の例では、彼は REQ/REP ソケットを実行しており、スリープ コールを追加する必要はありません... しかし、これを PUB/SUB に変更したとき、処理のためにクライアントに渡すために必要です。

#Notes: Code taken from slide: http://www.google.com/url?sa=t&rct=j&q=zeromq%20gevent&source=web&cd=27&ved=0CFsQFjAGOBQ&url=https%3A%2F%2Fraw.github.com%2Fstrangeloop%2F2011-slides%2Fmaster%2FLindsay-DistributedGeventZmq.pdf&ei=JoDNUO6OIePQiwK8noHQBg&usg=AFQjCNFa5g9ZliRVoN_yVH7aizU_fDMtfw&bvm=bv.1355325884,d.cGE
#Jeff Lindsey talk on gevent and zeromq

import gevent
from gevent import spawn
import zmq.green as zmq

context = zmq.Context()

def serve():
    print 'server online'
    socket = context.socket(zmq.PUB)
    socket.bind("ipc:///tmp/jeff")
    while True:
        print 'send'
        socket.send("World")
        #gevent.sleep(0.1)

def client():
    print 'client online'
    socket = context.socket(zmq.SUB)
    socket.connect("ipc:///tmp/jeff")
    socket.setsockopt(zmq.SUBSCRIBE, '') 
    while True:
        print 'recv'
        message = socket.recv()


cl = spawn(client)
server = spawn(serve)

print 'joinall'
gevent.joinall([cl, server])


print 'end'
4

1 に答える 1

6

zmq.green バージョンの zmq をインポートすると、送信呼び出しと受信呼び出しがブロックされておらず、その下でタスクの切り替えが行われると思いました。

zmq.green は、これらの呼び出しがブロックされる場合にのみ生成されます。準備ができている場合は生成されません (待つ必要はありません)。あなたの場合、送信者は常に準備ができているので、譲歩する理由はありません。

いくつかのポインタ:

  • 最小の明示的な利回りはgevent.sleep(0)です。有限である必要はありません。
  • zmq.green は、ブロッキング呼び出しでのみ生成されます。つまり、要求されたときにソケットが常に送信/受信の準備ができている場合、ソケットは決して譲歩しません。
  • socket.sendソケットが送信する準備ができていない場合にのみブロックします ( not (socket.events & zmq.POLLOUT))。実際には PUB ソケットには当てはまりません (PUSH、DEALER などの HWM で確認できます)。
  • 一般に、send to yield を信頼しないでください。zeromq の動作方法により、構成の容量を超えていない限り、これはほとんど当てはまりません。
  • send とは異なり、recvは通常の使用法では定期的にブロックするため、ほとんどの呼び出しで譲歩します。しかし、ピアが着信バッファーをあふれさせている場合、recv 呼び出しを繰り返しても、受信する準備ができなくなるまで解放されないため、飢餓を防ぐために、時々明示的に解放する必要がある場合があります。

zmq.green は send/recv を次のように変換します。

try:
    socket.send(msg, zmq.NOBLOCK) # or recv
except zmq.ZMQError as e:
    if e.errno == zmq.EAGAIN:
        yield # and wait for socket to be ready, then try again

そのため、NOBLOCK を指定した send/recv が常に成功している場合、ソケットは解放されません。

別の言い方をすれば、ソケットに待機するものがなければ、ソケットは待機しません。

于 2012-12-16T21:38:02.847 に答える