4

CherryPy サーバーはスレッドを使用してリクエストを処理します。私のスレッド化されたサーバーの特定のメソッドは非常に複雑で CPU 負荷が高いため、メソッド要求スレッド内からマルチプロセッシングを使用して実行を並列化する必要があります。

交換するだけだと思ってた

class Server(object)
    @cherrypy.expose
    def expensive_method(self):
        ...
        x = map(fnc, args)
        ...

def fnc(args):
    # this method doesn't need CherryPy but is expensive to compute
    ...

cherrypy.quickstart(Server())

(これは正常に動作します)

    def expensive_method(self):
        pool = Pool()
        x = pool.map(fnc, args)
        pool.terminate()

しかし、それはうまくいきません。単純なケースでも、プールをまったく使用しない場合は、

    def expensive_method(self):
        pool = Pool()
        x = map(fnc, args) # <== no pool here! same as the working example
        pool.terminate()

例外が発生します

[08/Jan/2013:20:05:33] ENGINE Caught signal SIGTERM.
2013-01-08 20:05:33,919 : INFO : _cplogging:201 : error(CP Server Thread-3) : [08/Jan/2013:20:05:33] ENGINE Caught signal SIGTERM.
[08/Jan/2013:20:05:33] ENGINE Bus STOPPING
2013-01-08 20:05:33,920 : INFO : _cplogging:201 : error(CP Server Thread-3) : [08/Jan/2013:20:05:33] ENGINE Bus STOPPING
[08/Jan/2013:20:05:38] ENGINE Error in 'stop' listener <bound method Server.stop of <cherrypy._cpserver.Server object at 0x1090c3c90>>
Traceback (most recent call last):
  File "/Volumes/work/workspace/vew/prj/lib/python2.7/site-packages/cherrypy/process/wspbus.py", line 197, in publish
    output.append(listener(*args, **kwargs))
  File "/Volumes/work/workspace/vew/prj/lib/python2.7/site-packages/cherrypy/process/servers.py", line 223, in stop
    wait_for_free_port(*self.bind_addr)
  File "/Volumes/work/workspace/vew/prj/lib/python2.7/site-packages/cherrypy/process/servers.py", line 410, in wait_for_free_port
    raise IOError("Port %r not free on %r" % (port, host))
IOError: Port 8888 not free on '127.0.0.1'

これは、リクエストの最後、またはリクエストの最中に発生すると思いますpool.terminate()

フォークされたワーカー プロセスは、サーバーやポート自体には何もしません。「サーバービット」を無視するようにCherryPyおよび/またはマルチプロセッシングに指示する方法はありますか? そこにポートやソケットは必要ありませんfnc

Python 2.7.1 と CherryPy 3.2.2 を使用して、OSX + Linux で動作するにはこれが必要です。


進行状況 1:

Sylvain の提案に従って、試してみpool = Pool(initializer=cherrypy.server.unsubscribe)ました。例外はもうありません。すべて正常に動作しますが、ログに表示されます

[08/Jan/2013:21:16:35] ENGINE Caught signal SIGTERM.
2013-01-08 21:16:35,908 : INFO : _cplogging:201 : error(CP Server Thread-10) : [08/Jan/2013:21:16:35] ENGINE Caught signal SIGTERM.
[08/Jan/2013:21:16:35] ENGINE Bus STOPPING
2013-01-08 21:16:35,909 : INFO : _cplogging:201 : error(CP Server Thread-10) : [08/Jan/2013:21:16:35] ENGINE Bus STOPPING
[08/Jan/2013:21:16:35] ENGINE Bus STOPPED
2013-01-08 21:16:35,909 : INFO : _cplogging:201 : error(CP Server Thread-10) : [08/Jan/2013:21:16:35] ENGINE Bus STOPPED
[08/Jan/2013:21:16:35] ENGINE Bus EXITING
2013-01-08 21:16:35,909 : INFO : _cplogging:201 : error(CP Server Thread-10) : [08/Jan/2013:21:16:35] ENGINE Bus EXITING
[08/Jan/2013:21:16:35] ENGINE Bus EXITED
2013-01-08 21:16:35,910 : INFO : _cplogging:201 : error(CP Server Thread-10) : [08/Jan/2013:21:16:35] ENGINE Bus EXITED

それでいいですか?これにより問題が発生する可能性はありますか (たとえば、異なるスレッドで複数のリクエストを同時に処理する場合)。


進行状況 2:

実際、上記は時々アイドルプロセスを置き去りにします:(したがって、うまく動作しません。奇妙なことに、これらのアイドルプロセスはによって生成されたPoolため、デーモンである必要がありますが、親を殺した後でも実際には生き続けます。


進行状況 3:

フォーク (=Pool()呼び出し) をリクエスト処理メソッドの外に移動しましたが、必要なすべての状態を初期化した(ワーカー プロセスがこの状態を確認できるように) しました。エラーや例外はもうありません。

結論: マルチプロセッシングとマルチスレッドは一緒には機能しません。

4

1 に答える 1

4

'self'はどのタイプのオブジェクトを指しますか?どの時点で、フォークされたプロセスを初期化して開始しますか?おそらく、もう少しコードが問題の診断に役立つでしょう。

さて、これはうまく機能します:

import multiprocessing
import os
import time

import cherrypy

def run_in_sub_proc(size):
    for i in range(size):
        print os.getpid(), i
        time.sleep(1)

pool = multiprocessing.Pool(2)

class Root(object):
    @cherrypy.expose
    def index(self):
        pool.map_async(run_in_sub_proc, (3, 5))

if __name__ == '__main__':
    cherrypy.engine.subscribe('stop', pool.join)
    cherrypy.quickstart(Root())
于 2013-01-08T19:41:59.777 に答える