22

Pythonでzipファイルに書き込む必要のある大量のデータ(数ギグ)があります。ZipFileの.writestrメソッドに渡すために一度にすべてをメモリにロードすることはできません。また、一時ファイルを使用してすべてをディスクにフィードしてから読み戻したくありません。

ジェネレーターまたはファイルのようなオブジェクトをZipFileライブラリにフィードする方法はありますか?または、この機能がサポートされていないように見える理由はありますか?

zipファイルとは、zipファイルを意味します。Pythonzipfileパッケージでサポートされています。

4

10 に答える 10

13

唯一の解決策は、ファイルを圧縮してバッファから読み取る方法を書き直すことです。これを標準ライブラリに追加するのは簡単です。まだ実現していないことに少し驚いています。インターフェース全体をオーバーホールする必要があるという多くの合意があり、それが漸進的な改善を妨げているようです.

import zipfile, zlib, binascii, struct
class BufferedZipFile(zipfile.ZipFile):
    def writebuffered(self, zipinfo, buffer):
        zinfo = zipinfo

        zinfo.file_size = file_size = 0
        zinfo.flag_bits = 0x00
        zinfo.header_offset = self.fp.tell()

        self._writecheck(zinfo)
        self._didModify = True

        zinfo.CRC = CRC = 0
        zinfo.compress_size = compress_size = 0
        self.fp.write(zinfo.FileHeader())
        if zinfo.compress_type == zipfile.ZIP_DEFLATED:
            cmpr = zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION, zlib.DEFLATED, -15)
        else:
            cmpr = None

        while True:
            buf = buffer.read(1024 * 8)
            if not buf:
                break

            file_size = file_size + len(buf)
            CRC = binascii.crc32(buf, CRC) & 0xffffffff
            if cmpr:
                buf = cmpr.compress(buf)
                compress_size = compress_size + len(buf)

            self.fp.write(buf)

        if cmpr:
            buf = cmpr.flush()
            compress_size = compress_size + len(buf)
            self.fp.write(buf)
            zinfo.compress_size = compress_size
        else:
            zinfo.compress_size = file_size

        zinfo.CRC = CRC
        zinfo.file_size = file_size

        position = self.fp.tell()
        self.fp.seek(zinfo.header_offset + 14, 0)
        self.fp.write(struct.pack("<LLL", zinfo.CRC, zinfo.compress_size, zinfo.file_size))
        self.fp.seek(position, 0)
        self.filelist.append(zinfo)
        self.NameToInfo[zinfo.filename] = zinfo
于 2008-11-18T19:23:41.657 に答える
9

Chris B. の回答を参考にして、完全なソリューションを作成しました。他の誰かが興味を持っている場合は、次のとおりです。

import os
import threading
from zipfile import *
import zlib, binascii, struct

class ZipEntryWriter(threading.Thread):
    def __init__(self, zf, zinfo, fileobj):
        self.zf = zf
        self.zinfo = zinfo
        self.fileobj = fileobj

        zinfo.file_size = 0
        zinfo.flag_bits = 0x00
        zinfo.header_offset = zf.fp.tell()

        zf._writecheck(zinfo)
        zf._didModify = True

        zinfo.CRC = 0
        zinfo.compress_size = compress_size = 0
        zf.fp.write(zinfo.FileHeader())

        super(ZipEntryWriter, self).__init__()

    def run(self):
        zinfo = self.zinfo
        zf = self.zf
        file_size = 0
        CRC = 0

        if zinfo.compress_type == ZIP_DEFLATED:
            cmpr = zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION, zlib.DEFLATED, -15)
        else:
            cmpr = None
        while True:
            buf = self.fileobj.read(1024 * 8)
            if not buf:
                self.fileobj.close()
                break

            file_size = file_size + len(buf)
            CRC = binascii.crc32(buf, CRC)
            if cmpr:
                buf = cmpr.compress(buf)
                compress_size = compress_size + len(buf)

            zf.fp.write(buf)

        if cmpr:
            buf = cmpr.flush()
            compress_size = compress_size + len(buf)
            zf.fp.write(buf)
            zinfo.compress_size = compress_size
        else:
            zinfo.compress_size = file_size

        zinfo.CRC = CRC
        zinfo.file_size = file_size

        position = zf.fp.tell()
        zf.fp.seek(zinfo.header_offset + 14, 0)
        zf.fp.write(struct.pack("<lLL", zinfo.CRC, zinfo.compress_size, zinfo.file_size))
        zf.fp.seek(position, 0)
        zf.filelist.append(zinfo)
        zf.NameToInfo[zinfo.filename] = zinfo

class EnhZipFile(ZipFile, object):

    def _current_writer(self):
        return hasattr(self, 'cur_writer') and self.cur_writer or None

    def assert_no_current_writer(self):
        cur_writer = self._current_writer()
        if cur_writer and cur_writer.isAlive():
            raise ValueError('An entry is already started for name: %s' % cur_write.zinfo.filename)

    def write(self, filename, arcname=None, compress_type=None):
        self.assert_no_current_writer()
        super(EnhZipFile, self).write(filename, arcname, compress_type)

    def writestr(self, zinfo_or_arcname, bytes):
        self.assert_no_current_writer()
        super(EnhZipFile, self).writestr(zinfo_or_arcname, bytes)

    def close(self):
        self.finish_entry()
        super(EnhZipFile, self).close()

    def start_entry(self, zipinfo):
        """
        Start writing a new entry with the specified ZipInfo and return a
        file like object. Any data written to the file like object is
        read by a background thread and written directly to the zip file.
        Make sure to close the returned file object, before closing the
        zipfile, or the close() would end up hanging indefinitely.

        Only one entry can be open at any time. If multiple entries need to
        be written, make sure to call finish_entry() before calling any of
        these methods:
        - start_entry
        - write
        - writestr
        It is not necessary to explicitly call finish_entry() before closing
        zipfile.

        Example:
            zf = EnhZipFile('tmp.zip', 'w')
            w = zf.start_entry(ZipInfo('t.txt'))
            w.write("some text")
            w.close()
            zf.close()
        """
        self.assert_no_current_writer()
        r, w = os.pipe()
        self.cur_writer = ZipEntryWriter(self, zipinfo, os.fdopen(r, 'r'))
        self.cur_writer.start()
        return os.fdopen(w, 'w')

    def finish_entry(self, timeout=None):
        """
        Ensure that the ZipEntry that is currently being written is finished.
        Joins on any background thread to exit. It is safe to call this method
        multiple times.
        """
        cur_writer = self._current_writer()
        if not cur_writer or not cur_writer.isAlive():
            return
        cur_writer.join(timeout)

if __name__ == "__main__":
    zf = EnhZipFile('c:/tmp/t.zip', 'w')
    import time
    w = zf.start_entry(ZipInfo('t.txt', time.localtime()[:6]))
    w.write("Line1\n")
    w.write("Line2\n")
    w.close()
    zf.finish_entry()
    w = zf.start_entry(ZipInfo('p.txt', time.localtime()[:6]))
    w.write("Some text\n")
    w.close()
    zf.close()
于 2010-04-29T01:06:09.613 に答える
8

Python 3.5での変更(公式ドキュメントから):シークできないストリームへの書き込みのサポートが追加されました。

これはzipfile.ZipFile、ファイル全体をメモリに保存しないストリームを使用できるようになったことを意味します。このようなストリームは、データ ボリューム全体にわたる移動をサポートしていません。

したがって、これは単純なジェネレーターです。

from zipfile import ZipFile, ZipInfo

def zipfile_generator(path, stream):
    with ZipFile(stream, mode='w') as zf:
        z_info = ZipInfo.from_file(path)
        with open(path, 'rb') as entry, zf.open(z_info, mode='w') as dest:
            for chunk in iter(lambda: entry.read(16384), b''):
                dest.write(chunk)
                # Yield chunk of the zip file stream in bytes.
                yield stream.get()
    # ZipFile was closed.
    yield stream.get()

path大きなファイル、ディレクトリ、またはpathlikeオブジェクトの文字列パスです。

streamこのようなクラスのシークできないストリームインスタンスです(公式ドキュメントに従って設計されています):

from io import RawIOBase

class UnseekableStream(RawIOBase):
    def __init__(self):
        self._buffer = b''

    def writable(self):
        return True

    def write(self, b):
        if self.closed:
            raise ValueError('Stream was closed!')
        self._buffer += b
        return len(b)

    def get(self):
        chunk = self._buffer
        self._buffer = b''
        return chunk

このコードをオンラインで試すことができます: https://repl.it/@IvanErgunov/zipfilegenerator


ZipInfo大きなファイルを手動で読み取って分割せずにジェネレーターを作成する別の方法もあります。queue.Queue()オブジェクトをオブジェクトに渡し、UnseekableStream()別のスレッドでこのキューに書き込むことができます。次に、現在のスレッドで、反復可能な方法でこのキューからチャンクを簡単に読み取ることができます。ドキュメントを見る

allanlei によるPS Python Zipstreamは時代遅れで信頼性の低い方法です。公式に行われる前に、シークできないストリームのサポートを追加する試みでした。

于 2019-03-14T18:32:47.030 に答える
3

基本的な圧縮はzlib.compressobjによって行われます。ZipFile(MacOSXのPython 2.5ではコンパイルされているようです)。Python2.3のバージョンは次のとおりです。

圧縮ファイルが8kチャンクでビルドされていることがわかります。多くのソースファイル属性(非圧縮サイズなど)がzipファイルヘッダーに記録されるため、ソースファイル情報の取得は複雑です。

def write(self, filename, arcname=None, compress_type=None):
    """Put the bytes from filename into the archive under the name
    arcname."""

    st = os.stat(filename)
    mtime = time.localtime(st.st_mtime)
    date_time = mtime[0:6]
    # Create ZipInfo instance to store file information
    if arcname is None:
        zinfo = ZipInfo(filename, date_time)
    else:
        zinfo = ZipInfo(arcname, date_time)
    zinfo.external_attr = st[0] << 16L      # Unix attributes
    if compress_type is None:
        zinfo.compress_type = self.compression
    else:
        zinfo.compress_type = compress_type
    self._writecheck(zinfo)
    fp = open(filename, "rb")

    zinfo.flag_bits = 0x00
    zinfo.header_offset = self.fp.tell()    # Start of header bytes
    # Must overwrite CRC and sizes with correct data later
    zinfo.CRC = CRC = 0
    zinfo.compress_size = compress_size = 0
    zinfo.file_size = file_size = 0
    self.fp.write(zinfo.FileHeader())
    zinfo.file_offset = self.fp.tell()      # Start of file bytes
    if zinfo.compress_type == ZIP_DEFLATED:
        cmpr = zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION,
             zlib.DEFLATED, -15)
    else:
        cmpr = None
    while 1:
        buf = fp.read(1024 * 8)
        if not buf:
            break
        file_size = file_size + len(buf)
        CRC = binascii.crc32(buf, CRC)
        if cmpr:
            buf = cmpr.compress(buf)
            compress_size = compress_size + len(buf)
        self.fp.write(buf)
    fp.close()
    if cmpr:
        buf = cmpr.flush()
        compress_size = compress_size + len(buf)
        self.fp.write(buf)
        zinfo.compress_size = compress_size
    else:
        zinfo.compress_size = file_size
    zinfo.CRC = CRC
    zinfo.file_size = file_size
    # Seek backwards and write CRC and file sizes
    position = self.fp.tell()       # Preserve current position in file
    self.fp.seek(zinfo.header_offset + 14, 0)
    self.fp.write(struct.pack("<lLL", zinfo.CRC, zinfo.compress_size,
          zinfo.file_size))
    self.fp.seek(position, 0)
    self.filelist.append(zinfo)
    self.NameToInfo[zinfo.filename] = zinfo
于 2008-11-18T02:45:19.630 に答える
3

gzip.GzipFile はデータを gzip されたチャンクに書き込みます。ファイルから読み取った行数に応じてチャンクのサイズを設定できます。

例:

file = gzip.GzipFile('blah.gz', 'wb')
sourcefile = open('source', 'rb')
chunks = []
for line in sourcefile:
  chunks.append(line)
  if len(chunks) >= X: 
      file.write("".join(chunks))
      file.flush()
      chunks = []
于 2008-11-17T23:41:27.253 に答える
1

いくつかの (多くの? ほとんどの?) 圧縮アルゴリズムは、ファイル全体の冗長性を調べることに基づいてい ます。

一部の圧縮ライブラリは、ファイルに最適な圧縮アルゴリズムに基づいて、複数の圧縮アルゴリズムから選択します。

私は ZipFile モジュールがこれを行うと信じているので、一度に断片だけでなく、ファイル全体を見たいと思っています。

したがって、メモリにロードするには大きすぎるジェネレータやファイルでは機能しません。これは、Zipfile ライブラリの制限を説明するものです。

于 2008-11-18T00:11:34.527 に答える
1

Python 2.7 の 2017 年にも関連するこの質問に誰かが遭遇した場合に備えて、他の場合のように出力がシーク可能である必要がない、真のストリーミング zip ファイルの実用的なソリューションを次に示します。その秘密は、汎用ビット フラグのビット 3 を設定することです ( https://pkware.cachefly.net/webdocs/casestudies/APPNOTE.TXTセクション 4.3.9.1 を参照)。

この実装では常に ZIP64 スタイルのファイルが作成されるため、任意のサイズのファイルに対してストリーミングが機能することに注意してください。中央ディレクトリ レコードの zip64 終了を強制する醜いハックが含まれているため、プロセスによって書き込まれたすべての zip ファイルが ZIP64 スタイルになることに注意してください。

import io
import zipfile
import zlib
import binascii
import struct

class ByteStreamer(io.BytesIO):
    '''
    Variant on BytesIO which lets you write and consume data while
    keeping track of the total filesize written. When data is consumed
    it is removed from memory, keeping the memory requirements low.
    '''
    def __init__(self):
        super(ByteStreamer, self).__init__()
        self._tellall = 0

    def tell(self):
        return self._tellall

    def write(self, b):
        orig_size = super(ByteStreamer, self).tell()
        super(ByteStreamer, self).write(b)
        new_size = super(ByteStreamer, self).tell()
        self._tellall += (new_size - orig_size)

    def consume(self):
        bytes = self.getvalue()
        self.seek(0)
        self.truncate(0)
        return bytes

class BufferedZipFileWriter(zipfile.ZipFile):
    '''
    ZipFile writer with true streaming (input and output).
    Created zip files are always ZIP64-style because it is the only safe way to stream
    potentially large zip files without knowing the full size ahead of time.

    Example usage:
    >>> def stream():
    >>>     bzfw = BufferedZip64FileWriter()
    >>>     for arc_path, buffer in inputs:  # buffer is a file-like object which supports read(size)
    >>>         for chunk in bzfw.streambuffer(arc_path, buffer):
    >>>             yield chunk
    >>>     yield bzfw.close()
    '''
    def __init__(self, compression=zipfile.ZIP_DEFLATED):
        self._buffer = ByteStreamer()
        super(BufferedZipFileWriter, self).__init__(self._buffer, mode='w', compression=compression, allowZip64=True)

    def streambuffer(self, zinfo_or_arcname, buffer, chunksize=2**16):
        if not isinstance(zinfo_or_arcname, zipfile.ZipInfo):
            zinfo = zipfile.ZipInfo(filename=zinfo_or_arcname,
                                    date_time=time.localtime(time.time())[:6])
            zinfo.compress_type = self.compression
            zinfo.external_attr = 0o600 << 16     # ?rw-------
        else:
            zinfo = zinfo_or_arcname

        zinfo.file_size = file_size = 0
        zinfo.flag_bits = 0x08  # Streaming mode: crc and size come after the data
        zinfo.header_offset = self.fp.tell()

        self._writecheck(zinfo)
        self._didModify = True

        zinfo.CRC = CRC = 0
        zinfo.compress_size = compress_size = 0
        self.fp.write(zinfo.FileHeader())
        if zinfo.compress_type == zipfile.ZIP_DEFLATED:
            cmpr = zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION, zlib.DEFLATED, -15)
        else:
            cmpr = None

        while True:
            buf = buffer.read(chunksize)
            if not buf:
                break

            file_size += len(buf)
            CRC = binascii.crc32(buf, CRC) & 0xffffffff
            if cmpr:
                buf = cmpr.compress(buf)
                compress_size += len(buf)

            self.fp.write(buf)
            compressed_bytes = self._buffer.consume()
            if compressed_bytes:
                yield compressed_bytes

        if cmpr:
            buf = cmpr.flush()
            compress_size += len(buf)
            self.fp.write(buf)
            zinfo.compress_size = compress_size
            compressed_bytes = self._buffer.consume()
            if compressed_bytes:
                yield compressed_bytes
        else:
            zinfo.compress_size = file_size

        zinfo.CRC = CRC
        zinfo.file_size = file_size

        # Write CRC and file sizes after the file data
        # Always write as zip64 -- only safe way to stream what might become a large zipfile
        fmt = '<LQQ'
        self.fp.write(struct.pack(fmt, zinfo.CRC, zinfo.compress_size, zinfo.file_size))

        self.fp.flush()
        self.filelist.append(zinfo)
        self.NameToInfo[zinfo.filename] = zinfo
        yield self._buffer.consume()

    # The close method needs to be patched to force writing a ZIP64 file
    # We'll hack ZIP_FILECOUNT_LIMIT to do the forcing
    def close(self):
        tmp = zipfile.ZIP_FILECOUNT_LIMIT
        zipfile.ZIP_FILECOUNT_LIMIT = 0
        super(BufferedZipFileWriter, self).close()
        zipfile.ZIP_FILECOUNT_LIMIT = tmp
        return self._buffer.consume()
于 2017-05-15T20:00:05.807 に答える
0

Python 2.7 では、ファイルの代わりに zipfile にデータを追加できます。

http://docs.python.org/2/library/zipfile#zipfile.ZipFile.writestr

于 2013-09-06T18:16:40.740 に答える
0

gzip ライブラリは、圧縮のためにファイルのようなオブジェクトを受け取ります。

class GzipFile([filename [,mode [,compresslevel [,fileobj]]]])

zip ファイルに含める名目上のファイル名を指定する必要がありますが、データ ソースを fileobj に渡すことができます。

(この回答は Damnsweet の回答とは異なります。焦点は、増分的に書き込まれる圧縮ファイルではなく、増分的に読み取られるデータ ソースにある必要があります。)

そして、元の質問者がGzipを受け入れないことがわかりました:-(

于 2008-11-18T00:02:46.367 に答える