2

というピラミッドアプリがありmainsiteます。

このサイトは、ほとんどの場合、バックエンド操作を実行するためにビューから起動されるスレッドを通じて、かなり非同期で動作します。

sqlalchemy で mysql に接続し、ZopeTransactionExtension を使用してセッション管理を行います。

これまでのところ、アプリケーションはうまく機能しています。

その上で定期的なジョブを実行する必要があり、ビューから起動されているのと同じ非同期関数のいくつかを使用する必要があります。

私はapschedulerを使用しましたが、それで問題が発生しました。そこで、mainappをライブラリとして扱い、使用する関数をインポートする別のプロセスとしてセロリビートを使用することを考えました。

私のセロリの設定は次のようになります。

from datetime import timedelta
from api.apiconst import RERUN_CHECK_INTERVAL, AUTOMATION_CHECK_INTERVAL, \
    AUTH_DELETE_TIME

BROKER_URL = 'sqla+mysql://em:em@localhost/edgem'
CELERY_RESULT_BACKEND = "database"
CELERY_RESULT_DBURI = 'mysql://em:em@localhost/edgem'

CELERYBEAT_SCHEDULE = {
    'rerun': {
        'task': 'tasks.rerun_scheduler',
        'schedule': timedelta(seconds=RERUN_CHECK_INTERVAL)
    },
    'automate': {
        'task': 'tasks.automation_scheduler',
        'schedule': timedelta(seconds=20)
    },
    'remove-tokens': {
        'task': 'tasks.token_remover_scheduler',
        'schedule': timedelta(seconds=2 * 24 * 3600 )
    },
}

CELERY_TIMEZONE = 'UTC'

tasks.py は

from celery import Celery
celery = Celery('tasks')
celery.config_from_object('celeryconfig')


@celery.task
def rerun_scheduler():
    from mainsite.task import check_update_rerun_tasks
    check_update_rerun_tasks()


@celery.task
def automation_scheduler():
    from mainsite.task import automate
    automate()


@celery.task
def token_remover_scheduler():
    from mainsite.auth_service import delete_old_tokens
    delete_old_tokens()

上記のすべての関数はすぐに戻りますが、必要に応じてスレッドを起動することに注意してください

スレッドは、オブジェクトを db に保存しますtransaction.commit() after session.add(object)

問題は、全体が宝石のように機能するのが約 30 分間だけであることです。その後、.ResourceClosedError: The transaction is closedがある場所ならどこでもエラーが発生し始めtransaction.commit()ます。何が問題なのかわからないので、トラブルシューティングのサポートが必要です。

タスク内にインポートする理由は、このエラーを取り除くためです。タスクを実行する必要があるたびにインポートするのは良い考えであり、毎回新しいトランザクションを取得する可能性がありますが、そうではないようです。

4

1 に答える 1

9

私の経験では、Pyramid (ZopeTransactionExtension など) で使用するように構成されたセッションを Celery ワーカーで再利用しようとすると、デバッグが非常に困難な混乱が生じます。

ZopeTransactionExtension は、SQLAlchemy セッションを Pyramid の要求応答サイクルにバインドします - トランザクションは自動的に開始され、コミットまたはロールバックされます。通常、コード内で transaction.commit() を使用することは想定されていません - すべてが問題なければ、ZTE はすべてをコミットします。コードが発生し、例外が発生すると、トランザクションはロールバックされます。

Celery では、SQLAlchemy セッションを手動で管理する必要がありますが、ZTE ではこれを行うことができないため、DBSession別の方法で構成する必要があります。

次のような単純なものが機能します。

DBSession = None

def set_dbsession(session):
    global DBSession
    if DBSession is not None:
        raise AttributeError("DBSession has been already set to %s!" % DBSession)

    DBSession = session

そして、あなたが行う Pyramid 起動コードから

def main(global_config, **settings):
    ...
    set_dbsession(scoped_session(sessionmaker(extension=ZopeTransactionExtension())))

Celery の場合は少し複雑です。最終的に、Celery 用のカスタム開始スクリプトを作成し、セッションを構成しました。

setup.pyの中:worker

  entry_points="""
  # -*- Entry points: -*-
  [console_scripts]
  custom_celery = worker.celeryd:start_celery
  custom_celerybeat = worker.celeryd:start_celerybeat
  """,
  )

worker/celeryd.py:

def initialize_async_session(db_string, db_echo):

    import sqlalchemy as sa
    from db import Base, set_dbsession

    session = sa.orm.scoped_session(sa.orm.sessionmaker(autoflush=True, autocommit=True))
    engine = sa.create_engine(db_string, echo=db_echo)
    session.configure(bind=engine)

    set_dbsession(session)
    Base.metadata.bind = engine


def start_celery():
    initialize_async_session(DB_STRING, DB_ECHO)
    import celery.bin.celeryd
    celery.bin.celeryd.main()

「バックエンド操作を実行するためにビューから起動されるスレッド」で使用している一般的なアプローチは、アプリケーションを本番サーバーにデプロイすることを計画している場合、私には少し危険に感じます-Webサーバーはしばしばリサイクル、強制終了、または作成します新しい「ワーカー」であるため、一般に、特定の各プロセスが現在の要求と応答のサイクルを超えて存続するという保証はありません。私はこれを試したことがないので、多分大丈夫でしょう:)

于 2013-05-02T19:53:11.083 に答える