8

pika.BlockingConnectionメッセージごとにいくつかのタスクを実行するコンシューマーで使用しています。また、すべてのタスクを完全に実行した後にコンシューマーが適切に終了するように、シグナル処理も追加しました。

メッセージが処理され、シグナルが受信されている間"signal received"、関数から取得するだけですが、コードは終了しません。そこで、コールバック関数の最後に受信したシグナルもチェックすることにしました。問題は、このコードにはさらに多くの関数があるため、信号を何回チェックするかです。無理をせずにシグナルを処理するより良い方法はありますか?

import signal
import sys
import pika
from time import sleep

received_signal = False
all_over = False

def signal_handler(signal, frame):
    global received_signal
    print "signal received"
    received_signal = True

signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)

mq_connection = pika.BlockingConnection(pika.ConnectionParameters(my_mq_server, virtual_host='test'))
mq_channel = mq_connection.channel()

def callback(ch, method, properties, body):
    if received_signal:
        print "Exiting, as a kill signal is already received"
        exit(0)
    print body
    sleep(50)
    mq_channel.basic_ack(delivery_tag=method.delivery_tag)
    print "Message consumption complete"

    if received_signal:
        print "Exiting, as a kill signal is already received"
        exit(0)

try:
    print ' [*] Waiting for messages. To exit press CTRL+C'
    mq_channel.basic_consume(callback, queue='test')
    mq_channel.start_consuming()
except Exception:
    mq_channel.close()
    exit()

これは私の最初の質問です。詳細が必要な場合はお知らせください。

4

1 に答える 1

4

これはあなたが探しているものだと思います:

#!/usr/bin/python

import signal
import sys 
import pika
from contextlib import contextmanager

received_signal = False
processing_callback = False

def signal_handler(signal, frame):
    global received_signal
    print "signal received"
    received_signal = True
    if not processing_callback:
         sys.exit()

signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)

@contextmanager
def block_signals():
    global processing_callback
    processing_callback = True
    try:
        yield
    finally:
        processing_callback = False
        if received_signal:
            sys.exit()

def callback(ch, method, properties, body):
    with block_signals:
        print body
        sum(xrange(0, 200050000)) # sleep gets interrupted by signals, this doesn't.
        mq_channel.basic_ack(delivery_tag=method.delivery_tag)
        print "Message consumption complete"

if __name__ == "__main__":    
    try:
        mq_connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
        mq_channel = mq_connection.channel()
        print ' [*] Waiting for messages. To exit press CTRL+C'
        mq_channel.basic_consume(callback, queue='test')
        mq_channel.start_consuming()
    except Exception as e:
        mq_channel.close()
        sys.exit()

コンテキスト マネージャーを使用して信号のブロックを処理し、すべてのロジックがコールバック自体の外に隠されるようにしました。これにより、コードの再利用も容易になります。それがどのように機能するかを明確にするために、これはこれと同等です:

def callback(ch, method, properties, body):
    global processing_callback
    processing_callback = True
    try:
        print body
        sum(xrange(0, 200050000))
        mq_channel.basic_ack(delivery_tag=method.delivery_tag)
        print "Message consumption complete"
    finally:
        processing_callback = False
        if received_signal:
            sys.exit()
于 2014-05-09T15:47:19.307 に答える