Asyncio 发送数千个请求

问题描述 投票:0回答:1

我正在尝试创建一个脚本,可以向网站发送 100000 到 1.000.000 范围内的大量请求。我正在尝试使用 asyncio 和 aiohttp 来执行此操作,但不知何故我不知道如何执行此操作。看起来它正在创建任务但没有完成它,而且我对 asyncio 和 aiohttp 没有进行太多实验。

代码如何工作?基本上,它首先询问我想要多少线程以及代理的代理类型如何,然后转到

start_workers
函数,在该函数中应该创建任务并启动。它使用
asyncio.semaphore
将其限制为一次 10.000 个并发请求,但它似乎不起作用。然后,当它调用检查函数时,它会向网站发送请求,处理响应并更新统计信息。但从我看来并非如此,这就是我今天来到这里的原因。为了检查检查器的工作情况,我创建了一个名为 console 的函数,该函数基本上在 while true: 循环中运行,以检查每 2 秒的进度,还有另一个名为
count_requests_per_minute
的函数,它应该检查可以有多少请求大约在一分钟内完成。自行查看代码如下:

import threading
import os
import time
import random
from queue import Queue
from tkinter.filedialog import askopenfilename
from tkinter import Tk
from colorama import Fore
from urllib.parse import quote
import asyncio
import aiohttp
stats_lock = asyncio.Lock()
stats = {
    'valid': 0,
    'invalid': 0,
    'twofa': 0,
    'error': 0,
    'total_checked': 0,
    'cpm': 0
}


async def check(data, proxy, stats_lock, stats, session):
    

    payload = {
        'data':data
    }

    headers2 = {
        "User-Agent": "Mozilla/5.0 (Linux; Android 6.0; Nexus 5 Build/MRA58N) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Mobile Safari/537.36",
    }
    async with session.post("https://example_website.com", headers=headers2, data=payload, proxy=proxy) as response:
        response_text = await response.text()
        if "true" in response_text:
            stats['valid'] +=1
        elif "false" in response_text:
            stats['invalid']+=1


async def handler(data, proxy, proxytype, stats_lock, stats, session):
    async with session:
        proxy_url = f"http://{proxy}"
        await check(data, proxy_url, stats_lock, stats, session)


async def start_workers(threads, data_queue, proxies_list, proxies_input):
    sem = asyncio.Semaphore(threads)

    console_thread = threading.Thread(target=console, args=(len(data_queue),))
    console_thread.start()
    
    cpm_thread = threading.Thread(target=count_requests_per_minute) 
    cpm_thread.start()

    tasks = []
    async with aiohttp.ClientSession(connector=aiohttp.TCPConnector(), trust_env=True) as session:
        for data in data_queue:
            task = asyncio.ensure_future(handler(data, random.choice(proxies_list), proxies_input, stats_lock, stats, session))
            tasks.append(task)
            await asyncio.sleep(0)  
            async with sem:
                await task

async def main():
    data_queue = []

    root = Tk()
    root.withdraw()

    threads = int(input(f"{Fore.RED}[{Fore.WHITE}?{Fore.RED}]{Fore.WHITE} - {Fore.RED}How many threads {Fore.WHITE}:{Fore.RED}"))

    proxies_input = input(f"\n{Fore.RED}[{Fore.WHITE}!{Fore.RED}]{Fore.WHITE} > {Fore.RED}Proxies type {Fore.WHITE}| {Fore.RED}HTTP{Fore.WHITE}/{Fore.RED}SOCKS4{Fore.WHITE}/{Fore.RED}SOCKS5 {Fore.WHITE}:{Fore.RED} ")

    combo_file = askopenfilename(title="Data File", parent=root)
    with open(combo_file, "r", encoding='utf-8') as combofile:
        data_queue.extend(combofile.read().splitlines())

    proxy_file = askopenfilename(title="Proxy File")
    with open(proxy_file, "r") as proxyfile:
        proxies_list = [line.strip() for line in proxyfile]

    await start_workers(threads, data_queue, proxies_list, proxies_input)


def count_requests_per_minute():
    while True:
        time.sleep(1)
        stats['cpm'] = stats['total_checked'] * 60
        stats['total_checked'] = 0


def console(combo):
    print(f"\n{Fore.RED}[{Fore.WHITE}!{Fore.RED}]{Fore.WHITE} - {Fore.RED}Please wait while the console is loading.")
    time.sleep(10)
    os.system("cls")
    while True:
        os.system("cls")
        print(f"""
{Fore.RED}[{Fore.WHITE}Valid{Fore.RED}]{Fore.WHITE} | {Fore.RED}{stats['valid']}

{Fore.RED}[{Fore.WHITE}Invalid{Fore.RED}]{Fore.WHITE} | {Fore.RED}{stats['invalid']}

{Fore.RED}[{Fore.WHITE}Errors{Fore.RED}]{Fore.WHITE} | {Fore.RED}{stats['error']}

{Fore.RED}[{Fore.WHITE}Checked{Fore.RED}]{Fore.WHITE} | {Fore.RED}{stats['valid']+stats['invalid']}/{combo}

{Fore.RED}[{Fore.WHITE}CPM{Fore.RED}]{Fore.WHITE} | {Fore.RED}{stats['cpm']}
""")
        time.sleep(2)


if __name__ == "__main__":
    os.system("cls")
    try:
        loop = asyncio.get_event_loop()
        loop.run_until_complete(main())
    except RuntimeError as e:
        if str(e) != "Event loop is closed":
            raise e  
    except Exception:
        pass

python multithreading python-requests python-asyncio aiohttp
1个回答
0
投票
        for data in data_queue:
            task = asyncio.ensure_future(handler(data, random.choice(proxies_list), proxies_input, stats_lock, stats, session))
            tasks.append(task)
            await asyncio.sleep(0)  
            async with sem:
                await task

让我为您分解一下:

  1. 创建一个任务并安排它在事件循环中运行(实际上应该使用
    asyncio.create_task()
    )。
  2. 将任务添加到列表中。
  3. 屈服于事件循环,此时任务将在返回到此函数之前启动。
  4. 抓住信号量。它的限制为 10,000,我们是唯一使用它的代码,所以它是 1/10000。
  5. 等待任务完成。
  6. 释放信号量,所以我们回到0/10000。
  7. 重复步骤 1。

我希望从那里可以很明显地看出,您一次运行一个任务,从而消除了使用 asyncio 的意义。

相反,我们可以这样做:

await asyncio.gather(*(handler(d, ...) for d in data_queue))

或者,如果您想要更有活力的东西:

async with asyncio.TaskGroup() as tg:
    for data in data_queue:
        tg.create_task(handler(data, ...))

对于信号量,最简单的选择是将其放入任务中,但这显然会增加内存使用量,因为您需要预先创建 100,000 多个任务,然后等待 90% 以上。

更好的选择可能是重构,以便创建 10,000 个任务,这些任务从

asyncio.Queue
提取数据,然后将所有数据转储到该队列中(并且当队列为空时让任务返回)。或者,这里还有一些其他示例:https://stackoverflow.com/a/48484593/3010334


其他一些小建议:

  • 请勿在您的应用程序中使用
    threading
    。该代码应该可以作为任务运行,但如果您确实需要执行重要的阻塞操作,请尝试
    asyncio.to_thread()
  • 使用
    asyncio.run()
    启动您的应用程序,而不是使用低级 API(这可能是您设法获得一些不应该发生的运行时错误的原因)。
© www.soinside.com 2019 - 2024. All rights reserved.