脚本使用 zipfile 模块使压缩文件变得越来越大,当终端被终止时停止

问题描述 投票:0回答:1

设计了一个 Tkinter GUI 应用程序,主要目标是压缩与特定模式匹配的文件。它将所需的文件压缩在一起,并且每个操作都按预期进行。唯一的问题是第一个压缩的文件在磁盘上的大小越来越大,如果我不终止终端,实际大小将达到 120GB 或更多。(原始文件为 90MB)。您可以将文件视为批次,如下所示:

  • AAAA.src
  • AAAA.txt
  • AAAA.log
  • BBBB.src
  • BBBB.log
  • 因此第一个压缩文件将是 AAAA.zip,而这个文件就会出现问题。 这是代码片段。任何帮助将不胜感激。
def copy_move_files(self, file_paths, action):
    third_input = self.get_third_input()

    for file_path in file_paths:
        if action == "copy":
            # .....
        elif action == "move":
            # .....
        elif action == "just_zip":
            file_paths_to_zip = []
            file_paths_to_delete = []
            source_with_third = os.path.join(self.source_folder, third_input)

            pattern = r'\d{8}_\d{6}_\d{14}'

            for root, _, files in os.walk(source_with_third):

                for file in files:
                    file_name = os.path.basename(file)
                    if re.match(pattern, file_name):
                        file_path = os.path.join(root, file)
                        file_paths_to_zip.append(file_path)
                        file_paths_to_delete.append(file_path)
                    else:
                        file_path = os.path.join(root, file)
                        file_paths_to_delete.append(file_path)

            for file_path in file_paths_to_zip:
                try:
                    prefix = os.path.basename(file_path)[:30]
                    zip_path = os.path.join(os.path.dirname(file_path), f'{prefix}.zip')

                    with zipfile.ZipFile(zip_path, 'a') as zipf:
                        arcname = os.path.basename(file_path)
                        zipf.write(file_path, arcname)

                    self.logger.info("File %s zipped successfully", os.path.basename(file_path))
                except Exception as e:
                    # Log the error message if an exception occurs during zipping
                    self.logger.error("Error zipping file %s: %s", os.path.basename(file_path), str(e))

            for file_path in file_paths_to_delete:
                if not file_path.endswith(".zip"):
                    os.remove(file_path)
                    self.logger.info("File %s deleted successfully", os.path.basename(file_path))

            self.logger.info("Files zipped successfully on %s", self.source_folder)


def just_zip(self):
    try:
        self.source_folder = self.source_entry.get()
        self.dest_folder = self.dest_entry.get()
        third_input = self.third_entry.get()
        last_copy_time = datetime.now().strftime("%Y-%m-%d \n %H:%M:%S")
        self.logger.info("Files will be zipped with the Chamber Name: %s", third_input)

        with open("guide.txt", "r+") as file:
            lines = file.readlines()
            lines[0] = self.source_folder + "\n"
            lines[1] = self.dest_folder + "\n"
            lines[2] = self.third_entry.get() + "\n"
            lines[3] = last_copy_time + "\n"
            file.seek(0)
            file.writelines(lines)
            file.truncate()

        with open("guide.txt", "r+") as file:
            file_lines = file.readlines()
            if len(file_lines) > 5:
                file.seek(0)
                file.writelines(file_lines[:5])
                file.truncate()

        if not self.source_folder:
            print("Please select a source.")
            self.logger.warning("No Source folder selected.")
            return

        file_paths = [os.path.join(self.source_folder, file) for file in os.listdir(self.source_folder)]
        try:
            threading.Thread(target=self.copy_move_files, args=(file_paths, "just_zip")).start()
        except Exception as e:
            logging.error("An error occurred during thread creation for zipping files: %s", str(e))
    except Exception as e:
        logging.error("An error occurred during zipping files: %s", str(e))

我尝试了仅使用写入模式的 with 块,但结果是没有压缩文件。尝试过 zipf.close()。开始对线程产生怀疑。

谢谢您!

python python-multithreading filesize python-zipfile
1个回答
0
投票

我不知道问题是什么,但对于可能有类似错误的人,我从头开始编写了一个完全不同的 zip 函数,并且效果很好。

        elif action == "just_zip":
        source_with_third = os.path.join(self.source_folder, third_input)

        all_files = []
        for root, _, files in os.walk(source_with_third):
            all_files.extend([os.path.join(root, file) for file in files])

        if not all_files:
            self.logger.error("Source folder looks empty. Check if the source path is correct")
            self.update_status("Source folder looks empty. Check if the source path is correct", is_error=True)
        else:

            for file_path in all_files:
                filename = os.path.basename(file_path)
                if not re.match(pattern, filename[:30]):
                    send2trash(file_path.replace("/","\\"))

            zip_files_exist = any(file.endswith(".zip") for file in all_files)
            if zip_files_exist:
                self.logger.error("ERROR, FILES ARE ALREADY ZIPPED YOU CAN TRY TO MOVE OR COPY THEM")
                self.update_status("Files are already zipped you can try to move or copy them", is_error=True)
            else:
                grouped_files = {}
                for file_path in all_files:
                    base_name = os.path.basename(file_path)
                    prefix = base_name[:30]
                    if prefix not in grouped_files:
                        grouped_files[prefix] = []
                    grouped_files[prefix].append(file_path)

                total_files = sum(len(files) for files in grouped_files.values())
                processed_files = 0

                for prefix, files in grouped_files.items():
                    if len(files) > 1:
                        zip_path = os.path.dirname(files[0])
                        zip_filename = os.path.join(zip_path, f"{prefix}.zip")
                        try:
                            with zipfile.ZipFile(zip_filename, "w") as zipf:
                                for file in files:
                                    try:
                                        zipf.write(file, os.path.relpath(file, zip_path))
                                    except Exception as e:
                                        self.logger.error(f"Error zipping {file}: {e}")
                                        self.update_status(f"Error zipping {file}", is_error=True)
                            self.logger.info(f"Zipped {len(files)} files with prefix '{prefix}'.")

                            for file in files:
                                send2trash(file.replace("/","\\"))
                                self.logger.info(f"Deleted {file}.")
                        except Exception as e:
                            self.logger.error(f"Error creating zip for {prefix}: {e}")
                            self.update_status(f"Error creating zip for {prefix}", is_error=True)
                    else:
                        pass
                    processed_files += len(files)
                    progress = int((processed_files / total_files) * 100)
                    self.progressbar["value"] = progress  # Update the progress bar directly
                    self.root.update_idletasks()
                
                self.mirrored_clock_label.config(text="Last Zip Time:\n" + datetime.now().strftime("%Y-%m-%d \n %H:%M:%S"))
                self.logger.info("Files zipped successfully on %s", self.source_folder)
                self.update_status("Files have been zipped SUCCESSFULLY", is_error=False)
© www.soinside.com 2019 - 2024. All rights reserved.