我有
archive.zip
,有两个文件:hello.txt
和 world.txt
我想用该代码用新文件覆盖
hello.txt
文件:
import zipfile
z = zipfile.ZipFile('archive.zip','a')
z.write('hello.txt')
z.close()
但它不会覆盖文件,不知何故它会创建
hello.txt
的另一个实例 - 看看 winzip 屏幕截图:
既然没有像
zipfile.remove()
这样的东西,那么处理这个问题的最佳方法是什么?
Python zipfile 模块无法做到这一点。您必须创建一个新的 zip 文件并重新压缩第一个文件中的所有内容以及新的修改文件。
下面是一些代码来做到这一点。但请注意,它效率不高,因为它会解压缩然后重新压缩所有数据。
import tempfile
import zipfile
import shutil
import os
def remove_from_zip(zipfname, *filenames):
tempdir = tempfile.mkdtemp()
try:
tempname = os.path.join(tempdir, 'new.zip')
with zipfile.ZipFile(zipfname, 'r') as zipread:
with zipfile.ZipFile(tempname, 'w') as zipwrite:
for item in zipread.infolist():
if item.filename not in filenames:
data = zipread.read(item.filename)
zipwrite.writestr(item, data)
shutil.move(tempname, zipfname)
finally:
shutil.rmtree(tempdir)
用途:
remove_from_zip('archive.zip', 'hello.txt')
with zipfile.ZipFile('archive.zip', 'a') as z:
z.write('hello.txt')
以诺斯克洛的答案为基础。 UpdateableZipFile 继承自 ZipFile 的类,维护相同的接口,但添加了覆盖文件(通过 writestr 或 write)和删除文件的功能。
import os
import shutil
import tempfile
from zipfile import ZipFile, ZIP_STORED, ZipInfo
class UpdateableZipFile(ZipFile):
"""
Add delete (via remove_file) and update (via writestr and write methods)
To enable update features use UpdateableZipFile with the 'with statement',
Upon __exit__ (if updates were applied) a new zip file will override the exiting one with the updates
"""
class DeleteMarker(object):
pass
def __init__(self, file, mode="r", compression=ZIP_STORED, allowZip64=False):
# Init base
super(UpdateableZipFile, self).__init__(file, mode=mode,
compression=compression,
allowZip64=allowZip64)
# track file to override in zip
self._replace = {}
# Whether the with statement was called
self._allow_updates = False
def writestr(self, zinfo_or_arcname, bytes, compress_type=None):
if isinstance(zinfo_or_arcname, ZipInfo):
name = zinfo_or_arcname.filename
else:
name = zinfo_or_arcname
# If the file exits, and needs to be overridden,
# mark the entry, and create a temp-file for it
# we allow this only if the with statement is used
if self._allow_updates and name in self.namelist():
temp_file = self._replace[name] = self._replace.get(name,
tempfile.TemporaryFile())
temp_file.write(bytes)
# Otherwise just act normally
else:
super(UpdateableZipFile, self).writestr(zinfo_or_arcname,
bytes, compress_type=compress_type)
def write(self, filename, arcname=None, compress_type=None):
arcname = arcname or filename
# If the file exits, and needs to be overridden,
# mark the entry, and create a temp-file for it
# we allow this only if the with statement is used
if self._allow_updates and arcname in self.namelist():
temp_file = self._replace[arcname] = self._replace.get(arcname,
tempfile.TemporaryFile())
with open(filename, "rb") as source:
shutil.copyfileobj(source, temp_file)
# Otherwise just act normally
else:
super(UpdateableZipFile, self).write(filename,
arcname=arcname, compress_type=compress_type)
def __enter__(self):
# Allow updates
self._allow_updates = True
return self
def __exit__(self, exc_type, exc_val, exc_tb):
# call base to close zip file, organically
try:
super(UpdateableZipFile, self).__exit__(exc_type, exc_val, exc_tb)
if len(self._replace) > 0:
self._rebuild_zip()
finally:
# In case rebuild zip failed,
# be sure to still release all the temp files
self._close_all_temp_files()
self._allow_updates = False
def _close_all_temp_files(self):
for temp_file in self._replace.itervalues():
if hasattr(temp_file, 'close'):
temp_file.close()
def remove_file(self, path):
self._replace[path] = self.DeleteMarker()
def _rebuild_zip(self):
tempdir = tempfile.mkdtemp()
try:
temp_zip_path = os.path.join(tempdir, 'new.zip')
with ZipFile(self.filename, 'r') as zip_read:
# Create new zip with assigned properties
with ZipFile(temp_zip_path, 'w', compression=self.compression,
allowZip64=self._allowZip64) as zip_write:
for item in zip_read.infolist():
# Check if the file should be replaced / or deleted
replacement = self._replace.get(item.filename, None)
# If marked for deletion, do not copy file to new zipfile
if isinstance(replacement, self.DeleteMarker):
del self._replace[item.filename]
continue
# If marked for replacement, copy temp_file, instead of old file
elif replacement is not None:
del self._replace[item.filename]
# Write replacement to archive,
# and then close it (deleting the temp file)
replacement.seek(0)
data = replacement.read()
replacement.close()
else:
data = zip_read.read(item.filename)
zip_write.writestr(item, data)
# Override the archive with the updated one
shutil.move(temp_zip_path, self.filename)
finally:
shutil.rmtree(tempdir)
使用示例:
with UpdateableZipFile("C:\Temp\Test2.docx", "a") as o:
# Overwrite a file with a string
o.writestr("word/document.xml", "Some data")
# exclude an exiting file from the zip
o.remove_file("word/fontTable.xml")
# Write a new file (with no conflict) to the zp
o.writestr("new_file", "more data")
# Overwrite a file with a file
o.write(r"C:\Temp\example.png", "word/settings.xml")
基于这个答案这里有一个快速而肮脏的方法来monkey patch股票zip文件来支持文件删除(当我们等待它被python:main接受时):
from zipfile import ZipFile, ZipInfo
from operator import attrgetter
import functools
def enable_zip_remove(func):
def _zipfile_remove_member(self, member):
# get a sorted filelist by header offset, in case the dir order
# doesn't match the actual entry order
fp = self.fp
entry_offset = 0
filelist = sorted(self.filelist, key=attrgetter('header_offset'))
for i in range(len(filelist)):
info = filelist[i]
# find the target member
if info.header_offset < member.header_offset:
continue
# get the total size of the entry
entry_size = None
if i == len(filelist) - 1:
entry_size = self.start_dir - info.header_offset
else:
entry_size = filelist[i + 1].header_offset - info.header_offset
# found the member, set the entry offset
if member == info:
entry_offset = entry_size
continue
# Move entry
# read the actual entry data
fp.seek(info.header_offset)
entry_data = fp.read(entry_size)
# update the header
info.header_offset -= entry_offset
# write the entry to the new position
fp.seek(info.header_offset)
fp.write(entry_data)
fp.flush()
# update state
self.start_dir -= entry_offset
self.filelist.remove(member)
del self.NameToInfo[member.filename]
self._didModify = True
# seek to the start of the central dir
fp.seek(self.start_dir)
def zipfile_remove(self, member):
"""Remove a file from the archive. The archive must be open with mode 'a'"""
if self.mode != 'a':
raise RuntimeError("remove() requires mode 'a'")
if not self.fp:
raise ValueError(
"Attempt to write to ZIP archive that was already closed")
if self._writing:
raise ValueError(
"Can't write to ZIP archive while an open writing handle exists."
)
# Make sure we have an info object
if isinstance(member, ZipInfo):
# 'member' is already an info object
zinfo = member
else:
# get the info object
zinfo = self.getinfo(member)
return self._zipfile_remove_member(zinfo)
@functools.wraps(func)
def wrapper(*args, **kwargs):
if not hasattr(ZipFile, "remove"):
setattr(ZipFile, "_zipfile_remove_member", _zipfile_remove_member)
setattr(ZipFile, "remove", zipfile_remove)
return func(*args, **kwargs)
return wrapper
用途:
@enable_zip_remove
def replace_zip_file():
with ZipFile("archive.zip", "a") as z:
z.remove("hello.txt")
z.write("hello.txt")
附注国家科学基金会
我的解决方案:全部读取 -> 替换 -> 写回
def read_zip(fname):
bio = BytesIO(open(fname, 'rb').read())
zip = zipfile.ZipFile(bio, 'r')
fdict = {n:zip.read(n) for n in zip.namelist()}
zip.close()
return fdict
def write_zip(fname, fdict):
bio = BytesIO()
zip = zipfile.ZipFile(bio, 'w', zipfile.ZIP_DEFLATED)
for name, data in fdict.items():
zip.writestr(name, data)
zip.close()
open(fname, 'wb').write(bio.getvalue())
我的解决方案与其他答案类似,但使用 SQLite 来管理中间文件,并提供
__getitem__
、__setitem__
和 __delitem__
来实现简单的界面。
默认情况下,数据库位于内存中,但如果您的 zip 大于可用内存,则可以提供临时文件路径。
当然,SQLite 内置于 Python 中并且比文件系统更快
import sqlite3
import subprocess
import zipfile
from pathlib import Path
from sql import CREATE_TABLE, DELETE_FILE, INSERT_FILE, SELECT_CONTENT
class EditableZip:
"""Intended to make editing files inside zip archive easy, this class is capable of loading files
from a zip file into a sqlite database, facilitates editing/removing/adding files, and saving
to a zip.
The database can be in-memory (default) or in a temporary on disk file if
temp_db_path is provided.
If an on-disk file is used, EditableZip.close can be called to remove the file or EditableZip
can be used as a context manager.
If auto_save is set to True and an initial zip_path was provided then the file will
be overwritten when EditableZip closes. If you wish to save to a different file,
or no zip_path is used in instantiation, auto_save can take a file path.
Files can be added by item assignment
with EditableZip(auto_save="example.zip") as ez:
ez["thing.txt"] = "stuff"
# empty dir
ez["empty/"] = None
Assignment accepts Non-text files as bytes.
EditableZip is subscriptable. If the subscript is a path in the db, the data will be returned.
EditableZip.files can be used to iterate over files in the db.
"""
def __init__(
self,
zip_path: None | str | Path = None,
temp_db_path: None | Path = None,
auto_save: bool | str | Path = False,
):
self.temp_db_path, self.auto_save, self.file_path = (
temp_db_path,
auto_save,
zip_path,
)
self.db = sqlite3.connect(
str(temp_db_path if temp_db_path is not None else ":memory:")
)
self.db.execute(CREATE_TABLE)
if self.file_path:
self.load(self.file_path)
@property
def files(self):
"Returns a generator of all file paths in the database."
try:
return (
i[0] for i in self.db.execute("SELECT file_path FROM files").fetchall()
)
except TypeError:
return None
def load(self, zip_path: str | Path) -> None:
"Add all files from zip at zip_path to db."
with zipfile.ZipFile(zip_path, mode="r") as archive:
for item in archive.infolist():
self[item.filename] = (
None if item.filename[-1] == "/" else archive.read(item)
)
def save(self, zip_path: None | str | Path) -> Path:
"Save all files from db to zip at zip_path."
zip_path = self.file_path if zip_path is None else zip_path
with zipfile.ZipFile(zip_path, "w") as archive:
for file in self.files:
if file_data := self.fetch(file):
archive.writestr(file, file_data)
else:
archive.writestr(zipfile.ZipInfo(file), "")
return zip_path
def close(self):
"Auto save if applicable and close + remove db."
if self.auto_save:
self.save(
zip_path=self.auto_save
if isinstance(self.auto_save, (str, Path))
else None
)
self.db.close()
if isinstance(self.temp_db_path, Path):
self.temp_db_path.unlink(missing_ok=True)
def fetch(self, file_path: str) -> bytes:
"Get content of db file for file_path."
try:
return self.db.execute(SELECT_CONTENT, {"file_path": file_path}).fetchone()[
0
]
except TypeError:
return None
def __getitem__(self, key):
result = self.fetch(key)
try:
return result.decode("utf-8")
except AttributeError:
return result
def __setitem__(self, file_path, content: str | bytes):
if isinstance(content, str):
content = content.encode("utf-8")
self.db.execute(
INSERT_FILE,
{"file_path": file_path, "file_content": content},
)
def __delitem__(self, file_path):
self.db.execute(DELETE_FILE, {"file_path": file_path})
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, traceback):
self.close()
if __name__ == "__main__":
# A use case: editing epub files.
# File source:
# https://archiveofourown.org/downloads/13795605/Victoria%20Potter%20and%20the.epub?updated_at=1650231615
file_path = Path("Victoria Potter and the.epub")
new_file = (file_path.parent / (file_path.stem + "- lowercase")).with_suffix(
file_path.suffix
)
# Create a copy of the epub with all letters lowercase
with EditableZip(zip_path=file_path, auto_save=new_file) as ez:
for file in ez.files:
if Path(file).suffix in [".html", ".xhtml"]:
ez[file] = ez[file].lower()
简而言之,
您可以从 https://github.com/python/cpython/blob/659eb048cc9cac73c46349eb29845bc5cd630f09/Lib/zipfile.py 获取代码并从中创建一个单独的文件。之后,只需从您的项目中引用它,而不是内置的 python 库:
import myproject.zipfile as zipfile
。
用途:
with zipfile.ZipFile(f"archive.zip", "a") as z:
z.remove(f"firstfile.txt")