2

リクエスト後にデータを取得するウェイトレスを備えたフラスコアプリがあり、長い計算を実行してlong_function結果を返します。これらの計算は並列でpebbleあり、タイムアウト オプションが必要なため使用しています。また、ユーザーがサーバーを再起動するリクエストを送信できるようにしたい (つまり、スレッドの数を変更したいwaitress)

私はこの解決策を見つけましたhttps://gist.github.com/naushadzaman/b65534d912f1551c7d8366b326b7a151pebbleほとんどの場合は機能しますが、プール とうまくやり取りしません。サーバーがプールにあるときにサーバーをリロードできません。マルチプロセッシングを使用long_function_without_poolしない を使用すると、サーバーが現在何らかのジョブを実行していても、サーバーをリロードできます (もちろん、結果は失われますが、これが私が望んでいることです)。しかしlong_function、プールが閉じられるのを待つ必要があり、それからサーバーを再起動できます。プールがまだ開いているときに再起動要求を送信しようとすると、エラーが発生します。

OSError: [Errno 98] Address already in use

したがって、ランニングp.terminate()がある場合は機能しないと思います。Pool

このコードを修正するにはどうすればよいですか、または別のソリューションを使用する必要がありますか?

このエラーを再現する簡単な手順:

  1. アプリを起動

  2. 本文が空の POST リクエストをhttp://localhost:5221/に送信します

  3. 応答を受け取る前に (5 秒かかります)、GET 要求を変数なしでhttp://localhost:5221/restart/に送信します。

  4. 楽しい。サーバーがスタックしており、何にも応答していません

    import subprocess
    from flask import Flask
    from flask_restful import Api, Resource
    from flask_cors import CORS
    from webargs.flaskparser import parser, abort
    import json
    import time
    import sys
    from waitress import serve
    from multiprocessing import Process, Queue
    from concurrent.futures import TimeoutError
    from pebble import ProcessPool, ProcessExpired
    import functools
    
    some_queue = None
    
    
    APP = Flask(__name__)
    API = Api(APP)
    CORS(APP)
    
    @APP.route('/restart/', methods=['GET'], endpoint='start_flaskapp')
    def restart():
        try:
            some_queue.put("something")
            print("Restarted successfully")
            return("Quit")
        except:
            print("Failed in restart")
            return "Failed"
    
    def start_flaskapp(queue):
        global some_queue
        some_queue = queue
        API.add_resource(FractionsResource, "/")
        serve(APP, host='0.0.0.0', port=5221, threads=2)
    
    def long_function():
        with ProcessPool(5) as pool:
            data = [0, 1, 2, 3, 4]
            future = pool.map(functools.partial(add_const, const=1), data, timeout=5)
            iterator = future.result()
            result=[]
            while True:
                try:
                    result.append(next(iterator))
                except StopIteration:
                    break
                except TimeoutError as error:
                    print("function took longer than %d seconds" % error.args[1])
        return(result)
    
    def long_function_without_pool():
        data = [0, 1, 2, 3, 4]
        result = list(map(functools.partial(add_const, const=1), data))
        return(result)
    
    def add_const(number, const=0):
        time.sleep(5)
        return number+const
    
    class FractionsResource(Resource):
        @APP.route('/', methods=['POST'])
        def post():
            response = long_function()
            return(json.dumps(response))
    
    if __name__ == "__main__":
    
        q = Queue()
        p = Process(target=start_flaskapp, args=(q,))
        p.start()
        while True: #wathing queue, if there is no call than sleep, otherwise break
            if q.empty():
                time.sleep(1)
            else:
                break
        p.terminate() #terminate flaskapp and then restart the app on subprocess
        args = [sys.executable] + [sys.argv[0]]
        subprocess.call(args)
    
4

0 に答える 0