我需要一个 Python 脚本来自动按年份和月份对电子邮件进行排序。一般来说,可以直接在 Gmail 中执行搜索或过滤操作,但由于此收件箱中的电子邮件数量巨大,所有这些操作都会失败。
因此,我尝试使用以下脚本来实现此目的:
import os
import re
import datetime
from google.oauth2.credentials import Credentials
from google.auth.transport.requests import Request
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
SCOPES = ['https://www.googleapis.com/auth/gmail.label']
def create_label_if_not_exists(service, user_id, label_name):
try:
service.users().labels().get(userId=user_id, id=label_name).execute()
print('Label already exists:', label_name)
except HttpError as e:
if e.status_code == 404:
create_label(service, user_id, label_name)
elif e.status_code == 409:
print('Label already exists or conflicts:', label_name)
# If label already exists, no need to create it again, just return
return
else:
print('Error checking label:', e)
def create_label(service, user_id, label_name):
label = {'name': label_name, 'messageListVisibility': 'show', 'labelListVisibility': 'labelShow'}
try:
created_label = service.users().labels().create(userId=user_id, body=label).execute()
print('Label created:', created_label['name'])
except HttpError as e:
print('Error creating label:', e)
def apply_label(service, user_id, message_id, label_id):
body = {'addLabelIds': [label_id]}
try:
service.users().messages().modify(userId=user_id, id=message_id, body=body).execute()
print('Label applied to email:', label_id)
except HttpError as e:
print('Error applying label to email:', e)
def extract_year_month(date_str):
try:
# Remove the timezone information (anything within parentheses)
date_str = re.sub(r'(?<=\d{2}:\d{2}:\d{2}).*', '', date_str)
# Attempt to parse the date with the day of the week format specifier
try:
date_obj = datetime.datetime.strptime(date_str, '%a, %d %b %Y %H:%M:%S')
except ValueError:
# If parsing with day of the week specifier fails, try without it
date_obj = datetime.datetime.strptime(date_str, '%d %b %Y %H:%M:%S')
year = str(date_obj.year)
month = str(date_obj.month).zfill(2) # Zero-padding the month to ensure two digits
label_name = year + "-" + month # Modified label format, e.g., "2024-03"
return label_name
except ValueError:
print("Failed to parse 'Created at' date:", date_str)
return None
def get_label_id(service, user_id, label_name):
labels_list = service.users().labels().list(userId=user_id).execute()
labels = labels_list.get('labels', [])
for label in labels:
if label['name'] == label_name:
return label['id']
return None
def get_emails(service, user_id, query):
messages = []
page_token = None
while True:
response = service.users().messages().list(userId=user_id, q=query, pageToken=page_token).execute()
messages.extend(response.get('messages', []))
page_token = response.get('nextPageToken')
if not page_token:
break
return messages
def apply_or_create_label(service, user_id, message_id, label_name):
try:
label_id = get_label_id(service, user_id, label_name)
if label_id:
apply_label(service, user_id, message_id, label_id)
else:
create_label(service, user_id, label_name)
label_id = get_label_id(service, user_id, label_name)
if label_id:
apply_label(service, user_id, message_id, label_id)
else:
print("Failed to apply label:", label_name)
except Exception as e:
print("Error applying or creating label:", e)
def main():
creds = None
if os.path.exists('token.json'):
creds = Credentials.from_authorized_user_file('token.json')
if not creds or not creds.valid:
if creds and creds.expired and creds.refresh_token:
creds.refresh(Request())
else:
flow = InstalledAppFlow.from_client_secrets_file(
'credentials.json', SCOPES)
creds = flow.run_local_server(port=0)
with open('token.json', 'w') as token:
token.write(creds.to_json())
service = build('gmail', 'v1', credentials=creds)
user_id = 'me'
query = 'in:all'
messages = get_emails(service, user_id, query)
previous_label = None
for message in messages:
msg = service.users().messages().get(userId=user_id, id=message['id']).execute()
headers = msg['payload'].get('headers', [])
date_header = next((header['value'] for header in headers if header['name'] == 'Date'), None)
if date_header:
label_name = extract_year_month(date_header)
if label_name:
try:
apply_or_create_label(service, user_id, message['id'], label_name)
previous_label = label_name
except Exception as e:
print("Error applying or creating label:", e)
if previous_label:
apply_or_create_label(service, user_id, message['id'], previous_label)
else:
print("No previous label found to apply.")
else:
print("Date header not found in email, skipping...")
if previous_label:
apply_or_create_label(service, user_id, message['id'], previous_label)
else:
print("No previous label found to apply.")
if __name__ == '__main__':
main()
脚本简要说明:
问题:
我想创建一个父标签“年度概述”,它应该包含年份作为子标签,每年应该包含月份。我尝试这样做,但每当我创建“年度概述”父标签时,子标签都会始终显示在同一级别上。 (如果需要,我可以在这里发布我的非工作代码)。
有什么方法可以提高这个脚本的速度,或者由于本地计算机上没有存储数据,我无能为力?
我可以忍受的一个小问题,因为电子邮件有不同的时区,有时午夜左右电子邮件无法正确排序。如果有快速解决方案,我会接受,但这并没有真正困扰我。
感谢您的帮助!
我现在成功添加了我想要的标签创建。一般来说,脚本按预期工作,除了当脚本运行较长时间时出现一个错误(见下文)。
# Import necessary libraries
import os
import re
import datetime
from google.oauth2.credentials import Credentials
from google.auth.transport.requests import Request
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
from google_auth_oauthlib.flow import InstalledAppFlow
import time
# Define OAuth scopes for accessing Gmail API
SCOPES = ['https://www.googleapis.com/auth/gmail.modify']
# Function to create a Gmail label if it doesn't exist
def create_label_if_not_exists(service, user_id, label_name):
try:
# Attempt to get the label, if it already exists
service.users().labels().get(userId=user_id, id=label_name).execute()
print('Label already exists:', label_name)
except HttpError as e:
if e.status_code == 404: # Label not found
create_label(service, user_id, label_name)
elif e.status_code == 409: # Label conflict
print('Label already exists or conflicts:', label_name)
# If label already exists, no need to create it again, just return
return
else:
print('Error checking label:', e)
# Function to create a Gmail label
def create_label(service, user_id, label_name):
label = {'name': label_name, 'messageListVisibility': 'show', 'labelListVisibility': 'labelShow'}
try:
created_label = service.users().labels().create(userId=user_id, body=label).execute()
print('Label created:', created_label['name'])
except HttpError as e:
print('Error creating label:', e)
# Function to apply a label to an email with retry logic
def apply_label_with_retry(service, user_id, message_id, label_id, label_name, max_retries=3, delay=2):
for attempt in range(max_retries):
try:
body = {'addLabelIds': [label_id]}
service.users().messages().modify(userId=user_id, id=message_id, body=body).execute()
print('Label applied to email:', label_name)
return # Successful, so exit the function
except Exception as e:
print(f"Error applying label to email (attempt {attempt+1}/{max_retries}): {e}")
if attempt < max_retries - 1:
print(f"Retrying in {delay} seconds...")
time.sleep(delay)
delay *= 2 # Exponential backoff for subsequent retries
else:
print("Maximum retry attempts reached. Aborting.")
# Function to extract year and month from a date string
def extract_year_month(date_str):
try:
# Remove the timezone information (anything within parentheses)
date_str = re.sub(r'(?<=\d{2}:\d{2}:\d{2}).*', '', date_str)
# Attempt to parse the date with the day of the week format specifier
try:
date_obj = datetime.datetime.strptime(date_str, '%a, %d %b %Y %H:%M:%S')
except ValueError:
# If parsing with day of the week specifier fails, try without it
date_obj = datetime.datetime.strptime(date_str, '%d %b %Y %H:%M:%S')
year = str(date_obj.year)
month = str(date_obj.month).zfill(2) # Zero-padding the month to ensure two digits
label_name = year + "-" + month # Modified label format, e.g., "2024-03"
return label_name
except ValueError:
print("Failed to parse the date header:", date_str)
return None
# Function to get label ID by label name
def get_label_id(service, user_id, label_name):
labels_list = service.users().labels().list(userId=user_id).execute()
labels = labels_list.get('labels', [])
for label in labels:
if label['name'] == label_name:
return label['id']
return None
# Function to retrieve emails matching a query
def get_emails(service, user_id, query):
messages = []
page_token = None
while True:
response = service.users().messages().list(userId=user_id, q=query, pageToken=page_token).execute()
messages.extend(response.get('messages', []))
page_token = response.get('nextPageToken')
if not page_token:
break
return messages
# Function to apply or create label for an email
def apply_or_create_label(service, user_id, message_id, label_name):
try:
label_id = get_label_id(service, user_id, label_name)
if label_id:
apply_label_with_retry(service, user_id, message_id, label_id, label_name)
else:
create_label(service, user_id, label_name)
label_id = get_label_id(service, user_id, label_name)
if label_id:
apply_label_with_retry(service, user_id, message_id, label_id, label_name)
else:
print("Failed to apply label:", label_name)
except Exception as e:
print("Error applying or creating label:", e)
# Main function
def main():
start_time = int(time.time()) # Unix timestamp for start time
# Load credentials from token file or authenticate
creds = None
if os.path.exists('token.json'):
creds = Credentials.from_authorized_user_file('token.json')
if not creds or not creds.valid:
if creds and creds.expired and creds.refresh_token:
creds.refresh(Request())
else:
flow = InstalledAppFlow.from_client_secrets_file(
'credentials.json', SCOPES)
creds = flow.run_local_server(port=0)
with open('token.json', 'w') as token:
token.write(creds.to_json())
# Build Gmail service
service = build('gmail', 'v1', credentials=creds)
user_id = 'me'
# Check if "Yearly Overview" label exists, create if it doesn't
yearly_overview_label_id = get_label_id(service, user_id, "Yearly Overview")
if not yearly_overview_label_id:
create_label(service, user_id, "Yearly Overview")
yearly_overview_label_id = get_label_id(service, user_id, "Yearly Overview")
# Process emails in "All Mail"
query = 'in:all'
messages = get_emails(service, user_id, query)
# Initialize previous label
previous_label_id = yearly_overview_label_id
for message in messages:
msg = service.users().messages().get(userId=user_id, id=message['id']).execute()
headers = msg['payload'].get('headers', [])
date_header = next((header['value'] for header in headers if header['name'] == 'Date'), None)
if date_header:
label_name = extract_year_month(date_header)
if label_name:
# Split label_name to get year and month
year, month = label_name.split('-')
# Check if year label exists, create if it doesn't
year_label_name = f"Yearly Overview/{year}"
year_label_id = get_label_id(service, user_id, year_label_name)
if not year_label_id:
create_label(service, user_id, year_label_name)
year_label_id = get_label_id(service, user_id, year_label_name)
# Check if month label exists under year, create if it doesn't
month_label_name = f"Yearly Overview/{year}/{month}"
month_label_id = get_label_id(service, user_id, month_label_name)
if not month_label_id:
create_label(service, user_id, month_label_name)
month_label_id = get_label_id(service, user_id, month_label_name)
# Apply the month label to the email
apply_label_with_retry(service, user_id, message['id'], month_label_id, month_label_name)
# Update previous label
previous_label_id = month_label_id
else:
print("Failed to determine monthly label, applying previous label...")
# Apply the previous label to the email
apply_label_with_retry(service, user_id, message['id'], previous_label_id, label_name)
else:
print("Date header not found in email, applying previous label...")
# Apply the previous label to the email
apply_label_with_retry(service, user_id, message['id'], previous_label_id, label_name)
end_time = int(time.time()) # Unix timestamp for end time
# Output start time and end time
print("Start Time:", start_time)
print("Formatted Time:", datetime.datetime.fromtimestamp(start_time).strftime("%H:%M:%S"))
print("End Time:", end_time)
print("Formatted Time:", datetime.datetime.fromtimestamp(end_time).strftime("%H:%M:%S"))
# Calculate duration
duration_seconds = end_time - start_time
print("Duration (seconds):", duration_seconds)
# Get user profile to retrieve total messages
profile = service.users().getProfile(userId=user_id).execute()
total_messages = profile['messagesTotal']
print("Total Messages:", total_messages)
# Calculate time taken to process one message
if total_messages > 0:
seconds_per_message = duration_seconds / total_messages
# Limiting to 3 digits after the decimal point
seconds_per_message = "{:.3f}".format(seconds_per_message)
print("Seconds per Message:", seconds_per_message)
if __name__ == '__main__':
main()
错误:
Traceback (most recent call last):
File "labelyear.py", line 201, in <module>
main()
File "labelyear.py", line 150, in main
year_label_id = get_label_id(service, user_id, year_label_name)
File "labelyear.py", line 71, in get_label_id
labels_list = service.users().labels().list(userId=user_id).execute()
File "C:\Users\User\AppData\Local\Programs\Python\Python38\lib\site-packages\googleapiclient\_helpers.py", line 130, in positional_wrapper
return wrapped(*args, **kwargs)
File "C:\Users\User\AppData\Local\Programs\Python\Python38\lib\site-packages\googleapiclient\http.py", line 938, in execute
raise HttpError(resp, content, uri=self.uri)
googleapiclient.errors.HttpError: <HttpError 400 when requesting https://gmail.googleapis.com/gmail/v1/users/me/labels?alt=json returned "Precondition check failed.". Details: "[{'message': 'Precondition check failed.', 'domain': 'global', 'reason': 'failedPrecondition'}]">
检查 Google 时发现此错误与某些缺失的委托有关。但当我在 Gmail 帐户中使用此脚本时,我不确定是否需要委托?