并行应用并不比普通应用快,pyhon的应用速度也不快

问题描述 投票:0回答:1

我正在使用AWS sagemaker实例,有64 giga ram和16个cpu核心,我试图让应用更快,但它不工作。

  1. 常规应用--CPU时间:用户1分59秒,系统3.51秒,总时间:2分59秒。3.51秒,总时间:2分3秒,墙时间:2分3秒。
def get_item_list_per_date(date, ads_list, date_list):

      loc = date_list.index(date)
      updated_ads_list = ads_list[:loc]
      return updated_ads_list

    df['item_list_update'] = df.progress_apply(lambda x: get_item_list_per_date(date=x['date'], item_list=x['item_list'], date_list=x['date_list']),axis=1)
  1. 使用Pool应用--CPU时间:用户48.8秒,系统5.39秒,总时间:54.2秒,墙时间:5分29秒。5.39秒,总时间:54.2秒,墙时间:5分29秒。
       import multiprocessing as mp

        def fx(df):
            def __fx(s):
                date = s['date']
                date_list = s['date_list']
                loc = date_list.index(date)
                return s['item_list'][:loc]

            return df.apply(__fx, axis=1)

        def parallel_apply(df):
            dfs = filter(lambda d: not d.empty, np.array_split(df, mp.cpu_count()))
            pool = mp.Pool(1)
            per_date = pd.concat(pool.map(fx, dfs))
            pool.close()
            pool.join()
            return per_date
  1. 用dask应用 - CPU时间:用户7分13秒,系统10.7秒,总时间:7分24秒,墙时间:7分16秒。10.7秒,总时间:7分24秒,墙时间:7分16秒。
    import dask.dataframe as dd
    import multiprocessing

    ddf= dd.from_pandas(df, npartitions= mp.cpu_count())
    df['item_list_update'] = ddf.map_partitions(lambda df: df.apply(lambda x: get_item_list_per_date(date=x['date'], item_list=x['item_list'], date_list=x['date_list']),axis=1)).compute(scheduler='processes')

问题出在哪里?

python multiprocessing apply
1个回答
0
投票

我对Pandas的经验不多,对Dask的经验更少,但无论如何,你是在用一个worker做一个pool。

pool = mp.Pool(1)

这一般来说是没有什么意义的

另外,我建议不要使用所有的核心,因为这会让你的系统很不灵敏。我一般会留2个核心。

num_cores = max(mp.cpu_count()-2, 1)

而在你的代码中代入这一点就会被修改

        dfs = filter(lambda d: not d.empty, np.array_split(df, num_cores))
        pool = mp.Pool(num_cores)

0
投票

我会使用python模块,叫做线程 在相同的cpu上运行进程,但线程不同。这样可以避免cpu进程启动时间过长。

def func(data1,data2):
    # send request to an API maybe
    # or download a file
    return True

# You have like 1000 or more threads which means you can really split the processes
with concurrent.futures.ThreadPoolExecutor() as executor:
    processes = [executor.submit(func, *[(data1, data2)]) for data1, data2 in datas]
    data = []
    for i in concurrent.futures.as_completed(processes):
        data.append(i.result())

多处理和多线程的区别。

用多处理来处理CPU重的东西,用多线程来下载文件等。

当我需要下载一个网站,然后用API把内容翻译成英文时,我使用了这个模块。这真的提高了我的速度表现。

我不知道你想做什么。所以,如果我是你,那么我会试试多线程。

© www.soinside.com 2019 - 2024. All rights reserved.