3

これはここからのフォローアップクエストです。

Django 1.3.1、セロリ2.2.7、python2.6。

私は次のものを持っていますfruits/models.py

モデルを考えてみましょう。

class Fruit(models.Model):
    name = models.CharField(max_length=50)

    def __unicode__(self):
        return self.name

そして、次のfruits/tasks.py

from django.dispatch import receiver
from django.db.models import signals

from celery.task import periodic_task, task
import fruits.models as m
import time

@task()
def check_fruit(id):
    time.sleep(2)
    try:
        fruit = m.Fruit.objects.get(pk=id)
        print "Fruit %s is found!" % fruit.name
    except m.Fruit.DoesNotExist:
        print "no such fruit"

@receiver(signals.pre_save, sender=m.Fruit, dispatch_uid="on_fruit_save")
def on_custom_feed_save(sender, instance, **kwargs):
    check_fruit.apply_async(args=[instance.id])

セロリデーモンを起動し、djangoシェルを開いて次のように入力します。

import fruits.tasks;
import fruits.models as m;
m.Fruit(name="plum").save()

質問:私はその仕事が実を結ぶことを期待しますが、決してそうはなりません。なんで?

(大規模なシステムで発生する問題をシミュレートするために、意図的に事前保存信号からタスクを起動しています)。

4

3 に答える 3

-1

今週、私はDjangoなしでCeleryを使用していたことを除いて、非常によく似た問題に遭遇しました。送信者パラメーターは、単に送信者クラス自体への参照ではなく、タスクのインスタンスが渡されたときにのみ機能することがわかりました。

問題を少し調べたところ、senderパラメーターがceleryによって使用され、特定のタスクのIDと、シグナルの登録された送信者(特定の送信者をフィルターするように設定されている)を比較していることがわかりました。

celery / utils / dispatch / signal.pyモジュールでは、以下がこの評価を実行します。

def _make_id(target):  # pragma: no cover
    if hasattr(target, 'im_func'):
        return (id(target.im_self), id(target.im_func))
    return id(target) 

最初に、いくつかのターゲットオブジェクトのIDがフェッチされます。デコレータで送信者を指定すると、次のようなタプルで保存されます。

lookup_key = (_make_id(receiver), _make_id(sender))

その後、タスクが起動すると、 _live_receiversメソッドが呼び出されます。このメソッドは、基本的に、lookup_keyで指定されたターゲット送信者が現在の送信者のIDと一致するかどうかを確認するための評価を実行します(起動タスク)。

def _live_receivers(self, senderkey):
        """Filter sequence of receivers to get resolved, live receivers.

        This checks for weak references and resolves them, then returning only
        live receivers.

        """
        none_senderkey = _make_id(None)
        receivers = []

        for (receiverkey, r_senderkey), receiver in self.receivers:
            if r_senderkey == none_senderkey or r_senderkey == senderkey:
                if isinstance(receiver, WEAKREF_TYPES):
                    # Dereference the weak reference.
                    receiver = receiver()
                    if receiver is not None:
                        receivers.append(receiver)
                else:
                    receivers.append(receiver)
        return receivers

今私が経験した問題は、信号を受信したいタスクを指定したにもかかわらず、それが起動されたときに送信者キーが一致しなかったということでした。

これを正しく機能させる唯一の方法__init__は、特定のタスク自体のために構築した抽象タスククラスのメソッド内からシグナルを登録することでした。これにより、Celeryがselfタスクに登録した正確なインスタンス()を渡すことができました。

この問題が私のロジックにあるのか、シグナルがどのように機能するのかを理解しているのか、それともCeleryのバグなのかはわかりませんが、タスクインスタンスを渡すと問題が修正され、その後はすべて正常に機能しました。

注意すべき点:

  1. 私はDjangoを使用していなかったため、Django-Celery拡張機能を使用していませんでした
  2. 問題がCeleryにあるとは確信していません(ロジックを間違えたか、どこかで何かを誤解した可能性が高いです)
  3. Django-CeleryがCeleryバックエンドの動作方法をどのように変更するかわからないため、このケースがあなたにも当てはまるかどうかはわかりません。

それでも、これがお役に立てば幸いです。

幸運を!

于 2012-08-29T17:02:46.553 に答える
-1

古い質問と問題はおそらく今では解決されていますが、将来の訪問者のために...

セロリ ワーカーが実行時間の長いトランザクションを開いているように聞こえますが、コミットされていない場合、新しいオブジェクトは表示されません。タスクで DB クエリを実行する前に、これを追加してみてください。

from django.db import transaction
transaction.commit()

Django 1.6 ではトランザクション管理にいくつかの変更が加えられていますが、これを書いている時点ではまだリリースされていません。

于 2013-09-05T08:39:37.803 に答える