我正在使用AWS sagemaker实例,有64 giga ram和16个cpu核心,我试图让应用更快,但它不工作。
def get_item_list_per_date(date, ads_list, date_list):
loc = date_list.index(date)
updated_ads_list = ads_list[:loc]
return updated_ads_list
df['item_list_update'] = df.progress_apply(lambda x: get_item_list_per_date(date=x['date'], item_list=x['item_list'], date_list=x['date_list']),axis=1)
import multiprocessing as mp
def fx(df):
def __fx(s):
date = s['date']
date_list = s['date_list']
loc = date_list.index(date)
return s['item_list'][:loc]
return df.apply(__fx, axis=1)
def parallel_apply(df):
dfs = filter(lambda d: not d.empty, np.array_split(df, mp.cpu_count()))
pool = mp.Pool(1)
per_date = pd.concat(pool.map(fx, dfs))
pool.close()
pool.join()
return per_date
import dask.dataframe as dd
import multiprocessing
ddf= dd.from_pandas(df, npartitions= mp.cpu_count())
df['item_list_update'] = ddf.map_partitions(lambda df: df.apply(lambda x: get_item_list_per_date(date=x['date'], item_list=x['item_list'], date_list=x['date_list']),axis=1)).compute(scheduler='processes')
问题出在哪里?
我对Pandas的经验不多,对Dask的经验更少,但无论如何,你是在用一个worker做一个pool。
pool = mp.Pool(1)
这一般来说是没有什么意义的
另外,我建议不要使用所有的核心,因为这会让你的系统很不灵敏。我一般会留2个核心。
num_cores = max(mp.cpu_count()-2, 1)
而在你的代码中代入这一点就会被修改
dfs = filter(lambda d: not d.empty, np.array_split(df, num_cores))
pool = mp.Pool(num_cores)
我会使用python模块,叫做线程 在相同的cpu上运行进程,但线程不同。这样可以避免cpu进程启动时间过长。
def func(data1,data2):
# send request to an API maybe
# or download a file
return True
# You have like 1000 or more threads which means you can really split the processes
with concurrent.futures.ThreadPoolExecutor() as executor:
processes = [executor.submit(func, *[(data1, data2)]) for data1, data2 in datas]
data = []
for i in concurrent.futures.as_completed(processes):
data.append(i.result())
用多处理来处理CPU重的东西,用多线程来下载文件等。
当我需要下载一个网站,然后用API把内容翻译成英文时,我使用了这个模块。这真的提高了我的速度表现。
我不知道你想做什么。所以,如果我是你,那么我会试试多线程。