Telethon 模块出现 FloodWaitError

问题描述 投票:0回答:1

当我运行下面的脚本时遇到“FloodWaitError”问题。

此脚本的目的是根据数据库中指定的 URL 检查 Telegram 组名称,并相应地更新数据库中组的状态。目的是检查链条是否存在,然后报废。

我尝试使用“generate_random_delay”函数向 Telegram API 引入随机请求提交。当引发“FloodWaitError”时,“handle_flood_wait_error”函数会检索延迟并向洪水延迟添加额外的随机延迟。

但是,我仍然面临洪水问题。您有解决这个问题的想法吗?

# Checking Telegram group names
"""
Name: check_telegram_group.py
Description: This script checks the names of Telegram groups from specified URLs in a database and updates the status of the groups in the database accordingly.
Date: 26/03/2024
Operation:
    1. Import necessary modules, including logging, SQLite, asyncio, telethon, and other modules needed for error handling and Telegram operations.
    2. Define custom exception classes InvalidURLException and GroupNotFoundException to handle specific errors related to invalid URLs or not found groups.
    3. Define the generate_random_delay function to generate a random delay according to an exponential distribution. This helps manage requests to the Telegram API to avoid rate limits.
    4. Define the handle_flood_wait_error function to handle flood wait errors. This function logs a warning and pauses program execution for the specified time in the error.
    5. Define the get_entity_with_timeout function to retrieve an entity (group) with a time limit. This function handles timeout errors and potential exceptions when retrieving the entity.
    6. Define the get_group_name_from_url function to retrieve the name of a group from a given URL. This function uses get_entity_with_timeout to retrieve the corresponding entity and then gets the group name from that entity.
    7. Define the batch_get_group_names_from_urls function to retrieve group names for a list of URLs. This function uses get_entity_with_timeout to retrieve the corresponding entities and then gets the group names from those entities.
    8. Define the write_group_list_to_db function to write the updated list of groups to a SQLite database.
    9. Define the handle_exit function to handle program exit on SIGINT (CTRL+C) signal.
    10. Define the main function. This function reads group URLs from a SQLite database, retrieves the corresponding group names, updates the groups' status in the database, and handles potential errors and exceptions.
    11. Load API credentials from the .env file.
    12. Execute the main function using asyncio.run() to run the main application loop.
"""

import logging
import os
import asyncio
import sqlite3
import telethon
from collections import deque
import signal
import random
import math
from datetime import datetime
from dotenv import load_dotenv
from telethon.tl import types as tl_types
from telethon import TelegramClient, errors
from telethon.tl.functions.messages import GetMessagesRequest
from telethon.tl.functions.channels import GetFullChannelRequest, GetMessagesRequest

# Set logging level (DEBUG, INFO, WARNING, ERROR, CRITICAL)
logging.getLogger().setLevel(logging.WARNING)

# Set log format
log_formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')

# Configure file handler for logging
log_handler = logging.FileHandler('log/check_telegram_group.log')
log_handler.setFormatter(log_formatter)
logging.getLogger().addHandler(log_handler)

# Define class for invalid URL
class InvalidURLException(Exception):
    """Exception raised for invalid URL."""
    pass

# Define class for group not found
class GroupNotFoundException(Exception):
    """Exception raised if group is not found."""
    pass

# Function to generate random delay according to exponential distribution
def generate_random_delay(lambd):
    """Generate random delay according to exponential distribution."""
    return -math.log(1 - random.random()) / lambd

# Function to handle flood wait errors
async def handle_flood_wait_error(error, url):
    """Handle flood wait errors."""
    wait_time = error.seconds
    wait_time += generate_random_delay(1/30)
    logging.warning(f"Pausing for {wait_time} seconds due to flood wait on {url}")
    await asyncio.sleep(wait_time)

# Function to retrieve entity with timeout
async def get_entity_with_timeout(client: TelegramClient, url: str, timeout: int = 10):
    """Retrieve entity with timeout."""
    try:
        await asyncio.sleep(generate_random_delay(1/30))  # Add random delay
        entity = await asyncio.wait_for(client.get_entity(url), timeout=timeout)
        return entity
    except asyncio.TimeoutError:
        raise TimeoutError(f"Timeout while retrieving entity for URL: {url}")
    except errors.FloodWaitError as e:
        await handle_flood_wait_error(e, url)
    except Exception as e:
        raise e

# Function to get group name from URL
async def get_group_name_from_url(client, url: str, max_attempts: int = 5):
    """
    Get group name from URL.

    :param url: URL of the Telegram group.
    :param max_attempts: Maximum number of retrieval attempts.
    :return: The group name, or None if error.
    """
    if not url.startswith("https://t.me/") and not url.startswith("https://telegram.me/"):
        logging.warning(f"Invalid URL: {url}. URL must start with 'https://t.me/' or 'https://telegram.me/'.")
        return None

    attempts = 0
    while attempts < max_attempts:
        try:
            entity = await get_entity_with_timeout(client, url, timeout=10)
            full_channel = await client(GetFullChannelRequest(channel=entity))
            group_name = full_channel.chats[0].title
            return group_name
        except errors.FloodWaitError as e:
            await handle_flood_wait_error(e, url)
            attempts += 1
            if attempts >= max_attempts:
                logging.error(f"Maximum number of attempts reached for {url}. Exiting.")
                return None
        except ValueError:
            raise InvalidURLException(f"Invalid URL: {url}")
        except TimeoutError:
            logging.warning(f"Timeout while retrieving group for URL: {url}")
            return None
        except telethon.errors.rpcerrorlist.UsernameInvalidError as e:
            logging.warning(f"Invalid URL: {url}. URL must correspond to a public group on Telegram.")
            return None
        except Exception as e:
            logging.error(f"An unexpected error occurred: {e}")
            return None

# Function to get group names for a list of URLs
async def batch_get_group_names_from_urls(urls: list, max_attempts: int = 5):
    """
    Get group names for a list of URLs.

    :param urls: List of URLs of Telegram groups.
    :param max_attempts: Maximum number of retrieval attempts.
    :return: List of corresponding group names.
    """
    attempts = 0
    async with TelegramClient('session_name', api_id, api_hash) as client:
        while attempts < max_attempts:
            try:
                entities = await asyncio.gather(*(get_entity_with_timeout(client, url) for url in urls))
                valid_entities = [entity for entity in entities if entity is not None]

                if not valid_entities:
                    return []

                futures = [client(GetMessagesRequest(
                    id=[entity.id],
                    channel=entity,
                    count=1
                )) for entity in valid_entities]

                full_channels = await asyncio.gather(*futures)

                group_names = [full_channel.chats[0].title if full_channel.chats else None for full_channel in full_channels]

                group_names = [name for name in group_names if name]

                return group_names
            except errors.FloodWaitError as e:
                await handle_flood_wait_error(e, urls)
                attempts += 1
                if attempts >= max_attempts:
                    logging.error(f"Maximum number of attempts reached for {urls}. Exiting.")
                    break
            except Exception as e:
                raise e

# Function to write updated group list to SQLite database
# def write_group_list_to_db(group_list, db_path):
#     """Write updated group list to SQLite database."""
#     try:
#         conn = sqlite3.connect(db_path)
#         c = conn.cursor()
#         c.execute("CREATE TABLE IF NOT EXISTS APT_telegram_group_list (telegram_link TEXT, status BOOLEAN)")
#         c.execute("DELETE FROM APT_telegram_group_list")  # Delete old data
#         for group_info in group_list:
#             c.execute("INSERT INTO APT_telegram_group_list (telegram_link, status) VALUES (?, ?)", (group_info['telegram_link'], group_info['status']))
#         conn.commit()
#         conn.close()
#     except Exception as e:
#         print(f"Error writing to SQLite database: {e}")
#         exit(1)

# Function to handle program exit
def handle_exit(signum: int, frame: 'FrameType') -> None:
    """Signal handling function for program exit."""
    print("Exiting program...")
    logging.info("Exiting program...")
    exit(0)

# Associate signal handling function with SIGINT signal (CTRL+C)
signal.signal(signal.SIGINT, handle_exit)

# Main program function
async def main() -> None:
    """
    Main program function.
    """
    print("Welcome to the Telegram group name checking program.")
    logging.info("Welcome to the Telegram group name checking program.")

    # SQLite database path
    db_path = 'APT_telegram_group.db'

    try:
        # Connect to SQLite database
        conn = sqlite3.connect(db_path)
        c = conn.cursor()

        # Read URLs from SQLite database and randomly shuffle
        c.execute("SELECT telegram_link FROM APT_telegram_group_list")
        rows = c.fetchall()
        random.shuffle(rows)
        group_list = [{'telegram_link': row[0], 'status': False} for row in rows]

        conn.close()
    except Exception as e:
        print(f"Error reading from SQLite database: {e}")
        exit(1)

    groups_to_remove = []

    async with TelegramClient('session_name', api_id, api_hash) as client:
        for group_info in group_list:
            group_url = group_info['telegram_link']

            try:
                group_name = await get_group_name_from_url(client, group_url)

                if group_name:
                    print(f"Group name for {group_url} is: {group_name}")
                    logging.info(f"Group name for {group_url} is: {group_name}")
                    group_info['status'] = True
                else:
                    print(f"Unable to get group name for {group_url}.")
                    logging.warning(f"Unable to get group name for {group_url}.")
                    group_info['status'] = False
                    groups_to_remove.append(group_info)
            except InvalidURLException as e:
                print(f"Error: {e}")
                logging.error(str(e))
                continue
            except GroupNotFoundException as e:
                print(f"Error: {e}")
                logging.warning(str(e))
                group_info['status'] = False
                groups_to_remove.append(group_info)
            except TimeoutError as e:
                print(f"Timeout: {e}")
                logging.warning(str(e))
                break
            except Exception as e:
                raise e

    # Remove groups that couldn't be retrieved
    # for group_info_to_remove in groups_to_remove:
    #     if not group_info_to_remove['status']:
    #         logging.error(f"Group {group_info_to_remove['telegram_link']} has been removed.")
    #         print(f"Group {group_info_to_remove['telegram_link']} has been removed.")
    #         group_list.remove(group_info_to_remove)

    # Write updated list to SQLite database
    # write_group_list_to_db(group_list, db_path)

    print("Exiting Telegram group name checking program.")
    logging.info("Exiting Telegram group name checking program.")

if __name__ == "__main__":
    # Load API credentials from .env file
    load_dotenv()
    api_id = os.getenv("API_ID")
    api_hash = os.getenv("API_HASH")

    if api_id is None or api_hash is None:
        raise ValueError("API_ID and API_HASH must be defined in the .env file")

    api_id = int(api_id)

    # Execute main function
    asyncio.run(main())
python-3.x telegram telethon
1个回答
0
投票

Telegram 对用户机器人非常敏感,每个请求之间至少应该有 20、30 秒以上的延迟。这样您就不会再次遭受洪水等待。

© www.soinside.com 2019 - 2024. All rights reserved.