使用Python的FIM工具

问题描述 投票:0回答:1

我想监控文件,我已经创建了一个用于文件监控的python代码。但是当文件发生任何更改时,会自动将更改前的原始文件复制到安全文件夹中,以便稍后我们可以将安全文件夹中的文件与目标文件夹中的文件进行比较。例如,我在文件夹名称“目标文件夹”中有一个txt文件或word文档文件。如果文件被更改,即内容或数据已更改,则更改后的文件将保留在目标文件夹中,但具有原始/旧内容的原始文件应自动复制到名为“安全文件夹”的其他文件夹中。

请帮我解决问题。

我已经添加了我的代码以供参考。

type import os
import logging
import hashlib
import time
import openpyxl
import shutil
from PIL import Image
from docx import Document
import uuid
from PyPDF2 import PdfFileReader
from pptx import Presentation
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

def process_file_changes(filename, event_id, safe_folder_path):
    """
    Processes changes in a file by comparing it with the baseline and copies the original file to the safe folder.

    Args:
        filename: The path to the file to be processed.
        event_id: The unique event ID associated with the file event.
        safe_folder_path: The path to the safe folder where the original files will be copied.
    """
    logger = logging.getLogger(__name__)

    try:
        # Load the baseline data
        baseline_data = {}
        with open("baseline.txt", "r") as f:
            for line in f:
                parts = line.strip().split("|")
                if len(parts) >= 3:
                    path = parts[0]
                    file_hash = parts[1]
                    event_id = parts[2]
                    baseline_data[path] = {"hash": file_hash, "event_id": event_id}

        # Check if the file exists in the baseline
        if filename in baseline_data:
            # Copy the original file from the baseline to the safe folder
            safe_file_path = os.path.join(safe_folder_path, os.path.basename(filename))
            original_file_path = filename  # Assuming the file path in the baseline is the original file path
            shutil.copy2(original_file_path, safe_file_path)
            logger.info(f"[Event ID: 106] [{filename}] Original file copied to safe folder as it has been changed.")

        # Calculate hash of the current file
        current_hash = calculate_file_hash(filename)

        # Compare with baseline
        if filename not in baseline_data:
            logger.info(f"[Event ID: 101] [{filename}] New file detected.")
        elif current_hash != baseline_data[filename]["hash"]:
            logger.info(f"[Event ID: 103] [{filename}] File changed.")
        else:
            logger.info(f"[Event ID: 100] [{filename}] No change in file.")

        # Update baseline data with new hash
        baseline_data[filename] = {"hash": current_hash, "event_id": event_id}

        # Save updated baseline data
        with open("baseline.txt", "w") as f:
            for path, info in baseline_data.items():
                f.write(f"{path}|{info['hash']}|{info['event_id']}\n")

    except Exception as e:
        logger.error(f"Error processing changes in {filename}: {e}")


def process_pptx_changes(filename, event_id):
    """
    Processes changes in a PowerPoint (PPTX) file by comparing it with the baseline.

    Args:
        filename: The path to the PowerPoint file to be processed.
        event_id: The unique event ID associated with the file event.
    """
    logger = logging.getLogger(__name__)
    try:
        # Load the baseline data
        baseline_data = {}
        with open("baseline.txt", "r") as f:
            for line in f:
                parts = line.strip().split("|")
                if len(parts) >= 3:
                    path = parts[0]
                    file_hash = parts[1]
                    event_id = parts[2]
                    baseline_data[path] = {"hash": file_hash, "event_id": event_id}

        # Calculate hash of the current PowerPoint file
        current_hash = calculate_file_hash(filename)

        # Compare with baseline
        if filename not in baseline_data:
            logger.info(f"[Event ID: 101] [{filename}] New PowerPoint document detected.")
        elif current_hash != baseline_data[filename]["hash"]:
            logger.info(f"[Event ID: 103] [{filename}] PowerPoint document changed.")
        else:
            logger.info(f"[Event ID: 100] [{filename}] No change in PowerPoint document.")

        # Update baseline data with new hash
        baseline_data[filename] = {"hash": current_hash, "event_id": event_id}

        # Save updated baseline data
        with open("baseline.txt", "w") as f:
            for path, info in baseline_data.items():
                f.write(f"{path}|{info['hash']}|{info['event_id']}\n")

    except Exception as e:
        logger.error(f"Error processing changes in {filename}: {e}")

def process_image_changes(filename, event_id):
    """
    Processes changes in an image file by comparing it with the baseline.

    Args:
        filename: The path to the image file to be processed.
        event_id: The unique event ID associated with the file event.
    """
    logger = logging.getLogger(__name__)
    try:
        # Load the baseline data
        baseline_data = {}
        with open("baseline.txt", "r") as f:
            for line in f:
                parts = line.strip().split("|")
                if len(parts) >= 3:
                    path = parts[0]
                    file_hash = parts[1]
                    event_id = parts[2]
                    baseline_data[path] = {"hash": file_hash, "event_id": event_id}

        # Calculate hash of the current image
        current_hash = calculate_file_hash(filename)

        # Compare with baseline
        if filename not in baseline_data:
            logger.info(f"[Event ID: 101] [{filename}] New image detected.")
        elif current_hash != baseline_data[filename]["hash"]:
            logger.info(f"[Event ID: 103] [{filename}] Image changed.")
        else:
            logger.info(f"[Event ID: 100] [{filename}] No change in image.")

        # Update baseline data with new hash
        baseline_data[filename] = {"hash": current_hash, "event_id": event_id}

        # Save updated baseline data
        with open("baseline.txt", "w") as f:
            for path, info in baseline_data.items():
                f.write(f"{path}|{info['hash']}|{info['event_id']}\n")

    except Exception as e:
        logger.error(f"Error processing changes in {filename}: {e}")

def process_excel_changes(filename, event_id):
    """
    Processes changes in an Excel file by comparing it with the baseline.

    Args:
        filename: The path to the Excel file to be processed.
        event_id: The unique event ID associated with the file event.
    """
    logger = logging.getLogger(__name__)
    try:
        # Load the baseline data
        baseline_data = {}
        with open("baseline.txt", "r") as f:
            for line in f:
                parts = line.strip().split("|")
                if len(parts) >= 3:
                    path = parts[0]
                    file_hash = parts[1]
                    event_id = parts[2]
                    baseline_data[path] = {"hash": file_hash, "event_id": event_id}

        # Load the current Excel file
        wb = openpyxl.load_workbook(filename)
        sheet = wb.active

        # Get the column names
        column_names = [cell.value for cell in next(sheet.iter_rows())]

        # Compare row values with the baseline
        for row in sheet.iter_rows(min_row=2):  # Start from the second row (assuming the first row contains headers)
            row_data = {column_names[i]: cell.value for i, cell in enumerate(row)}
            row_key = '|'.join(str(row_data[col]) for col in column_names)
            
            if row_key not in baseline_data.get(filename, {}).get("rows", {}):
                logger.info(f"[Event ID: {event_id}] [{filename}] New row: {row_data}")
            else:
                old_row_data = baseline_data[filename]["rows"].get(row_key)
                if row_data != old_row_data:
                    logger.info(f"[Event ID: {event_id}] [{filename}] Row changed: {row_data}")

                    # Update the baseline data with new row values
                    baseline_data[filename]["rows"][row_key] = row_data

        # Update the baseline data with new file hash
        baseline_data[filename]["hash"] = calculate_file_hash(filename)

        # Save updated baseline data
        with open("baseline.txt", "w") as f:
            for path, info in baseline_data.items():
                f.write(f"{path}|{info['hash']}|{info['event_id']}\n")

    except Exception as e:
        logger.error(f"Error processing changes in {filename}: {e}")

def process_word_changes(filename, event_id):
    logger = logging.getLogger(__name__)
    try:
        if filename.startswith('~$'):
            # Skip temporary Word files
            return

        # Load the baseline data
        baseline_data = {}
        with open("baseline.txt", "r") as f:
            for line in f:
                parts = line.strip().split("|")
                if len(parts) >= 3:
                    path = parts[0]
                    file_hash = parts[1]
                    event_id = parts[2]
                    baseline_data[path] = {"hash": file_hash, "event_id": event_id}

        # Check if the file exists
        if not os.path.exists(filename):
            if filename in baseline_data:
                logger.info(f"[Event ID: {event_id}] [{filename}] Word document has been deleted.")
                del baseline_data[filename]
            else:
                logger.warning(f"[Event ID: {event_id}] [{filename}] Word document deletion event occurred but not tracked.")
            return

        # Calculate hash of the current Word document
        current_hash = calculate_file_hash(filename)

        # Compare with baseline
        if filename not in baseline_data:
            logger.info(f"[Event ID: 101] [{filename}] New Word document detected.")
        elif current_hash != baseline_data[filename]["hash"]:
            logger.info(f"[Event ID: 103] [{filename}] Word document changed.")
        else:
            logger.info(f"[Event ID: 100] [{filename}] No change in Word document.")

        # Update baseline data with new hash
        baseline_data[filename] = {"hash": current_hash, "event_id": event_id}

        # Save updated baseline data
        with open("baseline.txt", "w") as f:
            for path, info in baseline_data.items():
                f.write(f"{path}|{info['hash']}|{info['event_id']}\n")

    except Exception as e:
        logger.error(f"Error processing changes in {filename}: {e}")

def process_pdf_changes(filename, event_id):
    """
    Processes changes in a PDF file by comparing it with the baseline.

    Args:
        filename: The path to the PDF file to be processed.
        event_id: The unique event ID associated with the file event.
    """
    logger = logging.getLogger(__name__)
    try:
        # Load the baseline data
        baseline_data = {}
        with open("baseline.txt", "r") as f:
            for line in f:
                parts = line.strip().split("|")
                if len(parts) >= 3:
                    path = parts[0]
                    file_hash = parts[1]
                    event_id = parts[2]
                    baseline_data[path] = {"hash": file_hash, "event_id": event_id}

        # Calculate hash of the current PDF file
        current_hash = calculate_file_hash(filename)

        # Compare with baseline
        if filename not in baseline_data:
            logger.info(f"[Event ID: 101] [{filename}] New PDF document detected.")
        elif current_hash != baseline_data[filename]["hash"]:
            logger.info(f"[Event ID: 103] [{filename}] PDF document changed.")
        else:
            logger.info(f"[Event ID: 100] [{filename}] No change in PDF document.")

        # Update baseline data with new hash
        baseline_data[filename] = {"hash": current_hash, "event_id": event_id}

        # Save updated baseline data
        with open("baseline.txt", "w") as f:
            for path, info in baseline_data.items():
                f.write(f"{path}|{info['hash']}|{info['event_id']}\n")

    except Exception as e:
        logger.error(f"Error processing changes in {filename}: {e}")

def process_text_changes(filename, event_id):
    """
    Processes changes in a text file by comparing it with the baseline.

    Args:
        filename: The path to the text file to be processed.
        event_id: The unique event ID associated with the file event.
    """
    logger = logging.getLogger(__name__)
    try:
        # Load the baseline data
        baseline_data = {}
        with open("baseline.txt", "r") as f:
            for line in f:
                parts = line.strip().split("|")
                if len(parts) >= 3:
                    path = parts[0]
                    file_hash = parts[1]
                    event_id = parts[2]
                    baseline_data[path] = {"hash": file_hash, "event_id": event_id}

        # Calculate hash of the current text file
        current_hash = calculate_file_hash(filename)

        # Compare with baseline
        if filename not in baseline_data:
            logger.info(f"[Event ID: 101] [{filename}] New text file detected.")
        elif current_hash != baseline_data[filename]["hash"]:
            logger.info(f"[Event ID: 103] [{filename}] Text file changed.")
        else:
            logger.info(f"[Event ID: 100] [{filename}] No change in text file.")

        # Update baseline data with new hash
        baseline_data[filename] = {"hash": current_hash, "event_id": event_id}

        # Save updated baseline data
        with open("baseline.txt", "w") as f:
            for path, info in baseline_data.items():
                f.write(f"{path}|{info['hash']}|{info['event_id']}\n")

    except Exception as e:
        logger.error(f"Error processing changes in {filename}: {e}")

def check_file_type(filename):
    """
    Checks the type of the file.

    Args:
        filename: The path to the file.

    Returns:
        'excel' if the file is an Excel file, 'image' if it's an image file,
        'word' if it's a Word document, 'pdf' if it's a PDF document,
        'pptx' if it's a PowerPoint document, None otherwise.
    """
    if filename.endswith('.xlsx'):
        return 'excel'
    elif filename.endswith(('.jpg', '.jpeg', '.png', '.gif')):
        return 'image'
    elif filename.endswith('.docx'):
        return 'word'
    elif filename.endswith('.pdf'):
        return 'pdf'
    elif filename.endswith('.pptx'):
        return 'pptx'
    return None

def process_file(filename, event_id, safe_folder_path):
    """
    Processes a file based on its type.

    Args:
        filename: The path to the file to be processed.
        event_id: The unique event ID associated with the file event.
        safe_folder_path: The path to the safe folder where the original files will be copied.
    """
    file_type = check_file_type(filename)
    logger = logging.getLogger(__name__)
    try:
        # Copy the original file to the safe folder
        safe_file_path = os.path.join(safe_folder_path, os.path.basename(filename))
        shutil.copy2(filename, safe_file_path)
        logger.info(f"[Event ID: 106] [{filename}] File copied to safe folder as it has been changed.")
        
        # Process the file based on its type
        if file_type == 'excel':
            process_excel_changes(filename, event_id)
        elif file_type == 'image':
            process_image_changes(filename, event_id)
        elif file_type == 'word':
            process_word_changes(filename, event_id)
        elif file_type == 'pdf':
            process_pdf_changes(filename, event_id)
        elif file_type == 'pptx':
            process_pptx_changes(filename, event_id)
        elif file_type == 'txt':
            process_text_changes(filename, event_id)  # Add this line to handle text files
        else:
            # Handle other file types here
            logger.info(f"[Event ID: 103] [{filename}] File has been changed.")
    except Exception as e:
        logger.error(f"Error processing {filename}: {e}")

def calculate_file_hash(filepath):
    """Calculates the SHA512 hash of a file.
    
    Args:
        filepath: The path to the file.
        
    Returns:
        A string containing the SHA512 hash of the file.
    """
    with open(filepath, "rb") as f:
        data = f.read()
        return hashlib.sha512(data).hexdigest()

def erase_existing_baseline():
    """
    Deletes the "baseline.txt" file if it exists.
    """
    if os.path.exists("baseline.txt"):
        os.remove("baseline.txt")

def collect_baseline(target_folders):
    """
    Collects baseline information for files in the target folders and their subfolders.

    Args:
        target_folders: A list of paths to the target folders.
    """
    erase_existing_baseline()

    # Collect baseline information for each target folder
    for target_folder in target_folders:
        # Initialize file_info_dict for this target folder
        file_info_dict = {}

        # Collect all files in the target folder and its subfolders
        for root, dirs, files in os.walk(target_folder):
            for f in files:
                full_path = os.path.join(root, f)
                file_hash = calculate_file_hash(full_path)
                event_id = str(uuid.uuid4())  # Generate a UUID for the event
                file_info_dict[full_path] = {"hash": file_hash, "event_id": event_id}

        # Save the dictionary to "baseline.txt"
        with open("baseline.txt", "a") as f:  # Use 'a' (append) mode to add to existing baseline
            for path, info in file_info_dict.items():
                f.write(f"{path}|{info['hash']}|{info['event_id']}\n")

def monitor_files(target_folders, safe_folder_path):
    """
    Monitor changes in files within the specified target folders and their subfolders.

    Args:
        target_folders: A list of paths to the target folders.
        safe_folder_path: The path to the safe folder where the original files will be copied.
    """
    file_info_dict = {}  # Initialize an empty dictionary to store file information
    excluded_dirs = ['image files']  # Directories to exclude from monitoring
    
    try:
        with open("baseline.txt", "r") as f:
            for line in f:
                parts = line.strip().split("|")
                if len(parts) >= 3:
                    path = parts[0]
                    file_hash = parts[1]
                    event_id = parts[2]
                    file_info_dict[path] = {"hash": file_hash, "event_id": event_id}
    except Exception as e:
        print(f"Error loading baseline: {e}")

    while True:
        time.sleep(5)  # Delay for monitoring
        
        for target_folder in target_folders:
            for root, dirs, files in os.walk(target_folder):
                # Exclude certain directories from being processed
                dirs[:] = [d for d in dirs if d not in excluded_dirs]
                
                for d in dirs:
                    full_path = os.path.join(root, d)
                    if d not in [".", ".."] and full_path not in file_info_dict:
                        event_id = str(uuid.uuid4())  # Generate a UUID for the event
                        file_info_dict[full_path] = {"hash": "", "event_id": event_id}
                        logger = logging.getLogger(__name__)
                        logger.info(f"[Event ID: 101] [{full_path}] New folder created.")
            
                for f in files:
                    full_path = os.path.join(root, f)
                    # Check if the file has been modified
                    if file_modified(file_path):
                        # Copy the original file to the safe folder
                        copy_original_file(file_path)
                    try:
                        with open(full_path, "rb") as file_handle:
                            pass
                    except PermissionError:
                        print(f"\n{full_path} is in use, skipping...")
                        continue
                    
                    # Check if the file is new (not in the baseline)
                    if full_path not in file_info_dict:
                        event_id = str(uuid.uuid4())  # Generate a UUID for the event
                        file_hash = calculate_file_hash(full_path)
                        file_info_dict[full_path] = {"hash": file_hash, "event_id": event_id}
                        
                        # Log new file creation event
                        logger = logging.getLogger(__name__)
                        logger.info(f"[Event ID: 101] [{full_path}] New file detected.")
                    
                    # Update baseline information for existing files
                    else:
                        current_hash = calculate_file_hash(full_path)
                        if current_hash != file_info_dict[full_path]["hash"]:
                            event_id = str(uuid.uuid4())  # Generate a UUID for the event
                            file_info_dict[full_path]["hash"] = current_hash
                            file_info_dict[full_path]["event_id"] = event_id
                            process_file(full_path, event_id, safe_folder_path)  # Pass safe_folder_path here
            
            # Check for renamed/deleted files
            for path in list(file_info_dict.keys()):
                if not os.path.exists(path):
                    if path in file_info_dict:
                        # Check if the file was renamed by checking if any similar file exists
                        renamed = False
                        for root, dirs, files in os.walk(os.path.dirname(path)):
                            if not renamed:
                                for file_name in files:
                                    file_path = os.path.join(root, file_name)
                                    if calculate_file_hash(file_path) == file_info_dict[path]["hash"] and file_path != path:
                                        renamed = True
                                        new_path = file_path
                                        break

                        if renamed:
                            print(f"\n{path} has been renamed to {new_path}!")
                            # Log file renaming event
                            logger = logging.getLogger(__name__)
                            logger.info(f"[Event ID: 104] File at path: {path} has been renamed to {new_path}")
                            file_info_dict[new_path] = file_info_dict.pop(path)  # Update dictionary with new path
                        else:
                            print(f"\n{path} has been deleted!")
                            # Log file deletion event
                            logger = logging.getLogger(__name__)
                            logger.info(f"[Event ID: 102] File at path: {path} has been deleted")
                            del file_info_dict[path]
                    else:
                        # This case shouldn't happen, but if it does, log it as a deletion
                        print(f"\n{path} has been deleted!")
                        # Log file deletion event 
                        logger = logging.getLogger(__name__)
                        logger.info(f"[Event ID: 102] File at path: {path} has been deleted")
                
    # Add the necessary logging configuration and handlers here

def file_modified(file_path):
    try:
        # Logic to check if the file has been modified
        # You need to implement this based on your requirements
        return True  # Replace this with your actual logic for file modification
    except UnboundLocalError:
        return False

def copy_original_file(file_path):
    # Copy the original file to the safe folder
    safe_file_path = os.path.join(safe_folder_path, os.path.basename(file_path))
    shutil.copy2(file_path, safe_file_path)
    print(f"Original file copied to safe folder: {file_path}")

# Create a FileHandler and set its properties
log_file = "folder_logs.log"
file_handler = logging.FileHandler(log_file)
file_handler.setLevel(logging.INFO)

# Create a formatter
formatter = logging.Formatter('%(asctime)s %(name)s %(levelname)s %(message)s')
file_handler.setFormatter(formatter)

# Add the FileHandler to the logger
logger.addHandler(file_handler)

if __name__ == "__main__":
    # Define log file path and target folder
    log_file = "folder_logs.log"

    target_folders = [r"D:\Academics\Engineering\Final_Year_Proj\FIM_Tool\Sample\Files",
                     r"H:\Temp",
                     r"E:\shivv proj"]
    
    safe_folder_path = r"C:\safe_folder"



    # Configure logging
    logging.basicConfig(filename=log_file, level=logging.INFO,
                        format='%(asctime)s %(name)s %(levelname)s %(message)s')

    # Collect baseline or monitor files based on user input
    while True:
        print("\nWhat would you like to do?")
        print("  A) Collect new Baseline?")
        print("  B) Begin monitoring files with saved Baseline?")
        response = input("\nPlease enter 'A' or 'B': ").upper()

        if response == "A":
            collect_baseline(target_folders)
        elif response == "B":
            monitor_files(target_folders, safe_folder_path)
        else:
            print("Invalid input. Please enter 'A' or 'B'.")
here

现在在我的代码中,文件被自动复制到安全文件夹中,但复制的是更改的文件而不是原始文件。我希望将更改前的文件复制到安全文件夹。

python file monitoring
1个回答
0
投票

我认为你想做的事情是不可能的。当您注意到更改时,该文件已经被修改。原件已不存在,因此无法复制到任何地方。

您可以做的是创建所有文件的临时副本,当检测到更改时,将临时副本复制到您的安全文件夹,然后用新文件替换临时副本。不过,它可能需要大量内存,具体取决于您正在监视的文件的总大小。

如果文件保证可以被words或openoffice等软件修改,您可以监控锁定文件。这些软件在编辑过程中会创建一个

.~lock.{name of file}
,以防止文件同时在两个不同的程序中编辑。您可以检查锁定文件,当找到锁定文件时,创建正在编辑的文件的临时副本。

此外,如果您想要的只是某种版本控制方法,那么使用 git 可能会更容易。

© www.soinside.com 2019 - 2024. All rights reserved.