1

1 つのプロセスはソケットからデータを受信して​​キューに入れ、もう 1 つのプロセスはキューに入れられたデータを処理しています。両方を同時に実行するにはどうすればよいですか?

このソケットは serve_forever ですが、キューが空でない場合にのみデータの処理が実行されます。

4

2 に答える 2

1

私はそれを動かしています。これが答えかどうかはわかりません。しかし、これまでのところ、うまく機能しています。バグがあるか、バグがあるかもしれません(うまくいけばありません)。改善のための提案は大歓迎です。

import multiprocessing
import socket
from multiprocessing import Process, Queue
import time

def handle(connection, address):
  try:
    while True:
      data = connection.recv(1024)
      if data == "":
        break
      else :
        print "RECEIVE DATA : " + str(data)
        xdata = data.strip()
        xdata = data.split(" ")
        for xd in xdata :
          print "PUT Task : " + str(xd)
          QueueTask.put((xd), block=True, timeout=5)
      connection.sendall(data)
  except:
    print "Problem handling request"
  finally:
    connection.close()

class Server(object):
  def __init__(self, hostname, port):
    self.hostname = hostname
    self.port = port

  def start(self):
    self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    self.socket.bind((self.hostname, self.port))
    self.socket.listen(1)
    while True:
      conn, address = self.socket.accept()
      process = multiprocessing.Process(target=handle, args=(conn, address))
      process.daemon = True
      process.start()

def f_Processor():
  time.sleep(10)
  print 'PROCESSOR Starting'
  while 1:
    try :
      job = QueueTask.get(True,1)
      print "GET Task : " + str(job)
      time.sleep(5)
    except Exception as err :
      pass
  print 'PROCESSOR Exiting'

if __name__ == "__main__":
  server = Server("localhost", 9999)
  QueueTask = Queue()
  try:
    p = multiprocessing.Process(name='Processing', target=f_Processor)
    p.start()
    server.start()
    p.join()
  except:
    print "Unexpected exception"
  finally:
    for process in multiprocessing.active_children():
      process.terminate()
      process.join()
  print "All done"
于 2013-03-19T08:53:29.910 に答える
0

また、サーバー アプリケーションまたはクライアント アプリケーションがある場合にも依存します。それがあなたが使用できるよりもサーバーである場合

    SocketServer.TCPServer.allow_reuse_address = True       

    self.server = TCPFactory( ( HOST, PORT ), TCPRequestHandler, params )                      

    # Start a thread with the server        
    self.server_thread = threading.Thread( target = self.server.serve_forever )


    self.server_thread.setDaemon( True )       
    self.server_thread.start()

class TCPFactory( SocketServer.ThreadingTCPServer ):    


    def __init__( self, server_address, RequestHandlerClass, params ):
        """
        """

        SocketServer.ThreadingTCPServer.__init__( self, server_address, RequestHandlerClass )
        self.patrams = params


class TCPRequestHandler( SocketServer.BaseRequestHandler ):

    def setup( self ):
       print self.server.params
       pass
    def handle( self ):
       pass

したがって、クライアントがサーバーに接続すると、新しいスレッドが開始されます。setupand 関数はhandler自動的に呼び出されます。他のスレッドについては、タイマーまたは他のスレッドを使用できます

myt = Timer( 2, chackque, () )
myt.start()

def chackque():
   if not myq.empty():
   #Do what you want

または、別のスレッドを開始します。

mythread = threading.Thread( target = chackque, args = ( myargs, ) )        
mythread.setDaemon( True )
mythread.start()

def chackque():
   while True:
       if not myq.empty():
       #Do what you want
于 2013-03-14T07:35:54.680 に答える