5

誰もが django で長時間実行されるタスクを処理する方法を知っていると思います: セロリを使用してリラックスします。しかし、aiohttp (またはトルネード) を使用して Websocket の利点を得たい場合はどうすればよいでしょうか?

数秒から数分 (5 ~ 10 分) かかる非常に CPU バウンドのタスクがあるとします。このタスクを websocket ループで処理し、進行状況をユーザーに通知するのはかなり良い考えのようです。ajax リクエストがなく、短いタスクに対する応答が非常に高速です。

async def websocket_handler(request):
    ws = web.WebSocketResponse()
    await ws.prepare(request)

    async for msg in ws:
        if msg.tp == aiohttp.MsgType.text:     
            answer_to_the_ultimate_question_of_life_the_universe_and_everything =\
                long_running_task(msg.data, NotificationHelper(ws))
            ws.send_str(json.dumps({
                'action': 'got-answer',
                'data': answer_to_the_ultimate_question_of_life_the_universe_and_everything,
            }))
    return ws

しかし一方で、私が理解しているように、そのような方法で提供されるCPUバウンドタスクはスレッド全体をブロックします。アプリケーションを使用したい 10 人のワーカーと 11 人のクライアントがいる場合、1 番目のクライアントのタスクが完了するまで、11 番目のクライアントは提供されません。

たぶん、セロリで大きく見えるタスクとメインループで小さく見えるタスクを実行する必要がありますか?

それで、私の質問: 非同期サーバーで長時間実行されるタスクを処理するための適切な設計パターンはありますか?

ありがとう!

4

1 に答える 1

8

実行時間の長い CPU バウンド タスクをで実行しloop.run_in_executor()、進行状況の通知を で送信するだけloop.call_soon_threadsafe()です。

ジョブが CPU ではなく IO バウンドの場合 (メールの送信など)、loop.create_task()呼び出しで新しいタスクを作成できます。新しいスレッドを生成するようです。

ファイア アンド フォーゲット アプローチを使用できない場合は、RabbitMQ のような永続的なメッセージ ブローカーを使用する必要があります ( Rabbit と非同期で通信するためのhttps://github.com/benjamin-hodgson/asynqpライブラリがあります)。

于 2016-02-12T21:11:03.033 に答える