14

次の Python コードがあるとします。

from itertools import count, tee
original = count()     # just an example, can be another iterable
a, b = tee(original)

問題は、あるスレッドで "a" を反復し、同時に別のスレッドで "b" を反復し始めた場合に問題が発生するかどうかです。明らかに、a と b はいくつかのデータを共有しています (元の iterable、+ 追加のもの、内部バッファーなど)。では、a.next() と b.next() は、この共有データにアクセスするときに適切なロックを行うでしょうか?

4

4 に答える 4

2

ドキュメントに同等のコードが示されている場合は、次のようになります。

正しい場合、いいえ、スレッドセーフではありません。

deque はスレッド セーフな追加とポップを行うことが文書化されていますが、 dequeを使用するコードを保証するものではないことに注意してください。

メイン コードは複数のスレッドの要素に対して基になる反復子を要求する可能性があるため、tee を安全にするには、スレッド セーフなコレクションと反復子を入力として使用する必要があります。

于 2011-07-15T07:36:06.130 に答える
0

C-Python では、itertools.tee()それが返すイテレータは C コードで実装されています。つまり、GIL は複数のスレッドから同時に呼び出されないように保護する必要があります。おそらく正しく動作し、インタープリターがクラッシュすることはありませんが、スレッドセーフであるとは限りません。

簡単に言えば、リスクを冒さないでください。

于 2011-07-15T07:42:06.190 に答える
0

Python 3.6.9 および 3.7.4 環境で、itertools.tee を使用して大きなサイズの plat テキスト ファイルを s3 との間で複数の csv ファイルに分割したときの経験を共有したいと思います。

私のデータフローは、s3 zipfile、s3fs read iter、map iter for dataclass transform、tee iter、map iter for dataclass filter、 iter をループしてデータをキャプチャし、s3fs write および/または local write を使用して csv 形式で s3 に書き込みます。 s3fs を s3 に置きます。

zipfile プロセス スタックで itertools.tee が失敗しました。

上記の Dror Speiser、safetee は正常に動作しましたが、データセットが適切に配布されなかったり、処理の遅延が発生したりするため、tee オブジェクト間の不均衡のためにメモリ使用量が増加しました。また、マルチプロセッシング ロギングでは適切に動作しませんでした。このバグに関連している可能性があります: https://bugs.python.org/issue34410

以下のコードは、tee オブジェクトの間に単純なフロー制御を追加して、メモリのインクリメントと OOM Killer の状況を防ぐだけです。

今後の参考になれば幸いです。

import time
import threading
import logging
from itertools import tee
from collections import Counter

logger = logging.getLogger(__name__)


FLOW_WAIT_GAP = 1000  # flow gap for waiting
FLOW_WAIT_TIMEOUT = 60.0  # flow wait timeout


class Safetee:
    """tee object wrapped to make it thread-safe and flow controlled"""

    def __init__(self, teeobj, lock, flows, teeidx):
        self.teeobj = teeobj
        self.lock = lock
        self.flows = flows
        self.mykey = teeidx
        self.logcnt = 0

    def __iter__(self):
        return self

    def __next__(self):
        waitsec = 0.0
        while True:
            with self.lock:
                flowgap = self.flows[self.mykey] - self.flows[len(self.flows) - 1]
                if flowgap < FLOW_WAIT_GAP or waitsec > FLOW_WAIT_TIMEOUT:
                    nextdata = next(self.teeobj)
                    self.flows[self.mykey] += 1
                    return nextdata

            waitthis = min(flowgap / FLOW_WAIT_GAP, FLOW_WAIT_TIMEOUT / 3)
            waitsec += waitthis

            time.sleep(waitthis)

            if waitsec > FLOW_WAIT_TIMEOUT and self.logcnt < 5:
                self.logcnt += 1
                logger.debug(f'tee wait seconds={waitsec:.2f}, mykey={self.mykey}, flows={self.flows}')

    def __copy__(self):
        return Safetee(self.teeobj.__copy__(), self.lock, self.flows, self.teeidx)


def safetee(iterable, n=2):
    """tuple of n independent thread-safe and flow controlled iterators"""
    lock = threading.Lock()
    flows = Counter()
    return tuple(Safetee(teeobj, lock, flows, teeidx) for teeidx, teeobj in enumerate(tee(iterable, n)))


于 2020-08-28T15:03:07.290 に答える