66

プロデューサーが複数のタスクを生成し、1 つ以上のコンシューマーが一度にタスクを取得して処理し、メッセージを確認する、基本的なダイレクト キュー システムをセットアップしようとしています。

問題は、処理に 10 ~ 20 分かかる場合があり、その時間にメッセージに応答しないため、サーバーが接続を切断することです。

コンシューマー向けの疑似コードを次に示します。

#!/usr/bin/env python
import pika
import time

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)
print ' [*] Waiting for messages. To exit press CTRL+C'

def callback(ch, method, properties, body):
    long_running_task(connection)
    ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,
                      queue='task_queue')

channel.start_consuming()

最初のタスクが完了すると、BlockingConnection の奥深くで例外がスローされ、ソケットがリセットされたことが通知されます。さらに、RabbitMQ ログは、時間内に応答しなかったためにコンシューマーが切断されたことを示しています (FIN を送信するのではなく、接続をリセットする理由は奇妙ですが、心配する必要はありません)。

これがRabbitMQの通常のユースケース(多くのコンシューマー間で分割されるべき多くの長時間実行されるタスクがある)であると信じていたので、私たちは多くのことを調べましたが、実際にこの問題を抱えている人は他にいなかったようです. long_running_task()最後に、ハートビートを使用し、別のスレッドで生成することが推奨されているスレッドに遭遇しました。

したがって、コードは次のようになりました。

#!/usr/bin/env python
import pika
import time
import threading

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost',
        heartbeat_interval=20))
channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)
print ' [*] Waiting for messages. To exit press CTRL+C'

def thread_func(ch, method, body):
    long_running_task(connection)
    ch.basic_ack(delivery_tag = method.delivery_tag)

def callback(ch, method, properties, body):
    threading.Thread(target=thread_func, args=(ch, method, body)).start()

channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,
                      queue='task_queue')

channel.start_consuming()

これは機能しているように見えますが、非常に面倒です。chオブジェクトがスレッドセーフであると確信していますか? さらに、long_running_task()その接続パラメーターを使用して新しいキューにタスクを追加していると想像してください (つまり、この長いプロセスの最初の部分が完了したので、タスクを 2 番目の部分に送りましょう)。したがって、スレッドはconnectionオブジェクトを使用しています。そのスレッドセーフですか?

さらに言えば、これを行うための好ましい方法は何ですか? これは非常に面倒で、おそらくスレッドセーフではないので、正しく行っていない可能性があります。ありがとう!

4

7 に答える 7

34

今のところ、最善の策はハートビートをオフにすることです。これにより、長時間ブロックしている場合に、RabbitMQ が接続を閉じるのを防ぐことができます。バックグラウンド スレッドで動作する pika のコア接続管理と IO ループを試していますが、リリースできるほど安定していません。

pika v1.1.0 では、これはConnectionParameters(heartbeat=0)

于 2013-04-22T19:29:18.107 に答える
11

私はあなたが持っていたのと同じ問題に遭遇します。
私の解決策は次のとおりです。

  1. サーバー側でハートビートをオフにする
  2. タスクにかかる最大時間を評価する
  3. ステップ 2 で取得した時間にクライアントのハートビート タイムアウトを設定します。

なぜこれ?

次のケースでテストすると:

ケース1
  1. サーバー ハートビートのオン、1800 秒
  2. クライアントの設定解除

タスクを非常に長時間実行すると、まだエラーが発生します -- >1800

ケース2
  1. サーバーのハートビートをオフにする
  2. クライアントのハートビートをオフにする

1 つの問題を除いて、クライアント側にエラーはありません。クライアントがクラッシュした場合 (一部の障害で OS が再起動した場合)、Rabbitmq 管理プラグインで tcp 接続が引き続き表示されます。そして、それは混乱しています。

ケース3
  1. サーバーのハートビートをオフにする
  2. クライアントのハートビートをオンにし、予測される最大実行時間に設定します

この場合、個々のクライアントですべてのヒートビートを動的に変更できます。実際、頻繁にクラッシュするマシンにハートビートを設定しました。さらに、Rabbitmq Manangement プラグインを使用して、オフラインのマシンを確認できます。

環境

OS: centos x86_64
ピカ: 0.9.13
rabbitmq: 3.3.1

于 2014-05-29T11:17:49.970 に答える
0

これをスレッドで処理するもう 1 つの簡単な方法を次に示します。コンシューマ アプリが現在のジョブが終了するまで別のジョブを消費してはならない場合に特に便利です。ack はいつでも送信できます。この場合は、ジョブが完了したとき (スレッドが生きていないとき) にのみ送信することにしました。

独自のスレッドで実行時間の長いプロセスを開始し、channel.process_data_events() への呼び出しを使用してループ内でそのスレッドを監視します。メインスレッドはスレッドセーフではないため、接続オブジェクトへの参照を維持します。基本的に:

import time
import pika
from threading import Thread
from functools import partial

rmqconn = pika.BlockingConnection( ... )
rmqchan = rmqconn.channel()
rmqchan.basic_consume(
    queue='test',
    on_message_callback=partial(launch_process,rmqconn)
)
rmqchan.start_consuming()

def launch_process(conn,ch,method,properties,body):
    runthread = Thread(target=run_process,args=body)
    runthread.start()
    while runthread.is_alive():
        time.sleep(2)
        conn.process_data_events()
    ch.basic_ack(delivery_tag=method.delivery_tag)

def run_process(body):
    #do the long-running thing
    time.sleep(10)
于 2021-06-24T21:07:23.420 に答える