3

編集:

主な問題は、サードパーティの rabbitmq マシンが時々アイドル状態の接続を強制終了しているように見えることです。そのとき、「壊れたパイプ」の例外が発生し始めます。通信を取得する唯一の方法。通常に戻るには、プロセスを強制終了して再起動する必要があります。もっと良い方法があると思いますか?

--

私はここで少し迷っています。メッセージをプッシュするために、サードパーティの RabbitMQ サーバーに接続しています。時々、マシン上のすべてのソケットがドロップされ、「Broken Pipe」例外が発生します。

コードにハートビート チェックを実装するように言われましたが、正確な方法がわかりません。ここでいくつかの情報を見つけました: http://kombu.readthedocs.org/en/latest/changelog.html#version-2-3-0しかし、実際のサンプル コードはありません。

接続文字列に「?heartbeat=x」を追加するだけでよいですか? あとは昆布がやってくれる?「x/2」で「Connection.heartbeat_check()」を呼び出す必要があるようです。これを呼び出す定期的なタスクを作成する必要がありますか? 接続はどのように再確立されますか?

私は使用しています:

  • セロリ==3.0.12
  • 昆布==2.5.4

私のコードは今このように見えます。単純な Celery タスクが呼び出され、メッセージがサード パーティの RabbitMQ サーバーに送信されます (ログとコメントを削除して、簡潔にするため、十分に基本的です)。

class SendMessageTask(Task):
    name = "campaign.backends.send"
    routing_key = "campaign.backends.send"
    ignore_result = True
    default_retry_delay = 60 # 1 minute.
    max_retries = 5

    def run(self, send_to, message, **kwargs):
    payload = "Testing message"

    try:
        conn = BrokerConnection(
        hostname=HOSTNAME,
        port=PORT,
        userid=USER_ID,
        password=PASSWORD,
        virtual_host=VHOST
        )

        with producers[conn].acquire(block=True) as producer:
        publish = conn.ensure(producer, producer.publish, errback=sending_errback, max_retries=3)
        publish(
            body=payload,
            routing_key=OUT_ROUTING_KEY,
            delivery_mode=2,
            exchange=EXCHANGE,
            serializer=None,
            content_type='text/xml',
            content_encoding = 'utf-8'
        )

    except Exception, ex:
        print ex

助けてくれてありがとう。

4

1 に答える 1

2

確かにプロデューサーにハートビート サポートを追加することはできますが、コンシューマー プロセスにとってはより理にかなっています。

ハートビートを有効にするということは、定期的にハートビートを送信する必要があることを意味します。たとえば、ハートビートが 1 秒に設定されている場合、1 秒以上ごとにハートビートを送信する必要があります。そうしないと、リモートが接続を閉じます。

これは、別のスレッドを使用するか、async io を使用してハートビートを時間内に確実に送信する必要があることを意味します。スレッド間で接続を共有できないため、非同期 io のままになります。

良いニュースは、プロデュースのみの接続にハートビートを追加しても、おそらくあまりメリットがないということです。

于 2013-01-29T17:16:18.380 に答える