跨 dask 数据帧分区的方法

问题描述 投票:0回答:0

我正在滑入

dask
并想利用
dask.distribution
库来并行计算。考虑以下示例数据框:

import pandas as pd
import numpy as np
import dask.dataframe as dd

N = 100000000
# init pandas data frame
df = pd.DataFrame(data={
    "x1": np.random.choice([0, 1, np.nan], size=N, p=[.45, .3, .25]),
    "x2": np.random.choice([0, 1, np.nan], size=N, p=[.2, .4, .4]),
    "x3": np.random.choice([0, 1], size=N, p=[.1, .9]),
})
# init dask data frame 
ddf = dd.from_pandas(df, npartitions=6)

我想并行化一个简单的外部函数,例如:

def get_coalesce(ddf: dd.DataFrame, x1: str, x2: str, x3: str) -> dd.DataFrame:
    # Checks whether function is called
    print(1)
    # Assigns new columns whole data frame
    ddf["x4"] = ddf[x1].combine_first(ddf[x2]).combine_first(ddf[x3])
    return ddf

没有真正改变它的源代码。有解决办法吗?

我正在考虑使用集群,但这似乎是错误的方法:

from dask.distributed import LocalCluster, Client

with LocalCluster(
    n_workers=16,
    processes=True,
    threads_per_worker=2,
    memory_limit="10GB",
) as cluster, Client(cluster) as client:
    df = get_coalesce(ddf, **dict(zip(ddf, ddf))).compute()
python pandas parallel-processing dask dask-distributed
© www.soinside.com 2019 - 2024. All rights reserved.