使用多线程的异步请求

问题描述 投票:0回答:1

我有很多公司,正在调用REST API以获取每个公司的每日股票价格。详细信息存储在PostgreSQL数据库中。核心功能如下:

async def get_data_asynchronous():
conn = await asyncpg.connect(**DBConn)

path = 'path'
source = pd.read_excel(io=path + 'companies.xlsx', sheet_name='data')

retries = Retry(total=2, backoff_factor=1, status_forcelist=[404, 502, 503, 504])
dates = pd.date_range('2015-01-01', '2019-12-01', freq='D').strftime("%d-%m-%Y").tolist() 

with ThreadPoolExecutor(max_workers=10) as executor:
    with requests.Session() as session:
        session.mount('https://', HTTPAdapter(max_retries=retries))

        loop = asyncio.get_event_loop()

        for index, inputrow in source.iterrows():
            try:
                if int(inputrow['rowid']) > 0:
                    compid = inputrow['compid'].lower().strip()

                    tasks = [
                        loop.run_in_executor(
                            executor,
                            fetch,
                            *(session, compid, datetime.datetime.strptime(str(dates[i-1]), '%d-%m-%Y'), datetime.datetime.strptime(str(dates[i]), '%d-%m-%Y'))
                        )
                        for i in range(len(dates))
                    ]

                    for content in await asyncio.gather(*tasks):
                        if content is not None:
                            for data in content:
                                compid = data.get('compid', '')
                                date = data.get('date', '')
                                stock_price = data.get('sprice', '')

                                try:
                                    await conn.execute('''
                                                    INSERT INTO comp_dailyhistory VALUES($1, $2, $3)
                                                    ''', compid, date, stock_price)
                                except Exception as e:
                                    print('ERROR')
                                    pass
                    pass
            except Exception as e:
                print(str(e))
                pass

在上面的功能中,我首先从excel工作表(源)获取公司列表,并创建日期列表。由于列表中有超过20万家公司,因此我创建了一个ThreadPoolExecutor,最多可容纳10个工人。目的是传递日期范围内的每个公司ID(复杂)和两个连续的日期以异步方式转换为“提取”功能,以加快整个数据收集过程。提取功能如下所示:

def fetch(session, compid, start, stop):
base_url = 'baseurl'

try:
    with session.get(base_url + 'compid=' + compid + '&begin=' + str(int(start.timestamp())) + '&end=' + str(int(stop.timestamp())), timeout=None) as data:
        content = []

        if data.status_code == 200:
            for item in data.json():
                ret = {'compid': compid, 'date': str(date), 'sprice': sprice}
                content.append(ret)
            return content
        else:
            return None
except Exception as e:
    return None 

提取函数使用request.get来获取公司在开始和停止日期之间的股价列表,将JSON响应解析为键值对列表,然后将其返回给调用函数。然后由调用函数中的asyncio.gather函数拾取返回的列表,其中每个股价都使用asyncpg存储在postgreSQL中。其余代码如下:

def main():
    loop = asyncio.get_event_loop()
    future = asyncio.ensure_future(get_data_asynchronous())
    loop.run_until_complete(future)

main()    

此设置的主要问题是该脚本似乎并没有针对给定公司收取全部价格。例如,对于compid = 1,应该精确地有600个每日价格。但是,每次运行脚本时,我都会得到不同的结果,该结果始终低于真实计数。例如,我在第一轮中获得550的每日价格,在第二轮中获得570的日价格,在第三轮中获得540的日价格,依此类推。...

为什么我的脚本无法提取600个每日价格的完整清单?我的某些请求是否以某种方式被放弃?我尝试了aiohttp请求的替代方法,但是并没有取得太大进展。

我没有多线程编程的经验,尤其是asyncio的经验,真的希望在这方面有任何帮助吗?在此先感谢您的时间。

python multithreading asynchronous python-asyncio aiohttp
1个回答
0
投票

我已经完成了多个涉及抓取网站的项目,每天都可以获取数千个股票价格。正如dano所建议的,该问题与您的错误处理有关:

except Exception as e:
    return None

这不会处理失败的请求。您可以将失败的URL附加到列表中,然后在脚本的末尾再次使用这些URL运行“获取”功能。如果您的信息很重要,您甚至可以定义一个函数,该函数尝试至少5-10次以下载股票信息,然后返回“无”。

更多与多线程问题有关,您也必须是

希望有帮助。

© www.soinside.com 2019 - 2024. All rights reserved.