6

私は最近、単純なプロデューサー/コンシューマー パターンを使用するプログラムを作成しました。最初は、threading.Lock の不適切な使用に関連するバグがありましたが、最終的に修正しました。しかし、生産者/消費者パターンをロックレスで実装することは可能かどうか考えさせられました。

私の場合の要件は単純でした:

  • 1 つのプロデューサー スレッド。
  • 1 つのコンシューマ スレッド。
  • キューには 1 つのアイテムしか入れられません。
  • プロデューサーは、現在のアイテムが消費される前に次のアイテムを生成できます。したがって、現在のアイテムは失われますが、それで問題ありません。
  • 消費者は、次のアイテムが生成される前に現在のアイテムを消費できます。したがって、現在のアイテムは 2 回 (またはそれ以上) 消費されますが、それは問題ありません。

だから私はこれを書いた:

QUEUE_ITEM = None

# this is executed in one threading.Thread object
def producer():
    global QUEUE_ITEM
    while True:
        i = produce_item()
        QUEUE_ITEM = i

# this is executed in another threading.Thread object
def consumer():
    global QUEUE_ITEM
    while True:
        i = QUEUE_ITEM
        consume_item(i)

私の質問は: このコードはスレッドセーフですか?

即時コメント: このコードは実際にはロックレスではありません - 私は CPython を使用しており、GIL を持っています。

コードを少しテストしたところ、うまくいくようです。これは、GIL のためにアトミックな LOAD および STORE 操作に変換されます。del xしかし、x がメソッドを実装する場合、操作はアトミックではないことも知ってい__del__ます。したがって、アイテムに__del__メソッドがあり、厄介なスケジューリングが発生した場合、問題が発生する可能性があります。か否か?

もう 1 つの質問は、上記のコードを正常に動作させるには、どのような制限 (たとえば、生産されるアイテムの種類) を課す必要があるかということです。

私の質問は、CPython と GIL の癖を利用してロックレス (つまり、threading.Lock のようなロックをコードで明示的に使用しない) ソリューションを考え出す理論的な可能性についてのみです。

4

6 に答える 6

6

トリックはあなたを噛むでしょう。キューを使用してスレッド間で通信するだけです。

于 2009-05-12T21:50:51.433 に答える
2

はい、これはあなたが説明した方法で機能します:

  1. プロデューサーがスキップ可能な要素を生成できること。
  2. 消費者が同じ要素を消費できること。

しかし、x がdelメソッドを実装している場合、del x 操作はアトミックではないことも知っています。したがって、アイテムにdelメソッドがあり、厄介なスケジューリングが発生した場合、問題が発生する可能性があります。

ここに「デル」はありません。consumer_item でdelが発生した場合、del はプロデューサー スレッドで発生する可能性があります。これは「問題」ではないと思います。

ただし、これを使用する必要はありません。無意味なポーリング サイクルで CPU を使い果たすことになり、Python には既にグローバル ロックがあるため、ロック付きのキューを使用するよりも高速ではありません。

于 2009-05-13T02:03:19.270 に答える
1

コンシューマーがそれを消費する前にプロデューサーが上書きし、コンシューマーが2回消費する可能性があるため、これは実際にはスレッドセーフではありません。あなたが言ったように、あなたはそれで大丈夫ですが、ほとんどの人はそうではありません。QUEUE_ITEMQUEUE_ITEM

cpythonの内部に関する知識が豊富な人は、より理論的な質問に答える必要があります。

于 2009-05-12T21:31:55.553 に答える
0

特にアイテムが大きなオブジェクトの場合、生成/消費中にスレッドが中断される可能性があると思います。編集:これは単なる推測です。私は専門家ではありません。

また、スレッドは、他のスレッドが実行を開始する前に、任意の数のアイテムを生成/消費する可能性があります。

于 2009-05-12T21:32:56.987 に答える
0

どちらもアトミックであるため、append / popに固執する限り、リストをキューとして使用できます。

QUEUE = []

# this is executed in one threading.Thread object
def producer():
    global QUEUE
    while True:
        i = produce_item()
        QUEUE.append(i)

# this is executed in another threading.Thread object
def consumer():
    global QUEUE
    while True:
        try:
            i = QUEUE.pop(0)
        except IndexError:
            # queue is empty
            continue

        consume_item(i)

以下のようなクラススコープでは、キューをクリアすることもできます。

class Atomic(object):
    def __init__(self):
        self.queue = []

    # this is executed in one threading.Thread object
    def producer(self):
        while True:
            i = produce_item()
            self.queue.append(i)

    # this is executed in another threading.Thread object
    def consumer(self):
        while True:
            try:
                i = self.queue.pop(0)
            except IndexError:
                # queue is empty
                continue

            consume_item(i)

    # There's the possibility producer is still working on it's current item.
    def clear_queue(self):
        self.queue = []

生成されたバイトコードを調べて、どのリスト操作がアトミックであるかを確認する必要があります。

于 2009-05-12T21:34:05.080 に答える
0

あなたが言ったように、それ__del__は問題になる可能性があります。ガベージ コレクター__del__が新しいオブジェクトをQUEUE_ITEM. 次のようなものが必要です。

increase the reference counter on the old object
assign a new one to `QUEUE_ITEM`
decrease the reference counter on the old object

恐れ入りますが、それが可能かどうかはわかりません。

于 2009-05-12T22:07:49.723 に答える