我正在尝试使用 pycurl 获取操作系统映像并将解压缩的数据写入磁盘。使用 gzip 很简单,只有使用 lz4 格式时我会遇到问题,似乎 write_lz4(buf) 会解压缩并写入磁盘,只有当我尝试调整分区大小时,才会收到错误:
entries 为 0 字节,但该程序仅支持 128 字节的条目。 相应调整,但分区表可能是垃圾。 警告:分区表头声称分区表的大小 条目为0字节,但该程序仅支持128字节条目。 相应调整,但分区表可能是垃圾。 在内存中创建新的 GPT 条目。 操作已成功完成。 错误:分区不存在
我也可以使用 io.Byitesio 来管理它:
if url.endswith('.lz4'):
with io.BytesIO() as output_buffer:
curl.setopt(pycurl.WRITEDATA, output_buffer)
curl.perform()
output_buffer.seek(0)
decompressed_data = lz4.frame.decompress(output_buffer.read())
disk.write(decompressed_data)
但是这一步似乎没有必要。我尝试了直接方法,但没有成功。这是代码:
def write_to_disk(self, url, dev, proxy=None):
if os.path.isfile(dev):
size = os.path.getsize(dev)
with open(os.path.realpath(dev), 'wb') as disk:
disk.seek(0)
def write_gz(buf):
disk.write(d.decompress(buf))
def write_lz4(buf):
disk.write(lz4.decompress(buf))
try:
curl = pycurl.Curl()
curl.setopt(pycurl.URL, url)
if proxy is not False:
curl.setopt(pycurl.PROXY, proxy)
curl.setopt(pycurl.BUFFERSIZE, 1024)
if url.endswith('.lz4'):
curl.setopt(pycurl.WRITEFUNCTION, write_lz4)
elif url.endswith('.gz'):
d = zlib.decompressobj(zlib.MAX_WBITS | 32)
curl.setopt(pycurl.WRITEFUNCTION, write_gz)
curl.perform()
except pycurl.error:
return False
if os.path.isfile(dev):
disk.seek(size - 1)
disk.write(b"\0")
return True
谢谢
我最终实施了以下解决方案:
def write_to_disk(self, url, dev, proxy=None):
"""Fetch compressed OS image, decompress, and write to disk."""
if not self.is_block_device(os.path.realpath(dev)):
print(f"{dev} is not a block device")
return False
with open(os.path.realpath(dev), 'wb') as disk:
disk.seek(0)
try:
curl = pycurl.Curl()
curl.setopt(pycurl.URL, url)
if proxy is not False:
curl.setopt(pycurl.PROXY, proxy)
curl.setopt(pycurl.BUFFERSIZE, 524288)
if url.endswith('.gz'):
d = zlib.decompressobj(zlib.MAX_WBITS | 32)
def write_gz(buf):
disk.write(d.decompress(buf))
curl.setopt(pycurl.WRITEFUNCTION, write_gz)
curl.perform()
elif url.endswith('.zst'):
# Perform the cURL request and store the data in a buffer
buffer = bytearray()
curl.setopt(curl.WRITEFUNCTION, buffer.extend)
curl.perform()
# Decompress the received data using zstd.decompress() and write it to disk
decompressed_data = zstd.decompress(bytes(buffer))
disk.write(decompressed_data)
elif url.endswith('.lz4'):
# Perform the cURL request and store the data in a buffer
buffer = bytearray()
curl.setopt(curl.WRITEFUNCTION, buffer.extend)
curl.perform()
# Decompress the received data using lz4.frame.decompress() and write it to disk
decompressed_data = lz4.frame.decompress(bytes(buffer))
disk.write(decompressed_data)
elif url.endswith('.raw'):
def write_raw(buf):
disk.write(buf)
curl.setopt(pycurl.WRITEFUNCTION, write_raw)
curl.perform()
except pycurl.error as e:
print("PyCURL error:", e)
return False
if os.path.isfile(dev):
size = os.path.getsize(dev)
disk.seek(size - 1)
disk.write(b"\0")
return True