Arrow RecordBatch 作为 Polars DataFrame

问题描述 投票:0回答:1

我经常需要从 Microsoft SQL 服务器获取大量数据,以便使用 Rust 中的 Polars 进行操作,并且根据企业安全策略或多或少被迫使用 ODBC 进行这些连接。 ODBC 要求限制我使用 ConnectorX 等成熟且功能齐全的库。我能够使用 arrow_odbc 连接并高效地将查询结果读取到 Arrow 中的 RecordBatch 对象中,但无法将这些 RecordBatch 对象转换为 Polars DataFrames。

因为

RecordBatch
Series
的实际数据组件具有相同的底层表示,我认为可以从
DataFrame
零拷贝创建
RecordBatch

但是在

columns.push(Series::from_arrow(&schema.fields().get(i).unwrap().name(), *column)?);
我收到错误:

mismatched types
expected struct `std::boxed::Box<(dyn polars::export::polars_arrow::array::Array + 'static)>`
   found struct `Arc<dyn arrow::array::Array>`

我的印象是

Arc<dyn Array>
就是
ArrayRef
,真正的问题也许是我有一个
Arc<dyn arrow::array::Array>
并且
Series::from_arrow()
期待着Polars
Arc<Array>
?如果是这样,我该如何解决?

我的完整代码如下供参考。

use arrow_odbc::{odbc_api::{Environment, ConnectionOptions}, OdbcReaderBuilder};
use arrow::record_batch::RecordBatch;
use polars::prelude::*;
use anyhow::Result;

const CONNECTION_STRING: &str = "...";

pub fn test() -> Result<()> {

    let odbc_environment = Environment::new()?;
     
    let connection = odbc_environment.connect_with_connection_string(
        CONNECTION_STRING,
        ConnectionOptions::default()
    )?;

    let cursor = connection.execute("SELECT * FROM Backcast_Power_Plant_Map", ())?.unwrap();

    let arrow_record_batches = OdbcReaderBuilder::new().build(cursor)?;

    fn record_batch_to_dataframe(batch: &RecordBatch) -> Result<DataFrame, PolarsError> {
        let schema = batch.schema();
        let mut columns = Vec::with_capacity(batch.num_columns());
        for (i, column) in batch.columns().iter().enumerate() {
            columns.push(Series::from_arrow(&schema.fields().get(i).unwrap().name(), *column)?);
        }
        Ok(DataFrame::from_iter(columns))
    }

    for batch in arrow_record_batches {
        dbg!(record_batch_to_dataframe(&batch?));
    }

    Ok(())
}
rust odbc apache-arrow rust-polars
1个回答
0
投票

看来

polars
arrow-odbc
使用不同的箭头箱:
polars
使用
polars-arrow
arrow-odbc
使用
arrow
。前者的数组类型是
Box<dyn polars_arrow::array::Array>
,而后者的类型是
ArrayRef
,它是
Arc<dyn arrow::array::Array>
的别名。

对我们来说幸运的是,

polars-arrow
板条箱中存在一个兼容层。您可以通过
From
impls 在两种类型(以及更多类型)之间进行转换:

use anyhow::Result;
use arrow::record_batch::RecordBatch;
use arrow_odbc::{
    odbc_api::{ConnectionOptions, Environment},
    OdbcReaderBuilder,
};
use polars::prelude::*;

const CONNECTION_STRING: &str = "...";

pub fn test() -> Result<()> {
    let odbc_environment = Environment::new()?;

    let connection = odbc_environment
        .connect_with_connection_string(CONNECTION_STRING, ConnectionOptions::default())?;

    let cursor = connection
        .execute("SELECT * FROM Backcast_Power_Plant_Map", ())?
        .unwrap();

    let arrow_record_batches = OdbcReaderBuilder::new().build(cursor)?;

    fn record_batch_to_dataframe(batch: &RecordBatch) -> Result<DataFrame, PolarsError> {
        let schema = batch.schema();
        let mut columns = Vec::with_capacity(batch.num_columns());
        for (i, column) in batch.columns().iter().enumerate() {
            let arrow = Box::<dyn polars_arrow::array::Array>::from(&**column);
            columns.push(Series::from_arrow(
                &schema.fields().get(i).unwrap().name(),
                arrow,
            )?);
        }
        Ok(DataFrame::from_iter(columns))
    }

    for batch in arrow_record_batches {
        dbg!(record_batch_to_dataframe(&batch?));
    }

    Ok(())
}

请注意,这需要

polars-arrow
arrow_rs
功能作为依赖项。

据我所知,这并没有复制实际数据。

© www.soinside.com 2019 - 2024. All rights reserved.