2

UDP メッセージを収集して 1 秒ごとに処理するアプリケーションを作成しています。

アプリケーションのプロトタイプは次のようになります。

from twisted.internet.protocol import DatagramProtocol
from twisted.internet import reactor
import threading
import time

class UdpListener(DatagramProtocol):

    messages = []

    def datagramReceived(self, data, (host, port)):
        self.messages.append(data)

class Messenger(threading.Thread):

    listener = None

    def __init__(self):
        threading.Thread.__init__(self)

    def run(self):
        while True:
            time.sleep(1)
            recivedMessages = self.listener.messages
            length = len(recivedMessages)
            messagesToProccess = recivedMessages[0:length]
            #doSomethingWithMessages(messagesToProccess)
            del self.listener.messages[0:length]
            print(length)

listener = UdpListener()

messenger = Messenger()
messenger.listener = listener
messenger.start()

reactor.listenUDP(5556, listener)
reactor.run()

着信メッセージがリストを変更してアプリケーションがクラッシュするリスクなしに、リスト (del self.listener.messages[0:length]) から開始値を簡単に削除できるかどうかはわかりません。

更新- ロック付きバージョン

class Messenger(threading.Thread):

listener = None
lock = threading.Lock()

def __init__(self):
    threading.Thread.__init__(self)

def run(self):
    while True:
        time.sleep(1)
        recivedMessages = self.listener.messages
        self.lock.acquire()
        try:
            length = len(recivedMessages)
            messagesToProccess = recivedMessages[0:length]
            del self.listener.messages[0:length]
        except Exception as e:
            raise e
        finally:
            self.lock.release()

        #doSomethingWithMessages(messagesToProccess)
        print(length)
4

2 に答える 2

6

あなたのコードはスレッドセーフではありません。あなたは周りにロックをかける必要があるでしょうmessages

ただし、ここにスレッドは必要ありません。なぜこれをしないのですか?

from twisted.internet.protocol import DatagramProtocol
from twisted.internet import reactor

class UdpListener(DatagramProtocol):
    callingLater = False

    messages = []

    def process(self):
        doSomethingWithMessages(self.messages)
        self.messages = []
        self.callingLater = False

    def datagramReceived(self, data, (host, port)):
        self.messages.append(data)
        if not self.callingLater:
            reactor.callLater(1.0, self.process)
            self.callingLater = True

listener = UdpListener()

reactor.listenUDP(5556, listener)
reactor.run()

更新:これは、教育目的でのみ、元のバージョンがロックでどのように機能するかを示しています。これはそれほど効率的ではなく、バグが発生しやすいことに注意してください。編集:すべてのメッセージロジックを分離して、UdpListenerそれを使用するクラスがそのねばねばした内部の詳細を知る必要がないようにしました。

from twisted.internet.protocol import DatagramProtocol
from twisted.internet import reactor
import threading
import time

class UdpListener(DatagramProtocol):
    message_lock = threading.Lock()
    messages = []

    def datagramReceived(self, data, (host, port)):
        with self.message_lock:
            self.messages.append(data)

    def getAndClearMessages(self):
        with self.message_lock:
            res = self.messages
            self.messages = []
        return res

class Messenger(threading.Thread):

    listener = None

    def __init__(self):
        threading.Thread.__init__(self)

    def run(self):
        while True:
            time.sleep(1)
            recivedMessages = self.listener.getAndClearMessages()
            length = len(recivedMessages)
            #doSomethingWithMessages(recivedMessages)
            print(length)

listener = UdpListener()

messenger = Messenger()
messenger.listener = listener
messenger.start()

reactor.listenUDP(5556, listener)
reactor.run()
于 2012-07-06T21:29:40.773 に答える
0

これを DeferredQueue で実装しないのはなぜですか。これはまさにこの目的のためのものです。スレッドを使用する場合は、特別な注意が必要です。

スレッド化を可能にする DeferredQueue の例を次に示します。

class UdpListener(DatagramProtocol):

    def __init__(self)
        self._messages = DeferredQueue()

    def datagramReceived(self, data, (host, port)):
        self._messages.put(message)

    @inlineCallbacks
    def _wait_for_and_process_next_message(self):

        # Get message from queue through a deferred call from the DeferredQueue
        # Here we use @inlineCallbacks, so we assign the result from yield 
        # which is the new message, and will "block" (actually releasing control to Twisted) until a message gets in
        message = yield self._message_queue.get()

        # Do something with your message here, and ensure you catch any exceptions!
        # If your message processing may be long, you may wish to run it in another thread,
        # and because of @inlineCallbacks, this call will "block" until your function finishes.
        # In case you did this, ensure you read the notes below.
        yield threads.deferToThread(my_long_function, message)

        # Schedule an immediate call to this method again in order to process next message
        self.wait_for_and_process_next_message()

    def wait_for_and_process_next_message(self):
        reactor.callLater(0, self._wait_for_and_process_next_message)

    def initialize(self):
        # Call this during your application bootstrapping, so you start processing messages
        self.wait_for_and_process_next_message()

メッセージ処理を ( を使用して) Twisted スレッド プールに延期することを選択した場合threads.deferToThread、コードは別のスレッドで実行されることに注意することが非常に重要です。別のスレッドからのメッセージに応答する可能性が高く、Twisted ではプロトコルはスレッドセーフ オブジェクトではありません ( http://twistedmatrix.com/documents/13.2.0/core/howto/threading.html#auto0 )。

この場合、次の例のようにreactor.callFromThread()、重要なリソースを保護するために使用します。transport

def _send_message_critical_section(self, message):
    self.transport.write(message, (self.host, self.port))

def send_message(self, message):
    reactor.callFromThread(self._send_message_critical_section, message)

その他の変更:

  • 完全に非公開と見なす必要があるため、messages変数の名前を に変更しました。_messages
  • _messages初期化をメソッド内に移動し、__init__()に割り当てましたself._messages。そうしないと、メッセージ リストがすべてのインスタンス間で共有されます。クラスのインスタンスは 1 つしかなかったと思いますが... (クラス __init__() 関数内外の変数)
于 2015-09-04T17:10:44.947 に答える