高效加载和分组空间 CSV 数据

问题描述 投票:0回答:0

我有 CSV 文件,其中包含 LAT、LON、MMSI、VesselType 等列的数据。我想读入它并按 MMSI 对其进行分组,并将所有 LAT 和 LON 放入某种地理线数据结构中。

到目前为止,我读取数据并将其保存到镶木地板文件中,如下所示:

from spatialpandas.dask import GeoDataFrame
import dask.dataframe as dd
from dask.diagnostics import ProgressBar
import spatialpandas as sp
...
def convert_partition(self, df):
        return GeoDataFrame({
            'geometry': sp.geometry.PointArray((df["LON"], df["LAT"])),
            'MMSI':     df["MMSI"].fillna(0).astype('int32'),
            'category': df["VesselType"].replace(self.categories).astype('int32')
        })

    example = GeoDataFrame({
        'geometry': sp.geometry.PointArray([], dtype='float32'),
        'MMSI':     np.array([], dtype='int32'),
        'category': np.array([], dtype='int32')
    })

def generate_parquet_files(self, vessel_cols: List[str], df_cols: List[str], index: str = 'MMSI') -> None:
    csvs = self.base_dir + self.base_name + '*.csv'
    with ProgressBar():
        df = dd.read_csv(csvs, usecols=vessel_cols, assume_missing=True)
        vessels = df.groupby(index).last().reset_index().compute()
        vessels[index] = vessels[index].astype('int32')
        vessels.to_parquet(self.vessels_file)

        gdf = dd.read_csv(csvs, usecols=df_cols, assume_missing=True)
        gdf = gdf.map_partitions(self.convert_partition, meta=self.example).persist()

        gdf = gdf.pack_partitions_to_parquet(self.cache_file, npartitions=64).persist()

然后将数据加载到变量中后,我将其分组如下:

grouped = ais.df.compute().groupby(['MMSI', 'category'], group_keys=True).apply(
    lambda x: pd.DataFrame({
        'geometry': [[[point.x, point.y] for point in x['geometry']]],
    }))

它确实有效,但有两个问题:

  1. 超级慢,我想边写parquet文件边做分组
  2. 这只是一个列表列表,我正在寻找一种地理数据类型的东西(我真的不知道哪个是最好的)并且我可以在稍后调用
    df.plot(ax=ax, cmap=...,...)
    之类的东西

总的来说,代码感觉有点乱,所以如果您有其他改进建议,我很乐意听取。

python dataframe group-by dask
© www.soinside.com 2019 - 2024. All rights reserved.