2

2 つのスレッド (プロデューサーとコンシューマー) があり、データを と共有していますQueue。問題は、プロデューサーを強制的に中止すると、コンシューマーがロックされることがあることです。

キューを使用してスレッドをキャンセルするとキューが破損し、デッドロックが発生する可能性があることをドキュメントで読みました。私は明示的にロックを取得していませんが、Queue.py のソースを読むと、そうであることがわかりputますget

getスレッドを中止すると、 /の途中にある可能性があることを知っている人はputいますか?つまり、ロックを使用してから解放しませんか? 私はそれについて何ができますか?プロデューサーを途中で終了する必要がある場合があります。スレッドの代わりにプロセスを使用すると、違いはありますか?

4

2 に答える 2

0

ほとんどの場合、デッドロックはスレッドが終了していないことが原因です。Linux を使用している場合は、pyrasiteのインジェクターを使用してバックトレースを出力できます (プログラムがハングした場所がわかります)。

シグナル ハンドラでロックを使用している場合は、おそらくこれがデッドロックです (これは少し複雑です。説明が必要かどうか尋ねてください)。

スレッドの代わりにプロセスを作成すると状況は確実に変わりますが、データ交換と同期は非常に複雑であることを忘れないでください。

于 2012-05-25T21:59:20.990 に答える
0

たぶんこれが役立ちます:

import threading

class MyQueue:
    def __init__(self):
        self.tasks = []
        self.tlock = threading.Semaphore(0)
        self.dlock = threading.Lock()
        self.aborted = False

    def put(self, arg):
        try:
            self.dlock.acquire()
            self.tasks.append(arg)
        finally:
            self.dlock.release()
            self.tlock.release()

    def get(self):
        if self.aborted:
            return None
        self.tlock.acquire()
        if self.aborted:
            self.tlock.release()
            return None
        try:
            self.dlock.acquire()
            if self.tasks:
                return self.tasks.pop()
            else: # executed abort
                return None
        finally:
            self.dlock.release()

    def abort(self):
        self.aborted = True
        self.tlock.release()

# TESTING

mq = MyQueue()
import sys

def tlog(line):
    sys.stdout.write("[ %s ] %s\n" % (threading.currentThread().name, line))
    sys.stdout.flush()

def reader():
    arg = 1
    while arg is not None:
        tlog("start reading")
        arg = mq.get()
        tlog("read: %s" % arg)
    tlog("END")

import time, random
def writer():
    try:
        pos = 1
        while not mq.aborted:
            x = random.random() * 5
            tlog("writer sleep (%s)" % x)
            pending = x
            while pending > 0:
                tosleep = min(0.5, pending)
                if mq.aborted:
                    return
                time.sleep(tosleep)
                pending -= tosleep

            tlog("write: %s" % x)
            mq.put("POS %s  val=%s" % (pos, x))
            pos += 1
    finally:
        tlog("writer END")

def testStart():
    try:
        for i in xrange(9):
            th = threading.Thread(None, reader, "reader %s" % i, (), {}, None)
            th.start()
        for i in xrange(3):
            th = threading.Thread(None, writer, "writer %s" % i, (), {}, None)
            th.start()
        time.sleep(30) # seconds for testing
    finally:
        print "main thread: abort()"
        mq.abort()

if __name__ == "__main__":
    testStart()
于 2012-05-25T22:40:52.140 に答える