Pythonでウサギmqの次の作業プロデューサー消費者コードを作成しました。しかし、私にはひねりがあります。コンシューマーは 0.5 秒ごとにデータをキューに入れ続けていますが、コンシューマーが 3 秒ごとにウェイクアップし、パブリッシャーがキューに入れた 6 つのデータすべてを取得し、再び 3 秒間スリープさせたいと考えています。私はこれを無限ループに入れたいと思っています。
しかし、ウサギのmqでこれをどのように達成できるかわかりません
プロデューサー
import pika
import time
import datetime
connection = pika.BlockingConnection(pika.ConnectionParameters(
'localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
value=str(int(time.time()))
for i in range (1000):
channel.basic_publish(exchange='',routing_key='hello',body='{"action": "print", "method": "onData", "data": "Madan Mohan"}')
time.sleep(0.5)
connection.close()
消費者
#!/usr/bin/env python
import pika
import time
import json
import datetime
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
def callback(ch, method, properties, body):
#print " current time: %s " % (str(int((time.time())*1000)))
d=json.loads(body)
print d
channel.basic_consume(callback,
queue='hello',
no_ack=True)
channel.start_consuming()