我有一个〜300K的URL列表,我需要从中获取数据。
API限制是每秒100次调用。
我已经为异步创建了一个类,但是这样做很快,我在API上遇到了错误。
如何减慢异步速度,以便每秒可以进行100次调用?
import grequests
lst = ['url.com','url2.com']
class Test:
def __init__(self):
self.urls = lst
def exception(self, request, exception):
print ("Problem: {}: {}".format(request.url, exception))
def async(self):
return grequests.map((grequests.get(u) for u in self.urls), exception_handler=self.exception, size=5)
def collate_responses(self, results):
return [x.text for x in results]
test = Test()
#here we collect the results returned by the async function
results = test.async()
response_text = test.collate_responses(results)
我采取的第一步是创建一个每t ms最多可以分配n个硬币的对象。
import time
class CoinsDistribution:
"""Object that distribute a maximum of maxCoins every timeLimit ms"""
def __init__(self, maxCoins, timeLimit):
self.maxCoins = maxCoins
self.timeLimit = timeLimit
self.coin = maxCoins
self.time = time.perf_counter()
def getCoin(self):
if self.coin <= 0 and not self.restock():
return False
self.coin -= 1
return True
def restock(self):
t = time.perf_counter()
if (t - self.time) * 1000 < self.timeLimit:
return False
self.coin = self.maxCoins
self.time = t
return True
现在我们需要一种强制函数的方法,只有在他们能得到硬币时才被调用。为此,我们可以编写一个我们可以使用的装饰器函数:
@limitCalls(callLimit=1, timeLimit=1000)
def uniqFunctionRequestingServer1():
return 'response from s1'
但有时,多个函数正在调用请求相同的服务器,因此我们希望它们从同一个CoinsDistribution对象中获取硬币。因此,装饰器的另一个用途是提供CoinsDistribution对象:
server_2_limit = CoinsDistribution(3, 1000)
@limitCalls(server_2_limit)
def sendRequestToServer2():
return 'it worked !!'
@limitCalls(server_2_limit)
def sendAnOtherRequestToServer2():
return 'it worked too !!'
我们现在必须创建装饰器,它可以使用CoinsDistribution对象或足够的数据来创建新的。
import functools
def limitCalls(obj=None, *, callLimit=100, timeLimit=1000):
if obj is None:
obj = CoinsDistribution(callLimit, timeLimit)
def limit_decorator(func):
@functools.wraps(func)
def limit_wrapper(*args, **kwargs):
if obj.getCoin():
return func(*args, **kwargs)
return 'limit reached, please wait'
return limit_wrapper
return limit_decorator
它已经完成了!现在,您可以限制您使用的任何API的调用次数,并且您可以构建一个字典来跟踪您的CoinsDistribution对象,如果您必须管理它们(不同的API端点或不同的API)。
注意:如果没有可用的硬币,我选择返回错误消息。您应该根据需要采用此行为。
您可以跟踪已经过了多少时间,并决定是否要执行更多请求。
这将每秒打印100个数字,例如:
from datetime import datetime
import time
start = datetime.now()
time.sleep(1);
counter = 0
while (True):
end = datetime.now()
s = (end-start).seconds
if (counter >= 100):
if (s <= 1):
time.sleep(1) # You can keep track of the time and sleep less, actually
start = datetime.now()
counter = 0
print(counter)
counter += 1
This SO中的其他问题显示了如何做到这一点。顺便说一下,你所需要的通常称为节流。