95

フラスコを試すために小さなゲームサーバーを書くのに忙しい。ゲームはRESTを介してAPIをユーザーに公開します。"game world"ユーザーがアクションを実行したりデータをクエリしたりするのは簡単ですが、ゲームエンティティなどを更新するためにループの外側にサービスを提供したいと思いますapp.run()。Flaskが非常にクリーンに実装されているので、Flaskの方法があるかどうかを確認したいと思います。これ。

4

3 に答える 3

97

追加のスレッドは、WSGIサーバーによって呼び出されるのと同じアプリから開始する必要があります。

以下の例では、5秒ごとに実行され、Flaskでルーティングされた関数でも使用できるデータ構造を操作するバックグラウンドスレッドを作成します。

import threading
import atexit
from flask import Flask

POOL_TIME = 5 #Seconds
    
# variables that are accessible from anywhere
commonDataStruct = {}
# lock to control access to variable
dataLock = threading.Lock()
# thread handler
yourThread = threading.Thread()

def create_app():
    app = Flask(__name__)

    def interrupt():
        global yourThread
        yourThread.cancel()

    def doStuff():
        global commonDataStruct
        global yourThread
        with dataLock:
            pass
            # Do your stuff with commonDataStruct Here

        # Set the next thread to happen
        yourThread = threading.Timer(POOL_TIME, doStuff, ())
        yourThread.start()   

    def doStuffStart():
        # Do initialisation stuff here
        global yourThread
        # Create your thread
        yourThread = threading.Timer(POOL_TIME, doStuff, ())
        yourThread.start()

    # Initiate
    doStuffStart()
    # When you kill Flask (SIGTERM), clear the trigger for the next thread
    atexit.register(interrupt)
    return app

app = create_app()          

Gunicornから次のように呼び出します。

gunicorn -b 0.0.0.0:5000 --log-config log.conf --pid=app.pid myfile:app
于 2014-04-06T21:27:07.707 に答える
10

純粋なスレッドまたはCeleryキューを使用することに加えて(flask-celeryは不要になったことに注意してください)、flask-apschedulerを確認することもできます。

https://github.com/viniciuschiele/flask-apscheduler

https://github.com/viniciuschiele/flask-apscheduler/blob/master/examples/jobs.pyからコピーした簡単な例:

from flask import Flask
from flask_apscheduler import APScheduler


class Config(object):
    JOBS = [
        {
            'id': 'job1',
            'func': 'jobs:job1',
            'args': (1, 2),
            'trigger': 'interval',
            'seconds': 10
        }
    ]

    SCHEDULER_API_ENABLED = True


def job1(a, b):
    print(str(a) + ' ' + str(b))

if __name__ == '__main__':
    app = Flask(__name__)
    app.config.from_object(Config())

    scheduler = APScheduler()
    # it is also possible to enable the API directly
    # scheduler.api_enabled = True
    scheduler.init_app(app)
    scheduler.start()

    app.run()
于 2017-07-19T04:18:49.060 に答える
0

まず、WebSocketまたはポーリングの仕組みを使用して、発生した変更についてフロントエンド部分に通知する必要があります。私はラッパーを使用しており、小さなアプリの非同期メッセージングFlask-SocketIOに非常に満足しています。

ネストでは、必要なすべてのロジックを個別のスレッドで実行し、SocketIOオブジェクトを介してフロントエンドに通知できます(Flaskはすべてのフロントエンドクライアントとの継続的なオープン接続を保持します)。

例として、バックエンドファイルの変更時にページの再読み込みを実装しました。

<!doctype html>
<script>
    sio = io()

    sio.on('reload',(info)=>{
        console.log(['sio','reload',info])
        document.location.reload()
    })
</script>
class App(Web, Module):

    def __init__(self, V):
        ## flask module instance
        self.flask = flask
        ## wrapped application instance
        self.app = flask.Flask(self.value)
        self.app.config['SECRET_KEY'] = config.SECRET_KEY
        ## `flask-socketio`
        self.sio = SocketIO(self.app)
        self.watchfiles()

    ## inotify reload files after change via `sio(reload)``
    def watchfiles(self):
        from watchdog.observers import Observer
        from watchdog.events import FileSystemEventHandler
        class Handler(FileSystemEventHandler):
            def __init__(self,sio):
                super().__init__()
                self.sio = sio
            def on_modified(self, event):
                print([self.on_modified,self,event])
                self.sio.emit('reload',[event.src_path,event.event_type,event.is_directory])
        self.observer = Observer()
        self.observer.schedule(Handler(self.sio),path='static',recursive=True)
        self.observer.schedule(Handler(self.sio),path='templates',recursive=True)
        self.observer.start()


于 2021-02-08T11:38:33.353 に答える