3

私はthreading.Thread、通常の動作とは対照的に、メソッドが呼び出されたオブジェクトによって表されるスレッドでメソッドを呼び出して実行できるようにするサブクラスに取り組んでいます。これを行うには、メソッドへの呼び出しを a に配置するターゲット メソッドでデコレータをcollections.deque使用し、メソッドを使用しrunて両端キューを処理します。

このrunメソッドは、while not self.__stop:ステートメントとthreading.Conditionオブジェクトを使用して、呼び出しが両端キューに配置されるのを待ってから、 を呼び出しますself.__process_calls。ループのelse部分は、への最後の呼び出しを行います。の場合、別のスレッドから「呼び出し可能な」メソッドのいずれかを呼び出そうとすると、例外が発生します。while__process_callsself.__stop

問題は、最後のステートメントがデバッグ中に発見し__process_callsたものでない限り、返されないことです。print私はa = 1明示的に試しましreturnたが、どちらも機能しません。printただし、関数の最後のステートメントとして任意のステートメントを使用すると、戻り、スレッドはハングしません。何が起こっているのですか?

編集:David Zaslavskyによって、印刷には時間がかかるため機能することが指摘され、私はそれを確認しました

コードは少し長いですが、上記の説明が理解に役立つほど明確であることを願っています。

import threading
import collections    

class BrokenPromise(Exception): pass    
class CallableThreadError(Exception): pass    
class CallToNonRunningThreadError(CallableThreadError): pass   


class Promise(object):
    def __init__(self, deque, condition):
        self._condition = condition
        self._deque = deque

    def read(self, timeout=None):
        if not self._deque:
            with self._condition:
                if timeout:
                    self._condition.wait(timeout)
               else:
                    self._condition.wait()
        if self._deque:
            value = self._deque.popleft()
            del self._deque
            del self._condition
            return value
        else:
           raise BrokenPromise

    def ready(self):
        return bool(self._deque) 

class CallableThread(threading.Thread):
    def __init__(self, *args, **kwargs): 
        # _enqueued_calls is used to store tuples that encode a function call.
        # It is processed by the run method 
        self.__enqueued_calls = collections.deque() 
        # _enqueue_call_permission is for callers to signal that they have
        # placed something on the queue 
        self.__enqueue_call_permission = threading.Condition()
        self.__stop = False
        super(CallableThread, self).__init__(*args, **kwargs) 

    @staticmethod
    def blocking_method(f): 
        u"""A decorator function to implement a blocking method on a thread""" 
        # the returned function enqueues the decorated function and blocks
        # until the decorated function# is called and returns. It then returns
        # the value unmodified. The code in register runs in the calling thread
        # and the decorated method runs in thread that it is called on 
        f = CallableThread.nonblocking_method_with_promise(f)
        def register(self, *args, **kwargs):
            p = f(self, *args, **kwargs)
            return p.read()
        return register

    @staticmethod 
    def nonblocking_method_with_promise(f):
        u"""A decorator function to implement a non-blocking method on a
        thread
        """ 
        # the returned function enqueues the decorated function and returns a
        # Promise object.N The code in register runs in the calling thread 
        # and the decorated method runs in thread that it is called on. 
        def register(self, *args, **kwargs): 
            call_complete = threading.Condition() 
            response_deque = collections.deque()
            self.__push_call(f, args, kwargs, response_deque, call_complete)
            return Promise(response_deque, call_complete)
        return register

    @staticmethod
    def nonblocking_method(f):
        def register(self, *args, **kwargs):
            self.__push_call(f, args, kwargs)
        return register

    def run(self):        
        while not self.__stop:  # while we've not been killed 
            with self.__enqueue_call_permission:
                # get the condition so that we can wait on it if we need too. 
                if not self.__enqueued_calls: 
                    self.__enqueue_call_permission.wait() 
            self.__process_calls()
        else:
            # if we exit because self._run == False, finish processing
            # the pending calls if there are any
            self.__process_calls()

    def stop(self): 
        u""" Signal the thread to stop"""
        with self.__enqueue_call_permission:
           # we do this in case the run method is stuck waiting on an update
           self.__stop = True
           self.__enqueue_call_permission.notify()

    def __process_calls(self):
        print "processing calls"
        while self.__enqueued_calls:
            ((f,  args, kwargs),
            response_deque, call_complete) = self.__enqueued_calls.popleft()
            if call_complete:
                with call_complete:
                    response_deque.append(f(self, *args, **kwargs)) 
                    call_complete.notify()
            else:
                f(self, *args, **kwargs)
        # this is where you place the print statement if you want to see the
        # behavior        

    def __push_call(self, f, args, kwargs, response_deque=None,
                    call_complete=None):
        if self.__stop:
            raise CallToNonRunningThreadError(
                  "This thread is no longer accepting calls")
        with self.__enqueue_call_permission:
            self.__enqueued_calls.append(((f, args, kwargs),
                                           response_deque, call_complete))
            self.__enqueue_call_permission.notify()


#if __name__=='__main__':      i lost the indent on the following code in copying but
#it doesn't matter in this context
class TestThread(CallableThread): 
    u"""Increment a counter on each call and print the value""" 
    counter = 0

    @CallableThread.nonblocking_method_with_promise
    def increment(self): 
        self.counter += 1
        return self.counter

class LogThread(CallableThread):

    @CallableThread.nonblocking_method
    def log(self, message):
        print message

l = LogThread()
l.start()
l.log("logger started")
t = TestThread() 
t.start()
l.log("test thread started")
p = t.increment()
l.log("promise aquired")
v = p.read()
l.log("promise read")
l.log("{0} read from promise".format(v))
l.stop()
t.stop()
l.join()
t.join()
4

1 に答える 1

1
  1. __process_calls__enqueued_callsロックを所有せずに変更しています。これにより、競合状態が発生している可能性があります。

  2. 編集: deque は「スレッドセーフ」(つまり、スレッド アクセスによって破損しない) かもしれませんが、その状態のチェックは引き続きロックする必要があります。

  3. 停止条件も安全ではありません。

インラインコメント:

def run(self):        
    while not self.__stop:  # while we've not been killed 
        with self.__enqueue_call_permission:
            # get the condition so that we can wait on it if we need too. 
            ### should be checking __stop here, it could have been modified before
            ### you took the lock.
            if not self.__enqueued_calls: 
                self.__enqueue_call_permission.wait() 
        self.__process_calls()
    else:
        # if we exit because self._run == False, finish processing
        # the pending calls if there are any
        self.__process_calls()
于 2010-07-30T03:30:02.500 に答える