現在、私はZeroRPCを使用しています。「ワーカー」を「サーバー」に接続させ、サーバーが送信する作業を行います。
現在、FIFO キューを使用していることがわかる限り、呼び出す必要があるとすぐに ZeroRPC 経由で呼び出しが行われます。
独自のキューを使用して、通話を調整/優先順位付けしたいと考えています。
Event
ZeroRPCが、内部キューが空になったときにトリガーされる gevent を公開することを期待しています。
あなたがしたいことは、あなたのサーバーにあなた自身のワークキューを作成することです。そして、あなたが望む優先順位であなた自身に電話をかけなさい。
数行のコードが3巻のどの吸血鬼の物語よりも多くを表現しているので、サーバーがどのように見えるかを擬似コードで見てみましょう。
myqueue = MySuperBadAssQueue()
def myqueueprocessor():
for request in myqueue: # blocks until next request
gevent.spawn(request.processme) # do the job asynchronously
gevent.spawn(myqueueprocessor) # do that at startup
class Server:
def dosomething(args...blabla...): # what users are calling
request = Request(args...blabla...)
myqueue.put(request) # something to do buddy!
return request.future.get() # return when request is completed
# (can also raise an exception)
# An example of what a request could look like:
class Request:
def __init__(self, ....blablabla...):
self.future = gevent.AsyncResult()
def process():
try:
result = someworker(self.args*) # call some worker
self.future.set(result) # complete the initial request
except Exception as e:
self.future.set_exception(e)
すべてのスマートな作業を実行したり、必要に応じてスロットルしたり、必要に応じて例外を除いてリクエストをキャンセルしたりするのは、MySuperBadAssQueue次第です...
ZeroRPCは、その「内部」キューが空になったかどうかを通知するイベントを公開しません。
実際、ZeroRPCには明示的なキューはありません。何が起こるかは、単に先着順であり、正確な順序はZeroMQとGevent IOLoop(バージョンに応じてlibeventまたはlibev)の両方に依存します。実際には、これはFIFOキューのように便利に機能します。
私はこれを自分で試したことはありませんが、ソースを読みました。自分でやりたいのでモチベーションが上がります。
あなたがすることは、メソッドを継承zerorpc.Server
してオーバーライドすることのようです_acceptor
。ソースによると、_acceptor
メッセージを受信し、スレッドを生成して実行します。したがって、ロジック/ループを変更してキューを組み込むと、それを使用してスロットルすることができます。