用Python发送100,000个HTTP请求的最快方法是什么?

问题描述 投票:258回答:15

我正在打开一个具有100,000个URL的文件。我需要向每个URL发送一个HTTP请求并打印状态代码。我正在使用Python 2.6,到目前为止,我们研究了Python实现线程/并发性的许多令人困惑的方式。我什至看过python concurrence库,但无法弄清楚如何正确编写该程序。有没有人遇到过类似的问题?我想通常我需要知道如何尽快地在Python中执行数千个任务-我想这意味着“同时”。

python http concurrency
15个回答
192
投票

无扭转解决方案:

from urlparse import urlparse
from threading import Thread
import httplib, sys
from Queue import Queue

concurrent = 200

def doWork():
    while True:
        url = q.get()
        status, url = getStatus(url)
        doSomethingWithResult(status, url)
        q.task_done()

def getStatus(ourl):
    try:
        url = urlparse(ourl)
        conn = httplib.HTTPConnection(url.netloc)   
        conn.request("HEAD", url.path)
        res = conn.getresponse()
        return res.status, ourl
    except:
        return "error", ourl

def doSomethingWithResult(status, url):
    print status, url

q = Queue(concurrent * 2)
for i in range(concurrent):
    t = Thread(target=doWork)
    t.daemon = True
    t.start()
try:
    for url in open('urllist.txt'):
        q.put(url.strip())
    q.join()
except KeyboardInterrupt:
    sys.exit(1)

这比扭曲的解决方案要快一点,并且使用更少的CPU。


1
投票

使用thread pool是一个不错的选择,这将使之非常容易。不幸的是,python没有使线程池变得异常简单的标准库。但是这里有一个不错的图书馆,应该可以帮助您入门:http://www.chrisarndt.de/projects/threadpool/


1
投票

创建epoll对象,打开许多客户端TCP套接字,将其发送缓冲区调整为比请求标头多一点,发送请求标头-应该立即发送,仅放入缓冲区中,在epoll对象中注册套接字,在.poll上执行epoll,从.poll的每个套接字读取前3个字节,将它们写入sys.stdout,然后写入\n(请勿刷新),关闭客户端套接字。


0
投票

对于您的情况,线程可能会成功,因为您可能会花费大量时间等待响应。标准库中有一些有用的模块,例如Queue,可能会有所帮助。


0
投票

考虑使用Windmill,尽管Windmill可能无法执行那么多线程。


0
投票

这个扭曲的异步Web客户端运行得很快。


-2
投票

最简单的方法是使用Python的内置线程库。 它们不是“真实的” /内核线程


51
投票

使用tornado异步网络库的解决方案

from tornado import ioloop, httpclient

i = 0

def handle_request(response):
    print(response.code)
    global i
    i -= 1
    if i == 0:
        ioloop.IOLoop.instance().stop()

http_client = httpclient.AsyncHTTPClient()
for url in open('urls.txt'):
    i += 1
    http_client.fetch(url.strip(), handle_request, method='HEAD')
ioloop.IOLoop.instance().start()

40
投票

自2010年发布以来,情况发生了很大变化,我还没有尝试所有其他答案,但是我尝试了一些答案,我发现使用python3.6对我来说最有效。

我能够在AWS上每秒获取约150个唯一域。

import pandas as pd
import concurrent.futures
import requests
import time

out = []
CONNECTIONS = 100
TIMEOUT = 5

tlds = open('../data/sample_1k.txt').read().splitlines()
urls = ['http://{}'.format(x) for x in tlds[1:]]

def load_url(url, timeout):
    ans = requests.head(url, timeout=timeout)
    return ans.status_code

with concurrent.futures.ThreadPoolExecutor(max_workers=CONNECTIONS) as executor:
    future_to_url = (executor.submit(load_url, url, TIMEOUT) for url in urls)
    time1 = time.time()
    for future in concurrent.futures.as_completed(future_to_url):
        try:
            data = future.result()
        except Exception as exc:
            data = str(type(exc))
        finally:
            out.append(data)

            print(str(len(out)),end="\r")

    time2 = time.time()

print(f'Took {time2-time1:.2f} s')
print(pd.Series(out).value_counts())

38
投票

线程在这里绝对不是答案。如果总体目标是“最快的方法”,它们将提供进程和内核瓶颈,以及吞吐量限制,这是不可接受的。

twisted和它的异步HTTP客户端会为您带来更好的结果。


16
投票

使用grequests,它是请求+ Gevent模块的组合。

GRequests允许您将Requests与Gevent一起使用,以轻松地发出异步HTTP请求。

用法很简单:

import grequests

urls = [
   'http://www.heroku.com',
   'http://tablib.org',
   'http://httpbin.org',
   'http://python-requests.org',
   'http://kennethreitz.com'
]

创建一组未发送的请求:

>>> rs = (grequests.get(u) for u in urls)

同时发送它们:

>>> grequests.map(rs)
[<Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>]

8
投票

解决此问题的一种好方法是先编写获得一个结果所需的代码,然后合并线程代码以并行化应用程序。

在理想情况下,这仅意味着同时启动100,000个线程,这些线程将其结果输出到字典或列表中以供以后处理,但是实际上,您以这种方式可以发出多少个并行HTTP请求受到限制。在本地,您可以同时打开多少个套接字,Python解释器允许多少个执行线程受到限制。从远程来看,如果所有请求都针对一台或多台服务器,则同时连接的数量可能会受到限制。这些限制可能需要您编写脚本,以便在任何时候仅轮询一小部分网址(如另一位发帖人所述,100可能是一个不错的线程池大小,尽管您可能会发现自己可以成功部署更多)。

您可以按照此设计模式来解决上述问题:

  1. 启动一个线程以启动新的请求线程,直到当前正在运行的线程数(您可以通过threading.active_count()或通过将线程对象推入数据结构来跟踪它们)> =您的同时请求的最大数量(例如100),然后短暂睡眠。没有更多URL可处理时,该线程应终止。因此,线程将不断唤醒,启动新线程并进入睡眠状态,直到完成为止。
  2. 已将请求线程的结果存储在某种数据结构中,以供以后检索和输出。如果要在其中存储结果的结构是CPython中的listdict,则可以safely append or insert unique items from your threads without locks,但是如果您写入文件或需要更复杂的跨线程数据交互您应使用互斥锁以保护此状态免遭损坏]] ..>

    我建议您使用threading模块。您可以使用它来启动和跟踪正在运行的线程。 Python的线程支持是裸露的,但是对问题的描述表明它完全可以满足您的需求。

  3. 最后,如果您希望看到用Python编写的并行网络应用程序的简单明了的应用程序,请查看ssh.py。这是一个小型库,使用Python线程并行化许多SSH连接。该设计足够接近您的要求,您可能会发现它是很好的资源。

如果要获得最佳性能,则可能要考虑使用异步I / O而不是线程。与数以千计的OS线程相关的开销是不平凡的,并且Python解释器中的上下文切换在其之上增加了更多。线程肯定可以完成工作,但是我怀疑异步路由会提供更好的整体性能。

特别是,我建议Twisted库(http://www.twistedmatrix.com)中的异步Web客户端。它具有公认的陡峭的学习曲线,但是一旦您掌握了Twisted的异步编程风格,就很容易使用。

在Twisted的异步Web客户端API上的HowTo可在以下位置找到:

http://twistedmatrix.com/documents/current/web/howto/client.html

解决方案:

from twisted.internet import reactor, threads
from urlparse import urlparse
import httplib
import itertools


concurrent = 200
finished=itertools.count(1)
reactor.suggestThreadPoolSize(concurrent)

def getStatus(ourl):
    url = urlparse(ourl)
    conn = httplib.HTTPConnection(url.netloc)   
    conn.request("HEAD", url.path)
    res = conn.getresponse()
    return res.status

def processResponse(response,url):
    print response, url
    processedOne()

def processError(error,url):
    print "error", url#, error
    processedOne()

def processedOne():
    if finished.next()==added:
        reactor.stop()

def addTask(url):
    req = threads.deferToThread(getStatus, url)
    req.addCallback(processResponse, url)
    req.addErrback(processError, url)   

added=0
for url in open('urllist.txt'):
    added+=1
    addTask(url.strip())

try:
    reactor.run()
except KeyboardInterrupt:
    reactor.stop()

测试时间:

[kalmi@ubi1:~] wc -l urllist.txt
10000 urllist.txt
[kalmi@ubi1:~] time python f.py > /dev/null 

real    1m10.682s
user    0m16.020s
sys 0m10.330s
[kalmi@ubi1:~] head -n 6 urllist.txt
http://www.google.com
http://www.bix.hu
http://www.godaddy.com
http://www.google.com
http://www.bix.hu
http://www.godaddy.com
[kalmi@ubi1:~] python f.py | head -n 6
200 http://www.bix.hu
200 http://www.bix.hu
200 http://www.bix.hu
200 http://www.bix.hu
200 http://www.bix.hu
200 http://www.bix.hu

Pingtime:

bix.hu is ~10 ms away from me
godaddy.com: ~170 ms
google.com: ~30 ms

我知道这是一个老问题,但是在Python 3.7中,您可以使用asyncioaiohttp来做到这一点。

import asyncio
import aiohttp
from aiohttp import ClientSession, ClientConnectorError

async def fetch_html(url: str, session: ClientSession, **kwargs) -> tuple:
    try:
        resp = await session.request(method="GET", url=url, **kwargs)
    except ClientConnectorError:
        return (url, 404)
    return (url, resp.status)

async def make_requests(urls: set, **kwargs) -> None:
    async with ClientSession() as session:
        tasks = []
        for url in urls:
            tasks.append(
                fetch_html(url=url, session=session, **kwargs)
            )
        results = await asyncio.gather(*tasks)

    for result in results:
        print(f'{result[1]} - {str(result[0])}')

if __name__ == "__main__":
    import pathlib
    import sys

    assert sys.version_info >= (3, 7), "Script requires Python 3.7+."
    here = pathlib.Path(__file__).parent

    with open(here.joinpath("urls.txt")) as infile:
        urls = set(map(str.strip, infile))

    asyncio.run(make_requests(urls=urls))

您可以阅读有关它的更多信息,并查看示例here

使用thread pool是一个不错的选择,这将使之非常容易。不幸的是,python没有使线程池变得异常简单的标准库。但是这里有一个不错的图书馆,应该可以帮助您入门:http://www.chrisarndt.de/projects/threadpool/

他们所在站点的代码示例:

pool = ThreadPool(poolsize)
requests = makeRequests(some_callable, list_of_args, callback)
[pool.putRequest(req) for req in requests]
pool.wait()

希望这会有所帮助。

创建epoll对象,打开许多客户端TCP套接字,将其发送缓冲区调整为比请求标头多一点,发送请求标头-应该立即发送,仅放入缓冲区中,在epoll对象中注册套接字,在.poll上执行epoll,从.poll的每个套接字读取前3个字节,将它们写入sys.stdout,然后写入\n(请勿刷新),关闭客户端套接字。

限制同时打开的套接字数-创建套接字时处理错误。仅当另一个插座关闭时才创建一个新的插座。调整操作系统限制。尝试分叉到几个(不是很多)进程:这可能有助于更有效地使用CPU。

对于您的情况,线程可能会成功,因为您可能会花费大量时间等待响应。标准库中有一些有用的模块,例如Queue,可能会有所帮助。

以前我通过并行下载文件做过类似的事情,这对我来说已经足够好了,但是您谈论的范围并不大。

如果您的任务受CPU限制较大,则可能需要查看multiprocessing模块,该模块将允许您利用更多的CPU /核心/线程(更多的进程不会互相阻塞,因为锁定是基于过程)

考虑使用Windmill,尽管Windmill可能无法执行那么多线程。

您可以在5台计算机上使用手动滚动的Python脚本来完成此操作,每台计算机都使用端口40000-60000连接出站,从而打开100,000个端口连接。

[此外,使用线程良好的QA应用程序(例如OpenSTA进行示例测试,以了解每个服务器可以处理的数量。

另外,请尝试仅将简单的Perl与LWP :: ConnCache类一起使用。这样,您可能会获得更多的性能(更多的连接)。

这个扭曲的异步Web客户端运行得很快。

#!/usr/bin/python2.7

from twisted.internet import reactor
from twisted.internet.defer import Deferred, DeferredList, DeferredLock
from twisted.internet.defer import inlineCallbacks
from twisted.web.client import Agent, HTTPConnectionPool
from twisted.web.http_headers import Headers
from pprint import pprint
from collections import defaultdict
from urlparse import urlparse
from random import randrange
import fileinput

pool = HTTPConnectionPool(reactor)
pool.maxPersistentPerHost = 16
agent = Agent(reactor, pool)
locks = defaultdict(DeferredLock)
codes = {}

def getLock(url, simultaneous = 1):
    return locks[urlparse(url).netloc, randrange(simultaneous)]

@inlineCallbacks
def getMapping(url):
    # Limit ourselves to 4 simultaneous connections per host
    # Tweak this number, but it should be no larger than pool.maxPersistentPerHost 
    lock = getLock(url,4)
    yield lock.acquire()
    try:
        resp = yield agent.request('HEAD', url)
        codes[url] = resp.code
    except Exception as e:
        codes[url] = str(e)
    finally:
        lock.release()


dl = DeferredList(getMapping(url.strip()) for url in fileinput.input())
dl.addCallback(lambda _: reactor.stop())

reactor.run()
pprint(codes)

最简单的方法是使用Python的内置线程库。 它们不是“真实的” /内核线程

它们有问题(例如序列化),但是足够好。您需要一个队列和线程池。一个选项是here,但是编写自己的选项很简单。您无法并行处理所有100,000个呼叫,但可以同时触发100个(或大约)呼叫。

7
投票

如果要获得最佳性能,则可能要考虑使用异步I / O而不是线程。与数以千计的OS线程相关的开销是不平凡的,并且Python解释器中的上下文切换在其之上增加了更多。线程肯定可以完成工作,但是我怀疑异步路由会提供更好的整体性能。


6
投票

解决方案:


5
投票

我知道这是一个老问题,但是在Python 3.7中,您可以使用asyncioaiohttp来做到这一点。

© www.soinside.com 2019 - 2024. All rights reserved.