如果要获得最佳性能,则可能要考虑使用异步I / O而不是线程。与数以千计的OS线程相关的开销是不平凡的,并且Python解释器中的上下文切换在其之上增加了更多。线程肯定可以完成工作,但是我怀疑异步路由会提供更好的整体性能。
我正在打开一个具有100,000个URL的文件。我需要向每个URL发送一个HTTP请求并打印状态代码。我正在使用Python 2.6,到目前为止,我们研究了Python实现线程/并发性的许多令人困惑的方式。我什至看过python concurrence库,但无法弄清楚如何正确编写该程序。有没有人遇到过类似的问题?我想通常我需要知道如何尽快地在Python中执行数千个任务-我想这意味着“同时”。
无扭转解决方案:
from urlparse import urlparse
from threading import Thread
import httplib, sys
from Queue import Queue
concurrent = 200
def doWork():
while True:
url = q.get()
status, url = getStatus(url)
doSomethingWithResult(status, url)
q.task_done()
def getStatus(ourl):
try:
url = urlparse(ourl)
conn = httplib.HTTPConnection(url.netloc)
conn.request("HEAD", url.path)
res = conn.getresponse()
return res.status, ourl
except:
return "error", ourl
def doSomethingWithResult(status, url):
print status, url
q = Queue(concurrent * 2)
for i in range(concurrent):
t = Thread(target=doWork)
t.daemon = True
t.start()
try:
for url in open('urllist.txt'):
q.put(url.strip())
q.join()
except KeyboardInterrupt:
sys.exit(1)
这比扭曲的解决方案要快一点,并且使用更少的CPU。
使用thread pool是一个不错的选择,这将使之非常容易。不幸的是,python没有使线程池变得异常简单的标准库。但是这里有一个不错的图书馆,应该可以帮助您入门:http://www.chrisarndt.de/projects/threadpool/
创建epoll
对象,打开许多客户端TCP套接字,将其发送缓冲区调整为比请求标头多一点,发送请求标头-应该立即发送,仅放入缓冲区中,在epoll
对象中注册套接字,在.poll
上执行epoll
,从.poll
的每个套接字读取前3个字节,将它们写入sys.stdout
,然后写入\n
(请勿刷新),关闭客户端套接字。
对于您的情况,线程可能会成功,因为您可能会花费大量时间等待响应。标准库中有一些有用的模块,例如Queue,可能会有所帮助。
考虑使用Windmill,尽管Windmill可能无法执行那么多线程。
这个扭曲的异步Web客户端运行得很快。
最简单的方法是使用Python的内置线程库。 它们不是“真实的” /内核线程
使用tornado异步网络库的解决方案
from tornado import ioloop, httpclient
i = 0
def handle_request(response):
print(response.code)
global i
i -= 1
if i == 0:
ioloop.IOLoop.instance().stop()
http_client = httpclient.AsyncHTTPClient()
for url in open('urls.txt'):
i += 1
http_client.fetch(url.strip(), handle_request, method='HEAD')
ioloop.IOLoop.instance().start()
自2010年发布以来,情况发生了很大变化,我还没有尝试所有其他答案,但是我尝试了一些答案,我发现使用python3.6对我来说最有效。
我能够在AWS上每秒获取约150个唯一域。
import pandas as pd
import concurrent.futures
import requests
import time
out = []
CONNECTIONS = 100
TIMEOUT = 5
tlds = open('../data/sample_1k.txt').read().splitlines()
urls = ['http://{}'.format(x) for x in tlds[1:]]
def load_url(url, timeout):
ans = requests.head(url, timeout=timeout)
return ans.status_code
with concurrent.futures.ThreadPoolExecutor(max_workers=CONNECTIONS) as executor:
future_to_url = (executor.submit(load_url, url, TIMEOUT) for url in urls)
time1 = time.time()
for future in concurrent.futures.as_completed(future_to_url):
try:
data = future.result()
except Exception as exc:
data = str(type(exc))
finally:
out.append(data)
print(str(len(out)),end="\r")
time2 = time.time()
print(f'Took {time2-time1:.2f} s')
print(pd.Series(out).value_counts())
线程在这里绝对不是答案。如果总体目标是“最快的方法”,它们将提供进程和内核瓶颈,以及吞吐量限制,这是不可接受的。
twisted
和它的异步HTTP
客户端会为您带来更好的结果。
使用grequests,它是请求+ Gevent模块的组合。
GRequests允许您将Requests与Gevent一起使用,以轻松地发出异步HTTP请求。
用法很简单:
import grequests
urls = [
'http://www.heroku.com',
'http://tablib.org',
'http://httpbin.org',
'http://python-requests.org',
'http://kennethreitz.com'
]
创建一组未发送的请求:
>>> rs = (grequests.get(u) for u in urls)
同时发送它们:
>>> grequests.map(rs)
[<Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>]
解决此问题的一种好方法是先编写获得一个结果所需的代码,然后合并线程代码以并行化应用程序。
在理想情况下,这仅意味着同时启动100,000个线程,这些线程将其结果输出到字典或列表中以供以后处理,但是实际上,您以这种方式可以发出多少个并行HTTP请求受到限制。在本地,您可以同时打开多少个套接字,Python解释器允许多少个执行线程受到限制。从远程来看,如果所有请求都针对一台或多台服务器,则同时连接的数量可能会受到限制。这些限制可能需要您编写脚本,以便在任何时候仅轮询一小部分网址(如另一位发帖人所述,100可能是一个不错的线程池大小,尽管您可能会发现自己可以成功部署更多)。
您可以按照此设计模式来解决上述问题:
list
或dict
,则可以safely append or insert unique items from your threads without locks,但是如果您写入文件或需要更复杂的跨线程数据交互您应使用互斥锁以保护此状态免遭损坏]] ..>我建议您使用threading模块。您可以使用它来启动和跟踪正在运行的线程。 Python的线程支持是裸露的,但是对问题的描述表明它完全可以满足您的需求。
最后,如果您希望看到用Python编写的并行网络应用程序的简单明了的应用程序,请查看ssh.py。这是一个小型库,使用Python线程并行化许多SSH连接。该设计足够接近您的要求,您可能会发现它是很好的资源。
如果要获得最佳性能,则可能要考虑使用异步I / O而不是线程。与数以千计的OS线程相关的开销是不平凡的,并且Python解释器中的上下文切换在其之上增加了更多。线程肯定可以完成工作,但是我怀疑异步路由会提供更好的整体性能。
特别是,我建议Twisted库(http://www.twistedmatrix.com)中的异步Web客户端。它具有公认的陡峭的学习曲线,但是一旦您掌握了Twisted的异步编程风格,就很容易使用。
在Twisted的异步Web客户端API上的HowTo可在以下位置找到:
http://twistedmatrix.com/documents/current/web/howto/client.html
解决方案:
from twisted.internet import reactor, threads
from urlparse import urlparse
import httplib
import itertools
concurrent = 200
finished=itertools.count(1)
reactor.suggestThreadPoolSize(concurrent)
def getStatus(ourl):
url = urlparse(ourl)
conn = httplib.HTTPConnection(url.netloc)
conn.request("HEAD", url.path)
res = conn.getresponse()
return res.status
def processResponse(response,url):
print response, url
processedOne()
def processError(error,url):
print "error", url#, error
processedOne()
def processedOne():
if finished.next()==added:
reactor.stop()
def addTask(url):
req = threads.deferToThread(getStatus, url)
req.addCallback(processResponse, url)
req.addErrback(processError, url)
added=0
for url in open('urllist.txt'):
added+=1
addTask(url.strip())
try:
reactor.run()
except KeyboardInterrupt:
reactor.stop()
测试时间:
[kalmi@ubi1:~] wc -l urllist.txt
10000 urllist.txt
[kalmi@ubi1:~] time python f.py > /dev/null
real 1m10.682s
user 0m16.020s
sys 0m10.330s
[kalmi@ubi1:~] head -n 6 urllist.txt
http://www.google.com
http://www.bix.hu
http://www.godaddy.com
http://www.google.com
http://www.bix.hu
http://www.godaddy.com
[kalmi@ubi1:~] python f.py | head -n 6
200 http://www.bix.hu
200 http://www.bix.hu
200 http://www.bix.hu
200 http://www.bix.hu
200 http://www.bix.hu
200 http://www.bix.hu
Pingtime:
bix.hu is ~10 ms away from me
godaddy.com: ~170 ms
google.com: ~30 ms
我知道这是一个老问题,但是在Python 3.7中,您可以使用asyncio
和aiohttp
来做到这一点。
import asyncio
import aiohttp
from aiohttp import ClientSession, ClientConnectorError
async def fetch_html(url: str, session: ClientSession, **kwargs) -> tuple:
try:
resp = await session.request(method="GET", url=url, **kwargs)
except ClientConnectorError:
return (url, 404)
return (url, resp.status)
async def make_requests(urls: set, **kwargs) -> None:
async with ClientSession() as session:
tasks = []
for url in urls:
tasks.append(
fetch_html(url=url, session=session, **kwargs)
)
results = await asyncio.gather(*tasks)
for result in results:
print(f'{result[1]} - {str(result[0])}')
if __name__ == "__main__":
import pathlib
import sys
assert sys.version_info >= (3, 7), "Script requires Python 3.7+."
here = pathlib.Path(__file__).parent
with open(here.joinpath("urls.txt")) as infile:
urls = set(map(str.strip, infile))
asyncio.run(make_requests(urls=urls))
您可以阅读有关它的更多信息,并查看示例here。
使用thread pool是一个不错的选择,这将使之非常容易。不幸的是,python没有使线程池变得异常简单的标准库。但是这里有一个不错的图书馆,应该可以帮助您入门:http://www.chrisarndt.de/projects/threadpool/
他们所在站点的代码示例:
pool = ThreadPool(poolsize) requests = makeRequests(some_callable, list_of_args, callback) [pool.putRequest(req) for req in requests] pool.wait()
希望这会有所帮助。
创建epoll
对象,打开许多客户端TCP套接字,将其发送缓冲区调整为比请求标头多一点,发送请求标头-应该立即发送,仅放入缓冲区中,在epoll
对象中注册套接字,在.poll
上执行epoll
,从.poll
的每个套接字读取前3个字节,将它们写入sys.stdout
,然后写入\n
(请勿刷新),关闭客户端套接字。
限制同时打开的套接字数-创建套接字时处理错误。仅当另一个插座关闭时才创建一个新的插座。调整操作系统限制。尝试分叉到几个(不是很多)进程:这可能有助于更有效地使用CPU。
对于您的情况,线程可能会成功,因为您可能会花费大量时间等待响应。标准库中有一些有用的模块,例如Queue,可能会有所帮助。
以前我通过并行下载文件做过类似的事情,这对我来说已经足够好了,但是您谈论的范围并不大。
如果您的任务受CPU限制较大,则可能需要查看multiprocessing模块,该模块将允许您利用更多的CPU /核心/线程(更多的进程不会互相阻塞,因为锁定是基于过程)
考虑使用Windmill,尽管Windmill可能无法执行那么多线程。
您可以在5台计算机上使用手动滚动的Python脚本来完成此操作,每台计算机都使用端口40000-60000连接出站,从而打开100,000个端口连接。
[此外,使用线程良好的QA应用程序(例如OpenSTA进行示例测试,以了解每个服务器可以处理的数量。
另外,请尝试仅将简单的Perl与LWP :: ConnCache类一起使用。这样,您可能会获得更多的性能(更多的连接)。
这个扭曲的异步Web客户端运行得很快。
#!/usr/bin/python2.7
from twisted.internet import reactor
from twisted.internet.defer import Deferred, DeferredList, DeferredLock
from twisted.internet.defer import inlineCallbacks
from twisted.web.client import Agent, HTTPConnectionPool
from twisted.web.http_headers import Headers
from pprint import pprint
from collections import defaultdict
from urlparse import urlparse
from random import randrange
import fileinput
pool = HTTPConnectionPool(reactor)
pool.maxPersistentPerHost = 16
agent = Agent(reactor, pool)
locks = defaultdict(DeferredLock)
codes = {}
def getLock(url, simultaneous = 1):
return locks[urlparse(url).netloc, randrange(simultaneous)]
@inlineCallbacks
def getMapping(url):
# Limit ourselves to 4 simultaneous connections per host
# Tweak this number, but it should be no larger than pool.maxPersistentPerHost
lock = getLock(url,4)
yield lock.acquire()
try:
resp = yield agent.request('HEAD', url)
codes[url] = resp.code
except Exception as e:
codes[url] = str(e)
finally:
lock.release()
dl = DeferredList(getMapping(url.strip()) for url in fileinput.input())
dl.addCallback(lambda _: reactor.stop())
reactor.run()
pprint(codes)
最简单的方法是使用Python的内置线程库。 它们不是“真实的” /内核线程
如果要获得最佳性能,则可能要考虑使用异步I / O而不是线程。与数以千计的OS线程相关的开销是不平凡的,并且Python解释器中的上下文切换在其之上增加了更多。线程肯定可以完成工作,但是我怀疑异步路由会提供更好的整体性能。
解决方案:
我知道这是一个老问题,但是在Python 3.7中,您可以使用asyncio
和aiohttp
来做到这一点。