24

私はFlaskを使用して、 k8055USBインターフェイスボード用のシンプルなWebAPIを提供してきました。かなり標準的なゲッターとパター、そしてフラスコは本当に私の人生をずっと楽にしてくれました。

しかし、ホエイが発生したときの状態の変化を/近くに登録できるようにしたいと思います。

たとえば、ボードにボタンが接続されている場合、その特定のポートのAPIをポーリングできます。しかし、誰かがAPIと話しているかどうかに関係なく、出力に出力を直接反映させたい場合は、次のようになります。

while True:
    board.read()
    board.digital_outputs = board.digital_inputs
    board.read()
    time.sleep(1)

そして毎秒、出力は入力と一致するように更新されます。

Flaskの下でこの種のことを行う方法はありますか?私は以前にツイストで同様のことをしましたが、Flaskはこの特定のアプリケーションにはまだそれをあきらめるにはあまりにも便利です...

ありがとう。

4

3 に答える 3

32

私のFlaskアプリケーションでは、Pashkaが彼の回答スケジュールライブラリ、およびAPSchedulerで説明しているcronアプローチを使用することを検討しました。

APSchedulerはシンプルで、定期的なタスク実行の目的を果たしていることがわかったので、APSchedulerを使用しました。

コード例:

from flask import Flask

from apscheduler.schedulers.background import BackgroundScheduler


app = Flask(__name__)

def test_job():
    print('I am working...')

scheduler = BackgroundScheduler()
job = scheduler.add_job(test_job, 'interval', minutes=1)
scheduler.start()
于 2018-01-03T08:34:21.187 に答える
15

単純なタスクにはcronを使用できます。

タスクのフラスコビューを作成します。

# a separate view for periodic task
@app.route('/task')
def task():
    board.read()
    board.digital_outputs = board.digital_inputs

次に、cronを使用して、そのURLから定期的にダウンロードします

# cron task to run each minute
0-59 * * * * run_task.sh

run_task.shの内容はどこにありますか

wget http://localhost/task

Cronは、1分に1回より頻繁に実行することはできません。より高い頻度が必要な場合(たとえば、5秒ごと= 1分あたり12回)、次のようにtun_task.shで実行する必要があります。

# loop 12 times with a delay
for i in 1 2 3 4 5 6 7 8 9 10 11 12
do
    # download url in background for not to affect delay interval much
    wget -b http://localhost/task
    sleep 5s
done
于 2013-09-26T12:06:07.787 に答える
1

Flaskにはタスクのサポートはありませんが、flask-celeryを使用することも、関数を別のスレッド(グリーンレット)で実行することもできます。

于 2012-08-04T17:19:41.207 に答える