覆盖 ziparchive 中的文件

问题描述 投票:0回答:6

我有

archive.zip
,有两个文件:
hello.txt
world.txt

我想用该代码用新文件覆盖

hello.txt
文件:

import zipfile

z = zipfile.ZipFile('archive.zip','a')
z.write('hello.txt')
z.close()  

但它不会覆盖文件,不知何故它会创建

hello.txt
的另一个实例 - 看看 winzip 屏幕截图:

alt text

既然没有像

zipfile.remove()
这样的东西,那么处理这个问题的最佳方法是什么?

python zip
6个回答
48
投票

Python zipfile 模块无法做到这一点。您必须创建一个新的 zip 文件并重新压缩第一个文件中的所有内容以及新的修改文件。

下面是一些代码来做到这一点。但请注意,它效率不高,因为它会解压缩然后重新压缩所有数据。

import tempfile
import zipfile
import shutil
import os

def remove_from_zip(zipfname, *filenames):
    tempdir = tempfile.mkdtemp()
    try:
        tempname = os.path.join(tempdir, 'new.zip')
        with zipfile.ZipFile(zipfname, 'r') as zipread:
            with zipfile.ZipFile(tempname, 'w') as zipwrite:
                for item in zipread.infolist():
                    if item.filename not in filenames:
                        data = zipread.read(item.filename)
                        zipwrite.writestr(item, data)
        shutil.move(tempname, zipfname)
    finally:
        shutil.rmtree(tempdir)

用途:

remove_from_zip('archive.zip', 'hello.txt')
with zipfile.ZipFile('archive.zip', 'a') as z:
    z.write('hello.txt')

30
投票

以诺斯克洛的答案为基础。 UpdateableZipFile 继承自 ZipFile 的类,维护相同的接口,但添加了覆盖文件(通过 writestr 或 write)和删除文件的功能。

import os
import shutil
import tempfile
from zipfile import ZipFile, ZIP_STORED, ZipInfo


class UpdateableZipFile(ZipFile):
    """
    Add delete (via remove_file) and update (via writestr and write methods)
    To enable update features use UpdateableZipFile with the 'with statement',
    Upon  __exit__ (if updates were applied) a new zip file will override the exiting one with the updates
    """

    class DeleteMarker(object):
        pass

    def __init__(self, file, mode="r", compression=ZIP_STORED, allowZip64=False):
        # Init base
        super(UpdateableZipFile, self).__init__(file, mode=mode,
                                                compression=compression,
                                                allowZip64=allowZip64)
        # track file to override in zip
        self._replace = {}
        # Whether the with statement was called
        self._allow_updates = False

    def writestr(self, zinfo_or_arcname, bytes, compress_type=None):
        if isinstance(zinfo_or_arcname, ZipInfo):
            name = zinfo_or_arcname.filename
        else:
            name = zinfo_or_arcname
        # If the file exits, and needs to be overridden,
        # mark the entry, and create a temp-file for it
        # we allow this only if the with statement is used
        if self._allow_updates and name in self.namelist():
            temp_file = self._replace[name] = self._replace.get(name,
                                                                tempfile.TemporaryFile())
            temp_file.write(bytes)
        # Otherwise just act normally
        else:
            super(UpdateableZipFile, self).writestr(zinfo_or_arcname,
                                                    bytes, compress_type=compress_type)

    def write(self, filename, arcname=None, compress_type=None):
        arcname = arcname or filename
        # If the file exits, and needs to be overridden,
        # mark the entry, and create a temp-file for it
        # we allow this only if the with statement is used
        if self._allow_updates and arcname in self.namelist():
            temp_file = self._replace[arcname] = self._replace.get(arcname,
                                                                   tempfile.TemporaryFile())
            with open(filename, "rb") as source:
                shutil.copyfileobj(source, temp_file)
        # Otherwise just act normally
        else:
            super(UpdateableZipFile, self).write(filename, 
                                                 arcname=arcname, compress_type=compress_type)

    def __enter__(self):
        # Allow updates
        self._allow_updates = True
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        # call base to close zip file, organically
        try:
            super(UpdateableZipFile, self).__exit__(exc_type, exc_val, exc_tb)
            if len(self._replace) > 0:
                self._rebuild_zip()
        finally:
            # In case rebuild zip failed,
            # be sure to still release all the temp files
            self._close_all_temp_files()
            self._allow_updates = False

    def _close_all_temp_files(self):
        for temp_file in self._replace.itervalues():
            if hasattr(temp_file, 'close'):
                temp_file.close()

    def remove_file(self, path):
        self._replace[path] = self.DeleteMarker()

    def _rebuild_zip(self):
        tempdir = tempfile.mkdtemp()
        try:
            temp_zip_path = os.path.join(tempdir, 'new.zip')
            with ZipFile(self.filename, 'r') as zip_read:
                # Create new zip with assigned properties
                with ZipFile(temp_zip_path, 'w', compression=self.compression,
                             allowZip64=self._allowZip64) as zip_write:
                    for item in zip_read.infolist():
                        # Check if the file should be replaced / or deleted
                        replacement = self._replace.get(item.filename, None)
                        # If marked for deletion, do not copy file to new zipfile
                        if isinstance(replacement, self.DeleteMarker):
                            del self._replace[item.filename]
                            continue
                        # If marked for replacement, copy temp_file, instead of old file
                        elif replacement is not None:
                            del self._replace[item.filename]
                            # Write replacement to archive,
                            # and then close it (deleting the temp file)
                            replacement.seek(0)
                            data = replacement.read()
                            replacement.close()
                        else:
                            data = zip_read.read(item.filename)
                        zip_write.writestr(item, data)
            # Override the archive with the updated one
            shutil.move(temp_zip_path, self.filename)
        finally:
            shutil.rmtree(tempdir)

使用示例:

with UpdateableZipFile("C:\Temp\Test2.docx", "a") as o:
    # Overwrite a file with a string
    o.writestr("word/document.xml", "Some data")
    # exclude an exiting file from the zip
    o.remove_file("word/fontTable.xml")
    # Write a new file (with no conflict) to the zp
    o.writestr("new_file", "more data")
    # Overwrite a file with a file
    o.write(r"C:\Temp\example.png", "word/settings.xml")

1
投票

基于这个答案这里有一个快速而肮脏的方法来monkey patch股票zip文件来支持文件删除(当我们等待它被python:main接受时):

from zipfile import ZipFile, ZipInfo
from operator import attrgetter
import functools

def enable_zip_remove(func):
    def _zipfile_remove_member(self, member):
        # get a sorted filelist by header offset, in case the dir order
        # doesn't match the actual entry order
        fp = self.fp
        entry_offset = 0
        filelist = sorted(self.filelist, key=attrgetter('header_offset'))
        for i in range(len(filelist)):
            info = filelist[i]
            # find the target member
            if info.header_offset < member.header_offset:
                continue

            # get the total size of the entry
            entry_size = None
            if i == len(filelist) - 1:
                entry_size = self.start_dir - info.header_offset
            else:
                entry_size = filelist[i + 1].header_offset - info.header_offset

            # found the member, set the entry offset
            if member == info:
                entry_offset = entry_size
                continue

            # Move entry
            # read the actual entry data
            fp.seek(info.header_offset)
            entry_data = fp.read(entry_size)

            # update the header
            info.header_offset -= entry_offset

            # write the entry to the new position
            fp.seek(info.header_offset)
            fp.write(entry_data)
            fp.flush()

        # update state
        self.start_dir -= entry_offset
        self.filelist.remove(member)
        del self.NameToInfo[member.filename]
        self._didModify = True

        # seek to the start of the central dir
        fp.seek(self.start_dir)

    def zipfile_remove(self, member):
        """Remove a file from the archive. The archive must be open with mode 'a'"""

        if self.mode != 'a':
            raise RuntimeError("remove() requires mode 'a'")
        if not self.fp:
            raise ValueError(
                "Attempt to write to ZIP archive that was already closed")
        if self._writing:
            raise ValueError(
                "Can't write to ZIP archive while an open writing handle exists."
            )

        # Make sure we have an info object
        if isinstance(member, ZipInfo):
            # 'member' is already an info object
            zinfo = member
        else:
            # get the info object
            zinfo = self.getinfo(member)

        return self._zipfile_remove_member(zinfo)

    @functools.wraps(func)
    def wrapper(*args, **kwargs):
        if not hasattr(ZipFile, "remove"):
            setattr(ZipFile, "_zipfile_remove_member", _zipfile_remove_member)
            setattr(ZipFile, "remove", zipfile_remove)
        return func(*args, **kwargs)
    return wrapper

用途:

@enable_zip_remove
def replace_zip_file():
    with ZipFile("archive.zip", "a") as z:
        z.remove("hello.txt")
        z.write("hello.txt")

附注国家科学基金会


1
投票

我的解决方案:全部读取 -> 替换 -> 写回

def read_zip(fname):
    bio = BytesIO(open(fname, 'rb').read())
    zip = zipfile.ZipFile(bio, 'r')
    fdict = {n:zip.read(n) for n in zip.namelist()}
    zip.close()
    return fdict

def write_zip(fname, fdict):
    bio = BytesIO()
    zip = zipfile.ZipFile(bio, 'w', zipfile.ZIP_DEFLATED)
    for name, data in fdict.items():
        zip.writestr(name, data)
    zip.close()
    open(fname, 'wb').write(bio.getvalue())


0
投票

我的解决方案与其他答案类似,但使用 SQLite 来管理中间文件,并提供

__getitem__
__setitem__
__delitem__
来实现简单的界面。 默认情况下,数据库位于内存中,但如果您的 zip 大于可用内存,则可以提供临时文件路径。 当然,SQLite 内置于 Python 中并且比文件系统更快

import sqlite3
import subprocess
import zipfile
from pathlib import Path

from sql import CREATE_TABLE, DELETE_FILE, INSERT_FILE, SELECT_CONTENT


class EditableZip:
    """Intended to make editing files inside zip archive easy, this class is capable of loading files
    from a zip file into a sqlite database, facilitates editing/removing/adding files, and saving
    to a zip.
    The database can be in-memory (default) or in a temporary on disk file if
    temp_db_path is provided.

    If an on-disk file is used, EditableZip.close can be called to remove the file or EditableZip
    can be used as a context manager.

    If auto_save is set to True and an initial zip_path was provided then the file will
    be overwritten when EditableZip closes. If you wish to save to a different file,
    or no zip_path is used in instantiation, auto_save can take a file path.

    Files can be added by item assignment
    with EditableZip(auto_save="example.zip") as ez:
        ez["thing.txt"] = "stuff"
        # empty dir
        ez["empty/"] = None

    Assignment accepts Non-text files as bytes.

    EditableZip is subscriptable. If the subscript is a path in the db, the data will be returned.

    EditableZip.files can be used to iterate over files in the db.
    """

    def __init__(
        self,
        zip_path: None | str | Path = None,
        temp_db_path: None | Path = None,
        auto_save: bool | str | Path = False,
    ):
        self.temp_db_path, self.auto_save, self.file_path = (
            temp_db_path,
            auto_save,
            zip_path,
        )
        self.db = sqlite3.connect(
            str(temp_db_path if temp_db_path is not None else ":memory:")
        )
        self.db.execute(CREATE_TABLE)

        if self.file_path:
            self.load(self.file_path)

    @property
    def files(self):
        "Returns a generator of all file paths in the database."
        try:
            return (
                i[0] for i in self.db.execute("SELECT file_path FROM files").fetchall()
            )
        except TypeError:
            return None

    def load(self, zip_path: str | Path) -> None:
        "Add all files from zip at zip_path to db."
        with zipfile.ZipFile(zip_path, mode="r") as archive:
            for item in archive.infolist():
                self[item.filename] = (
                    None if item.filename[-1] == "/" else archive.read(item)
                )

    def save(self, zip_path: None | str | Path) -> Path:
        "Save all files from db to zip at zip_path."
        zip_path = self.file_path if zip_path is None else zip_path
        with zipfile.ZipFile(zip_path, "w") as archive:
            for file in self.files:
                if file_data := self.fetch(file):
                    archive.writestr(file, file_data)
                else:
                    archive.writestr(zipfile.ZipInfo(file), "")
        return zip_path

    def close(self):
        "Auto save if applicable and close + remove db."
        if self.auto_save:
            self.save(
                zip_path=self.auto_save
                if isinstance(self.auto_save, (str, Path))
                else None
            )
        self.db.close()
        if isinstance(self.temp_db_path, Path):
            self.temp_db_path.unlink(missing_ok=True)

    def fetch(self, file_path: str) -> bytes:
        "Get content of db file for file_path."
        try:
            return self.db.execute(SELECT_CONTENT, {"file_path": file_path}).fetchone()[
                0
            ]
        except TypeError:
            return None

    def __getitem__(self, key):
        result = self.fetch(key)
        try:
            return result.decode("utf-8")
        except AttributeError:
            return result

    def __setitem__(self, file_path, content: str | bytes):
        if isinstance(content, str):
            content = content.encode("utf-8")
        self.db.execute(
            INSERT_FILE,
            {"file_path": file_path, "file_content": content},
        )

    def __delitem__(self, file_path):
        self.db.execute(DELETE_FILE, {"file_path": file_path})

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_value, traceback):
        self.close()


if __name__ == "__main__":
    # A use case: editing epub files.
    # File source:
    # https://archiveofourown.org/downloads/13795605/Victoria%20Potter%20and%20the.epub?updated_at=1650231615
    file_path = Path("Victoria Potter and the.epub")
    new_file = (file_path.parent / (file_path.stem + "- lowercase")).with_suffix(
        file_path.suffix
    )

    # Create a copy of the epub with all letters lowercase
    with EditableZip(zip_path=file_path, auto_save=new_file) as ez:
        for file in ez.files:
            if Path(file).suffix in [".html", ".xhtml"]:
                ez[file] = ez[file].lower()

-1
投票

参考:使用ZipFile模块从zip文件中删除文件

简而言之,

您可以从 https://github.com/python/cpython/blob/659eb048cc9cac73c46349eb29845bc5cd630f09/Lib/zipfile.py 获取代码并从中创建一个单独的文件。之后,只需从您的项目中引用它,而不是内置的 python 库:

import myproject.zipfile as zipfile

用途:

with zipfile.ZipFile(f"archive.zip", "a") as z:
    z.remove(f"firstfile.txt")
© www.soinside.com 2019 - 2024. All rights reserved.