2

セロリタスクがスコープセッションを使用してmysqlと対話するアプリでsqlalchemyを使用しています。

db.py

engine = create_engine(
    'mysql://root:pass@localhost/testdb?charset=utf8&use_unicode=0',
    pool_recycle=3600)

db_session = scoped_session(
    sessionmaker(autocommit=False, autoflush=False, bind=engine))

定期的に実行するようにスケジュールされているセロリ タスクを使用し (セロリ ビートを使用)、セッションを使用するタスクの TaskSet を作成します。

@celery.task
def create_tasks():    
    tasks = []
    tasks.append(charge_now.subtask((tid,)))
    job = TaskSet(tasks)
    job.apply_async()

以下は、TaskSetが構成されているタスクです

@celery.task
def charge_now(tid):
    transaction = db_session.query(TransactionInfo).get(tid)

(後で挿入を行っています。わかりやすくするために、コードからそれといくつかのロジックを省略しました)

以下は私が得るエラーです

タスク tasks.charge_now[bf199346-0862-4709-95b8-053859ed5a6f] で例外が発生しました: ResourceClosedError('この結果オブジェクトは行を返しません。自動的に閉じられました。',)

同じセッションを共有しようとする複数のスレッドによって作成された問題であることは理解していますが、誰かがこれを修正する方法を指摘できますか?

4

0 に答える 0