4

着信接続のすべての処理を行うためにプリロードされた別のスレッドを持つ python3.3 でサーバーを実装しようとしています。

from multiprocessing import Process, Pipe, Queue
from multiprocessing.reduction import reduce_socket
import time
import socketserver,socket


def process(q):
    while 1:
        fn,args = q.get()
        conn = fn(*args)

        while conn.recv(1, socket.MSG_PEEK):
            buf = conn.recv(100)
            if not buf: break
            conn.send(b"Got it: ")
            conn.send(buf)

        conn.close()

class MyHandler(socketserver.BaseRequestHandler):
    def handle(self):
        print("Opening connection")
        print("Processing")
        self.server.q.put(reduce_socket(self.request))
        while self.request.recv(1, socket.MSG_PEEK):
            time.sleep(1)
        print("Closing connection")

class MyServer(socketserver.ForkingTCPServer):
    p = Process
    q = Queue()
    parent_conn,child_conn = Pipe()
    def __init__(self,server_address,handler):
        socketserver.ForkingTCPServer.__init__(self,server_address, handler)
        self.p = Process(target=process,args=(self.q,))
        self.p.start()
    def __del__(self):
        self.p.join()


server_address = ('',9999)
myserver = MyServer(server_address,MyHandler)
myserver.serve_forever()

次のスクリプトを使用して動作することをテストできます。

from multiprocessing.reduction import reduce_socket
import time
import socket
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect(('localhost', 9999))

time.sleep(1)
print("reduce_socket(s)")
fn,args = reduce_socket(s)
time.sleep(1)
print("rebuild_socket(s)")
conn = fn(*args)
time.sleep(1)
print("using_socket(s)")

conn.send("poks")
print conn.recv(255)
conn.send("poks")
print conn.recv(255)
conn.send("")
print conn.recv(255)
conn.close()

残念ながら、テストを n 回実行した後、私の tmp-folder はサブフォルダーで満たされているため、何か問題があるようです。

$ ls /tmp/pymp*|wc -l
32000

これらの一時ファイルは、によって作成されsocket_reduce()ます。興味深いことに、rebuild/reduce_socket()クライアントの も一時ファイルを作成しますが、関数が終了すると削除されます。現在の tmp-filesystem 内のフォルダーの最大数は 32000 であり、問​​題が発生します。/tmp/pymp*-files を手動またはサーバーのどこかで削除することもできますが、これを行う正しい方法もあるはずです。誰でもこれで私を助けることができますか?

4

1 に答える 1

0

わかりました、ちょっと修正しました。から 。./lib/python3.3/multiprocessing/util.py:

$ grep "def get_temp_dir" -B5 /usr/local/lib/python3.3/multiprocessing/util.py

#
# Function returning a temp directory which will be removed on exit
#

def get_temp_dir():

プロセスが終了するまで、一時ディレクトリが利用可能であるようです。myprocess()main()両方とも永久に実行されるため、一時ファイルは削除されません。それを修正するには、reduced_socket を に渡す別のプロセスを作成しますprocess()

def process(q):
    while 1:
        fn,args = q.get()
        conn = fn(*args)

        while conn.recv(1, socket.MSG_PEEK):
            buf = conn.recv(100)
            if not buf: break
            conn.send(b"Got it: ")
            conn.send(buf)

        conn.close()
        q.put("ok")



class MyHandler(socketserver.BaseRequestHandler):
    def socket_to_process(self,q):
        q.put(reduce_socket(self.request))
        q.get()
    def handle(self):
        p = Process(target=self.socket_to_process,args=(self.server.q,))
        p.start()
        p.join()

このようにして、一時ファイルはサブプロセスで作成され、サブプロセスprocess()は入力で処理を完了すると終了します。これはエレガントな方法だとは思いませんが、うまくいきます。誰かがよく知っている場合は、stackoverflow に知らせてください。

于 2013-03-27T09:47:58.233 に答える