Python 脚本按年和月对邮件进行排序(嵌套标签和速度)

问题描述 投票:0回答:1

我需要一个 Python 脚本来自动按年份和月份对电子邮件进行排序。一般来说,可以直接在 Gmail 中执行搜索或过滤操作,但由于此收件箱中的电子邮件数量巨大,所有这些操作都会失败。

因此,我尝试使用以下脚本来实现此目的:

import os
import re
import datetime
from google.oauth2.credentials import Credentials
from google.auth.transport.requests import Request
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError

SCOPES = ['https://www.googleapis.com/auth/gmail.label']

def create_label_if_not_exists(service, user_id, label_name):
    try:
        service.users().labels().get(userId=user_id, id=label_name).execute()
        print('Label already exists:', label_name)
    except HttpError as e:
        if e.status_code == 404:
            create_label(service, user_id, label_name)
        elif e.status_code == 409:
            print('Label already exists or conflicts:', label_name)
            # If label already exists, no need to create it again, just return
            return
        else:
            print('Error checking label:', e)

def create_label(service, user_id, label_name):
    label = {'name': label_name, 'messageListVisibility': 'show', 'labelListVisibility': 'labelShow'}
    try:
        created_label = service.users().labels().create(userId=user_id, body=label).execute()
        print('Label created:', created_label['name'])
    except HttpError as e:
        print('Error creating label:', e)

def apply_label(service, user_id, message_id, label_id):
    body = {'addLabelIds': [label_id]}
    try:
        service.users().messages().modify(userId=user_id, id=message_id, body=body).execute()
        print('Label applied to email:', label_id)
    except HttpError as e:
        print('Error applying label to email:', e)

def extract_year_month(date_str):
    try:
        # Remove the timezone information (anything within parentheses)
        date_str = re.sub(r'(?<=\d{2}:\d{2}:\d{2}).*', '', date_str)
        # Attempt to parse the date with the day of the week format specifier
        try:
            date_obj = datetime.datetime.strptime(date_str, '%a, %d %b %Y %H:%M:%S')
        except ValueError:
            # If parsing with day of the week specifier fails, try without it
            date_obj = datetime.datetime.strptime(date_str, '%d %b %Y %H:%M:%S')
        
        year = str(date_obj.year)
        month = str(date_obj.month).zfill(2)  # Zero-padding the month to ensure two digits
        label_name = year + "-" + month  # Modified label format, e.g., "2024-03"
        return label_name
    except ValueError:
        print("Failed to parse 'Created at' date:", date_str)
        return None


def get_label_id(service, user_id, label_name):
    labels_list = service.users().labels().list(userId=user_id).execute()
    labels = labels_list.get('labels', [])
    for label in labels:
        if label['name'] == label_name:
            return label['id']
    return None

def get_emails(service, user_id, query):
    messages = []
    page_token = None
    while True:
        response = service.users().messages().list(userId=user_id, q=query, pageToken=page_token).execute()
        messages.extend(response.get('messages', []))
        page_token = response.get('nextPageToken')
        if not page_token:
            break
    return messages

def apply_or_create_label(service, user_id, message_id, label_name):
    try:
        label_id = get_label_id(service, user_id, label_name)
        if label_id:
            apply_label(service, user_id, message_id, label_id)
        else:
            create_label(service, user_id, label_name)
            label_id = get_label_id(service, user_id, label_name)
            if label_id:
                apply_label(service, user_id, message_id, label_id)
            else:
                print("Failed to apply label:", label_name)
    except Exception as e:
        print("Error applying or creating label:", e)

def main():
    creds = None
    if os.path.exists('token.json'):
        creds = Credentials.from_authorized_user_file('token.json')
    if not creds or not creds.valid:
        if creds and creds.expired and creds.refresh_token:
            creds.refresh(Request())
        else:
            flow = InstalledAppFlow.from_client_secrets_file(
                'credentials.json', SCOPES)
            creds = flow.run_local_server(port=0)
        with open('token.json', 'w') as token:
            token.write(creds.to_json())

    service = build('gmail', 'v1', credentials=creds)

    user_id = 'me'

    query = 'in:all'

    messages = get_emails(service, user_id, query)

    previous_label = None

    for message in messages:
        msg = service.users().messages().get(userId=user_id, id=message['id']).execute()
        headers = msg['payload'].get('headers', [])
        date_header = next((header['value'] for header in headers if header['name'] == 'Date'), None)
        
        if date_header:
            label_name = extract_year_month(date_header)
            if label_name:
                try:
                    apply_or_create_label(service, user_id, message['id'], label_name)
                    previous_label = label_name
                except Exception as e:
                    print("Error applying or creating label:", e)
                    if previous_label:
                        apply_or_create_label(service, user_id, message['id'], previous_label)
                    else:
                        print("No previous label found to apply.")
        else:
            print("Date header not found in email, skipping...")
            if previous_label:
                apply_or_create_label(service, user_id, message['id'], previous_label)
            else:
                print("No previous label found to apply.")

if __name__ == '__main__':
    main()

脚本简要说明:

  • 扫描“所有邮件”并提取日期,按年和月对它们进行排序。
  • 我尝试对日期进行标准化,因为有时它会在开头显示工作日,或在结尾显示不同的时区值 (CET)、GMT 等。
  • 当无法获取电子邮件的日期时,将应用上一封电子邮件的标签作为后备。

问题:

  1. 我想创建一个父标签“年度概述”,它应该包含年份作为子标签,每年应该包含月份。我尝试这样做,但每当我创建“年度概述”父标签时,子标签都会始终显示在同一级别上。 (如果需要,我可以在这里发布我的非工作代码)。

  2. 有什么方法可以提高这个脚本的速度,或者由于本地计算机上没有存储数据,我无能为力?

  3. 我可以忍受的一个小问题,因为电子邮件有不同的时区,有时午夜左右电子邮件无法正确排序。如果有快速解决方案,我会接受,但这并没有真正困扰我。

感谢您的帮助!

python gmail gmail-api
1个回答
0
投票

我现在成功添加了我想要的标签创建。一般来说,脚本按预期工作,除了当脚本运行较长时间时出现一个错误(见下文)。

# Import necessary libraries
import os
import re
import datetime
from google.oauth2.credentials import Credentials
from google.auth.transport.requests import Request
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
from google_auth_oauthlib.flow import InstalledAppFlow
import time

# Define OAuth scopes for accessing Gmail API
SCOPES = ['https://www.googleapis.com/auth/gmail.modify']

# Function to create a Gmail label if it doesn't exist
def create_label_if_not_exists(service, user_id, label_name):
    try:
        # Attempt to get the label, if it already exists
        service.users().labels().get(userId=user_id, id=label_name).execute()
        print('Label already exists:', label_name)
    except HttpError as e:
        if e.status_code == 404:  # Label not found
            create_label(service, user_id, label_name)
        elif e.status_code == 409:  # Label conflict
            print('Label already exists or conflicts:', label_name)
            # If label already exists, no need to create it again, just return
            return
        else:
            print('Error checking label:', e)

# Function to create a Gmail label
def create_label(service, user_id, label_name):
    label = {'name': label_name, 'messageListVisibility': 'show', 'labelListVisibility': 'labelShow'}
    try:
        created_label = service.users().labels().create(userId=user_id, body=label).execute()
        print('Label created:', created_label['name'])
    except HttpError as e:
        print('Error creating label:', e)

# Function to apply a label to an email with retry logic
def apply_label_with_retry(service, user_id, message_id, label_id, label_name, max_retries=3, delay=2):
    for attempt in range(max_retries):
        try:
            body = {'addLabelIds': [label_id]}
            service.users().messages().modify(userId=user_id, id=message_id, body=body).execute()
            print('Label applied to email:', label_name)
            return  # Successful, so exit the function
        except Exception as e:
            print(f"Error applying label to email (attempt {attempt+1}/{max_retries}): {e}")
            if attempt < max_retries - 1:
                print(f"Retrying in {delay} seconds...")
                time.sleep(delay)
                delay *= 2  # Exponential backoff for subsequent retries
            else:
                print("Maximum retry attempts reached. Aborting.")

# Function to extract year and month from a date string
def extract_year_month(date_str):
    try:
        # Remove the timezone information (anything within parentheses)
        date_str = re.sub(r'(?<=\d{2}:\d{2}:\d{2}).*', '', date_str)
        # Attempt to parse the date with the day of the week format specifier
        try:
            date_obj = datetime.datetime.strptime(date_str, '%a, %d %b %Y %H:%M:%S')
        except ValueError:
            # If parsing with day of the week specifier fails, try without it
            date_obj = datetime.datetime.strptime(date_str, '%d %b %Y %H:%M:%S')
        
        year = str(date_obj.year)
        month = str(date_obj.month).zfill(2)  # Zero-padding the month to ensure two digits
        label_name = year + "-" + month  # Modified label format, e.g., "2024-03"
        return label_name
    except ValueError:
        print("Failed to parse the date header:", date_str)
        return None

# Function to get label ID by label name
def get_label_id(service, user_id, label_name):
    labels_list = service.users().labels().list(userId=user_id).execute()
    labels = labels_list.get('labels', [])
    for label in labels:
        if label['name'] == label_name:
            return label['id']
    return None

# Function to retrieve emails matching a query
def get_emails(service, user_id, query):
    messages = []
    page_token = None
    while True:
        response = service.users().messages().list(userId=user_id, q=query, pageToken=page_token).execute()
        messages.extend(response.get('messages', []))
        page_token = response.get('nextPageToken')
        if not page_token:
            break
    return messages

# Function to apply or create label for an email
def apply_or_create_label(service, user_id, message_id, label_name):
    try:
        label_id = get_label_id(service, user_id, label_name)
        if label_id:
            apply_label_with_retry(service, user_id, message_id, label_id, label_name)
        else:
            create_label(service, user_id, label_name)
            label_id = get_label_id(service, user_id, label_name)
            if label_id:
                apply_label_with_retry(service, user_id, message_id, label_id, label_name)
            else:
                print("Failed to apply label:", label_name)
    except Exception as e:
        print("Error applying or creating label:", e)

# Main function
def main():
    start_time = int(time.time())  # Unix timestamp for start time

    # Load credentials from token file or authenticate
    creds = None
    if os.path.exists('token.json'):
        creds = Credentials.from_authorized_user_file('token.json')
    if not creds or not creds.valid:
        if creds and creds.expired and creds.refresh_token:
            creds.refresh(Request())
        else:
            flow = InstalledAppFlow.from_client_secrets_file(
                'credentials.json', SCOPES)
            creds = flow.run_local_server(port=0)
        with open('token.json', 'w') as token:
            token.write(creds.to_json())

    # Build Gmail service
    service = build('gmail', 'v1', credentials=creds)

    user_id = 'me'

    # Check if "Yearly Overview" label exists, create if it doesn't
    yearly_overview_label_id = get_label_id(service, user_id, "Yearly Overview")
    if not yearly_overview_label_id:
        create_label(service, user_id, "Yearly Overview")
        yearly_overview_label_id = get_label_id(service, user_id, "Yearly Overview")

    # Process emails in "All Mail"
    query = 'in:all'
    messages = get_emails(service, user_id, query)

    # Initialize previous label
    previous_label_id = yearly_overview_label_id

    for message in messages:
        msg = service.users().messages().get(userId=user_id, id=message['id']).execute()
        headers = msg['payload'].get('headers', [])
        date_header = next((header['value'] for header in headers if header['name'] == 'Date'), None)
        
        if date_header:
            label_name = extract_year_month(date_header)
            if label_name:
                # Split label_name to get year and month
                year, month = label_name.split('-')
                
                # Check if year label exists, create if it doesn't
                year_label_name = f"Yearly Overview/{year}"
                year_label_id = get_label_id(service, user_id, year_label_name)
                if not year_label_id:
                    create_label(service, user_id, year_label_name)
                    year_label_id = get_label_id(service, user_id, year_label_name)
                
                # Check if month label exists under year, create if it doesn't
                month_label_name = f"Yearly Overview/{year}/{month}"
                month_label_id = get_label_id(service, user_id, month_label_name)
                if not month_label_id:
                    create_label(service, user_id, month_label_name)
                    month_label_id = get_label_id(service, user_id, month_label_name)
                
                # Apply the month label to the email
                apply_label_with_retry(service, user_id, message['id'], month_label_id, month_label_name)

                # Update previous label
                previous_label_id = month_label_id
            else:
                print("Failed to determine monthly label, applying previous label...")
                # Apply the previous label to the email
                apply_label_with_retry(service, user_id, message['id'], previous_label_id, label_name)
        else:
            print("Date header not found in email, applying previous label...")
            # Apply the previous label to the email
            apply_label_with_retry(service, user_id, message['id'], previous_label_id, label_name)

    end_time = int(time.time())  # Unix timestamp for end time

    # Output start time and end time
    print("Start Time:", start_time)
    print("Formatted Time:", datetime.datetime.fromtimestamp(start_time).strftime("%H:%M:%S"))
    print("End Time:", end_time)
    print("Formatted Time:", datetime.datetime.fromtimestamp(end_time).strftime("%H:%M:%S"))

    # Calculate duration
    duration_seconds = end_time - start_time
    print("Duration (seconds):", duration_seconds)

    # Get user profile to retrieve total messages
    profile = service.users().getProfile(userId=user_id).execute()
    total_messages = profile['messagesTotal']
    print("Total Messages:", total_messages)

    # Calculate time taken to process one message
    if total_messages > 0:
        seconds_per_message = duration_seconds / total_messages
        # Limiting to 3 digits after the decimal point
        seconds_per_message = "{:.3f}".format(seconds_per_message)
        print("Seconds per Message:", seconds_per_message)

if __name__ == '__main__':
    main()

错误:

Traceback (most recent call last):
  File "labelyear.py", line 201, in <module>
    main()
  File "labelyear.py", line 150, in main
    year_label_id = get_label_id(service, user_id, year_label_name)
  File "labelyear.py", line 71, in get_label_id
    labels_list = service.users().labels().list(userId=user_id).execute()
  File "C:\Users\User\AppData\Local\Programs\Python\Python38\lib\site-packages\googleapiclient\_helpers.py", line 130, in positional_wrapper
    return wrapped(*args, **kwargs)
  File "C:\Users\User\AppData\Local\Programs\Python\Python38\lib\site-packages\googleapiclient\http.py", line 938, in execute
    raise HttpError(resp, content, uri=self.uri)
googleapiclient.errors.HttpError: <HttpError 400 when requesting https://gmail.googleapis.com/gmail/v1/users/me/labels?alt=json returned "Precondition check failed.". Details: "[{'message': 'Precondition check failed.', 'domain': 'global', 'reason': 'failedPrecondition'}]">

检查 Google 时发现此错误与某些缺失的委托有关。但当我在 Gmail 帐户中使用此脚本时,我不确定是否需要委托?

© www.soinside.com 2019 - 2024. All rights reserved.