私は Django 1.4 と Celery 3.0 (rabbitmq) を使用して、Twitter API 1.1 へのクエリを取得およびキャッシュするためのタスクの集合を構築しています。私が実装しようとしているのは、タスクのチェーンです。最後のタスクは、これまでの応答と最近取得した応答の応答データに基づいて、2 ノード前のタスクを再帰的に呼び出します。具体的には、これにより、アプリはユーザーのタイムラインをトラバースできます (最大 3200 ツイート)。これは、特定のリクエストが最大 200 ツイートしか生成できないことを考慮に入れています (Twitter API の制限)。
私の tasks.py の主要コンポーネントはここで確認できますが、貼り付ける前に、Python シェルから呼び出しているチェーンを示します (ただし、最終的には、最終的な Web アプリでユーザー入力を介して起動されます)。与えられた:
>>request(twitter_user_id='#1010101010101#,
total_requested=1000,
max_id = random.getrandbits(128) #e.g. arbitrarily large number)
電話する:
>> res = (twitter_getter.s(request) |
pre_get_tweets_for_user_id.s() |
get_tweets_for_user_id.s() |
timeline_recursor.s()).apply_async()
重要なことは、timeline_recursor が可変数の get_tweets_for_user_id サブタスクを開始できることです。timeline_recursor が基本ケースにある場合、ここで定義されているように応答 dict を返す必要があります。
@task(rate_limit=None)
def timeline_recursor(request):
previous_tweets=request.get('previous_tweets', None) #If it's the first time through, this will be None
if not previous_tweets:
previous_tweets = [] #so we initiate to empty array
tweets = request.get('tweets', None)
twitter_user_id=request['twitter_user_id']
previous_max_id=request['previous_max_id']
total_requested=request['total_requested']
pulled_in=request['pulled_in']
remaining_requested = total_requested - pulled_in
if previous_max_id:
remaining_requested += 1 #this is because cursored results will always have one overlapping id
else:
previous_max_id = random.getrandbits(128) # for first time through loop
new_max_id = min([tweet['id'] for tweet in tweets])
test = lambda x, y: x<y
if remaining_requested < 0: #because we overshoot by requesting batches of 200
remaining_requested = 0
if tweets:
previous_tweets.extend(tweets)
if tweets and remaining_requested and (pulled_in > 1) and test(new_max_id, previous_max_id):
request = dict(user_pk=user_pk,
twitter_user_id=twitter_user_id,
max_id = new_max_id,
total_requested = remaining_requested,
tweets=previous_tweets)
#problem happens in this part of the logic???
response = (twitter_getter_config.s(request) | get_tweets_for_user_id.s() | timeline_recursor.s()).apply_async()
else: #if in base case, combine all tweets pulled in thus far and send back as "tweets" -- to be
#saved in db or otherwise consumed
response = dict(
twitter_user_id=twitter_user_id,
total_requested = total_requested,
tweets=previous_tweets)
return response
したがって、res.result に対する私の予想される応答は、Twitter ユーザー ID、要求されたツイート数、および連続する呼び出しで取得された一連のツイートで構成される辞書です。しかし、再帰的なタスクランドではすべてがうまくいっているわけではありません。上記のチェーンを実行するときに、チェーンを開始した直後にres.statusを入力すると、「成功」と表示されますが、セロリ ワーカーのログ ビューでは、Twitter API への連鎖再帰呼び出しが行われていることがわかります。予想通り、正しいパラメータで。また、チェーンされたタスクが実行されているときでも、すぐに result.result を実行できます。res.result は AsyncResponse インスタンス ID を生成します。再帰的にチェーンされたタスクの実行が終了した後でも、res.result は AsyncResult id のままです。
一方、res.result.result.result.result['tweets'] にアクセスすると、完全なツイートのセットにアクセスできます。チェーン化されたチェーン化されたサブタスクのそれぞれが実際に発生していると推測できますが、res.result に期待される結果が得られない理由がわかりません。timeline_recursor がその基本ケースを取得したときに発生するはずの再帰的なリターンが、期待どおりに伝播していないようです。
何ができるかについて何か考えはありますか?Celery での再帰は非常に強力になりますが、少なくとも私には、Celery を利用する再帰と再帰関数をどのように考えるべきか、またこれがチェーンされたタスクの return ステートメントのロジックにどのように影響するかは完全には明らかではありません。
必要に応じて明確にさせていただきます。アドバイスをお寄せいただきありがとうございます。