1

私は他のいくつかの変更ファイルを監視するコードをいくつか持っています。私がやりたいことは、別のソケットで zeromq を使用するコードを開始することです。私が今それを行っている方法では、libzmq のどこかでアサーションが失敗するようです。同じソケット。モニター クラスから新しいプロセスを作成するときに、コンテキストが再利用されないようにするにはどうすればよいですか? それが私が起こっていると思うことです。私の側に他の愚かさがあると言うことができるなら、アドバイスしてください。ここにいくつかのコードがあります:

import zmq
from zmq.eventloop import ioloop
from  zmq.eventloop.zmqstream  import ZMQStream
class Monitor(object):
    def __init(self)
        self.context = zmq.Context()
        self.socket = self.context.socket(zmq.DEALER)
        self.socket.connect("tcp//127.0.0.1:5055")
        self.stream =  ZMQStream(self._socket)
        self.stream.on_recv(self.somefunc)

    def initialize(self,id)
        self._id = id

    def somefunc(self, something)
        """work here and send back results if any """
        import json
        jdecoded = json.loads(something)
        if self_id == jdecoded['_id']
           """ good im the right monitor for you """
           work = jdecoded['message']
           results  = algorithm (work)
           self.socket.send(json.dumps(results))
        else:
           """let some other process deal with it, not mine """
           pass

 class Prefect(object):
    def __init(self, id)
        self.context = zmq.Context()
        self.socket = self.context.socket(zmq.DEALER)
        self.socket.bind("tcp//127.0.0.1:5055")
        self.stream =  ZMQStream(self._socket)
        self.stream.on_recv(self.check_if)
        self._id = id
        self.monitors = []
    def check_if(self,message):
        """find out from message's id whether we have
            started a proces for it previously"""
        import json
        jdecoded = json.loads(message)
        this_id = jdecoded['_id']
        if this_id in self.monitors:
            pass
        else:
            """start new process for it should have its won socket """
            new = Monitor()
            import Process
            newp = Process(target=new.initialize,args=(this_id) )
            newp.start()
            self.monitors.append(this_id) ## ensure its remembered

何が起こっているのかというと、すべての監視プロセスと単一のprefectプロセスが同じポートでリッスンする必要があるため、prefectがまだ見ていないリクエストを確認すると、そのプロセスを開始し、おそらく存在するすべてのプロセスもリッスンする必要がありますが、彼ら向けではないメッセージを無視します。現状では、これを行うと、おそらく何かによる同じzmqソケットの同時アクセスに関連するクラッシュが発生します(threading.threadを試しましたが、まだクラッシュします)別のスレッドによるzmqソケットの同時アクセスは不可能であることをどこかで読みました. 新しいプロセスが独自の zmq ソケットを取得するようにするにはどうすればよいですか?

編集:私のアプリの主な取引は、リクエストがzmqソケットを介して着信し、リッスンしているプロセスがメッセージに反応することです:

1. If its directed at that process judged by the _id field, do some reading on a file and reply since one of the monitors match the messages _id, if none match, then:
2  If the messages _id files is not recognized, all monitors ignore it but the Prefect creates a process to handle that _id and all future messages to that id.
3. I want all the messages to be seen by the monitor processes as well as the prefect process, seems that seems easiest, 
4. All the messages are very small, avarage ~4096 bytes.   
5. The monitor does some non-blocking read and for each ioloop it sends what it has found out

more-edit=>prefect プロセスがバインドされ、メッセージを受信して​​エコーし、モニターで表示できるようにします。これは、アーキテクチャとして私が考えていることですが、最終的なものではありません。.

すべてのメッセージは、クライアントが何を望んでいるのかをサーバーに知らせるブラウザを介してリモートユーザーから到着し、サーバーはzmqを介してメッセージをバックエンドに送信します(これは示していませんが、難しいことではありません)ので、本番環境ではバインドしない可能性があります/localhost に接続します。私が DEALER を選択したのは、どちらの方向でも asyc / 無制限のメッセージを許可し (ポイント 5 を参照)、DEALER は DEALER とバインドでき、最初の要求/応答はどちらの側からでも到着できるためです。これを行うことができるもう 1 つは、おそらく DEALER/ROUTER です。

4

1 に答える 1

2

フォークの境界を越えて同じソケットを使い続けることはできません (マルチプロセッシングはフォークを使用します)。一般に、これが意味することは、サブプロセスが開始されるまで、フォークされたプロセスで使用されるソケットを作成したくないということです。あなたの場合、ソケットは Monitor オブジェクトの属性であるため、メインプロセスで Monitor を作成したくありません。それは次のようになります。

def start_monitor(this_id):
    monitor = Monitor()
    monitor.initialize(this_id)
    # run the eventloop, or this will return immediately and destroy the monitor

... inside Prefect.check_if():

    proc = Process(target=start_monitor, args=(this_id,))
    proc.start()
    self.monitors.append(this_id)

あなたの例ではなく、サブプロセスが行う唯一のことはIDを割り当ててからプロセスを強制終了し、最終的には何の効果もありません。

于 2013-04-13T20:49:10.847 に答える