ブロックすることが予想される (Tornado の非同期 HTTP 要求クライアントなどを使用するように簡単に変更できない) 私の Tornado サーバーでの操作については、multiprocessing
モジュールを使用して作業を分離したワーカー プロセスにオフロードしています。具体的には、マルチプロセッシングを使用していました。これは、引数の 1 つとしてコールバックを受け取るため、Tornado と非常にうまく連携Pool
する というメソッドを提供するためです。apply_async
私は最近、プールがプロセスの数を事前に割り当てていることに気付きました。そのため、それらがすべてブロックされると、新しいプロセスを必要とする操作は待機する必要があります。サーバーは、タスクキューに何かを追加することで機能するため、サーバーがまだ接続できることを認識していますがapply_async
、それ自体はすぐに終了しますが、実行する必要があるn個のブロックタスクに対してn個のプロセスを生成しようとしています。
Tornado サーバーの IOLoop のメソッドを使用add_handler
して、その IOLoop に作成する新しい PID ごとにハンドラーを追加できると考えました。以前に似たようなことをしたことがありますが、popen と任意のコマンドを使用していました。このメソッドの使用例はこちらです。ただし、スコープ内の任意のターゲット Python 関数に引数を渡したかったので、multiprocessing
.
multiprocessing.Process
しかし、私のオブジェクトが持っているPID が気に入らないようです。私は得るIOError: [Errno 9] Bad file descriptor
。これらのプロセスは何らかの形で制限されていますか? 実際にプロセスを開始するまで PID を使用できないことはわかっていますが、プロセスを開始します。この問題を示す、私が作成した例のソース コードを次に示します。
#!/usr/bin/env python
"""Creates a small Tornado program to demonstrate asynchronous programming.
Specifically, this demonstrates using the multiprocessing module."""
import tornado.httpserver
import tornado.ioloop
import tornado.web
import multiprocessing as mp
import random
import time
__author__ = 'Brian McFadden'
__email__ = 'brimcfadden@gmail.com'
def sleepy(queue):
"""Pushes a string to the queue after sleeping for 5 seconds.
This sleeping can be thought of as a blocking operation."""
time.sleep(5)
queue.put("Now I'm awake.")
return
def random_num():
"""Returns a string containing a random number.
This function can be used by handlers to receive text for writing which
facilitates noticing change on the webpage when it is refreshed."""
n = random.random()
return "<br />Here is a random number to show change: {0}".format(n)
class SyncHandler(tornado.web.RequestHandler):
"""Demonstrates handing a request synchronously.
It executes sleepy() before writing some more text and a random number to
the webpage. While the process is sleeping, the Tornado server cannot
handle any requests at all."""
def get(self):
q = mp.Queue()
sleepy(q)
val = q.get()
self.write(val)
self.write('<br />Brought to you by SyncHandler.')
self.write('<br />Try refreshing me and then the main page.')
self.write(random_num())
class AsyncHandler(tornado.web.RequestHandler):
"""Demonstrates handing a request asynchronously.
It executes sleepy() before writing some more text and a random number to
the webpage. It passes the sleeping function off to another process using
the multiprocessing module in order to handle more requests concurrently to
the sleeping, which is like a blocking operation."""
@tornado.web.asynchronous
def get(self):
"""Handles the original GET request (normal function delegation).
Instead of directly invoking sleepy(), it passes a reference to the
function to the multiprocessing pool."""
# Create an interprocess data structure, a queue.
q = mp.Queue()
# Create a process for the sleepy function. Provide the queue.
p = mp.Process(target=sleepy, args=(q,))
# Start it, but don't use p.join(); that would block us.
p.start()
# Add our callback function to the IOLoop. The async_callback wrapper
# makes sure that Tornado sends an HTTP 500 error to the client if an
# uncaught exception occurs in the callback.
iol = tornado.ioloop.IOLoop.instance()
print "p.pid:", p.pid
iol.add_handler(p.pid, self.async_callback(self._finish, q), iol.READ)
def _finish(self, q):
"""This is the callback for post-sleepy() request handling.
Operation of this function occurs in the original process."""
val = q.get()
self.write(val)
self.write('<br />Brought to you by AsyncHandler.')
self.write('<br />Try refreshing me and then the main page.')
self.write(random_num())
# Asynchronous handling must be manually finished.
self.finish()
class MainHandler(tornado.web.RequestHandler):
"""Returns a string and a random number.
Try to access this page in one window immediately after (<5 seconds of)
accessing /async or /sync in another window to see the difference between
them. Asynchronously performing the sleepy() function won't make the client
wait for data from this handler, but synchronously doing so will!"""
def get(self):
self.write('This is just responding to a simple request.')
self.write('<br />Try refreshing me after one of the other pages.')
self.write(random_num())
if __name__ == '__main__':
# Create an application using the above handlers.
application = tornado.web.Application([
(r"/", MainHandler),
(r"/sync", SyncHandler),
(r"/async", AsyncHandler),
])
# Create a single-process Tornado server from the application.
http_server = tornado.httpserver.HTTPServer(application)
http_server.listen(8888)
print 'The HTTP server is listening on port 8888.'
tornado.ioloop.IOLoop.instance().start()
トレースバックは次のとおりです。
Traceback (most recent call last):
File "/usr/local/lib/python2.6/dist-packages/tornado/web.py", line 810, in _stack_context
yield
File "/usr/local/lib/python2.6/dist-packages/tornado/stack_context.py", line 77, in StackContext
yield
File "/usr/local/lib/python2.6/dist-packages/tornado/web.py", line 827, in _execute
getattr(self, self.request.method.lower())(*args, **kwargs)
File "/usr/local/lib/python2.6/dist-packages/tornado/web.py", line 909, in wrapper
return method(self, *args, **kwargs)
File "./process_async.py", line 73, in get
iol.add_handler(p.pid, self.async_callback(self._finish, q), iol.READ)
File "/usr/local/lib/python2.6/dist-packages/tornado/ioloop.py", line 151, in add_handler
self._impl.register(fd, events | self.ERROR)
IOError: [Errno 9] Bad file descriptor
上記のコードは、プロセス プールを使用した古い例を実際に変更したものです。私は同僚と私自身の参照用にそれを保存してきました (そのため大量のコメントがあります)。2 つの小さなブラウザー ウィンドウを並べて開いて、/sync URI は接続をブロックし、/async はより多くの接続を許可することを上司に示すことができるように、私はそれを作成しました。この質問の目的のために、それを再現するために必要なことは、/async ハンドラーにアクセスすることだけです。すぐにエラーになります。
これについてどうすればよいですか?PIDはどのように「悪い」ことができますか? プログラムを実行すると、標準出力に出力されることがわかります。
記録として、Ubuntu 10.04 で Python 2.6.5 を使用しています。トルネードは1.1です。