我正在滑入
dask
并想利用dask.distribution
库来并行计算。考虑以下示例数据框:
import pandas as pd
import numpy as np
import dask.dataframe as dd
N = 100000000
# init pandas data frame
df = pd.DataFrame(data={
"x1": np.random.choice([0, 1, np.nan], size=N, p=[.45, .3, .25]),
"x2": np.random.choice([0, 1, np.nan], size=N, p=[.2, .4, .4]),
"x3": np.random.choice([0, 1], size=N, p=[.1, .9]),
})
# init dask data frame
ddf = dd.from_pandas(df, npartitions=6)
我想并行化一个简单的外部函数,例如:
def get_coalesce(ddf: dd.DataFrame, x1: str, x2: str, x3: str) -> dd.DataFrame:
# Checks whether function is called
print(1)
# Assigns new columns whole data frame
ddf["x4"] = ddf[x1].combine_first(ddf[x2]).combine_first(ddf[x3])
return ddf
没有真正改变它的源代码。有解决办法吗?
我正在考虑使用集群,但这似乎是错误的方法:
from dask.distributed import LocalCluster, Client
with LocalCluster(
n_workers=16,
processes=True,
threads_per_worker=2,
memory_limit="10GB",
) as cluster, Client(cluster) as client:
df = get_coalesce(ddf, **dict(zip(ddf, ddf))).compute()