当使用 argparse 的参数而不是默认值时,脚本不起作用

问题描述 投票:0回答:1

我尝试编写一个Python脚本来提取zip文件,复制到目录并创建PR。默认行为是不使用任何参数,仅使用默认值。当我这样做时,效果非常好。但是,当我传递参数

-z
-f
时,我收到以下错误:

235: An error occurred:
Traceback (most recent call last):
  File "/Users/ops.eekfonky/code/dev/server/trunk/deployment/./templatesToPR.py", line 217, in main
    template_name, filename = unzip_templates(zip_dir, extraction_dir)
                              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/ops.eekfonky/code/dev/server/trunk/deployment/./templatesToPR.py", line 57, in unzip_templates
    filename = next(f for f in os.listdir(zip_dir) if f.endswith('.zip'))
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
StopIteration

这是完整的脚本;

#!/usr/bin/env python3

import argparse
import logging
import os
import shutil
import sys
import subprocess
import importlib
import tempfile
import uuid
import zipfile
import git
import re
from git import Repo, GitCommandError

# Configure logging
logging.basicConfig(level=logging.INFO, format="%(lineno)d: %(message)s")

# Define default values in a dictionary
DEFAULTS = {
    "default_branch": "release/17",
    "zip_dir": "~/Downloads/templates",
}

def parse_arguments():
    """Parses command-line arguments."""
    parser = argparse.ArgumentParser(description='''
    This script handles git repositories and templates based on internal defaults or command-line overrides.
    Use command-line options to specify custom settings for the script's operation.''')
    parser.add_argument('--def-branch', '-d', default=DEFAULTS["default_branch"],
                        help='Default git branch name. Default is "%(default)s".')
    parser.add_argument('--zip-dir', '-z', default=DEFAULTS["zip_dir"],
                        help='Directory for zipped templates. Default is "%(default)s".')
    parser.add_argument('--zip_file', '-f', type=str, default=None, nargs = '?',
                        help='Specify the .zip filename explicitly. Defaults to the zip_dir if not specified.')
    return parser.parse_args()

def install_and_import(package, module_name=None):
    """Installs a package if needed and imports it."""
    module_name = module_name if module_name else package
    try:
        importlib.import_module(module_name)
    except ImportError:
        subprocess.check_call([sys.executable, "-m", "pip", "install", package])
        importlib.import_module(module_name)

def ensure_directory_exists(directory):
    if not os.path.exists(directory):
        os.makedirs(directory)

def unzip_templates(zip_dir, extraction_path):
    """Extracts a template ZIP, handling 'email' and 'sms' directories and preserving nested structures."""
    zip_file_path = None

    if os.path.isdir(zip_dir):
        filename = next(f for f in os.listdir(zip_dir) if f.endswith('.zip'))
        zip_file_path = os.path.join(zip_dir, filename)
    else:
        filename = os.path.basename(zip_dir)
        zip_file_path = zip_dir

    if not filename:
        raise FileNotFoundError("No ZIP file found in the directory or provided file")

    template_name = os.path.splitext(filename)[0]  # Determine template_name

    # Rename template_name if it has spaces (optional)
    base_name, ext = os.path.splitext(template_name)
    if ' ' in base_name:
        new_base_name = base_name.replace(' ', '-')
        template_name = new_base_name + ext

    with zipfile.ZipFile(zip_file_path, 'r') as zip_ref:
        # Check for files with spaces in the name
        for zip_info in zip_ref.infolist():
            if ' ' in zip_info.filename:
                raise ValueError(f"ZIP file contains a file with a space in the name: {zip_info.filename}")

        # Extract directly into the extraction_path
        zip_ref.extractall(extraction_path)

    # Cleanup __MACOSX Directory
    macosx_dir = os.path.join(extraction_path, '__MACOSX')
    if os.path.exists(macosx_dir) and os.path.isdir(macosx_dir):
        shutil.rmtree(macosx_dir)

    logging.info(f"Extracted {filename} to {extraction_path}")
    return template_name, filename

def copy_templates_to_destination(extraction_dir, template_dir):
    """Copies extracted templates, overwrites matching files, adds new files, and skips hidden files."""

    for subdir in os.listdir(extraction_dir):
        source_dir = os.path.join(extraction_dir, subdir)
        destination_dir = os.path.join(template_dir, subdir)

        # Filter out any hidden files before copying
        non_hidden_files = [f for f in os.listdir(source_dir) if not f.startswith('.')]

        # Copy with merge
        shutil.copytree(source_dir, destination_dir, ignore=shutil.ignore_patterns('.*'), dirs_exist_ok=True)
        logging.info(f"Merged {subdir} into {destination_dir}")

def cleanup(extraction_dir, args, filename=None):
    """Clean up temporary files and optionally the original ZIP file(s)."""

    shutil.rmtree(extraction_dir)
    logging.info(f"Temporary files cleaned up from {extraction_dir}")

    zip_dir = DEFAULTS["zip_dir"]
    zip_dir = os.path.abspath(os.path.expanduser(zip_dir))
    original_zip_file = os.path.join(zip_dir, filename)

    if os.path.isfile(original_zip_file):
        os.remove(original_zip_file)
        logging.debug(f"Original ZIP file removed: {original_zip_file}")
    else:
        logging.warning(f"No {filename} for ZIP file deletion in default handling.")


class GitRepoManager:
    def __init__(self, repo_path):
        self.repo = Repo(repo_path)

    def prepare_and_create_branch(self, default_branch, template_name, user_initials):
        """Prepares the repository  and creates a new branch for template updates."""
        original_branch = self.repo.active_branch.name
        unique_suffix = str(uuid.uuid4())[:8]
        new_branch = f"{user_initials}/{template_name}-{unique_suffix}-no-build"
        stashed_changes = False

        try:
            if self.repo.is_dirty():
                logging.info("Changes detected in the working directory. Attempting to stash changes...")
                self.repo.git.stash('save', "Stashing changes before template update")
                logging.info("All changes stashed.")
                stashed_changes = True

            self.repo.git.checkout(default_branch)
            self.repo.git.pull()
            self.repo.git.checkout("HEAD", b=new_branch)
            logging.info(f"New branch '{new_branch}' created from '{default_branch}'.")

            return True, new_branch, original_branch, stashed_changes

        except git.exc.GitCommandError as e:
            logging.error(f"Error preparing repository or creating branch: {e}")
            return False, None, None, stashed_changes

    def handle_updates(self, template_dir, branch_name, original_branch, stashed_changes):
        """Stages, commits, and pushes template changes"""
        try:
            if branch_name not in self.repo.git.branch().split():
                sys.exit(f"Branch '{branch_name}' does not exist locally. Cannot proceed with push.")
            self.repo.git.add(template_dir)
            self.repo.index.commit("Update templates")
            subprocess.run(["git", "push", "origin", branch_name, "--progress"])
            self.repo.git.checkout(original_branch)
            if stashed_changes:
                self.repo.git.stash('pop')
            self.repo.git.branch('-D', branch_name)
            logging.info(f"Templates pushed and local branch '{branch_name}' removed.")
        except git.exc.GitCommandError as e:
            logging.error(f"Git operations failed: {e}")

    def backup_current_state(self):
        # Create a temporary branch to hold the current state
        try:
            self.repo.git.branch('temp_backup_branch')
        except GitCommandError:
            # Branch already exists, force update it
            self.repo.git.branch('-D', 'temp_backup_branch')
            self.repo.git.branch('temp_backup_branch')

    def rollback_to_backup(self):
        try:
            # Reset the current branch to the backup branch
            self.repo.git.reset('--hard', 'temp_backup_branch')
            print("Rollback successful.")
        except GitCommandError as e:
            print(f"Rollback failed: {e}")

def main():
    args = parse_arguments()
    install_and_import('GitPython', 'git')
    from git import Repo, GitCommandError

    # Script variables with argparse and dynamic paths
    script_dir = os.path.dirname(os.path.abspath(__file__))
    os.chdir(script_dir)
    git_repo_root = os.path.abspath(os.path.join(script_dir, "../../../"))  # Example path
    template_dir = os.path.join(git_repo_root, "server/trunk/services/src/main/resources/templates")

    if args.zip_dir:
        zip_dir = os.path.abspath(os.path.expanduser(args.zip_dir))  # Ensure absolute path with ~ expansion
        if not os.path.isdir(zip_dir):
            raise ValueError("Error: When using the -z flag, you must provide a directory containing ZIP files.")

    elif args.zip_file:
        zip_dir = os.path.abspath(os.path.expanduser(args.zip_file))  # Ensure absolute path with ~ expansion
        if not os.path.isfile(zip_dir):
            raise ValueError("Error: When using the -f flag, you must provide a valid file path to a ZIP file.")
    else:
        zip_dir = DEFAULTS["zip_dir"]
        zip_dir = os.path.abspath(os.path.expanduser(zip_dir))

    default_branch = args.def_branch
    ensure_directory_exists(zip_dir)
    extraction_dir = tempfile.mkdtemp(prefix='templates-extraction-', dir='/tmp')

    try:
        if len(zip_dir) == 0:
            print("Empty directory or file not found")
            raise Exception('zip template dir is empty')

        template_name, filename = unzip_templates(zip_dir, extraction_dir)
        print(f"template name: {template_name}, filename: {filename}")
        repo_manager = GitRepoManager(git_repo_root)
        # Calculate user_initials
        whoami_output = subprocess.check_output(['whoami']).decode('utf-8').strip()
        parts = whoami_output.split('.')
        user_initials = parts[1][0] + parts[-1][0]  # Extract initials from first and last name
        success, branch_name, original_branch, stashed_changes = repo_manager.prepare_and_create_branch(default_branch, template_name, user_initials)
        if not success:
            logging.error("Failed to create the new branch. Aborting script.")
            sys.exit(1)
        copy_templates_to_destination(extraction_dir, template_dir)
        repo_manager.handle_updates(template_dir, branch_name, original_branch, stashed_changes)

        # removes everything from templates dir
        if success:
            cleanup(extraction_dir, args, filename)
    except Exception as e:
        logging.error(f"An error occurred: {e}")
        repo_manager.backup_current_state()
        repo_manager.rollback_to_backup()

# Call your main function
if __name__ == "__main__":
    main()

为什么会出现这个错误?我是 Python 新手,所以陷入困境

python-3.x scripting
1个回答
-1
投票

问题出在这行代码:

filename = next(f for f in os.listdir(zip_dir) if f.endswith('.zip'))

当没有匹配的文件时,这会引发

StopIteration
错误。一个简单的解决方案是捕获
StopIeration
错误:

def unzip_templates(zip_dir, extraction_path):
    """Extracts a template ZIP, handling 'email' and 'sms' directories and preserving nested structures."""
    zip_file_path = None
    filename = None

    if os.path.isdir(zip_dir):
        try:
            filename = next(f for f in os.listdir(zip_dir) if f.endswith('.zip'))
            zip_file_path = os.path.join(zip_dir, filename)
        except StopIeration:
            pass
    else:
        filename = os.path.basename(zip_dir)
        zip_file_path = zip_dir

    if not filename:
        raise FileNotFoundError("No ZIP file found in the directory or provided file")

您可以通过使用

StopIeration
的两个参数形式来避免
next
异常:

filename = next((f for f in os.listdir(zip_dir) if f.endswith('.zip')), None)

如果没有

filename
文件,这会将
None
设置为
.zip
。但是,要使其起作用,您需要稍微修改一下逻辑(因为否则,如果
os.path.join
filename
,后续的
None
将失败)。也许是这样的:

def unzip_templates(zip_dir, extraction_path):
    """Extracts a template ZIP, handling 'email' and 'sms' directories and preserving nested structures."""
    zip_file_path = None

    if not zip_dir:
        raise ValueError("zip_dir must contain a path")

    if os.path.isdir(zip_dir):
        filename = next((f for f in os.listdir(zip_dir) if f.endswith(".zip")), None)
        if not filename:
            raise FileNotFoundError(
                "No ZIP file found in the directory or provided file"
            )

        zip_file_path = os.path.join(zip_dir, filename)
    else:
        filename = os.path.basename(zip_dir)
        zip_file_path = zip_dir

    template_name = os.path.splitext(filename)[0]  # Determine template_name

    ...
© www.soinside.com 2019 - 2024. All rights reserved.