0

Django、Celery、djcelery、および PeriodicTasks に苦労しています。

Adsense のレポートを取得してライブ統計レポートを生成するタスクを作成しました。これが私の仕事です:

import datetime
import httplib2
import logging

from apiclient.discovery import build
from celery.task import PeriodicTask
from django.contrib.auth.models import User
from oauth2client.django_orm import Storage

from .models import Credential, Revenue


logger = logging.getLogger(__name__)


class GetReportTask(PeriodicTask):
    run_every = datetime.timedelta(minutes=2)

    def run(self, *args, **kwargs):
        scraper = Scraper()
        scraper.get_report()


class Scraper(object):
    TODAY = datetime.date.today()
    YESTERDAY = TODAY - datetime.timedelta(days=1)

    def get_report(self, start_date=YESTERDAY, end_date=TODAY):
        logger.info('Scraping Adsense report from {0} to {1}.'.format(
            start_date, end_date))
        user = User.objects.get(pk=1)
        storage = Storage(Credential, 'id', user, 'credential')
        credential = storage.get()
        if not credential is None and credential.invalid is False:
            http = httplib2.Http()
            http = credential.authorize(http)
            service = build('adsense', 'v1.2', http=http)
            reports = service.reports()
            report = reports.generate(
                startDate=start_date.strftime('%Y-%m-%d'),
                endDate=end_date.strftime('%Y-%m-%d'),
                dimension='DATE',
                metric='EARNINGS',
            )
            data = report.execute()
            for row in data['rows']:
                date = row[0]
                revenue = row[1]

                try:
                    record = Revenue.objects.get(date=date)
                except Revenue.DoesNotExist:
                    record = Revenue()
                record.date = date
                record.revenue = revenue
                record.save()
        else:
            logger.error('Invalid Adsense Credentials')

セロリとRabbitMQを使用しています。ここに私の設定があります:

# Celery/RabbitMQ
BROKER_HOST = "localhost"
BROKER_PORT = 5672
BROKER_USER = "myuser"
BROKER_PASSWORD = "****"
BROKER_VHOST = "myvhost"
CELERYD_CONCURRENCY = 1
CELERYD_NODES = "w1"
CELERY_RESULT_BACKEND = "amqp"
CELERY_TIMEZONE = 'America/Denver'
CELERYBEAT_SCHEDULER = 'djcelery.schedulers.DatabaseScheduler'

import djcelery
djcelery.setup_loader()

一見するとすべてが機能しているように見えますが、ロガーをオンにして実行を観察したところ、タスクが少なくとも 4 回連続して実行されていることがわかりました。また、2 分ごとではなく、1 分ごとに実行されているようです。crontab を使用するように run_every を変更しようとしましたが、同じ結果が得られます。

スーパーバイザーを使用してセロリビートを開始しています。これが私が使用するコマンドです:

python manage.py celeryd -B -E -c 1

なぜ期待どおりに機能しないのかについてのアイデアはありますか?

ああ、もう1つ、日が変わった後も、最初に実行した日付範囲を引き続き使用します。そのため、日が進むにつれて、タスクの実行が開始された日の統計が取得され続けます。ある時点でタスクを手動で実行しない限り、最後に手動で実行した日付に変更されます。なぜこれが起こるのか誰か教えてもらえますか?

4

1 に答える 1

1

このタイプのタスク用に 1 つのワーカー プロセスと固定レートで別のキューを作成し、celerybeat から直接タスクを実行する代わりに、この新しいキューにタスクを追加することを検討してください。コードの何が問題なのか、セロリビートの問題なのか、それともタスクが予想よりも長く実行されているのかを理解するのに役立つことを願っています.

@task(queue='create_report', rate_limit='0.5/m')
def create_report():
    scraper = Scraper()
    scraper.get_report()

class GetReportTask(PeriodicTask):
    run_every = datetime.timedelta(minutes=2)

    def run(self, *args, **kwargs):
        create_report.delay()

settings.py で

   CELERY_ROUTES = {
     'myapp.tasks.create_report': {'queue': 'create_report'},
   }

キュー内のタスクを処理する追加のセロリワーカーを開始します

セロリ ワーカー -c 1 -Q create_report -n create_report.local

問題 2. YESTERDAY 変数と TODAY 変数がクラス レベルで設定されているため、1 つのスレッド内で 1 回だけ設定されます。

于 2013-04-10T14:56:55.480 に答える