プロデューサーが複数のタスクを生成し、1 つ以上のコンシューマーが一度にタスクを取得して処理し、メッセージを確認する、基本的なダイレクト キュー システムをセットアップしようとしています。
問題は、処理に 10 ~ 20 分かかる場合があり、その時間にメッセージに応答しないため、サーバーが接続を切断することです。
コンシューマー向けの疑似コードを次に示します。
#!/usr/bin/env python
import pika
import time
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
print ' [*] Waiting for messages. To exit press CTRL+C'
def callback(ch, method, properties, body):
long_running_task(connection)
ch.basic_ack(delivery_tag = method.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,
queue='task_queue')
channel.start_consuming()
最初のタスクが完了すると、BlockingConnection の奥深くで例外がスローされ、ソケットがリセットされたことが通知されます。さらに、RabbitMQ ログは、時間内に応答しなかったためにコンシューマーが切断されたことを示しています (FIN を送信するのではなく、接続をリセットする理由は奇妙ですが、心配する必要はありません)。
これがRabbitMQの通常のユースケース(多くのコンシューマー間で分割されるべき多くの長時間実行されるタスクがある)であると信じていたので、私たちは多くのことを調べましたが、実際にこの問題を抱えている人は他にいなかったようです. long_running_task()
最後に、ハートビートを使用し、別のスレッドで生成することが推奨されているスレッドに遭遇しました。
したがって、コードは次のようになりました。
#!/usr/bin/env python
import pika
import time
import threading
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost',
heartbeat_interval=20))
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
print ' [*] Waiting for messages. To exit press CTRL+C'
def thread_func(ch, method, body):
long_running_task(connection)
ch.basic_ack(delivery_tag = method.delivery_tag)
def callback(ch, method, properties, body):
threading.Thread(target=thread_func, args=(ch, method, body)).start()
channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,
queue='task_queue')
channel.start_consuming()
これは機能しているように見えますが、非常に面倒です。ch
オブジェクトがスレッドセーフであると確信していますか? さらに、long_running_task()
その接続パラメーターを使用して新しいキューにタスクを追加していると想像してください (つまり、この長いプロセスの最初の部分が完了したので、タスクを 2 番目の部分に送りましょう)。したがって、スレッドはconnection
オブジェクトを使用しています。そのスレッドセーフですか?
さらに言えば、これを行うための好ましい方法は何ですか? これは非常に面倒で、おそらくスレッドセーフではないので、正しく行っていない可能性があります。ありがとう!