4

2015 年 8 月の更新: メッセージングを使用したい人には、現在 zeromq をお勧めします。pykka に加えて、または pykka の完全な代替として使用できます。

メッセージの RabbitMQ キューをリッスンし、Pykka 内のアクターに転送するにはどうすればよいですか?

現在、そうしようとすると、奇妙な動作が発生し、システムが停止します。

アクターを実装する方法は次のとおりです。

class EventListener(eventlet.EventletActor):
    def __init__(self, target):
        """
        :param pykka.ActorRef target: Where to send the queue messages.
        """
        super(EventListener, self).__init__()

        self.target = target

    def on_start(self):
        ApplicationService.listen_for_events(self.actor_ref)

そして、ここApplicationServiceに新しいメッセージのキューをチェックすることになっているクラス内の私のメソッドがあります:

@classmethod
def listen_for_events(cls, actor):
    """
    Subscribe to messages and forward them to the given actor.
    """    
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
    channel = connection.channel()
    channel.queue_declare(queue='test')
    def callback(ch, method, properties, body):
        message = pickle.loads(body)
        actor.tell(message)

    channel.basic_consume(callback, queue='test', no_ack=True)
    channel.start_consuming()            

start_consuming無期限にブロックしているようです。自分で定期的にキューを「ポーリング」する方法はありますか?

4

1 に答える 1

3

あなたのコードはすべて私には正しいように見えます。各アクターが使用するキューを確認したい場合は、actor_inboxから返されたアクター リファレンスで利用可能なプロパティを確認できますActor#start

から継承するときに同様の問題に遭遇したので、テストするために、と を使用しEventletActorて同じコードを試しました。ソース コードからわかる限り、どちらも作業に使用しています。は私にとってはうまく機能しますが、では機能しません。EventletActorThreadingActoreventletThreadingActorEventletActorActorRef#tellActorRef#ask

以下に示すように、同じディレクトリにある 2 つのファイルから始めました。

my_actors.py: クラス名で始まるメッセージ コンテンツを出力することによって、メッセージに応答する 2 つのアクターを初期化します。

from pykka.eventlet import EventletActor
import pykka


class MyThreadingActor(pykka.ThreadingActor):
    def __init__(self):
        super(MyThreadingActor, self).__init__()

    def on_receive(self, message):
        print(
            "MyThreadingActor Received: {message}".format(
                message=message)
        )


class MyEventletActor(EventletActor):
    def __init__(self):
        super(MyEventletActor, self).__init__()

    def on_receive(self, message):
        print(
            "MyEventletActor Received: {message}".format(
                message=message)
        )


my_threading_actor_ref = MyThreadingActor.start()
my_eventlet_actor_ref = MyEventletActor.start()

my_queue.py: pika にキューを設定し、前に設定した 2 つのアクターに転送されるキューにメッセージを送信します。各アクターにメッセージが通知された後、現在のアクターの受信トレイでキュー内の何かがチェックされます。

from my_actors import my_threading_actor_ref, my_eventlet_actor_ref
import pika


def on_message(channel, method_frame, header_frame, body):
    print "Received Message", body
    my_threading_actor_ref.tell({"msg": body})
    my_eventlet_actor_ref.tell({"msg": body})

    print "ThreadingActor Inbox", my_threading_actor_ref.actor_inbox
    print "EventletActor Inbox", my_eventlet_actor_ref.actor_inbox

    channel.basic_ack(delivery_tag=method_frame.delivery_tag)


queue_name = 'test'
connection = pika.BlockingConnection()

channel = connection.channel()
channel.queue_declare(queue=queue_name)
channel.basic_consume(on_message, queue_name)
channel.basic_publish(exchange='', routing_key=queue_name, body='A Message')

try:
    channel.start_consuming()
except KeyboardInterrupt:
    channel.stop_consuming()

    # It is very important to stop these actors, otherwise you may lockup
    my_threading_actor_ref.stop()
    my_eventlet_actor_ref.stop()
connection.close()

出力を実行my_queue.pyすると、次のようになります。

受信メッセージ A メッセージ

ThreadingActor インボックス<Queue.Queue instance at 0x10bf55878>

MyThreadingActor を受け取りました:{'msg': 'A Message'}

EventletActor インボックス<Queue maxsize=None queue=deque([{'msg': 'A Message'}]) tasks=1 _cond=<Event at 0x10bf53b50 result=NOT_USED _exc=None _waiters[0]>>

キューを停止するためにヒットすると、最終的にメッセージを受信して​​出力することCTRL+Cに気付きました。EventletActor

^C受信した MyEventletActor:{'msg': 'A Message'}

これらすべてにより、にバグがある可能性があるEventletActorと思います。あなたのコードは問題なく、最初の検査ではコードに見つからなかったバグが存在すると思います。

この情報がお役に立てば幸いです。

于 2015-02-17T19:37:24.867 に答える