除了请求之外,我一直在尝试使用并发。futures,以便从多个不同的用户发送多个不同的直接消息。我正在设计的应用程序的目的是尽快发送这些直接消息,而单独发送每个请求花费的时间太长。
下面的代码是我尝试过的代码,但我清楚地发现 futures 不会读取存储在数组中的请求。
任何有关如何执行此操作的建议将不胜感激。
from concurrent import futures
import requests
from requests_oauthlib import OAuth1
import json
from datetime import datetime
startTime = datetime.now()
URLS = ['https://api.twitter.com/1.1/direct_messages/new.json'] * 1
def get_oauth():
oauth = OAuth1("xxxxxx",
client_secret="zzzxxxx",
resource_owner_key="xxxxxxxxxxxxxxxxxx",
resource_owner_secret="xxxxxxxxxxxxxxxxxxxx")
return oauth
oauth = get_oauth()
req = []
def load_url(url, timeout):
req.append(requests.post(url, data={'screen_name':'vancephuoc','text':'hello pasdfasasdfdasdfasdffpls 1 2 3 4 5'}, auth=oauth, stream=True, timeout=timeout))
req.append(requests.post(url, data={'screen_name':'vancephuoc','text':'hello this is tweetnumber2 1 2 3 4 5 7'}, auth=oauth, stream=True, timeout=timeout))
with futures.ThreadPoolExecutor(max_workers=100) as executor:
future_to_url = dict((executor.submit(req, url, 60 ), url)
for url in URLS)
for future in futures.as_completed(future_to_url):
url = future_to_url[future]
print ("DM SENT IN")
print (datetime.now()-startTime)
try:
data = future.result()
except Exception as exc:
print('%r generated an exception: %s' % (url, exc))
else:
print('%r page is %d bytes' % (url, len(data)))
可能值得看看一些现有的库,它们尝试通过
requests
简化并发使用。
来自:http://docs.python-requests.org/en/latest/user/advanced/#blocking-or-non-blocking
[..] 有很多项目将请求与 Python 的异步框架之一相结合。两个很好的例子是 grequests 和 requests-futures。
这里的模式与文档略有不同,更容易阅读。这可以用作样板代码来快速请求/处理数据:
s = requests.Session()
s.proxies=proxies
retries = Retry(total=5,
backoff_factor=0.1,
status_forcelist=[ 500, 502, 503, 504 ])
s.mount('http://', HTTPAdapter(max_retries=retries))
s.mount('https://', HTTPAdapter(max_retries=retries))
output_data = []
def parse_save(response, record):
global output_data
...
output_data.append(record)
def get_url(url):
response = '<html></html>'
try:
response = session.get(url,)
response = response.text
except Exception as e:
print(f'error here response: {e}')
return response
links = pickle.load(open('links.pk', 'rb'))
with concurrent.futures.ThreadPoolExecutor(max_workers=20) as executor:
futures = []
##submit all tasks
for link in links:
future = executor.submit(load_url, link,)
future.link = link
futures.append(future)
##process as they come in
for future in concurrent.futures.as_completed(futures):
url = future.link
try:
data = future.result()
parse_save(data, {"link": url})
except Exception as e:
print(f'issue with {url} : {e}')
output_data.append({"link": url, "error": e})