我使用 python 多进程池下载数千张图像并使用 python PIL 处理这些图像
一切正常,除非图像下载并损坏,然后 PIL 抛出错误
我正在寻找有关如何重新循环池的建议,也许只是重新下载图像或整个池,每个池的总数据约为 15Mb
我检查返回的池数据数组是否是预期的长度,但下一步会抛出错误,因为图像已损坏。
矿池代码
pool = multiprocessing.Pool(multiprocessing.cpu_count())
func = partial(url_downloader, map_id)
data = pool.map(func, url_list)
pool.close()
pool.join()
if len(data) == len(url_list):
for d in data:
image = Image.open(BytesIO(d[0]))
dst.paste(image, (d[1], d[2]))
else:
helpers.write_log(os.getcwd(), '{} : {}'.format(map_id, 'data size mismatch, skipping'))
return
exif_data = dst.getexif()
# https://www.awaresystems.be/imaging/tiff/tifftags/extension.html
# 0x270 ImageDescription - A string that describes the subject of the image
# 0x269 DocumentName - The name of the document from which this image was scanned.
# 0x285 PageName - The name of the page from which this image was scanned.
exif_data[0x269] = str(helpers.normalizefilename(page_meta[0]))
dst.save(os.path.join(image_folder, master_image_name), exif=exif_data)
helpers.write_to_file(os.path.join(os.getcwd(), 'index.txt'), 'a+', index_text)
下载功能
def url_downloader(map_id, url):
header = {"User-Agent": "Mozilla/5.0 (X11; CrOS "
"x86_64 12871.102.0) "
"AppleWebKit/537.36 (KHTML, "
"like Gecko) "
"Chrome/81.0.4044.141 "
"Safari/537.36"}
try:
response = requests.get(url[0], headers=header)
if response.status_code == 200:
image_data = response.content
return [image_data, url[1], url[2]]
except requests.exceptions.RequestException as e:
helpers.write_log(os.getcwd(), '{} : {}'.format(map_id, e))
return
按要求出错
Traceback (most recent call last):
File "/home/james/mapgrabber/./map-grabber.py", line 291, in <module>
main()
File "/home/james/mapgrabber/./map-grabber.py", line 69, in main
auto_map_grabber(save_path, conn)
File "/home/james/mapgrabber/./map-grabber.py", line 166, in auto_map_grabber
map_builder(m[1], save_path, conn)
File "/home/james/mapgrabber/./map-grabber.py", line 247, in map_builder
image = Image.open(BytesIO(d[0]))
TypeError: 'NoneType' object is not subscriptable
编辑:
现在我添加了一个简单的 try, except 函数,也许是重试次数的限制?我猜通常这只是一个错误的下载,所以这应该足够了
顺便说一句:看看你的工作函数
url_downloader
,它只是下载一个URL,看来多线程会更合适。
我建议您继续使用
map
方法,因为它在使用多处理时提供自动“分块”,并且您不指定 chunksize 参数(当然,您始终可以指定显式 chunksize 值)。当提交的任务数量很大时,分块会减少运行时间。但是,必须修改您的工作函数以处理可能的异常并返回该异常作为结果。然后,您可以处理结果以确定辅助函数因哪些参数而失败。例如,出于演示目的,工作函数 worker
对于传递的参数 2 或 9 将失败:
import multiprocessing
def worker(x):
try:
if x in (2, 9):
raise Exception('oh-oh')
# Otherwise do some work
except Exception as e:
return e
else:
return None
def main():
pool = multiprocessing.Pool()
args = [4, 3, 2, 8, 9, 6]
results = pool.map(worker, args)
failed_arguments = [
args[index] for index, result in enumerate(results)
if isinstance(result, Exception)
]
pool.close()
pool.join()
print(failed_arguments)
if __name__ == '__main__':
main()
打印:
[2, 9]