0

1 つのプロセス (プロセス 'A') がリクエストを受け取り、それを (concurrent.futures から) ProcessPool に入れる Python アプリケーションを作成しようとしています。この要求を処理する際に、メッセージを 2 番目のプロセス (プロセス 'B') に渡す必要がある場合があります。トルネードの iostream モジュールを使用して、接続をラップして応答を取得しています。

プロセス A は、ProcessPool 実行内からプロセス B に正常に接続できません。どこが間違っていますか?

プロセス A に最初の要求を行うクライアント:

#!/usr/bin/env python

import socket
import tornado.iostream
import tornado.ioloop

def print_message ( data ):
    print 'client received', data

s = socket.socket(socket.AF_INET,socket.SOCK_STREAM, 0)
stream = tornado.iostream.IOStream(s)
stream.connect(('localhost',2001))
stream.read_until('\0',print_message)
stream.write('test message\0')
tornado.ioloop.IOLoop().instance().start()

最初のリクエストを受け取ったプロセス A:

#!/usr/bin/env python

import tornado.ioloop
import tornado.tcpserver
import tornado.iostream
import socket
import concurrent.futures
import functools

def handle_request ( data ):
    s = socket.socket(socket.AF_INET,socket.SOCK_STREAM,0)
    out_stream = tornado.iostream.IOStream(s)
    out_stream.connect(('localhost',2002))
    future = out_stream.read_until('\0')
    out_stream.write(data+'\0')
    return future.result()

class server_a (tornado.tcpserver.TCPServer):

   def return_response ( self, in_stream, future ):
       in_stream.write(future.result()+'\0')

   def handle_read ( self, in_stream, data ):
       future = self.executor.submit(handle_request,data)
       future.add_done_callback(functools.partial(self.return_response,in_stream))

   def handle_stream ( self, in_stream, address ):
       in_stream.read_until('\0',functools.partial(self.handle_read,in_stream))

   def __init__ ( self ):
       self.executor = concurrent.futures.ProcessPoolExecutor()
       tornado.tcpserver.TCPServer.__init__(self)

server = server_a()
server.bind(2001)
server.start(0)
tornado.ioloop.IOLoop().instance().start()

プロセス A から中継されたリクエストを受信するプロセス B:

#!/usr/bin/env python

import tornado.ioloop
import tornado.tcpserver
import functools

class server_b (tornado.tcpserver.TCPServer):

    def handle_read ( self, in_stream, data ):
        in_stream.write('server B read'+data+'\0')

    def handle_stream ( self, in_stream, address ):
       in_stream.read_until('\0',functools.partial(self.handle_read,in_stream))

server = server_b()
server.bind(2002)
server.start(0)
tornado.ioloop.IOLoop().instance().start()

最後に、「read_until」メソッドで発生したプロセス A から返されたエラー:

ERROR:concurrent.futures:exception calling callback for <Future at 0x10654b890 state=finished raised OSError>
Traceback (most recent call last):
  File "/usr/local/lib/python2.7/site-packages/concurrent/futures/_base.py", line 299, in _invoke_callbacks
    callback(self)
  File "./a.py", line 26, in return_response
    in_stream.write(future.result()+'\0')
  File "/usr/local/lib/python2.7/site-packages/concurrent/futures/_base.py", line 397, in result
    return self.__get_result()
  File "/usr/local/lib/python2.7/site-packages/concurrent/futures/_base.py", line 356, in __get_result
    raise self._exception
OSError: [Errno 9] Bad file descriptor
4

2 に答える 2

0

この「Bad file descriptor」エラーが発生する理由は 100% わかりませんが (concurrent.futures は残念ながら 2.7 にバックポートされたときにバックトレース情報を失いました)、ProcessPoolExecutor のワーカー プロセスで IOLoop が実行されていないため、勝利しました。このコンテキストでは IOStream のような Tornado コンストラクトを使用することはできません (タスクごとに新しい IOLoop をスピンアップしない限り、ただし、他の非同期ライブラリとの互換性が必要でない限り、あまり意味がないかもしれません)。

このように tornado のマルチプロセス モードと ProcessPoolExecutor を混在させてもうまくいくかどうかもわかりません。start(0) 呼び出しの後まで、 ProcessPoolExecutor の初期化を移動する必要があると思います。

于 2014-05-11T22:44:41.013 に答える