リクエスト後にデータを取得するウェイトレスを備えたフラスコアプリがあり、長い計算を実行してlong_function
結果を返します。これらの計算は並列でpebble
あり、タイムアウト オプションが必要なため使用しています。また、ユーザーがサーバーを再起動するリクエストを送信できるようにしたい (つまり、スレッドの数を変更したいwaitress
)
私はこの解決策を見つけましたhttps://gist.github.com/naushadzaman/b65534d912f1551c7d8366b326b7a151pebble
ほとんどの場合は機能しますが、プール
とうまくやり取りしません。サーバーがプールにあるときにサーバーをリロードできません。マルチプロセッシングを使用long_function_without_pool
しない を使用すると、サーバーが現在何らかのジョブを実行していても、サーバーをリロードできます (もちろん、結果は失われますが、これが私が望んでいることです)。しかしlong_function
、プールが閉じられるのを待つ必要があり、それからサーバーを再起動できます。プールがまだ開いているときに再起動要求を送信しようとすると、エラーが発生します。
OSError: [Errno 98] Address already in use
したがって、ランニングp.terminate()
がある場合は機能しないと思います。Pool
このコードを修正するにはどうすればよいですか、または別のソリューションを使用する必要がありますか?
このエラーを再現する簡単な手順:
アプリを起動
本文が空の POST リクエストをhttp://localhost:5221/に送信します
応答を受け取る前に (5 秒かかります)、GET 要求を変数なしでhttp://localhost:5221/restart/に送信します。
楽しい。サーバーがスタックしており、何にも応答していません
import subprocess from flask import Flask from flask_restful import Api, Resource from flask_cors import CORS from webargs.flaskparser import parser, abort import json import time import sys from waitress import serve from multiprocessing import Process, Queue from concurrent.futures import TimeoutError from pebble import ProcessPool, ProcessExpired import functools some_queue = None APP = Flask(__name__) API = Api(APP) CORS(APP) @APP.route('/restart/', methods=['GET'], endpoint='start_flaskapp') def restart(): try: some_queue.put("something") print("Restarted successfully") return("Quit") except: print("Failed in restart") return "Failed" def start_flaskapp(queue): global some_queue some_queue = queue API.add_resource(FractionsResource, "/") serve(APP, host='0.0.0.0', port=5221, threads=2) def long_function(): with ProcessPool(5) as pool: data = [0, 1, 2, 3, 4] future = pool.map(functools.partial(add_const, const=1), data, timeout=5) iterator = future.result() result=[] while True: try: result.append(next(iterator)) except StopIteration: break except TimeoutError as error: print("function took longer than %d seconds" % error.args[1]) return(result) def long_function_without_pool(): data = [0, 1, 2, 3, 4] result = list(map(functools.partial(add_const, const=1), data)) return(result) def add_const(number, const=0): time.sleep(5) return number+const class FractionsResource(Resource): @APP.route('/', methods=['POST']) def post(): response = long_function() return(json.dumps(response)) if __name__ == "__main__": q = Queue() p = Process(target=start_flaskapp, args=(q,)) p.start() while True: #wathing queue, if there is no call than sleep, otherwise break if q.empty(): time.sleep(1) else: break p.terminate() #terminate flaskapp and then restart the app on subprocess args = [sys.executable] + [sys.argv[0]] subprocess.call(args)