2

ワーカーに一度に 1 つのタスクのみを実行させてからシャットダウンしようとしています。シャットダウン部分は正しく機能しています (ここにいくつかの背景があります: task_postrun シグナルで SystemExit を上げてワーカーをシャットダウンしようとしているセロリですが、常にハングし、メインプロセスは決して終了しません)、シャットダウンするとエラーが発生します:

[2013-02-13 12:19:05,689: CRITICAL/MainProcess] Couldn't ack 1, reason:AttributeError("'NoneType' object has no attribute 'method_writer'",)
Traceback (most recent call last):
  File "/usr/local/lib/python2.7/site-packages/kombu/transport/base.py", line 104, in ack_log_error
    self.ack()
  File "/usr/local/lib/python2.7/site-packages/kombu/transport/base.py", line 99, in ack
    self.channel.basic_ack(self.delivery_tag)
  File "/usr/local/lib/python2.7/site-packages/amqplib/client_0_8/channel.py", line 1742, in basic_ack
    self._send_method((60, 80), args)
  File "/usr/local/lib/python2.7/site-packages/amqplib/client_0_8/abstract_channel.py", line 75, in _send_method
    self.connection.method_writer.write_method(self.channel_id,
AttributeError: 'NoneType' object has no attribute 'method_writer'

なぜこうなった?確認応答しないだけでなく、キューに残っている他のすべてのタスクも削除します (大きな問題)。

これを修正するにはどうすればよいですか?





アップデート

以下は、すべてが更新されたスタック トレースです (pip install -U kombu amqp amqplib celery):

[2013-02-13 11:58:05,357: CRITICAL/MainProcess] Internal error: AttributeError("'NoneType' object has no attribute 'method_writer'",)
Traceback (most recent call last):
  File "/usr/local/lib/python2.7/dist-packages/celery/worker/__init__.py", line 372, in process_task
    req.execute_using_pool(self.pool)
  File "/usr/local/lib/python2.7/dist-packages/celery/worker/job.py", line 219, in execute_using_pool
    timeout=task.time_limit)
  File "/usr/local/lib/python2.7/dist-packages/celery/concurrency/base.py", line 137, in apply_async
    **options)
  File "/usr/local/lib/python2.7/dist-packages/celery/concurrency/base.py", line 27, in apply_target
    callback(target(*args, **kwargs))
  File "/usr/local/lib/python2.7/dist-packages/celery/worker/job.py", line 333, in on_success
    self.acknowledge()
  File "/usr/local/lib/python2.7/dist-packages/celery/worker/job.py", line 439, in acknowledge
    self.on_ack(logger, self.connection_errors)
  File "/usr/local/lib/python2.7/dist-packages/kombu/transport/base.py", line 98, in ack_log_error
    self.ack()
  File "/usr/local/lib/python2.7/dist-packages/kombu/transport/base.py", line 93, in ack
    self.channel.basic_ack(self.delivery_tag)
  File "/usr/local/lib/python2.7/dist-packages/amqp/channel.py", line 1562, in basic_ack
    self._send_method((60, 80), args)
  File "/usr/local/lib/python2.7/dist-packages/amqp/abstract_channel.py", line 57, in _send_method
    self.connection.method_writer.write_method(
AttributeError: 'NoneType' object has no attribute 'method_writer'
4

1 に答える 1

0

task_postrun は「タスク本体」エラー処理の外で実行されるため、task_postrun で終了することはお勧めしません。

タスクが sys.exit を呼び出したときに何が起こるかは明確に定義されておらず、実際には使用されているプールに依存します。

multiprocessing を使用すると、子プロセスは単純に新しいプロセスに置き換えられます。他のプールではワーカーはシャットダウンしますが、これはマルチプロセッシング動作と一致するように変更される可能性が高いものです。

タスク本体の外で exit を呼び出すと、内部エラー (クラッシュ) と見なされます。

「タスク本体」は、実行されるものですtask.__call__()

これに対するより良い解決策は、カスタム実行戦略を使用することだと思います。

from celery.worker import strategy
from functools import wraps

@staticmethod
def shutdown_after_strategy(task, app, consumer):

    default_handler = strategy.default(task, app, consumer)

    def _shutdown_to_exit_after(fun):
        @wraps(fun)
        def _inner(*args, **kwargs):
            try:
                return fun(*args, **kwargs)
            finally:
                raise SystemExit()
       return _inner
    return _decorate_to_exit_after(default_handler)

@celery.task(Strategy=shutdown_after_strategy)
def shutdown_after():
    print('will shutdown after this')

これは正確には美しいとは言えませんが、タスクの実行を最適化し、簡単に拡張できないようにするための実行戦略があります (ワーカーは、Task.Strategy をキャッシュすることによって、各タスク タイプの実行パスを「プリコンパイル」します)。

Celery 3.1 では、「ブートステップ」を使用してワーカーとコンシューマーを拡張できるため、適切な解決策が得られる可能性があります。

于 2013-02-14T12:57:36.763 に答える