2

私はrabbitmqに最新のpikaライブラリ(0.9.9+)を使用しています。私のrabbitmqとpikaの使用法は次のとおりです。

  1. 私は労働者として長時間(約5分)のタスクを実行しています。これらのタスクはrabbitmqからリクエストを受け取ります。リクエストは非常にまれにしか発生しません。つまり、リクエスト間に長いアイドル時間があります。
  2. 私が以前直面していた問題は、アイドル接続(アイドル接続による接続の閉鎖)に関連しています。だから、私はピカでハートビートを有効にしました。
  3. ここで、ハートビートの選択が問題になります。Pikaは、ハートビートの受信と確認応答がリクエストの時間枠の間に行われるシングルスレッドライブラリのようです。
  4. したがって、ハートビート間隔が、コールバック関数が長時間実行される計算を実行するために使用する時間よりも短く設定されている場合、サーバーはハートビート確認応答を受信せず、接続を閉じます。
  5. したがって、最小ハートビート間隔は、ブロッキング接続でのコールバック関数の最大計算時間であると想定しています。

アイドル状態の接続を閉じるのを防ぐために、Amazon ec2の適切なハートビート値は何でしょうか?

また、tcp接続を維持するためにrabbitmqキープアライブ(またはlibkeepalive)を使用することを提案する人もいます。アプリケーションがハートビートを管理する必要がないため、tcpレイヤーでハートビートを管理する方がはるかに優れていると思います。これは本当ですか?RMQハートビートと比較した場合、キープアライブは良い方法ですか?

長時間実行されるタスクに複数のスレッドとキューを使用することを提案する人もいます。しかし、これは長時間実行されるタスクの唯一のオプションですか?このシナリオで別のキューを使用する必要があるのは非常に残念です。

前もって感謝します。問題を詳しく説明したと思います。詳細をお知らせいただければお知らせください。

4

1 に答える 1

4

ナキウサギの使用に縛られていない場合、このスレッドは、昆布を使用してあなたがやろうとしていることを達成するのに役立ちました:

#!/usr/bin/env python
import time, logging, weakref, eventlet
from kombu import Connection, Exchange, Queue
from kombu.utils.debug import setup_logging
from kombu.common import eventloop
from eventlet import spawn_after

eventlet.monkey_patch()

log_format = ('%(levelname) -10s %(asctime)s %(name) -30s %(funcName) '
              '-35s %(lineno) -5d: %(message)s')
logging.basicConfig(level=logging.INFO, format=log_format)
logger = logging.getLogger('job_worker')
logger.setLevel(logging.INFO)


def long_running_function(body):
    time.sleep(300)

def job_worker(body, message):
    long_running_function(body)
    message.ack()

def monitor_heartbeats(connection, rate=2):
    """Function to send heartbeat checks to RabbitMQ. This keeps the
       connection alive over long-running processes."""
    if not connection.heartbeat:
        logger.info("No heartbeat set for connection: %s" % connection.heartbeat)
        return
    interval = connection.heartbeat
    cref = weakref.ref(connection)
    logger.info("Starting heartbeat monitor.")

    def heartbeat_check():
        conn = cref()
        if conn is not None and conn.connected:
            conn.heartbeat_check(rate=rate)
            logger.info("Ran heartbeat check.")
            spawn_after(interval, heartbeat_check)
    return spawn_after(interval, heartbeat_check)

def main():
    setup_logging(loglevel='INFO')

    # process for heartbeat monitor
    p = None

    try:
        with Connection('amqp://guest:guest@localhost:5672//', heartbeat=300) as conn:
            conn.ensure_connection()
            monitor_heartbeats(conn)
            queue = Queue('job_queue',
                          Exchange('job_queue', type='direct'),
                          routing_key='job_queue')
            logger.info("Starting worker.")
            with conn.Consumer(queue, callbacks=[job_worker]) as consumer:
                consumer.qos(prefetch_count=1)
                for _ in eventloop(conn, timeout=1, ignore_timeouts=True):
                    pass
    except KeyboardInterrupt:
        logger.info("Worker was shut down.")

if __name__ == "__main__":
    main()

ドメイン固有のコードを削除しましたが、基本的にこれが私が使用するフレームワークです。

于 2013-05-13T16:39:49.550 に答える