如何使用 Pandas apply() 函数对 API 应用异步调用

问题描述 投票:0回答:1

我有一个约 14,000 行的数据框,并试图通过调用 API 将一些数据填充到新列中。下面的代码检索了预期的响应,但是,似乎每次迭代都在等待响应以转到下一行。

这里是函数:

def market_sector_des(isin):
isin = '/isin/' + isin
return blp.bdp(tickers = isin, flds = ['market_sector_des']).iloc[0]

我正在使用 xbbg 调用 Bloomberg API。

.apply() 函数返回预期的响应,

df['new_column'] = df['ISIN'].apply(market_sector_des)

但每个响应大约需要 2 秒,在 14,000 行时大约需要 8 小时。

有没有办法让这个apply函数异步,让所有的请求都并行发送?我已经将 dask 视为替代方案,但是,我也遇到了使用它的问题。

pandas asynchronous apply
1个回答
0
投票

您可以使用

multiprocessing
并行化 API 调用。将您的 Series 分成 THREAD 块,然后每个块运行一个进程:

import multiprocessing as mp
import pandas as pd
import numpy as np

THREADS = mp.cpu_count() - 1

def market_sector_des(isin):
    isin = '/isin/' + isin
    return isin.lower()
    return blp.bdp(tickers = isin, flds = ['market_sector_des']).iloc[0]

def proxy_func(sr):
    return pd.Series([market_sector_des(isin) for isin in sr], index=sr.index)

if __name__ == '__main__':
    # df = your_dataframe_here
    split = np.array_split(df['ISIN'], THREADS)
    with mp.Pool(THREADS) as pool:
        data = pool.map(proxy_func, split)

    df['new_column'] = pd.concat(data)
© www.soinside.com 2019 - 2024. All rights reserved.