这是我的代码的简化版本。
import dask
import dask.dataframe as dask_frame
from dask.distributed import Client, LocalCluster
def main():
cluster = LocalCluster(n_workers=4, threads_per_worker=2)
client = Client(cluster)
csv_path_one = "" # both have 70 columns and around 70 million rows. at a size of about 25 gigabytes
csv_path_two = ""
# the columns are a mix of ints floats datetimes and strings
# almost all string lengths are less than 15 two of the longest string columns have a max length of 70
left_df = dask_frame.read_csv(csv_path_one, sep="|", quotechar="+", encoding="Latin-1", dtype="object")
right_df = dask_frame.read_csv(csv_path_one, sep=",", quotechar="\"", encoding="utf-8", dtype="object")
cand_keys = [""] # I have 3
merged = dask_frame.merge(left_df, right_df, how='outer', on=cand_keys, suffixes=("_L", "_R"),indicator=True)
missing_mask = merged._merge != 'both'
missing_findings: dask_frame.DataFrame = merged.loc[missing_mask, cand_keys + ["_merge"]]
print(f"Running {client}")
missing_findings.to_csv("results/findings-*.csv")
cluster.close()
client.close()
if __name__ == '__main__':
main()
此示例永远不会结束,昏迷到某个特定的部分,然后一个或多个工作人员立即超过内存限制,保姆杀死了他们并回滚了所有工作人员的进度查看诊断页面时,通常在随机拆分任务的一半左右发生内存高峰。
我正在Windows上运行Dask 2.9.1。我可以更新Dask,但是当前的设置很麻烦,我不知道它是否可以解决我的问题
对2.15的更新已解决此问题。