1

私はpy-amqpモジュールとPython 3.4を使用しています。複数のリスナーを実行し、1つのプロデューサーを開始してメッセージを公開すると、リスナーは1つのメッセージを受け取り、同時に処理を開始します。メッセージはDBに一度だけ書き込まれる必要があるため、そのような動作は必要ありません。そのため、最速のワーカーが DB にメッセージを書き込み、他のすべてのワーカーはそのメッセージが既に存在すると言います。

プロデューサー:

import json
import amqp
import random
from application.settings import RMQ_PASSWORD, RMQ_USER, RMQ_HOST, RMQ_EXCHANGE

def main():
    conn = amqp.Connection(RMQ_HOST, RMQ_USER,
                           RMQ_PASSWORD, ssl=False)
    ch = conn.channel()
    ch.exchange_declare(RMQ_EXCHANGE, 'fanout')
    req = {"request": {"transaction_number": random.randint(100000, 9999999999)}}
    message = json.dumps(req)    
    msg = amqp.Message(message)    
    ch.basic_publish(msg, RMQ_EXCHANGE)    
    ch.close()
    conn.close()

if __name__ == '__main__':
    for x in range(100):
        main()

ワーカー:

from functools import
from pipeline import pipeline, dal
from settings import DB_CONNECTION_STRING, RMQ_EXCHANGE, RMQ_HOST, RMQ_PASSWORD, RMQ_USER    
import amqp


DB = dal.DAL(DB_CONNECTION_STRING)
message_processor = pipeline.Pipeline(DB)


def callback(channel, msg):
    channel.basic_ack(msg.delivery_tag)
    message_processor.process(msg)

    if msg.body == 'quit':
        channel.basic_cancel(msg.consumer_tag)


def main():
    conn = amqp.Connection(RMQ_HOST, RMQ_USER,
                           RMQ_PASSWORD, ssl=False)
    ch = conn.channel()
    ch.exchange_declare(RMQ_EXCHANGE, 'fanout')
    qname, _, _ = ch.queue_declare()
    ch.queue_bind(qname, RMQ_EXCHANGE)
    ch.basic_consume(qname, callback=partial(callback, ch))
    while ch.callbacks:
        ch.wait()
    ch.close()
    conn.close()

if __name__ == '__main__':
    print('Listener starting')
    main()

また:

user@RabbitMQ:~$ sudo rabbitmqctl list_bindings
Listing bindings ...
        exchange        amq.gen--crTjfeSlue6gw0LRwW7pQ  queue   amq.gen--crTjfeSlue6gw0LRwW7pQ  []
        exchange        amq.gen-1X3vwGF5OKn_gcnofpJKFg  queue   amq.gen-1X3vwGF5OKn_gcnofpJKFg  []
...
        exchange        amq.gen-yf8ieG1AK9x83Vz4GBj-ZA  queue   amq.gen-yf8ieG1AK9x83Vz4GBj-ZA  []
        exchange        entryapi.test   queue   entryapi.test   []
entryapi        exchange        entryapi.test   queue           []
azaza   exchange        amq.gen--crTjfeSlue6gw0LRwW7pQ  queue           []
azaza   exchange        amq.gen-1X3vwGF5OKn_gcnofpJKFg  queue           []
...
azaza   exchange        amq.gen-yf8ieG1AK9x83Vz4GBj-ZA  queue           []
azaza   exchange        entryapi.test   queue           []
...done.
4

1 に答える 1

2

ユースケースに対して間違ったタイプのセットアップを使用していると思います。エクスチェンジに発行するパブリッシャーがあり、メッセージを読み取って DB に書き込みたいとします。スループットを向上させるために、多くのコンシューマーが DB に書き込むことでこれを行う必要があります。ファンアウト交換はメッセージをレプリケートするため、複数のキューとコンシューマーにより、同じデータが DB に複数回書き込まれます。「ワーク キュー」を使用する必要があります。各交換は、デフォルト (タイプなし、または同じルーティング キーを使用するすべてのメッセージとの直接交換) 交換になります。交換に送信されるすべてのメッセージは、1 つの単一のキューに転送されます。各キューには複数のコンシューマーがあります。各メッセージは、コンシューマーのグループから 1 人のコンシューマーによって 1 回だけキューから読み取られ、DB に 1 回だけ書き込まれます。

詳細はこちら http://www.rabbitmq.com/tutorials/tutorial-two-python.html

于 2014-04-22T11:10:35.310 に答える