使用requests和concurrent.futures异步发送多个API post请求

问题描述 投票:0回答:2

除了请求之外,我一直在尝试使用并发。futures,以便从多个不同的用户发送多个不同的直接消息。我正在设计的应用程序的目的是尽快发送这些直接消息,而单独发送每个请求花费的时间太长。

下面的代码是我尝试过的代码,但我清楚地发现 futures 不会读取存储在数组中的请求。

任何有关如何执行此操作的建议将不胜感激。

from concurrent import futures
import requests
from requests_oauthlib import OAuth1
import json
from datetime import datetime

startTime = datetime.now()

URLS = ['https://api.twitter.com/1.1/direct_messages/new.json'] * 1

def get_oauth():
    oauth = OAuth1("xxxxxx",
                client_secret="zzzxxxx",
                resource_owner_key="xxxxxxxxxxxxxxxxxx",
                resource_owner_secret="xxxxxxxxxxxxxxxxxxxx")
    return oauth

oauth = get_oauth()

req = []


def load_url(url, timeout):
    req.append(requests.post(url, data={'screen_name':'vancephuoc','text':'hello pasdfasasdfdasdfasdffpls 1 2 3 4 5'}, auth=oauth, stream=True, timeout=timeout))
    req.append(requests.post(url, data={'screen_name':'vancephuoc','text':'hello this is tweetnumber2 1 2 3 4 5 7'}, auth=oauth, stream=True, timeout=timeout))



with futures.ThreadPoolExecutor(max_workers=100) as executor:
    future_to_url = dict((executor.submit(req, url, 60 ), url)
                         for url in URLS)
    for future in futures.as_completed(future_to_url):
        url = future_to_url[future]
        print ("DM SENT IN")
        print (datetime.now()-startTime)
        try:
            data = future.result()
        except Exception as exc:
            print('%r generated an exception: %s' % (url, exc))
        else:
            print('%r page is %d bytes' % (url, len(data)))
python twitter
2个回答
6
投票

可能值得看看一些现有的库,它们尝试通过

requests
简化并发使用。

来自:http://docs.python-requests.org/en/latest/user/advanced/#blocking-or-non-blocking

[..] 有很多项目将请求与 Python 的异步框架之一相结合。两个很好的例子是 grequestsrequests-futures


0
投票

这里的模式与文档略有不同,更容易阅读。这可以用作样板代码来快速请求/处理数据:

s = requests.Session()
s.proxies=proxies
retries = Retry(total=5,
                backoff_factor=0.1,
                status_forcelist=[ 500, 502, 503, 504 ])
s.mount('http://', HTTPAdapter(max_retries=retries))
s.mount('https://', HTTPAdapter(max_retries=retries))

output_data = [] 

def parse_save(response, record):
    global output_data
    ...
    output_data.append(record)
            
def get_url(url):
    response = '<html></html>'
    try: 
        response = session.get(url,)
        response = response.text
    except Exception as e: 
        print(f'error here response: {e}')
    return response
      
links = pickle.load(open('links.pk', 'rb'))

with concurrent.futures.ThreadPoolExecutor(max_workers=20) as executor:
    futures = []
    ##submit all tasks
    for link in links:
        future = executor.submit(load_url, link,)
        future.link = link
        futures.append(future)
    ##process as they come in
    for future in concurrent.futures.as_completed(futures):
        url = future.link
        try:
            data = future.result()
            parse_save(data, {"link": url})
        except Exception as e:
            print(f'issue with {url} : {e}')
            output_data.append({"link": url, "error": e})
© www.soinside.com 2019 - 2024. All rights reserved.