3

私は自分のアプリでドロップボックスとセロリを組み合わせました。これにより、ユーザーがドロップボックスを接続している場合、ユーザーが自分の写真を保存できるようになります。

コードを書きましたが、これが無限ループに陥り、システムが停止するのではないかと心配しています。

私が利用している API では、一度に 60 枚の写真しか許可されず、ページネーションが提供されます。

これが私の tasks.py ファイルのコピーです。これは実際には正常に動作しますが、正しいことを行っており、システムにあまり影響を与えていないことを確認したいと思います。

class DropboxUsers(PeriodicTask):
    run_every = timedelta(hours=4)

    def run(self, **kwargs):
        logger = self.get_logger(**kwargs)
        logger.info("Collecting Dropbox users")

        dropbox_users = UserSocialAuth.objects.filter(provider='dropbox')
        for db in dropbox_users:
            ...
            ...
            ...
            sync_images.delay(first, second, third_argument)
        return True


@task(ignore_result=True)
def sync_images(token, secret, username):
    """docstring for sync_images"""
    logger = sync_images.get_logger()
    logger.info("Syncing images for %s" % username)
    ...
    ...
    ...
    ...
    feed = api.user_recent_media(user_id='self', count=60)
    images = feed[0]
    pagination = feed[1]
    for obj in images:
        ### STORE TO DROPBOX
        ...
        ...
        ...
        response = dropbox.put_file(f, my_picture, overwrite=True)
    ### CLOSE DB SESSION
    sess.unlink()
    if pagination:
        store_images.delay(first, second, third, fourth_argument)

@task(ignore_result=True)
def store_images(token, secret, username, max_id):
    """docstring for sync_images"""
    logger = store_images.get_logger()
    logger.info("Storing images for %s" % username)
    ...
    ...
    ...
    ...
    feed = api.user_recent_media(user_id='self', count=60, max_id=max_id)
    images = feed[0]
    try:
        pagination = feed[1]
    except:
        pagination = None
    for obj in images:
        ### STORE TO DROPBOX
        ...
        ...
        ...
        response = dropbox.put_file(f, my_picture, overwrite=True)
    ### CLOSE DB SESSION
    sess.unlink()
    if pagination:
        ### BASICALLY RESTART THE TASK WITH NEW ARGS
        store_images.delay(first, second, third, fourth_argument)

    return True

あなたの専門知識は大歓迎です。

4

1 に答える 1

1

大きな問題は見当たりません。また、タスクが別のタスクを開始するシステムも実装しました。

しばらくの間、サーバーの再起動時にセロリがタスクを複製するという問題がありました。同じ引数を持つ同じタスクが頻繁に実行されないようにするために、キャッシング バックエンドを使用するタスクをラップするデコレータを作成しました。無限ループに対するヘッジとして役立つ場合があります。

from django.core.cache import cache as _djcache
from django.utils.functional import wraps

class cache_task(object):

    """ Makes sure that a task is only run once over the course of a configurable
    number of seconds. Useful for tasks that get queued multiple times by accident,
    or on service restart, etc. Uses django's cache (memcache) to keep track."""

    def __init__(self, seconds=120, minutes=0, hours=0):
        self.cache_timeout_seconds = seconds + 60 * minutes + 60 * 60 * hours

    def __call__(self, task):
        task.unsynchronized_run = task.run
        @wraps(task.unsynchronized_run)
        def wrapper(*args, **kwargs):
            key = sha1(str(task.__module__) + str(task.__name__) + str(args) + str(kwargs)).hexdigest()
            is_cached = _djcache.get(key)
            if not is_cached:
                # store the cache BEFORE to cut down on race conditions caused by long tasks
                if self.cache_timeout_seconds:
                    _djcache.set(key, True, self.cache_timeout_seconds)
                task.unsynchronized_run(*args, **kwargs)
        task.run = wrapper
        return task

使用法:

@cache_task(hours=2)
@task(ignore_result=True)
def store_images(token, secret, username, max_id):
   ...
于 2012-06-08T19:04:01.023 に答える