ナキウサギの使用に縛られていない場合、このスレッドは、昆布を使用してあなたがやろうとしていることを達成するのに役立ちました:
#!/usr/bin/env python
import time, logging, weakref, eventlet
from kombu import Connection, Exchange, Queue
from kombu.utils.debug import setup_logging
from kombu.common import eventloop
from eventlet import spawn_after
eventlet.monkey_patch()
log_format = ('%(levelname) -10s %(asctime)s %(name) -30s %(funcName) '
'-35s %(lineno) -5d: %(message)s')
logging.basicConfig(level=logging.INFO, format=log_format)
logger = logging.getLogger('job_worker')
logger.setLevel(logging.INFO)
def long_running_function(body):
time.sleep(300)
def job_worker(body, message):
long_running_function(body)
message.ack()
def monitor_heartbeats(connection, rate=2):
"""Function to send heartbeat checks to RabbitMQ. This keeps the
connection alive over long-running processes."""
if not connection.heartbeat:
logger.info("No heartbeat set for connection: %s" % connection.heartbeat)
return
interval = connection.heartbeat
cref = weakref.ref(connection)
logger.info("Starting heartbeat monitor.")
def heartbeat_check():
conn = cref()
if conn is not None and conn.connected:
conn.heartbeat_check(rate=rate)
logger.info("Ran heartbeat check.")
spawn_after(interval, heartbeat_check)
return spawn_after(interval, heartbeat_check)
def main():
setup_logging(loglevel='INFO')
# process for heartbeat monitor
p = None
try:
with Connection('amqp://guest:guest@localhost:5672//', heartbeat=300) as conn:
conn.ensure_connection()
monitor_heartbeats(conn)
queue = Queue('job_queue',
Exchange('job_queue', type='direct'),
routing_key='job_queue')
logger.info("Starting worker.")
with conn.Consumer(queue, callbacks=[job_worker]) as consumer:
consumer.qos(prefetch_count=1)
for _ in eventloop(conn, timeout=1, ignore_timeouts=True):
pass
except KeyboardInterrupt:
logger.info("Worker was shut down.")
if __name__ == "__main__":
main()
ドメイン固有のコードを削除しましたが、基本的にこれが私が使用するフレームワークです。