Python 3.6.9 および 3.7.4 環境で、itertools.tee を使用して大きなサイズの plat テキスト ファイルを s3 との間で複数の csv ファイルに分割したときの経験を共有したいと思います。
私のデータフローは、s3 zipfile、s3fs read iter、map iter for dataclass transform、tee iter、map iter for dataclass filter、 iter をループしてデータをキャプチャし、s3fs write および/または local write を使用して csv 形式で s3 に書き込みます。 s3fs を s3 に置きます。
zipfile プロセス スタックで itertools.tee が失敗しました。
上記の Dror Speiser、safetee は正常に動作しましたが、データセットが適切に配布されなかったり、処理の遅延が発生したりするため、tee オブジェクト間の不均衡のためにメモリ使用量が増加しました。また、マルチプロセッシング ロギングでは適切に動作しませんでした。このバグに関連している可能性があります: https://bugs.python.org/issue34410
以下のコードは、tee オブジェクトの間に単純なフロー制御を追加して、メモリのインクリメントと OOM Killer の状況を防ぐだけです。
今後の参考になれば幸いです。
import time
import threading
import logging
from itertools import tee
from collections import Counter
logger = logging.getLogger(__name__)
FLOW_WAIT_GAP = 1000 # flow gap for waiting
FLOW_WAIT_TIMEOUT = 60.0 # flow wait timeout
class Safetee:
"""tee object wrapped to make it thread-safe and flow controlled"""
def __init__(self, teeobj, lock, flows, teeidx):
self.teeobj = teeobj
self.lock = lock
self.flows = flows
self.mykey = teeidx
self.logcnt = 0
def __iter__(self):
return self
def __next__(self):
waitsec = 0.0
while True:
with self.lock:
flowgap = self.flows[self.mykey] - self.flows[len(self.flows) - 1]
if flowgap < FLOW_WAIT_GAP or waitsec > FLOW_WAIT_TIMEOUT:
nextdata = next(self.teeobj)
self.flows[self.mykey] += 1
return nextdata
waitthis = min(flowgap / FLOW_WAIT_GAP, FLOW_WAIT_TIMEOUT / 3)
waitsec += waitthis
time.sleep(waitthis)
if waitsec > FLOW_WAIT_TIMEOUT and self.logcnt < 5:
self.logcnt += 1
logger.debug(f'tee wait seconds={waitsec:.2f}, mykey={self.mykey}, flows={self.flows}')
def __copy__(self):
return Safetee(self.teeobj.__copy__(), self.lock, self.flows, self.teeidx)
def safetee(iterable, n=2):
"""tuple of n independent thread-safe and flow controlled iterators"""
lock = threading.Lock()
flows = Counter()
return tuple(Safetee(teeobj, lock, flows, teeidx) for teeidx, teeobj in enumerate(tee(iterable, n)))