私は Python アプリに取り組んでいますが、Flask から Quart に移行しています。アプリケーションには、アプリケーションの実行中に常に実行されるバックグラウンド タスクが必要です。
control-c を使用してプロセスを停止しようとすると、スレッドが正常に閉じず、シャットダウン ルーチンの while ループにとどまります。
while not self._master_thread_class.shutdown_completed:
if not pro:
print('[DEBUG] Thread is not complete')
pro = True
このStackoverflow の質問に従いましたが、バックグラウンド スレッドをきれいにシャットダウンする方法がわかりません。Quart のドキュメントに少し欠けているように見えるので、説明をお願いします。
MasterThread クラス:
import asyncio
class MasterThread:
def __init__(self, shutdown_requested_event):
self._shutdown_completed = False
self._shutdown_requested_event = shutdown_requested_event
self._shutdown_requested = False
def __del__(self):
print('Thread was deleted')
def run(self, loop) -> None:
asyncio.set_event_loop(loop)
loop.run_until_complete(self._async_entrypoint())
@asyncio.coroutine
def _async_entrypoint(self) -> None:
while not self. _shutdown_requested and \
not self._shutdown_requested_event.isSet():
#print('_main_loop()')
pass
if self._shutdown_requested_event.wait(0.1):
self. _shutdown_requested = True
print('[DEBUG] thread has completed....')
self._shutdown_completed = True
def _main_loop(self) -> None:
print('_main_loop()')
メイン アプリケーション モジュール:
import asyncio
import threading
from quart import Quart
from workthr import MasterThread
app = Quart(__name__)
class Service:
def __init__(self):
self._shutdown_thread_event = threading.Event()
self._master_thread = MasterThread(self._shutdown_thread_event)
self._thread = None
def __del__(self):
self.stop()
def start(self):
loop = asyncio.get_event_loop()
self._thread = threading.Thread(target=self._master_thread.run, args=(loop,))
self._thread.start()
return True
def stop(self) -> None:
print('[DEBUG] Stop signal caught...')
self._shutdown_thread_event.set()
while not self._master_thread.shutdown_completed:
print('[DEBUG] Thread is not complete')
print('[DEBUG] Thread has completed')
self._shutdown()
def _shutdown(self):
print('Shutting down...')
service = Service()
service.start()