I/O ノンブロッキング Python サーバー Tornado を使用しています。GET
完了するまでにかなりの時間がかかる可能性のあるリクエストのクラスがあります (5 ~ 10 秒の範囲で考えてください)。問題は、Tornado がこれらのリクエストをブロックするため、後続の高速リクエストが低速リクエストが完了するまで保留されることです。
https://github.com/facebook/tornado/wiki/Threading-and-concurrencyを見て、#3 (他のプロセス) と #4 (他のスレッド) の組み合わせが必要だという結論に達しました。#4 自体に問題があり、別のスレッドが「heavy_lifting」を実行しているときに、信頼できる制御を ioloop に戻すことができませんでした。(これは GIL と、heavy_lifting タスクの CPU 負荷が高く、メインの ioloop から制御を引き離し続けているという事実によるものだと思いますが、それは推測です)。
そのため、これらの遅いリクエスト内で「重労働」タスクをGET
別のプロセスで実行し、プロセスが完了してリクエストを終了するときにコールバックを Tornado ioloop に戻すことで、これを解決する方法のプロトタイプを作成しました。これにより、ioloop が解放され、他の要求を処理できるようになります。
考えられる解決策を示す簡単な例を作成しましたが、コミュニティからフィードバックを得たいと思っています。
私の質問は 2 つあります。この現在のアプローチをどのように単純化できますか? それにはどのような落とし穴が潜在的に存在しますか?
アプローチ
Tornado の
asynchronous
ビルトイン デコレータを利用して、リクエストを開いたままにし、ioloop を続行できるようにします。multiprocessing
Python のモジュールを使用して、「重労働」タスク用に別のプロセスを生成します。私は最初にこのモジュールを使用しようとしましthreading
たが、確実に制御を放棄して ioloop に戻すことができませんでした。またmutliprocessing
、マルチコアも活用するようです。モジュールを使用して、メインの ioloop プロセスで「ウォッチャー」スレッドを開始します。モジュールの仕事は、完了時に「重労働」タスクの結果
threading
を監視することです。multiprocessing.Queue
これが必要だったのは、この要求が終了したことを ioloop に通知しながら、heavy_lifting タスクが完了したことを知る方法が必要だったからです。「ウォッチャー」スレッドがメインの ioloop ループに頻繁に
time.sleep(0)
呼び出しを行って制御を放棄し、他の要求が引き続きすぐに処理されるようにしてください。tornado.ioloop.IOLoop.instance().add_callback()
キューに結果がある場合は、他のスレッドから ioloop インスタンスを呼び出す唯一の安全な方法であると文書化されている「ウォッチャー」スレッドからのコールバックを追加します。その後、必ず
finish()
コールバックを呼び出してリクエストを完了し、返信を渡してください。
以下は、このアプローチを示すサンプル コードです。 multi_tornado.py
は、上記の概要を実装するcall_multi.py
サーバーであり、サーバーをテストするために 2 つの異なる方法でサーバーを呼び出すサンプル スクリプトです。どちらのテストも、3 つの遅いGET
要求とそれに続く 20 の速いGET
要求でサーバーを呼び出します。スレッド化をオンにして実行した場合とオンにせずに実行した場合の両方の結果が示されています。
「スレッドなし」で実行した場合、3 つの遅い要求がブロックされます (それぞれが完了するまでに 1 秒強かかります)。20 の高速なリクエストのうちのいくつかは、ioloop 内の低速なリクエストの間に押し込まれます (どのように発生するかは完全にはわかりませんが、サーバーとクライアントの両方のテスト スクリプトを同じマシンで実行しているアーティファクトである可能性があります)。ここでのポイントは、すべての高速リクエストがさまざまな程度まで保留されることです。
スレッドを有効にして実行した場合、20 個の高速なリクエストはすべて最初にすぐに完了し、3 つの低速なリクエストはそれぞれが並行して実行されているため、その後ほぼ同時に完了します。これは望ましい動作です。3 つの遅い要求が並行して完了するのに 2.5 秒かかりますが、スレッド化されていない場合、3 つの遅い要求は合計で約 3.5 秒かかります。したがって、全体で約 35% の速度向上があります (マルチコア共有によるものと思われます)。しかし、もっと重要なことは、高速なリクエストが低速なリクエストの代わりにすぐに処理されたことです。
私はマルチスレッド プログラミングの経験があまりありません。
これを達成するためのより簡単な方法はありますか?このアプローチにはどんなモンスターが潜んでいる可能性がありますか?
(注: 将来のトレードオフは、ロード バランシングを行う nginx のようなリバース プロキシを使用して Tornado のインスタンスをさらに実行することかもしれません。ロード バランサーを使用して複数のインスタンスを実行することに関係なく、この問題にハードウェアを投入することだけが心配です。ブロッキングに関しては、ハードウェアが問題に直接結びついているように見えるためです。)
サンプルコード
multi_tornado.py
(サンプルサーバー):
import time
import threading
import multiprocessing
import math
from tornado.web import RequestHandler, Application, asynchronous
from tornado.ioloop import IOLoop
# run in some other process - put result in q
def heavy_lifting(q):
t0 = time.time()
for k in range(2000):
math.factorial(k)
t = time.time()
q.put(t - t0) # report time to compute in queue
class FastHandler(RequestHandler):
def get(self):
res = 'fast result ' + self.get_argument('id')
print res
self.write(res)
self.flush()
class MultiThreadedHandler(RequestHandler):
# Note: This handler can be called with threaded = True or False
def initialize(self, threaded=True):
self._threaded = threaded
self._q = multiprocessing.Queue()
def start_process(self, worker, callback):
# method to start process and watcher thread
self._callback = callback
if self._threaded:
# launch process
multiprocessing.Process(target=worker, args=(self._q,)).start()
# start watching for process to finish
threading.Thread(target=self._watcher).start()
else:
# threaded = False just call directly and block
worker(self._q)
self._watcher()
def _watcher(self):
# watches the queue for process result
while self._q.empty():
time.sleep(0) # relinquish control if not ready
# put callback back into the ioloop so we can finish request
response = self._q.get(False)
IOLoop.instance().add_callback(lambda: self._callback(response))
class SlowHandler(MultiThreadedHandler):
@asynchronous
def get(self):
# start a thread to watch for
self.start_process(heavy_lifting, self._on_response)
def _on_response(self, delta):
_id = self.get_argument('id')
res = 'slow result {} <--- {:0.3f} s'.format(_id, delta)
print res
self.write(res)
self.flush()
self.finish() # be sure to finish request
application = Application([
(r"/fast", FastHandler),
(r"/slow", SlowHandler, dict(threaded=False)),
(r"/slow_threaded", SlowHandler, dict(threaded=True)),
])
if __name__ == "__main__":
application.listen(8888)
IOLoop.instance().start()
call_multi.py
(クライアントテスター):
import sys
from tornado.ioloop import IOLoop
from tornado import httpclient
def run(slow):
def show_response(res):
print res.body
# make 3 "slow" requests on server
requests = []
for k in xrange(3):
uri = 'http://localhost:8888/{}?id={}'
requests.append(uri.format(slow, str(k + 1)))
# followed by 20 "fast" requests
for k in xrange(20):
uri = 'http://localhost:8888/fast?id={}'
requests.append(uri.format(k + 1))
# show results as they return
http_client = httpclient.AsyncHTTPClient()
print 'Scheduling Get Requests:'
print '------------------------'
for req in requests:
print req
http_client.fetch(req, show_response)
# execute requests on server
print '\nStart sending requests....'
IOLoop.instance().start()
if __name__ == '__main__':
scenario = sys.argv[1]
if scenario == 'slow' or scenario == 'slow_threaded':
run(scenario)
試験結果
実行python call_multi.py slow
することにより (ブロッキング動作):
Scheduling Get Requests:
------------------------
http://localhost:8888/slow?id=1
http://localhost:8888/slow?id=2
http://localhost:8888/slow?id=3
http://localhost:8888/fast?id=1
http://localhost:8888/fast?id=2
http://localhost:8888/fast?id=3
http://localhost:8888/fast?id=4
http://localhost:8888/fast?id=5
http://localhost:8888/fast?id=6
http://localhost:8888/fast?id=7
http://localhost:8888/fast?id=8
http://localhost:8888/fast?id=9
http://localhost:8888/fast?id=10
http://localhost:8888/fast?id=11
http://localhost:8888/fast?id=12
http://localhost:8888/fast?id=13
http://localhost:8888/fast?id=14
http://localhost:8888/fast?id=15
http://localhost:8888/fast?id=16
http://localhost:8888/fast?id=17
http://localhost:8888/fast?id=18
http://localhost:8888/fast?id=19
http://localhost:8888/fast?id=20
Start sending requests....
slow result 1 <--- 1.338 s
fast result 1
fast result 2
fast result 3
fast result 4
fast result 5
fast result 6
fast result 7
slow result 2 <--- 1.169 s
slow result 3 <--- 1.130 s
fast result 8
fast result 9
fast result 10
fast result 11
fast result 13
fast result 12
fast result 14
fast result 15
fast result 16
fast result 18
fast result 17
fast result 19
fast result 20
実行python call_multi.py slow_threaded
することにより(望ましい動作):
Scheduling Get Requests:
------------------------
http://localhost:8888/slow_threaded?id=1
http://localhost:8888/slow_threaded?id=2
http://localhost:8888/slow_threaded?id=3
http://localhost:8888/fast?id=1
http://localhost:8888/fast?id=2
http://localhost:8888/fast?id=3
http://localhost:8888/fast?id=4
http://localhost:8888/fast?id=5
http://localhost:8888/fast?id=6
http://localhost:8888/fast?id=7
http://localhost:8888/fast?id=8
http://localhost:8888/fast?id=9
http://localhost:8888/fast?id=10
http://localhost:8888/fast?id=11
http://localhost:8888/fast?id=12
http://localhost:8888/fast?id=13
http://localhost:8888/fast?id=14
http://localhost:8888/fast?id=15
http://localhost:8888/fast?id=16
http://localhost:8888/fast?id=17
http://localhost:8888/fast?id=18
http://localhost:8888/fast?id=19
http://localhost:8888/fast?id=20
Start sending requests....
fast result 1
fast result 2
fast result 3
fast result 4
fast result 5
fast result 6
fast result 7
fast result 8
fast result 9
fast result 10
fast result 11
fast result 12
fast result 13
fast result 14
fast result 15
fast result 19
fast result 20
fast result 17
fast result 16
fast result 18
slow result 2 <--- 2.485 s
slow result 3 <--- 2.491 s
slow result 1 <--- 2.517 s