如何在Python中通过Apache Arrow IPC格式的处理、序列化和反序列化来维护Polars DataFrame元数据?

问题描述 投票:0回答:1

我希望能够使用 python-polars 中的 Apache Arrow IPC 格式通过处理、序列化和反序列化来跟踪与 polars DataFrame 相关的元数据。如何才能做到这一点?

DataFrame 元数据的一个常见用例是存储有关 DataFrame 如何生成的数据或有关其列中包含的数据的元数据,例如加载数据的数据库、关联的时区或可能是 DataFrame 中的 CRS 或坐标。

python python-3.x dataframe metadata python-polars
1个回答
0
投票

一种可能的解决方案是将 polars DataFrame 子类化为“MetaDataFrame”。通过添加 SimpleNamespace 作为属性“meta”,可以使用点语法存储元数据。

正如 我回答了 Polar 的问题,给定一个带有元数据的 MetaDataFrame,您可以通过“元”属性添加、覆盖和访问它:

df.meta.crs = "EPSG:4326"
df.meta.crs = "EPSG:3857"
print(df.meta.crs)

此外,通过利用 Arrow IPC 规范,您可以提供额外的函数来写入和读取 DataFrame,同时还管理元数据。

write_to_ipc_with_meta(df)
loaded_df = load_from_ipc_with_meta(df)
print(loaded_df.meta.crs)
# metadataframe.py
"""Provides functionality for handling Polars DataFrames with custom metadata.

This module enables the serialization and deserialization of Polars DataFrames
along with associated metadata, utilizing the IPC format for data interchange
and `orjson` for fast JSON processing. Metadata management is facilitated through
the use of the `DfMeta` class, a flexible container for arbitrary metadata fields.
Key functions include `write_ipc_with_meta` and `read_ipc_with_meta`, which allow
for the persistence of metadata across storage cycles, enhancing data context
retention and utility in analytical workflows.

Note:
    This module was not written for efficiency or performance, but to solve the
    use case of persisting metadata with Polars DataFrames. It is not recommended for
    production use, but rather as a starting point for more robust metadata management.

Classes:
    DfMeta: A simple namespace for metadata management.
    MetaDataFrame: An extension of Polars DataFrame to include metadata.

Functions:
    write_ipc_with_meta(df, filepath, meta): Serialize DataFrame and metadata to IPC.
    read_ipc_with_meta(filepath): Deserialize DataFrame and metadata from IPC.
"""


# Standard Library
from typing import Any, Dict
from types import SimpleNamespace

# Third Party
import orjson
import polars as pl
import pyarrow as pa


class DfMeta(SimpleNamespace):
    """A simple namespace for storing MetaDataFrame metadata.

    Usage:
        meta = DfMeta(
        name="checkins",
        db_name="my_db",
        tz_name="America/New_York",
        crs="EPSG:4326"
    )
    """

    # Generate a string representation of metadata keys
    def __repr__(self) -> str:
        keys = ", ".join(self.__dict__.keys())
        return f"DfMeta({keys})"

    # Alias __str__ to __repr__ for consistent string representation
    def __str__(self) -> str:
        return self.__repr__()


class MetaDataFrame(pl.DataFrame):
    """A Polars DataFrame extended to include custom metadata.

    Attributes:
        meta (DfMeta): A simple namespace for storing metadata.

    Usage:

        # Create MetaDataFrame with metadata
        meta = DfMeta(
            name="my_df",
            db_name="my_db",
            tz_name="America/New_York",
            crs="EPSG:4326"
        )
        df = MetaDataFrame({"a": [1, 2, 3]}, meta=meta)

        # Create MetaDataFrame then add metadata
        df = MetaDataFrame({"a": [1, 2, 3]})
        df.meta.name = "my_df"
        df.meta.db_name = "my_db"
        df.meta.tz_name = "America/New_York"
        df.meta.crs = "EPSG:4326"

        # Overwrite metadata
        df.meta.crs = "EPSG:3857"

        # Write MetaDataFrame to IPC with metadata
        write_ipc_with_meta(df, "my_df.ipc", df.meta)

        # Read MetaDataFrame from IPC with metadata
        loaded_df = read_ipc_with_meta("my_df.ipc")

        # Access metadata
        print(loaded_df.meta.name)
        print(loaded_df.meta_as_dict())
    """

    # Initialize DataFrame with `meta` attr SimpleNamespace
    def __init__(self, data: Any = None, *args, meta: DfMeta = None, **kwargs):
        super().__init__(data, *args, **kwargs)
        self.meta = meta if meta else DfMeta()

    def meta_as_dict(self) -> dict[str, Any]:
        """Returns the metadata as a dictionary.

        Returns:
            dict[str, Any]: A dictionary representation of the metadata.
        """
        return vars(self.meta)


def write_ipc_with_meta(df: MetaDataFrame, filepath: str, meta: DfMeta) -> None:
    """Serialize MetaDataFrame and metadata stored in `meta` attr to an IPC file.

    Args:
        df (MetaDataFrame): The MetaDataFrame to serialize.
        filepath (str): The path to the IPC file.
        meta (DfMeta): The metadata to serialize.

    Returns:
        None
    """
    # Convert Polars DataFrame to Arrow Table
    arrow_table = df.to_arrow()

    # Serialize metadata to JSON
    meta_dict = {k: v for k, v in meta.__dict__.items()}
    meta_json = orjson.dumps(meta_dict)

    # Embed metadata into Arrow schema
    new_schema = arrow_table.schema.with_metadata({"meta": meta_json})
    arrow_table_with_meta = arrow_table.replace_schema_metadata(new_schema.metadata)

    # Write Arrow table with metadata to IPC file
    with pa.OSFile(filepath, "wb") as sink:
        with pa.RecordBatchStreamWriter(sink, arrow_table_with_meta.schema) as writer:
            writer.write_table(arrow_table_with_meta)


def read_ipc_with_meta(filepath: str) -> MetaDataFrame:
    """Deserialize DataFrame and metadata from an IPC file.

    Args:
        filepath (str): The path to the IPC file.

    Returns:
        MetaDataFrame: The deserialized DataFrame with metadata stored in `meta` attr.
    """
    # Read Arrow table from IPC file
    with pa.OSFile(filepath, "rb") as source:
        reader = pa.ipc.open_stream(source)
        table = reader.read_all()

    # Extract and deserialize metadata from Arrow schema
    meta_json = table.schema.metadata.get(b"meta")
    if meta_json:
        meta_dict = orjson.loads(meta_json)
        meta = DfMeta(**meta_dict)
    else:
        meta = DfMeta()

    # Convert Arrow table to Polars DataFrame and attach metadata
    df = pl.from_arrow(table)
    extended_df = MetaDataFrame(df, meta=meta)
    return extended_df
© www.soinside.com 2019 - 2024. All rights reserved.