我有 CSV 文件,其中包含 LAT、LON、MMSI、VesselType 等列的数据。我想读入它并按 MMSI 对其进行分组,并将所有 LAT 和 LON 放入某种地理线数据结构中。
到目前为止,我读取数据并将其保存到镶木地板文件中,如下所示:
from spatialpandas.dask import GeoDataFrame
import dask.dataframe as dd
from dask.diagnostics import ProgressBar
import spatialpandas as sp
...
def convert_partition(self, df):
return GeoDataFrame({
'geometry': sp.geometry.PointArray((df["LON"], df["LAT"])),
'MMSI': df["MMSI"].fillna(0).astype('int32'),
'category': df["VesselType"].replace(self.categories).astype('int32')
})
example = GeoDataFrame({
'geometry': sp.geometry.PointArray([], dtype='float32'),
'MMSI': np.array([], dtype='int32'),
'category': np.array([], dtype='int32')
})
def generate_parquet_files(self, vessel_cols: List[str], df_cols: List[str], index: str = 'MMSI') -> None:
csvs = self.base_dir + self.base_name + '*.csv'
with ProgressBar():
df = dd.read_csv(csvs, usecols=vessel_cols, assume_missing=True)
vessels = df.groupby(index).last().reset_index().compute()
vessels[index] = vessels[index].astype('int32')
vessels.to_parquet(self.vessels_file)
gdf = dd.read_csv(csvs, usecols=df_cols, assume_missing=True)
gdf = gdf.map_partitions(self.convert_partition, meta=self.example).persist()
gdf = gdf.pack_partitions_to_parquet(self.cache_file, npartitions=64).persist()
然后将数据加载到变量中后,我将其分组如下:
grouped = ais.df.compute().groupby(['MMSI', 'category'], group_keys=True).apply(
lambda x: pd.DataFrame({
'geometry': [[[point.x, point.y] for point in x['geometry']]],
}))
它确实有效,但有两个问题:
df.plot(ax=ax, cmap=...,...)
之类的东西总的来说,代码感觉有点乱,所以如果您有其他改进建议,我很乐意听取。