42

ピラミッドで作成され、gunicorn+nginx を介して提供される Web アプリがあります。8 つのワーカー スレッド/プロセスで動作します

仕事が必要だったので、apscheduler を選択しました。これが私たちがそれを起動する方法です

from apscheduler.events import EVENT_JOB_EXECUTED, EVENT_JOB_ERROR
from apscheduler.scheduler import Scheduler

rerun_monitor = Scheduler()
rerun_monitor.start()
rerun_monitor.add_interval_job(job_to_be_run,\
            seconds=JOB_INTERVAL)

問題は、gunicorn のすべてのワーカー プロセスがスケジューラを選択することです。ファイル ロックを実装しようとしましたが、十分な解決策ではないようです。特定の時点でワーカー プロセスの 1 つだけがスケジュールされたイベントを取得し、他のスレッドが次までそれを取得しないようにするための最良の方法は何でしょうJOB_INTERVALか?

後で apache2+modwsgi に切り替える場合に備えて、このソリューションは mod_wsgi でも機能する必要があります。ウェイトレスである単一プロセス開発サーバーで動作する必要があります。

バウンティ スポンサーからの最新情報

Djangoアプリだけで、OPで説明されているのと同じ問題に直面しています。元の質問の場合、この詳細を追加してもあまり変わらないと確信しています。このため、もう少し可視性を高めるために、この質問にもタグを付けましたdjango

4

3 に答える 3

47

Gunicorn は 8 つのワーカーで開始されるため (この例では)、これはアプリを 8 回フォークして 8 つのプロセスにします。これらの 8 つのプロセスはマスタープロセスからフォークされ、各プロセスのステータスを監視し、ワーカーを追加/削除する機能を備えています。

各プロセスは、最初はマスター プロセスの APScheduler の正確なコピーである APScheduler オブジェクトのコピーを取得します。これにより、「n 番目」の各ワーカー (プロセス) が各ジョブを合計で「n」回実行することになります。

これを回避するハックは、以下のオプションで gunicorn を実行することです:

env/bin/gunicorn module_containing_app:app -b 0.0.0.0:8080 --workers 3 --preload

この--preloadフラグは、Gunicorn に「ワーカー プロセスを fork する前にアプリをロードする」ように指示します。そうすることで、各ワーカーには「アプリ自体をインスタンス化するのではなく、マスターによって既にインスタンス化されたアプリのコピーが与えられます」。これは、次のコードがマスター プロセスで 1 回だけ実行されることを意味します。

rerun_monitor = Scheduler()
rerun_monitor.start()
rerun_monitor.add_interval_job(job_to_be_run,\
            seconds=JOB_INTERVAL)

さらに、ジョブストアを :memory: 以外のものに設定する必要がありますこのように、各ワーカーは他の 7 つのワーカーと通信できない独自の独立したプロセスですが、(メモリではなく) ローカル データベースを使用することで、単一のワーカーを保証します。ジョブストアでの CRUD 操作の -point-of-truth。

from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore

rerun_monitor = Scheduler(
    jobstores={'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')})
rerun_monitor.start()
rerun_monitor.add_interval_job(job_to_be_run,\
            seconds=JOB_INTERVAL)

最後に、 BackgroundSchedulerの実装により、BackgroundScheduler を使用したいと考えていますstart()。BackgroundSchedulerを呼び出すstart()と、新しいスレッドがバックグラウンドで起動され、ジョブのスケジューリング/実行を担当します。これは重要です。なぜなら、ステップ (1) で覚えているように、--preloadフラグによりstart()​​、Master Gunicorn プロセスで関数を 1 回だけ実行するからです。定義上、フォークされたプロセスはその親のスレッドを継承しないため、各ワーカーは BackgroundScheduler スレッドを実行しません。

from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore

rerun_monitor = BackgroundScheduler(
    jobstores={'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')})
rerun_monitor.start()
rerun_monitor.add_interval_job(job_to_be_run,\
            seconds=JOB_INTERVAL)

このすべての結果として、すべての Gunicorn ワーカーは、だまされて「STARTED」状態になった APScheduler を持っていますが、親のスレッドをドロップするため、実際には実行されていません! 各インスタンスは、ジョブを実行するだけでなく、ジョブストア データベースを更新することもできます。

APSchedulerを Web サーバー (Gunicorn など) で実行し、各ジョブの CRUD 操作を有効にする簡単な方法については、flask-APSchedulerを確認してください。

于 2016-10-20T18:47:56.910 に答える
22

非常によく似た問題を持つ Django プロジェクトで機能する修正を見つけました。スケジューラーの初回起動時に TCP ソケットをバインドし、その後それをチェックするだけです。次のコードは、マイナーな調整でうまくいくと思います。

import sys, socket

try:
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.bind(("127.0.0.1", 47200))
except socket.error:
    print "!!!scheduler already started, DO NOTHING"
else:
    from apscheduler.schedulers.background import BackgroundScheduler
    scheduler = BackgroundScheduler()
    scheduler.start()
    print "scheduler started"
于 2014-12-04T20:52:03.667 に答える