我经常需要从 Microsoft SQL 服务器获取大量数据,以便使用 Rust 中的 Polars 进行操作,并且根据企业安全策略或多或少被迫使用 ODBC 进行这些连接。 ODBC 要求限制我使用 ConnectorX 等成熟且功能齐全的库。我能够使用 arrow_odbc 连接并高效地将查询结果读取到 Arrow 中的 RecordBatch 对象中,但无法将这些 RecordBatch 对象转换为 Polars DataFrames。
因为
RecordBatch
和 Series
的实际数据组件具有相同的底层表示,我认为可以从 DataFrame
零拷贝创建 RecordBatch
。
但是在
columns.push(Series::from_arrow(&schema.fields().get(i).unwrap().name(), *column)?);
我收到错误:
mismatched types
expected struct `std::boxed::Box<(dyn polars::export::polars_arrow::array::Array + 'static)>`
found struct `Arc<dyn arrow::array::Array>`
我的印象是
Arc<dyn Array>
就是ArrayRef
,真正的问题也许是我有一个Arc<dyn arrow::array::Array>
并且Series::from_arrow()
期待着PolarsArc<Array>
?如果是这样,我该如何解决?
我的完整代码如下供参考。
use arrow_odbc::{odbc_api::{Environment, ConnectionOptions}, OdbcReaderBuilder};
use arrow::record_batch::RecordBatch;
use polars::prelude::*;
use anyhow::Result;
const CONNECTION_STRING: &str = "...";
pub fn test() -> Result<()> {
let odbc_environment = Environment::new()?;
let connection = odbc_environment.connect_with_connection_string(
CONNECTION_STRING,
ConnectionOptions::default()
)?;
let cursor = connection.execute("SELECT * FROM Backcast_Power_Plant_Map", ())?.unwrap();
let arrow_record_batches = OdbcReaderBuilder::new().build(cursor)?;
fn record_batch_to_dataframe(batch: &RecordBatch) -> Result<DataFrame, PolarsError> {
let schema = batch.schema();
let mut columns = Vec::with_capacity(batch.num_columns());
for (i, column) in batch.columns().iter().enumerate() {
columns.push(Series::from_arrow(&schema.fields().get(i).unwrap().name(), *column)?);
}
Ok(DataFrame::from_iter(columns))
}
for batch in arrow_record_batches {
dbg!(record_batch_to_dataframe(&batch?));
}
Ok(())
}
看来
polars
和arrow-odbc
使用不同的箭头箱:polars
使用polars-arrow
,arrow-odbc
使用arrow
。前者的数组类型是Box<dyn polars_arrow::array::Array>
,而后者的类型是ArrayRef
,它是Arc<dyn arrow::array::Array>
的别名。
对我们来说幸运的是,
polars-arrow
板条箱中存在一个兼容层。您可以通过 From
impls 在两种类型(以及更多类型)之间进行转换:
use anyhow::Result;
use arrow::record_batch::RecordBatch;
use arrow_odbc::{
odbc_api::{ConnectionOptions, Environment},
OdbcReaderBuilder,
};
use polars::prelude::*;
const CONNECTION_STRING: &str = "...";
pub fn test() -> Result<()> {
let odbc_environment = Environment::new()?;
let connection = odbc_environment
.connect_with_connection_string(CONNECTION_STRING, ConnectionOptions::default())?;
let cursor = connection
.execute("SELECT * FROM Backcast_Power_Plant_Map", ())?
.unwrap();
let arrow_record_batches = OdbcReaderBuilder::new().build(cursor)?;
fn record_batch_to_dataframe(batch: &RecordBatch) -> Result<DataFrame, PolarsError> {
let schema = batch.schema();
let mut columns = Vec::with_capacity(batch.num_columns());
for (i, column) in batch.columns().iter().enumerate() {
let arrow = Box::<dyn polars_arrow::array::Array>::from(&**column);
columns.push(Series::from_arrow(
&schema.fields().get(i).unwrap().name(),
arrow,
)?);
}
Ok(DataFrame::from_iter(columns))
}
for batch in arrow_record_batches {
dbg!(record_batch_to_dataframe(&batch?));
}
Ok(())
}
请注意,这需要
polars-arrow
和 arrow_rs
功能作为依赖项。
据我所知,这并没有复制实际数据。