0

Pythonでウサギmqの次の作業プロデューサー消費者コードを作成しました。しかし、私にはひねりがあります。コンシューマーは 0.5 秒ごとにデータをキューに入れ続けていますが、コンシューマーが 3 秒ごとにウェイクアップし、パブリッシャーがキューに入れた 6 つのデータすべてを取得し、再び 3 秒間スリープさせたいと考えています。私はこれを無限ループに入れたいと思っています。

しかし、ウサギのmqでこれをどのように達成できるかわかりません

プロデューサー

import pika
import time
import datetime

connection = pika.BlockingConnection(pika.ConnectionParameters(
               'localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
value=str(int(time.time()))
for i in range (1000):
    channel.basic_publish(exchange='',routing_key='hello',body='{"action": "print", "method": "onData", "data": "Madan Mohan"}')
    time.sleep(0.5)

connection.close()

消費者

#!/usr/bin/env python
import pika
import time
import json
import datetime


connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='hello')


def callback(ch, method, properties, body):
    #print " current time: %s "  % (str(int((time.time())*1000)))
    d=json.loads(body)
    print d

channel.basic_consume(callback,
                      queue='hello',
                      no_ack=True)

channel.start_consuming()
4

1 に答える 1

2

コールバックでスリープを使用する最初のソリューション。しかし、basic_consume は可能な限り高速に (非同期で) メッセージを取得することを目的としているため、おそらくこれは適切な解決策ではありません。

got = 0

def callback(ch, method, properties, body):
    #print " current time: %s "  % (str(int((time.time())*1000)))
    d=json.loads(body)
    print d
    got = got + 1
    if got == 6
        got = 0
        time.sleep(3)

channel.basic_getを使用します。メッセージを同期的にフェッチする方が適切なソリューションです。

got = 0

while True
    channel.basic_get(callback,
                      queue='hello',
                      no_ack=True)
    got = got + 1
    if got == 6
        got = 0
        time.sleep(3)
于 2013-11-11T11:36:00.720 に答える