4

サーバー間にイベントベースのシステムをセットアップしたいと考えています。たとえば、データベース ロジックをラップするサーバーの状態が変化したときに、他のサーバーに通知するようにしたいと考えています。これにはパブリッシュ/サブスクライブの設計が理想的で、ZeroRPC について良いことを聞いたことがあります。

zerorpc ストリーミングを使用して pub/sub を実現することに言及している人もいますが、ストリーミングを使用してイベントを起動する方法は明らかではありません。

4

2 に答える 2

10

dotCloud では、多くの pub/sub トラフ zerorpc ストリーミングを使用しています。私たちのやり方を説明しましょう。

要約すれば

@zeropc.stream で装飾されたストリーミング メソッドを公開します。このメソッドが呼び出されると、gevent.queue がセットに追加されます。その後、メソッドは永遠にループし、キューに到着するすべてのメッセージを生成します。このメソッドが終了すると (クライアントが切断されたため)、キューはセットから削除されます。

パブリッシュするには、セットに登録されているすべてのキューにパブリッシュするメッセージを投稿するだけです。この時点で、遅いコンシューマに対して何をしたいかを決定する必要があります (コンシューマを切断する、特定の制限までキューに入れる、および/または新しいメッセージを破棄する)。

zerorpc-python での実装例:

サブスクライブ部分

class MyService(object):
    def __init__(self):
        self._subscribers = set()

    @zerorpc.stream
    def subscribe(self):
        try:
            queue = gevent.queue.Queue()
            self._subscribers.add(queue)
            for msg in queue:
                yield msg
        finally:
            self._subscribers.remove(queue)

subscribe メソッドは、イベント キューをセットに追加するだけです。次に、次のいずれかになるまでキューを永久に消費します。 - キューが StopIteration メッセージによって終了される (gevent.queue.Queue のドキュメントを参照) - サブスクライブ関数を実行している greenlet が強制終了される (通常、クライアントが切断されたため)

どちらの場合も、finally ステートメントが実行され、サブスクライバーのリストからキューが削除されます。

この時点でキューのサイズを制限できることに注意してください...Queue(maxsize=42)

出版部分

class MyService(object):
    [...]

    def _publish(self, msg):
        for queue in self._subscribers:
            if queue.size < 42:
                queue.put(msg)

メッセージを発行するには、このメソッドを呼び出します。メッセージを入れるために、すべてのサブスクライバーのキューを反復処理します。私の例では、キューが特定のサイズに達すると、メッセージを破棄します。しかし、そこに適用したいパターンの種類に制限はありません。

サブスクライバの greenlet インスタンスをセットに格納し、キューがいっぱいになったらそれを強制終了して、遅いクライアントを効果的に切断できます (クライアントに遅すぎることを知らせるメッセージを送信することもできます)。_publish などから戻る前に、すべてのコンシューマーがメッセージを並行して処理するのを待つこともできます。

それが役立つことを願っています!

于 2012-08-20T23:02:25.323 に答える
2

現在、ZeroRPC には完全に異なるパブリッシャー/サブスクライバー機能のセットがあり、ネットワーク上で完全に機能します!

ZeroRPC テストを読んで、ZeroRPC テストの使用方法に関するヒントを確認することに興味があるかもしれません。この場合は、Publisher クラスと Subscriber クラスです。ここにテストがあります。

また、パブリッシャ/サブスクライバ パターンなどに関する ØMQ ドキュメントには、多くの優れた情報があります。ここで見つけることができます。

于 2012-12-13T10:19:42.783 に答える