我正在尝试创建一个脚本,可以向网站发送 100000 到 1.000.000 范围内的大量请求。我正在尝试使用 asyncio 和 aiohttp 来执行此操作,但不知何故我不知道如何执行此操作。看起来它正在创建任务但没有完成它,而且我对 asyncio 和 aiohttp 没有进行太多实验。
代码如何工作?基本上,它首先询问我想要多少线程以及代理的代理类型如何,然后转到
start_workers
函数,在该函数中应该创建任务并启动。它使用 asyncio.semaphore
将其限制为一次 10.000 个并发请求,但它似乎不起作用。然后,当它调用检查函数时,它会向网站发送请求,处理响应并更新统计信息。但从我看来并非如此,这就是我今天来到这里的原因。为了检查检查器的工作情况,我创建了一个名为 console 的函数,该函数基本上在 while true: 循环中运行,以检查每 2 秒的进度,还有另一个名为 count_requests_per_minute
的函数,它应该检查可以有多少请求大约在一分钟内完成。自行查看代码如下:
import threading
import os
import time
import random
from queue import Queue
from tkinter.filedialog import askopenfilename
from tkinter import Tk
from colorama import Fore
from urllib.parse import quote
import asyncio
import aiohttp
stats_lock = asyncio.Lock()
stats = {
'valid': 0,
'invalid': 0,
'twofa': 0,
'error': 0,
'total_checked': 0,
'cpm': 0
}
async def check(data, proxy, stats_lock, stats, session):
payload = {
'data':data
}
headers2 = {
"User-Agent": "Mozilla/5.0 (Linux; Android 6.0; Nexus 5 Build/MRA58N) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Mobile Safari/537.36",
}
async with session.post("https://example_website.com", headers=headers2, data=payload, proxy=proxy) as response:
response_text = await response.text()
if "true" in response_text:
stats['valid'] +=1
elif "false" in response_text:
stats['invalid']+=1
async def handler(data, proxy, proxytype, stats_lock, stats, session):
async with session:
proxy_url = f"http://{proxy}"
await check(data, proxy_url, stats_lock, stats, session)
async def start_workers(threads, data_queue, proxies_list, proxies_input):
sem = asyncio.Semaphore(threads)
console_thread = threading.Thread(target=console, args=(len(data_queue),))
console_thread.start()
cpm_thread = threading.Thread(target=count_requests_per_minute)
cpm_thread.start()
tasks = []
async with aiohttp.ClientSession(connector=aiohttp.TCPConnector(), trust_env=True) as session:
for data in data_queue:
task = asyncio.ensure_future(handler(data, random.choice(proxies_list), proxies_input, stats_lock, stats, session))
tasks.append(task)
await asyncio.sleep(0)
async with sem:
await task
async def main():
data_queue = []
root = Tk()
root.withdraw()
threads = int(input(f"{Fore.RED}[{Fore.WHITE}?{Fore.RED}]{Fore.WHITE} - {Fore.RED}How many threads {Fore.WHITE}:{Fore.RED}"))
proxies_input = input(f"\n{Fore.RED}[{Fore.WHITE}!{Fore.RED}]{Fore.WHITE} > {Fore.RED}Proxies type {Fore.WHITE}| {Fore.RED}HTTP{Fore.WHITE}/{Fore.RED}SOCKS4{Fore.WHITE}/{Fore.RED}SOCKS5 {Fore.WHITE}:{Fore.RED} ")
combo_file = askopenfilename(title="Data File", parent=root)
with open(combo_file, "r", encoding='utf-8') as combofile:
data_queue.extend(combofile.read().splitlines())
proxy_file = askopenfilename(title="Proxy File")
with open(proxy_file, "r") as proxyfile:
proxies_list = [line.strip() for line in proxyfile]
await start_workers(threads, data_queue, proxies_list, proxies_input)
def count_requests_per_minute():
while True:
time.sleep(1)
stats['cpm'] = stats['total_checked'] * 60
stats['total_checked'] = 0
def console(combo):
print(f"\n{Fore.RED}[{Fore.WHITE}!{Fore.RED}]{Fore.WHITE} - {Fore.RED}Please wait while the console is loading.")
time.sleep(10)
os.system("cls")
while True:
os.system("cls")
print(f"""
{Fore.RED}[{Fore.WHITE}Valid{Fore.RED}]{Fore.WHITE} | {Fore.RED}{stats['valid']}
{Fore.RED}[{Fore.WHITE}Invalid{Fore.RED}]{Fore.WHITE} | {Fore.RED}{stats['invalid']}
{Fore.RED}[{Fore.WHITE}Errors{Fore.RED}]{Fore.WHITE} | {Fore.RED}{stats['error']}
{Fore.RED}[{Fore.WHITE}Checked{Fore.RED}]{Fore.WHITE} | {Fore.RED}{stats['valid']+stats['invalid']}/{combo}
{Fore.RED}[{Fore.WHITE}CPM{Fore.RED}]{Fore.WHITE} | {Fore.RED}{stats['cpm']}
""")
time.sleep(2)
if __name__ == "__main__":
os.system("cls")
try:
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
except RuntimeError as e:
if str(e) != "Event loop is closed":
raise e
except Exception:
pass
for data in data_queue:
task = asyncio.ensure_future(handler(data, random.choice(proxies_list), proxies_input, stats_lock, stats, session))
tasks.append(task)
await asyncio.sleep(0)
async with sem:
await task
让我为您分解一下:
asyncio.create_task()
)。我希望从那里可以很明显地看出,您一次运行一个任务,从而消除了使用 asyncio 的意义。
相反,我们可以这样做:
await asyncio.gather(*(handler(d, ...) for d in data_queue))
或者,如果您想要更有活力的东西:
async with asyncio.TaskGroup() as tg:
for data in data_queue:
tg.create_task(handler(data, ...))
对于信号量,最简单的选择是将其放入任务中,但这显然会增加内存使用量,因为您需要预先创建 100,000 多个任务,然后等待 90% 以上。
更好的选择可能是重构,以便创建 10,000 个任务,这些任务从
asyncio.Queue
提取数据,然后将所有数据转储到该队列中(并且当队列为空时让任务返回)。或者,这里还有一些其他示例:https://stackoverflow.com/a/48484593/3010334
其他一些小建议:
threading
。该代码应该可以作为任务运行,但如果您确实需要执行重要的阻塞操作,请尝试asyncio.to_thread()
。asyncio.run()
启动您的应用程序,而不是使用低级 API(这可能是您设法获得一些不应该发生的运行时错误的原因)。