设计了一个 Tkinter GUI 应用程序,主要目标是压缩与特定模式匹配的文件。它将所需的文件压缩在一起,并且每个操作都按预期进行。唯一的问题是第一个压缩的文件在磁盘上的大小越来越大,如果我不终止终端,实际大小将达到 120GB 或更多。(原始文件为 90MB)。您可以将文件视为批次,如下所示:
def copy_move_files(self, file_paths, action):
third_input = self.get_third_input()
for file_path in file_paths:
if action == "copy":
# .....
elif action == "move":
# .....
elif action == "just_zip":
file_paths_to_zip = []
file_paths_to_delete = []
source_with_third = os.path.join(self.source_folder, third_input)
pattern = r'\d{8}_\d{6}_\d{14}'
for root, _, files in os.walk(source_with_third):
for file in files:
file_name = os.path.basename(file)
if re.match(pattern, file_name):
file_path = os.path.join(root, file)
file_paths_to_zip.append(file_path)
file_paths_to_delete.append(file_path)
else:
file_path = os.path.join(root, file)
file_paths_to_delete.append(file_path)
for file_path in file_paths_to_zip:
try:
prefix = os.path.basename(file_path)[:30]
zip_path = os.path.join(os.path.dirname(file_path), f'{prefix}.zip')
with zipfile.ZipFile(zip_path, 'a') as zipf:
arcname = os.path.basename(file_path)
zipf.write(file_path, arcname)
self.logger.info("File %s zipped successfully", os.path.basename(file_path))
except Exception as e:
# Log the error message if an exception occurs during zipping
self.logger.error("Error zipping file %s: %s", os.path.basename(file_path), str(e))
for file_path in file_paths_to_delete:
if not file_path.endswith(".zip"):
os.remove(file_path)
self.logger.info("File %s deleted successfully", os.path.basename(file_path))
self.logger.info("Files zipped successfully on %s", self.source_folder)
def just_zip(self):
try:
self.source_folder = self.source_entry.get()
self.dest_folder = self.dest_entry.get()
third_input = self.third_entry.get()
last_copy_time = datetime.now().strftime("%Y-%m-%d \n %H:%M:%S")
self.logger.info("Files will be zipped with the Chamber Name: %s", third_input)
with open("guide.txt", "r+") as file:
lines = file.readlines()
lines[0] = self.source_folder + "\n"
lines[1] = self.dest_folder + "\n"
lines[2] = self.third_entry.get() + "\n"
lines[3] = last_copy_time + "\n"
file.seek(0)
file.writelines(lines)
file.truncate()
with open("guide.txt", "r+") as file:
file_lines = file.readlines()
if len(file_lines) > 5:
file.seek(0)
file.writelines(file_lines[:5])
file.truncate()
if not self.source_folder:
print("Please select a source.")
self.logger.warning("No Source folder selected.")
return
file_paths = [os.path.join(self.source_folder, file) for file in os.listdir(self.source_folder)]
try:
threading.Thread(target=self.copy_move_files, args=(file_paths, "just_zip")).start()
except Exception as e:
logging.error("An error occurred during thread creation for zipping files: %s", str(e))
except Exception as e:
logging.error("An error occurred during zipping files: %s", str(e))
我尝试了仅使用写入模式的 with 块,但结果是没有压缩文件。尝试过 zipf.close()。开始对线程产生怀疑。
谢谢您!
我不知道问题是什么,但对于可能有类似错误的人,我从头开始编写了一个完全不同的 zip 函数,并且效果很好。
elif action == "just_zip":
source_with_third = os.path.join(self.source_folder, third_input)
all_files = []
for root, _, files in os.walk(source_with_third):
all_files.extend([os.path.join(root, file) for file in files])
if not all_files:
self.logger.error("Source folder looks empty. Check if the source path is correct")
self.update_status("Source folder looks empty. Check if the source path is correct", is_error=True)
else:
for file_path in all_files:
filename = os.path.basename(file_path)
if not re.match(pattern, filename[:30]):
send2trash(file_path.replace("/","\\"))
zip_files_exist = any(file.endswith(".zip") for file in all_files)
if zip_files_exist:
self.logger.error("ERROR, FILES ARE ALREADY ZIPPED YOU CAN TRY TO MOVE OR COPY THEM")
self.update_status("Files are already zipped you can try to move or copy them", is_error=True)
else:
grouped_files = {}
for file_path in all_files:
base_name = os.path.basename(file_path)
prefix = base_name[:30]
if prefix not in grouped_files:
grouped_files[prefix] = []
grouped_files[prefix].append(file_path)
total_files = sum(len(files) for files in grouped_files.values())
processed_files = 0
for prefix, files in grouped_files.items():
if len(files) > 1:
zip_path = os.path.dirname(files[0])
zip_filename = os.path.join(zip_path, f"{prefix}.zip")
try:
with zipfile.ZipFile(zip_filename, "w") as zipf:
for file in files:
try:
zipf.write(file, os.path.relpath(file, zip_path))
except Exception as e:
self.logger.error(f"Error zipping {file}: {e}")
self.update_status(f"Error zipping {file}", is_error=True)
self.logger.info(f"Zipped {len(files)} files with prefix '{prefix}'.")
for file in files:
send2trash(file.replace("/","\\"))
self.logger.info(f"Deleted {file}.")
except Exception as e:
self.logger.error(f"Error creating zip for {prefix}: {e}")
self.update_status(f"Error creating zip for {prefix}", is_error=True)
else:
pass
processed_files += len(files)
progress = int((processed_files / total_files) * 100)
self.progressbar["value"] = progress # Update the progress bar directly
self.root.update_idletasks()
self.mirrored_clock_label.config(text="Last Zip Time:\n" + datetime.now().strftime("%Y-%m-%d \n %H:%M:%S"))
self.logger.info("Files zipped successfully on %s", self.source_folder)
self.update_status("Files have been zipped SUCCESSFULLY", is_error=False)