編集:
主な問題は、サードパーティの rabbitmq マシンが時々アイドル状態の接続を強制終了しているように見えることです。そのとき、「壊れたパイプ」の例外が発生し始めます。通信を取得する唯一の方法。通常に戻るには、プロセスを強制終了して再起動する必要があります。もっと良い方法があると思いますか?
--
私はここで少し迷っています。メッセージをプッシュするために、サードパーティの RabbitMQ サーバーに接続しています。時々、マシン上のすべてのソケットがドロップされ、「Broken Pipe」例外が発生します。
コードにハートビート チェックを実装するように言われましたが、正確な方法がわかりません。ここでいくつかの情報を見つけました: http://kombu.readthedocs.org/en/latest/changelog.html#version-2-3-0しかし、実際のサンプル コードはありません。
接続文字列に「?heartbeat=x」を追加するだけでよいですか? あとは昆布がやってくれる?「x/2」で「Connection.heartbeat_check()」を呼び出す必要があるようです。これを呼び出す定期的なタスクを作成する必要がありますか? 接続はどのように再確立されますか?
私は使用しています:
- セロリ==3.0.12
- 昆布==2.5.4
私のコードは今このように見えます。単純な Celery タスクが呼び出され、メッセージがサード パーティの RabbitMQ サーバーに送信されます (ログとコメントを削除して、簡潔にするため、十分に基本的です)。
class SendMessageTask(Task):
name = "campaign.backends.send"
routing_key = "campaign.backends.send"
ignore_result = True
default_retry_delay = 60 # 1 minute.
max_retries = 5
def run(self, send_to, message, **kwargs):
payload = "Testing message"
try:
conn = BrokerConnection(
hostname=HOSTNAME,
port=PORT,
userid=USER_ID,
password=PASSWORD,
virtual_host=VHOST
)
with producers[conn].acquire(block=True) as producer:
publish = conn.ensure(producer, producer.publish, errback=sending_errback, max_retries=3)
publish(
body=payload,
routing_key=OUT_ROUTING_KEY,
delivery_mode=2,
exchange=EXCHANGE,
serializer=None,
content_type='text/xml',
content_encoding = 'utf-8'
)
except Exception, ex:
print ex
助けてくれてありがとう。