問題
システムの 1 つのキュー ハブとして機能するRabbitMQ サーバーがあります。先週かそこらで、そのプロデューサーは数時間ごとに完全に停止します.
私は何を試しましたか
強引な
- コンシューマーを停止すると、ロックが数分間解放されますが、その後ブロックが戻ります。
- RabbitMQ を再起動すると、問題は数時間解決しました。
- 醜い再起動を行う自動スクリプトがいくつかありますが、明らかに適切な解決策にはほど遠いものです。
より多くのメモリを割り当てる
cantSleepNow の答えに従って、RabbitMQ に割り当てられたメモリを 90% に増やしました。サーバーにはなんと 16GB のメモリがあり、メッセージ数はそれほど多くない (1 日あたり数百万) ため、問題にはならないようです。
コマンドラインから:
sudo rabbitmqctl set_vm_memory_high_watermark 0.9
そして/etc/rabbitmq/rabbitmq.config
:
[
{rabbit,
[
{loopback_users, []},
{vm_memory_high_watermark, 0.9}
]
}
].
コードとデザイン
私はすべてのコンシューマーとプロデューサーに Python を使用しています。
生産者
プロデューサーは、呼び出しを処理する API サーバーです。通話が着信すると、接続が開かれ、メッセージが送信され、接続が閉じられます。
from kombu import Connection
def send_message_to_queue(host, port, queue_name, message):
"""Sends a single message to the queue."""
with Connection('amqp://guest:guest@%s:%s//' % (host, port)) as conn:
simple_queue = conn.SimpleQueue(name=queue_name, no_ack=True)
simple_queue.put(message)
simple_queue.close()
消費者
コンシューマーはそれぞれ若干異なりますが、一般的に次のパターンを使用します - 接続を開き、メッセージが到着するまで待機します。接続は、長期間 (たとえば、数日) 開いたままにすることができます。
with Connection('amqp://whatever:whatever@whatever:whatever//') as conn:
while True:
queue = conn.SimpleQueue(queue_name)
message = queue.get(block=True)
message.ack()
設計推論
- コンシューマーは、常にキュー サーバーとの接続を開いたままにしておく必要があります。
- Producer セッションは、API 呼び出しの存続期間中のみ存続する必要があります。
この設計では、約 1 週間前まで問題は発生していませんでした。
Web ビュー ダッシュボード
Web コンソールは、 のコンシューマーが、、およびからのコンシューマー127.0.0.1
を172.31.38.50
ブロックしていることを示しています。172.31.38.50
172.31.39.120
172.31.41.38
172.31.41.38
システム指標
念のため、サーバーの負荷を確認しました。予想どおり、負荷平均と CPU 使用率の指標は低くなっています。
ウサギの MQ がそれぞれこのようなデッドロックになるのはなぜですか?