为什么dask.dataframe to_csv之后有这么多数据丢失

问题描述 投票:0回答:1

我是 Dask 分发的新手,现在我正在做一个简单的测试来学习它,并得到一个非常奇怪的情况,这是我的代码:

import numpy as np
import pandas as pd

import dask.dataframe as dd

data_vol = 2000
index = pd.date_range("2021-09-01", periods=data_vol, freq="1h")
df = pd.DataFrame({"a": np.arange(data_vol), "b": ["abcaddbe"] * data_vol, 'time': index})
ddf = dd.from_pandas(df, npartitions=10)

df2 = pd.DataFrame({"c": np.arange(data_vol), "d": ["xyzopq"] * data_vol, 'time': reversed(index)})
ddf2 = dd.from_pandas(df2, npartitions=10)

ddf['timestamp'] = ddf.time.apply(lambda x: int(x.timestamp()), meta=('time', 'int64'))
ddf2['timestamp'] = ddf2.time.apply(lambda x: int(x.timestamp()), meta=('time', 'int64'))

def merge_onindex(ddf, ddf2):
    ret = ddf.merge(ddf2)
    ret["add"] = ret.a + ret.c + 1
    return ret


from dask.distributed import Client
import dask

dask.config.set({"dataframe.shuffle.method": "tasks"})
client = Client("tcp://172.17.0.2:8786")

ddf_st = client.scatter(ddf.set_index('timestamp'), broadcast=True)
ddf2_st = client.scatter(ddf2.set_index("timestamp"), broadcast=True)

dd_merge_res = client.submit(merge_onindex, ddf_st, ddf2_st)
## Future: merge_onindex status: finished, type: dask.dataframe.core.DataFrame, key: merge_onindex-da1eb54a93de0c19af3093b76230b9f6

dd_merge_res.result().to_csv("/jupyter/merge_single.csv", single_file=True)

然后我运行

wc -l merge_single.csv
,只有几百行,而且每次运行时行号都会不同。

以下是一些标题:

,a,b,time,c,d,add
0,19,abcaddbe,2021-09-01 19:00:00,1980,xyzopq,2000
1,22,abcaddbe,2021-09-01 22:00:00,1977,xyzopq,2000
2,35,abcaddbe,2021-09-02 11:00:00,1964,xyzopq,2000
3,37,abcaddbe,2021-09-02 13:00:00,1962,xyzopq,2000
4,50,abcaddbe,2021-09-03 02:00:00,1949,xyzopq,2000
5,58,abcaddbe,2021-09-03 10:00:00,1941,xyzopq,2000
6,78,abcaddbe,2021-09-04 06:00:00,1921,xyzopq,2000
7,84,abcaddbe,2021-09-04 12:00:00,1915,xyzopq,2000
8,112,abcaddbe,2021-09-05 16:00:00,1887,xyzopq,2000

现有的行是正确的,但许多其他行丢失了!

感谢您的帮助!

我的环境:

docker base image: python:3.8
dask: 2023.5.0
2 docker containers as worker and one as master. Each has 3 cpus.
python dask-distributed dask-dataframe
1个回答
0
投票

您遇到的问题可能是由于在整个 Dask DataFrame 上使用分散然后应用合并操作造成的。此方法可能会导致数据混洗,从而可能导致某些数据丢失或重复。 为了解决这个问题,可以直接在Dask DataFrame上进行合并操作,而不需要使用scatter。 Dask 将在内部处理分布式计算。

import dask.dataframe as dd

data_vol = 2000
index = pd.date_range("2021-09-01", periods=data_vol, freq="1h")
df = pd.DataFrame({"a": np.arange(data_vol), "b": ["abcaddbe"] * data_vol, 'time': index})
ddf = dd.from_pandas(df, npartitions=10)

df2 = pd.DataFrame({"c": np.arange(data_vol), "d": ["xyzopq"] * data_vol, 'time': reversed(index)})
ddf2 = dd.from_pandas(df2, npartitions=10)

ddf['timestamp'] = ddf.time.apply(lambda x: int(x.timestamp()), meta=('time', 'int64'))
ddf2['timestamp'] = ddf2.time.apply(lambda x: int(x.timestamp()), meta=('time', 'int64'))

def merge_onindex(ddf, ddf2):
    ret = ddf.merge(ddf2)
    ret["add"] = ret.a + ret.c + 1
    return ret

ddf = ddf.set_index('timestamp')
ddf2 = ddf2.set_index("timestamp")

# Use the custom merge function with map
dd_merge_res = ddf.map_partitions(custom_merge_onindex, ddf2)

from dask.distributed import Client

client = Client("tcp://172.17.0.2:8786")

dd_merge_res.to_csv("/jupyter/merge_single_custom.csv", single_file=True)
© www.soinside.com 2019 - 2024. All rights reserved.