Twitter のストリーミング API への接続を作成するタスクを (celery.task.Task をサブクラス化して) 作成しています。Twitter API 呼び出しには、tweepy を使用しています。セロリのドキュメントから読んだように、「タスクはリクエストごとにインスタンス化されるのではなく、グローバル インスタンスとしてタスク レジストリに登録されます。」タスクに対して apply_async (または遅延) を呼び出すたびに、最初にインスタンス化されたタスクにアクセスすることを期待していましたが、それは起こりません。代わりに、カスタム タスク クラスの新しいインスタンスが作成されます。tweepy API 呼び出しによって作成された元の接続を終了できる唯一の方法であるため、元のカスタム タスクにアクセスできる必要があります。
これが役立つ場合のコードは次のとおりです。
from celery import registry
from celery.task import Task
class FollowAllTwitterIDs(Task):
def __init__(self):
# requirements for creation of the customstream
# goes here. The CustomStream class is a subclass
# of tweepy.streaming.Stream class
self._customstream = CustomStream(*args, **kwargs)
@property
def customstream(self):
if self._customstream:
# terminate existing connection to Twitter
self._customstream.running = False
self._customstream = CustomStream(*args, **kwargs)
def run(self):
self._to_follow_ids = function_that_gets_list_of_ids_to_be_followed()
self.customstream.filter(follow=self._to_follow_ids, async=False)
follow_all_twitterids = registry.tasks[FollowAllTwitterIDs.name]
そしてDjangoビューの場合
def connect_to_twitter(request):
if request.method == 'POST':
do_stuff_here()
.
.
.
follow_all_twitterids.apply_async(args=[], kwargs={})
return
どんな助けでも大歓迎です。:D
編集:
質問の追加コンテキストとして、filter() メソッドが呼び出されるたびに CustomStream オブジェクトが httplib.HTTPSConnection インスタンスを作成します。この接続は、別の接続を作成しようとするたびに閉じる必要があります。customstream.running を False に設定すると、接続が閉じられます。