フラスコを試すために小さなゲームサーバーを書くのに忙しい。ゲームはRESTを介してAPIをユーザーに公開します。"game world"
ユーザーがアクションを実行したりデータをクエリしたりするのは簡単ですが、ゲームエンティティなどを更新するためにループの外側にサービスを提供したいと思いますapp.run()
。Flaskが非常にクリーンに実装されているので、Flaskの方法があるかどうかを確認したいと思います。これ。
3 に答える
追加のスレッドは、WSGIサーバーによって呼び出されるのと同じアプリから開始する必要があります。
以下の例では、5秒ごとに実行され、Flaskでルーティングされた関数でも使用できるデータ構造を操作するバックグラウンドスレッドを作成します。
import threading
import atexit
from flask import Flask
POOL_TIME = 5 #Seconds
# variables that are accessible from anywhere
commonDataStruct = {}
# lock to control access to variable
dataLock = threading.Lock()
# thread handler
yourThread = threading.Thread()
def create_app():
app = Flask(__name__)
def interrupt():
global yourThread
yourThread.cancel()
def doStuff():
global commonDataStruct
global yourThread
with dataLock:
pass
# Do your stuff with commonDataStruct Here
# Set the next thread to happen
yourThread = threading.Timer(POOL_TIME, doStuff, ())
yourThread.start()
def doStuffStart():
# Do initialisation stuff here
global yourThread
# Create your thread
yourThread = threading.Timer(POOL_TIME, doStuff, ())
yourThread.start()
# Initiate
doStuffStart()
# When you kill Flask (SIGTERM), clear the trigger for the next thread
atexit.register(interrupt)
return app
app = create_app()
Gunicornから次のように呼び出します。
gunicorn -b 0.0.0.0:5000 --log-config log.conf --pid=app.pid myfile:app
純粋なスレッドまたはCeleryキューを使用することに加えて(flask-celeryは不要になったことに注意してください)、flask-apschedulerを確認することもできます。
https://github.com/viniciuschiele/flask-apscheduler
https://github.com/viniciuschiele/flask-apscheduler/blob/master/examples/jobs.pyからコピーした簡単な例:
from flask import Flask
from flask_apscheduler import APScheduler
class Config(object):
JOBS = [
{
'id': 'job1',
'func': 'jobs:job1',
'args': (1, 2),
'trigger': 'interval',
'seconds': 10
}
]
SCHEDULER_API_ENABLED = True
def job1(a, b):
print(str(a) + ' ' + str(b))
if __name__ == '__main__':
app = Flask(__name__)
app.config.from_object(Config())
scheduler = APScheduler()
# it is also possible to enable the API directly
# scheduler.api_enabled = True
scheduler.init_app(app)
scheduler.start()
app.run()
まず、WebSocketまたはポーリングの仕組みを使用して、発生した変更についてフロントエンド部分に通知する必要があります。私はラッパーを使用しており、小さなアプリの非同期メッセージングFlask-SocketIO
に非常に満足しています。
ネストでは、必要なすべてのロジックを個別のスレッドで実行し、SocketIO
オブジェクトを介してフロントエンドに通知できます(Flaskはすべてのフロントエンドクライアントとの継続的なオープン接続を保持します)。
例として、バックエンドファイルの変更時にページの再読み込みを実装しました。
<!doctype html>
<script>
sio = io()
sio.on('reload',(info)=>{
console.log(['sio','reload',info])
document.location.reload()
})
</script>
class App(Web, Module):
def __init__(self, V):
## flask module instance
self.flask = flask
## wrapped application instance
self.app = flask.Flask(self.value)
self.app.config['SECRET_KEY'] = config.SECRET_KEY
## `flask-socketio`
self.sio = SocketIO(self.app)
self.watchfiles()
## inotify reload files after change via `sio(reload)``
def watchfiles(self):
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
class Handler(FileSystemEventHandler):
def __init__(self,sio):
super().__init__()
self.sio = sio
def on_modified(self, event):
print([self.on_modified,self,event])
self.sio.emit('reload',[event.src_path,event.event_type,event.is_directory])
self.observer = Observer()
self.observer.schedule(Handler(self.sio),path='static',recursive=True)
self.observer.schedule(Handler(self.sio),path='templates',recursive=True)
self.observer.start()