1

Twitter のストリーミング API への接続を作成するタスクを (celery.task.Task をサブクラス化して) 作成しています。Twitter API 呼び出しには、tweepy を使用しています。セロリのドキュメントから読んだように、「タスクはリクエストごとにインスタンス化されるのではなく、グローバル インスタンスとしてタスク レジストリに登録されます。」タスクに対して apply_async (または遅延) を呼び出すたびに、最初にインスタンス化されたタスクにアクセスすることを期待していましたが、それは起こりません。代わりに、カスタム タスク クラスの新しいインスタンスが作成されます。tweepy API 呼び出しによって作成された元の接続を終了できる唯一の方法であるため、元のカスタム タスクにアクセスできる必要があります。

これが役立つ場合のコードは次のとおりです。

from celery import registry
from celery.task import Task

class FollowAllTwitterIDs(Task):
    def __init__(self):
        # requirements for creation of the customstream
        # goes here. The CustomStream class is a subclass
        # of tweepy.streaming.Stream class

        self._customstream = CustomStream(*args, **kwargs)

    @property
    def customstream(self):
        if self._customstream:
            # terminate existing connection to Twitter
            self._customstream.running = False
        self._customstream = CustomStream(*args, **kwargs)

    def run(self):
        self._to_follow_ids = function_that_gets_list_of_ids_to_be_followed()

        self.customstream.filter(follow=self._to_follow_ids, async=False)
follow_all_twitterids = registry.tasks[FollowAllTwitterIDs.name]

そしてDjangoビューの場合

def connect_to_twitter(request):
    if request.method == 'POST':
        do_stuff_here()
        .
        .
        .

        follow_all_twitterids.apply_async(args=[], kwargs={})

     return

どんな助けでも大歓迎です。:D

編集:

質問の追加コンテキストとして、filter() メソッドが呼び出されるたびに CustomStream オブジェクトが httplib.HTTPSConnection インスタンスを作成します。この接続は、別の接続を作成しようとするたびに閉じる必要があります。customstream.running を False に設定すると、接続が閉じられます。

4

1 に答える 1

0

タスクは一度だけインスタンス化する必要があります。何らかの理由でインスタンス化されていないと思われる場合は、追加することをお勧めします

print("INSTANTIATE") インポート トレースバック traceback.print_stack()

これによりTask.__init__、これがどこで発生するかがわかります。

あなたの仕事は次のように表現する方が良いと思います:

from celery.task import Task, task

class TwitterTask(Task):
    _stream = None
    abstract = True

    def __call__(self, *args, **kwargs):
        try:
            return super(TwitterTask, self).__call__(stream, *args, **kwargs)
        finally:
            if self._stream:
                self._stream.running = False

    @property
    def stream(self):
        if self._stream is None:
            self._stream = CustomStream()
        return self._stream

@task(base=TwitterTask)
def follow_all_ids():
    ids = get_list_of_ids_to_follow()
    follow_all_ids.stream.filter(follow=ids, async=false)
于 2011-10-26T15:47:29.403 に答える