48

I/O ノンブロッキング Python サーバー Tornado を使用しています。GET完了するまでにかなりの時間がかかる可能性のあるリクエストのクラスがあります (5 ~ 10 秒の範囲で考えてください)。問題は、Tornado がこれらのリクエストをブロックするため、後続の高速リクエストが低速リクエストが完了するまで保留されることです。

https://github.com/facebook/tornado/wiki/Threading-and-concurrencyを見て、#3 (他のプロセス) と #4 (他のスレッド) の組み合わせが必要だという結論に達しました。#4 自体に問題があり、別のスレッドが「heavy_lifting」を実行しているときに、信頼できる制御を ioloop に戻すことができませんでした。(これは GIL と、heavy_lifting タスクの CPU 負荷が高く、メインの ioloop から制御を引き離し続けているという事実によるものだと思いますが、それは推測です)。

そのため、これらの遅いリクエスト内で「重労働」タスクをGET別のプロセスで実行し、プロセスが完了してリクエストを終了するときにコールバックを Tornado ioloop に戻すことで、これを解決する方法のプロトタイプを作成しました。これにより、ioloop が解放され、他の要求を処理できるようになります。

考えられる解決策を示す簡単な例を作成しましたが、コミュニティからフィードバックを得たいと思っています。

私の質問は 2 つあります。この現在のアプローチをどのように単純化できますか? それにはどのような落とし穴が潜在的に存在しますか?

アプローチ

  1. Tornado のasynchronousビルトイン デコレータを利用して、リクエストを開いたままにし、ioloop を続行できるようにします。

  2. multiprocessingPython のモジュールを使用して、「重労働」タスク用に別のプロセスを生成します。私は最初にこのモジュールを使用しようとしましthreadingたが、確実に制御を放棄して ioloop に戻すことができませんでした。またmutliprocessing、マルチコアも活用するようです。

  3. モジュールを使用して、メインの ioloop プロセスで「ウォッチャー」スレッドを開始します。モジュールの仕事は、完了時に「重労働」タスクの結果threadingを監視することです。multiprocessing.Queueこれが必要だったのは、この要求が終了したことを ioloop に通知しながら、heavy_lifting タスクが完了したことを知る方法が必要だったからです。

  4. 「ウォッチャー」スレッドがメインの ioloop ループに頻繁にtime.sleep(0)呼び出しを行って制御を放棄し、他の要求が引き続きすぐに処理されるようにしてください。

  5. tornado.ioloop.IOLoop.instance().add_callback()キューに結果がある場合は、他のスレッドから ioloop インスタンスを呼び出す唯一の安全な方法であると文書化されている「ウォッチャー」スレッドからのコールバックを追加します。

  6. その後、必ずfinish()コールバックを呼び出してリクエストを完了し、返信を渡してください。

以下は、このアプローチを示すサンプル コードです。 multi_tornado.pyは、上記の概要を実装するcall_multi.pyサーバーであり、サーバーをテストするために 2 つの異なる方法でサーバーを呼び出すサンプル スクリプトです。どちらのテストも、3 つの遅いGET要求とそれに続く 20 の速いGET要求でサーバーを呼び出します。スレッド化をオンにして実行した場合とオンにせずに実行した場合の両方の結果が示されています。

「スレッドなし」で実行した場合、3 つの遅い要求がブロックされます (それぞれが完了するまでに 1 秒強かかります)。20 の高速なリクエストのうちのいくつかは、ioloop 内の低速なリクエストの間に押し込まれます (どのように発生するかは完全にはわかりませんが、サーバーとクライアントの両方のテスト スクリプトを同じマシンで実行しているアーティファクトである可能性があります)。ここでのポイントは、すべての高速リクエストがさまざまな程度まで保留されることです。

スレッドを有効にして実行した場合、20 個の高速なリクエストはすべて最初にすぐに完了し、3 つの低速なリクエストはそれぞれが並行して実行されているため、その後ほぼ同時に完了します。これは望ましい動作です。3 つの遅い要求が並行して完了するのに 2.5 秒かかりますが、スレッド化されていない場合、3 つの遅い要求は合計で約 3.5 秒かかります。したがって、全体で約 35% の速度向上があります (マルチコア共有によるものと思われます)。しかし、もっと重要なことは、高速なリクエストが低速なリクエストの代わりにすぐに処理されたことです。

私はマルチスレッド プログラミングの経験があまりありません。

これを達成するためのより簡単な方法はありますか?このアプローチにはどんなモンスターが潜んでいる可能性がありますか?

(注: 将来のトレードオフは、ロード バランシングを行う nginx のようなリバース プロキシを使用して Tornado のインスタンスをさらに実行することかもしれません。ロード バランサーを使用して複数のインスタンスを実行することに関係なく、この問題にハードウェアを投入することだけが心配です。ブロッキングに関しては、ハードウェアが問題に直接結びついているように見えるためです。)

サンプルコード

multi_tornado.py(サンプルサーバー):

import time
import threading
import multiprocessing
import math

from tornado.web import RequestHandler, Application, asynchronous
from tornado.ioloop import IOLoop


# run in some other process - put result in q
def heavy_lifting(q):
    t0 = time.time()
    for k in range(2000):
        math.factorial(k)

    t = time.time()
    q.put(t - t0)  # report time to compute in queue


class FastHandler(RequestHandler):
    def get(self):
        res = 'fast result ' + self.get_argument('id')
        print res
        self.write(res)
        self.flush()


class MultiThreadedHandler(RequestHandler):
    # Note:  This handler can be called with threaded = True or False
    def initialize(self, threaded=True):
        self._threaded = threaded
        self._q = multiprocessing.Queue()

    def start_process(self, worker, callback):
        # method to start process and watcher thread
        self._callback = callback

        if self._threaded:
            # launch process
            multiprocessing.Process(target=worker, args=(self._q,)).start()

            # start watching for process to finish
            threading.Thread(target=self._watcher).start()

        else:
            # threaded = False just call directly and block
            worker(self._q)
            self._watcher()

    def _watcher(self):
        # watches the queue for process result
        while self._q.empty():
            time.sleep(0)  # relinquish control if not ready

        # put callback back into the ioloop so we can finish request
        response = self._q.get(False)
        IOLoop.instance().add_callback(lambda: self._callback(response))


class SlowHandler(MultiThreadedHandler):
    @asynchronous
    def get(self):
        # start a thread to watch for
        self.start_process(heavy_lifting, self._on_response)

    def _on_response(self, delta):
        _id = self.get_argument('id')
        res = 'slow result {} <--- {:0.3f} s'.format(_id, delta)
        print res
        self.write(res)
        self.flush()
        self.finish()   # be sure to finish request


application = Application([
    (r"/fast", FastHandler),
    (r"/slow", SlowHandler, dict(threaded=False)),
    (r"/slow_threaded", SlowHandler, dict(threaded=True)),
])


if __name__ == "__main__":
    application.listen(8888)
    IOLoop.instance().start()

call_multi.py(クライアントテスター):

import sys
from tornado.ioloop import IOLoop
from tornado import httpclient


def run(slow):
    def show_response(res):
        print res.body

    # make 3 "slow" requests on server
    requests = []
    for k in xrange(3):
        uri = 'http://localhost:8888/{}?id={}'
        requests.append(uri.format(slow, str(k + 1)))

    # followed by 20 "fast" requests
    for k in xrange(20):
        uri = 'http://localhost:8888/fast?id={}'
        requests.append(uri.format(k + 1))

    # show results as they return
    http_client = httpclient.AsyncHTTPClient()

    print 'Scheduling Get Requests:'
    print '------------------------'
    for req in requests:
        print req
        http_client.fetch(req, show_response)

    # execute requests on server
    print '\nStart sending requests....'
    IOLoop.instance().start()

if __name__ == '__main__':
    scenario = sys.argv[1]

    if scenario == 'slow' or scenario == 'slow_threaded':
        run(scenario)

試験結果

実行python call_multi.py slow することにより (ブロッキング動作):

Scheduling Get Requests:
------------------------
http://localhost:8888/slow?id=1
http://localhost:8888/slow?id=2
http://localhost:8888/slow?id=3
http://localhost:8888/fast?id=1
http://localhost:8888/fast?id=2
http://localhost:8888/fast?id=3
http://localhost:8888/fast?id=4
http://localhost:8888/fast?id=5
http://localhost:8888/fast?id=6
http://localhost:8888/fast?id=7
http://localhost:8888/fast?id=8
http://localhost:8888/fast?id=9
http://localhost:8888/fast?id=10
http://localhost:8888/fast?id=11
http://localhost:8888/fast?id=12
http://localhost:8888/fast?id=13
http://localhost:8888/fast?id=14
http://localhost:8888/fast?id=15
http://localhost:8888/fast?id=16
http://localhost:8888/fast?id=17
http://localhost:8888/fast?id=18
http://localhost:8888/fast?id=19
http://localhost:8888/fast?id=20

Start sending requests....
slow result 1 <--- 1.338 s
fast result 1
fast result 2
fast result 3
fast result 4
fast result 5
fast result 6
fast result 7
slow result 2 <--- 1.169 s
slow result 3 <--- 1.130 s
fast result 8
fast result 9
fast result 10
fast result 11
fast result 13
fast result 12
fast result 14
fast result 15
fast result 16
fast result 18
fast result 17
fast result 19
fast result 20

実行python call_multi.py slow_threaded することにより(望ましい動作):

Scheduling Get Requests:
------------------------
http://localhost:8888/slow_threaded?id=1
http://localhost:8888/slow_threaded?id=2
http://localhost:8888/slow_threaded?id=3
http://localhost:8888/fast?id=1
http://localhost:8888/fast?id=2
http://localhost:8888/fast?id=3
http://localhost:8888/fast?id=4
http://localhost:8888/fast?id=5
http://localhost:8888/fast?id=6
http://localhost:8888/fast?id=7
http://localhost:8888/fast?id=8
http://localhost:8888/fast?id=9
http://localhost:8888/fast?id=10
http://localhost:8888/fast?id=11
http://localhost:8888/fast?id=12
http://localhost:8888/fast?id=13
http://localhost:8888/fast?id=14
http://localhost:8888/fast?id=15
http://localhost:8888/fast?id=16
http://localhost:8888/fast?id=17
http://localhost:8888/fast?id=18
http://localhost:8888/fast?id=19
http://localhost:8888/fast?id=20

Start sending requests....
fast result 1
fast result 2
fast result 3
fast result 4
fast result 5
fast result 6
fast result 7
fast result 8
fast result 9
fast result 10
fast result 11
fast result 12
fast result 13
fast result 14
fast result 15
fast result 19
fast result 20
fast result 17
fast result 16
fast result 18
slow result 2 <--- 2.485 s
slow result 3 <--- 2.491 s
slow result 1 <--- 2.517 s
4

3 に答える 3

32

concurrent.futures.ProcessPoolExecutorの代わりに使用する場合multiprocessing、これは実際には非常に簡単です。Tornado の ioloop は既に をサポートしている concurrent.futures.Futureため、すぐにうまく連携できます。concurrent.futuresは Python 3.2+ に含まれており、Python 2.x にバックポートされています。

次に例を示します。

import time
from concurrent.futures import ProcessPoolExecutor
from tornado.ioloop import IOLoop
from tornado import gen

def f(a, b, c, blah=None):
    print "got %s %s %s and %s" % (a, b, c, blah)
    time.sleep(5)
    return "hey there"

@gen.coroutine
def test_it():
    pool = ProcessPoolExecutor(max_workers=1)
    fut = pool.submit(f, 1, 2, 3, blah="ok")  # This returns a concurrent.futures.Future
    print("running it asynchronously")
    ret = yield fut
    print("it returned %s" % ret)
    pool.shutdown()

IOLoop.instance().run_sync(test_it)

出力:

running it asynchronously
got 1 2 3 and ok
it returned hey there

ProcessPoolExecutorの API は よりも制限されていますmultiprocessing.Poolが、 のより高度な機能が必要ない場合はmultiprocessing.Pool、統合が非常に簡単であるため、使用する価値があります。

于 2014-08-08T16:40:38.050 に答える
16

multiprocessing.PoolI/O ループに統合できますがtornado、少し面倒です。を使用すると、よりクリーンな統合を行うことができますconcurrent.futures(詳細については、私の他の回答を参照してください)。ただし、Python 2.x に行き詰まっていてバックポートをインストールできない場合は、次のconcurrent.futuresように厳密に使用できますmultiprocessing

multiprocessing.Pool.apply_asyncメソッドとメソッドのmultiprocessing.Pool.map_async両方にオプションのcallbackパラメータがあります。つまり、両方を にプラグインできる可能性がありtornado.gen.Taskます。したがって、ほとんどの場合、サブプロセスでコードを非同期に実行するのは次のように簡単です。

import multiprocessing
import contextlib

from tornado import gen
from tornado.gen import Return
from tornado.ioloop import IOLoop
from functools import partial

def worker():
    print "async work here"

@gen.coroutine
def async_run(func, *args, **kwargs):
    result = yield gen.Task(pool.apply_async, func, args, kwargs)
    raise Return(result)

if __name__ == "__main__":
    pool = multiprocessing.Pool(multiprocessing.cpu_count())
    func = partial(async_run, worker)
    IOLoop().run_sync(func)

前述したように、これはほとんどの場合にうまく機能します。しかし、ifworker()が例外をスローするcallbackと、呼び出されることはありません。つまり、gen.Task決して終了せず、永遠にハングアップします。ここで、作業で例外がスローされないことがわかっている場合(たとえば、全体をtry/でラップしたためexcept)、このアプローチを問題なく使用できます。ただし、ワーカーから例外を逃がしたい場合、私が見つけた唯一の解決策は、いくつかのマルチプロセッシング コンポーネントをサブクラス化しcallback、ワーカー サブプロセスが例外を発生させた場合でもそれらを呼び出させることでした。

from multiprocessing.pool import ApplyResult, Pool, RUN
import multiprocessing
class TornadoApplyResult(ApplyResult):
    def _set(self, i, obj):
        self._success, self._value = obj 
        if self._callback:
            self._callback(self._value)
        self._cond.acquire()
        try:
            self._ready = True
            self._cond.notify()
        finally:
            self._cond.release()
        del self._cache[self._job]

class TornadoPool(Pool):
    def apply_async(self, func, args=(), kwds={}, callback=None):
        ''' Asynchronous equivalent of `apply()` builtin

        This version will call `callback` even if an exception is
        raised by `func`.

        '''
        assert self._state == RUN
        result = TornadoApplyResult(self._cache, callback)
        self._taskqueue.put(([(result._job, None, func, args, kwds)], None))
        return result
 ...

 if __name__ == "__main__":
     pool = TornadoPool(multiprocessing.cpu_count())
     ...

これらの変更により、例外オブジェクトは無期限にハングアップするのgen.Taskではなく、によって返されます。また、返されたときに例外を再発生させるようgen.Taskにメソッドを更新し、ワーカーのサブプロセスでスローされた例外のトレースバックを改善するためにいくつかの変更を加えました。async_run完全なコードは次のとおりです。

import multiprocessing
from multiprocessing.pool import Pool, ApplyResult, RUN
from functools import wraps

import tornado.web
from tornado.ioloop import IOLoop
from tornado.gen import Return
from tornado import gen

class WrapException(Exception):
    def __init__(self):
        exc_type, exc_value, exc_tb = sys.exc_info()
        self.exception = exc_value
        self.formatted = ''.join(traceback.format_exception(exc_type, exc_value, exc_tb))

    def __str__(self):
        return '\n%s\nOriginal traceback:\n%s' % (Exception.__str__(self), self.formatted)

class TornadoApplyResult(ApplyResult):
    def _set(self, i, obj):
        self._success, self._value = obj 
        if self._callback:
            self._callback(self._value)
        self._cond.acquire()
        try:
            self._ready = True
            self._cond.notify()
        finally:
            self._cond.release()
        del self._cache[self._job]   

class TornadoPool(Pool):
    def apply_async(self, func, args=(), kwds={}, callback=None):
        ''' Asynchronous equivalent of `apply()` builtin

        This version will call `callback` even if an exception is
        raised by `func`.

        '''
        assert self._state == RUN
        result = TornadoApplyResult(self._cache, callback)
        self._taskqueue.put(([(result._job, None, func, args, kwds)], None))
        return result

@gen.coroutine
def async_run(func, *args, **kwargs):
    """ Runs the given function in a subprocess.

    This wraps the given function in a gen.Task and runs it
    in a multiprocessing.Pool. It is meant to be used as a
    Tornado co-routine. Note that if func returns an Exception 
    (or an Exception sub-class), this function will raise the 
    Exception, rather than return it.

    """
    result = yield gen.Task(pool.apply_async, func, args, kwargs)
    if isinstance(result, Exception):
        raise result
    raise Return(result)

def handle_exceptions(func):
    """ Raise a WrapException so we get a more meaningful traceback"""
    @wraps(func)
    def inner(*args, **kwargs):
        try:
            return func(*args, **kwargs)
        except Exception:
            raise WrapException()
    return inner

# Test worker functions
@handle_exceptions
def test2(x):
    raise Exception("eeee")

@handle_exceptions
def test(x):
    print x
    time.sleep(2)
    return "done"

class TestHandler(tornado.web.RequestHandler):
    @gen.coroutine
    def get(self):
        try:
            result = yield async_run(test, "inside get")
            self.write("%s\n" % result)
            result = yield async_run(test2, "hi2")
        except Exception as e:
            print("caught exception in get")
            self.write("Caught an exception: %s" % e)
        finally:
            self.finish()

app = tornado.web.Application([
    (r"/test", TestHandler),
])

if __name__ == "__main__":
    pool = TornadoPool(4)
    app.listen(8888)
    IOLoop.instance().start()

クライアントに対する動作は次のとおりです。

dan@dan:~$ curl localhost:8888/test
done
Caught an exception: 

Original traceback:
Traceback (most recent call last):
  File "./mutli.py", line 123, in inner
    return func(*args, **kwargs)
  File "./mutli.py", line 131, in test2
    raise Exception("eeee")
Exception: eeee

また、2 つの curl リクエストを同時に送信すると、それらがサーバー側で非同期に処理されていることがわかります。

dan@dan:~$ ./mutli.py 
inside get
inside get
caught exception inside get
caught exception inside get

編集:

error_callbackPython 3 では、すべての非同期multiprocessing.Poolメソッドにキーワード引数が導入されているため、このコードがより単純になることに注意してください。これにより、Tornado との統合がはるかに簡単になります。

class TornadoPool(Pool):
    def apply_async(self, func, args=(), kwds={}, callback=None):
        ''' Asynchronous equivalent of `apply()` builtin

        This version will call `callback` even if an exception is
        raised by `func`.

        '''
        super().apply_async(func, args, kwds, callback=callback,
                            error_callback=callback)

@gen.coroutine
def async_run(func, *args, **kwargs):
    """ Runs the given function in a subprocess.

    This wraps the given function in a gen.Task and runs it
    in a multiprocessing.Pool. It is meant to be used as a
    Tornado co-routine. Note that if func returns an Exception
    (or an Exception sub-class), this function will raise the
    Exception, rather than return it.

    """
    result = yield gen.Task(pool.apply_async, func, args, kwargs)
    raise Return(result)

オーバーライドで行う必要があるのは、 kwargに加えて、キーワード引数apply_asyncを使用して親を呼び出すことだけです。オーバーライドする必要はありません。error_callbackcallbackApplyResult

で MetaClass を使用してTornadoPool、その*_asyncメソッドをコルーチンであるかのように直接呼び出すことができるようにすることで、さらに洗練されたものにすることができます。

import time
from functools import wraps
from multiprocessing.pool import Pool

import tornado.web
from tornado import gen
from tornado.gen import Return
from tornado import stack_context
from tornado.ioloop import IOLoop
from tornado.concurrent import Future

def _argument_adapter(callback):
    def wrapper(*args, **kwargs):
        if kwargs or len(args) > 1:
            callback(Arguments(args, kwargs))
        elif args:
            callback(args[0])
        else:
            callback(None)
    return wrapper

def PoolTask(func, *args, **kwargs):
    """ Task function for use with multiprocessing.Pool methods.

    This is very similar to tornado.gen.Task, except it sets the
    error_callback kwarg in addition to the callback kwarg. This
    way exceptions raised in pool worker methods get raised in the
    parent when the Task is yielded from.

    """
    future = Future()
    def handle_exception(typ, value, tb):
        if future.done():
            return False
        future.set_exc_info((typ, value, tb))
        return True
    def set_result(result):
        if future.done():
            return
        if isinstance(result, Exception):
            future.set_exception(result)
        else:
            future.set_result(result)
    with stack_context.ExceptionStackContext(handle_exception):
        cb = _argument_adapter(set_result)
        func(*args, callback=cb, error_callback=cb)
    return future

def coro_runner(func):
    """ Wraps the given func in a PoolTask and returns it. """
    @wraps(func)
    def wrapper(*args, **kwargs):
        return PoolTask(func, *args, **kwargs)
    return wrapper

class MetaPool(type):
    """ Wrap all *_async methods in Pool with coro_runner. """
    def __new__(cls, clsname, bases, dct):
        pdct = bases[0].__dict__
        for attr in pdct:
            if attr.endswith("async") and not attr.startswith('_'):
                setattr(bases[0], attr, coro_runner(pdct[attr]))
        return super().__new__(cls, clsname, bases, dct)

class TornadoPool(Pool, metaclass=MetaPool):
    pass

# Test worker functions
def test2(x):
    print("hi2")
    raise Exception("eeee")

def test(x):
    print(x)
    time.sleep(2)
    return "done"

class TestHandler(tornado.web.RequestHandler):
    @gen.coroutine
    def get(self):
        try:
            result = yield pool.apply_async(test, ("inside get",))
            self.write("%s\n" % result)
            result = yield pool.apply_async(test2, ("hi2",))
            self.write("%s\n" % result)
        except Exception as e:
            print("caught exception in get")
            self.write("Caught an exception: %s" % e)
            raise
        finally:
            self.finish()

app = tornado.web.Application([
    (r"/test", TestHandler),
])

if __name__ == "__main__":
    pool = TornadoPool()
    app.listen(8888)
    IOLoop.instance().start()
于 2014-05-01T16:30:01.150 に答える
1

get リクエストにそれほど時間がかかる場合、tornado は不適切なフレームワークです。

nginx を使用して、tornado への高速な取得と低速の取得を別のサーバーにルーティングすることをお勧めします。

PeterBeには、複数の Tornado サーバーを実行し、実行時間の長い要求を処理するためにそのうちの 1 つを「遅いサーバー」に設定するという興味深い記事があります。

于 2013-03-13T16:04:11.867 に答える