私は他のいくつかの変更ファイルを監視するコードをいくつか持っています。私がやりたいことは、別のソケットで zeromq を使用するコードを開始することです。私が今それを行っている方法では、libzmq のどこかでアサーションが失敗するようです。同じソケット。モニター クラスから新しいプロセスを作成するときに、コンテキストが再利用されないようにするにはどうすればよいですか? それが私が起こっていると思うことです。私の側に他の愚かさがあると言うことができるなら、アドバイスしてください。ここにいくつかのコードがあります:
import zmq
from zmq.eventloop import ioloop
from zmq.eventloop.zmqstream import ZMQStream
class Monitor(object):
def __init(self)
self.context = zmq.Context()
self.socket = self.context.socket(zmq.DEALER)
self.socket.connect("tcp//127.0.0.1:5055")
self.stream = ZMQStream(self._socket)
self.stream.on_recv(self.somefunc)
def initialize(self,id)
self._id = id
def somefunc(self, something)
"""work here and send back results if any """
import json
jdecoded = json.loads(something)
if self_id == jdecoded['_id']
""" good im the right monitor for you """
work = jdecoded['message']
results = algorithm (work)
self.socket.send(json.dumps(results))
else:
"""let some other process deal with it, not mine """
pass
class Prefect(object):
def __init(self, id)
self.context = zmq.Context()
self.socket = self.context.socket(zmq.DEALER)
self.socket.bind("tcp//127.0.0.1:5055")
self.stream = ZMQStream(self._socket)
self.stream.on_recv(self.check_if)
self._id = id
self.monitors = []
def check_if(self,message):
"""find out from message's id whether we have
started a proces for it previously"""
import json
jdecoded = json.loads(message)
this_id = jdecoded['_id']
if this_id in self.monitors:
pass
else:
"""start new process for it should have its won socket """
new = Monitor()
import Process
newp = Process(target=new.initialize,args=(this_id) )
newp.start()
self.monitors.append(this_id) ## ensure its remembered
何が起こっているのかというと、すべての監視プロセスと単一のprefectプロセスが同じポートでリッスンする必要があるため、prefectがまだ見ていないリクエストを確認すると、そのプロセスを開始し、おそらく存在するすべてのプロセスもリッスンする必要がありますが、彼ら向けではないメッセージを無視します。現状では、これを行うと、おそらく何かによる同じzmqソケットの同時アクセスに関連するクラッシュが発生します(threading.threadを試しましたが、まだクラッシュします)別のスレッドによるzmqソケットの同時アクセスは不可能であることをどこかで読みました. 新しいプロセスが独自の zmq ソケットを取得するようにするにはどうすればよいですか?
編集:私のアプリの主な取引は、リクエストがzmqソケットを介して着信し、リッスンしているプロセスがメッセージに反応することです:
1. If its directed at that process judged by the _id field, do some reading on a file and reply since one of the monitors match the messages _id, if none match, then:
2 If the messages _id files is not recognized, all monitors ignore it but the Prefect creates a process to handle that _id and all future messages to that id.
3. I want all the messages to be seen by the monitor processes as well as the prefect process, seems that seems easiest,
4. All the messages are very small, avarage ~4096 bytes.
5. The monitor does some non-blocking read and for each ioloop it sends what it has found out
more-edit=>prefect プロセスがバインドされ、メッセージを受信してエコーし、モニターで表示できるようにします。これは、アーキテクチャとして私が考えていることですが、最終的なものではありません。.
すべてのメッセージは、クライアントが何を望んでいるのかをサーバーに知らせるブラウザを介してリモートユーザーから到着し、サーバーはzmqを介してメッセージをバックエンドに送信します(これは示していませんが、難しいことではありません)ので、本番環境ではバインドしない可能性があります/localhost に接続します。私が DEALER を選択したのは、どちらの方向でも asyc / 無制限のメッセージを許可し (ポイント 5 を参照)、DEALER は DEALER とバインドでき、最初の要求/応答はどちらの側からでも到着できるためです。これを行うことができるもう 1 つは、おそらく DEALER/ROUTER です。