为了在实际数据上测试 Rust,我想连接到 ODBC 数据库。我已经取得了一些进展,但我正在努力检索光标的输出。
到目前为止,我取得的最好成绩是打印“Hurray”200 次,但这不是我想要的。如果有人可以指导我如何将数据推送到 Polars DataFrame,我将非常感激。有谁知道我怎样才能做到这一点?
fn main() {
use odbc_api::{ConnectionOptions, Cursor, Environment};
let connection_string = connection_string();
let env = Environment::new().unwrap();
let conn = env
.connect_with_connection_string(&connection_string, ConnectionOptions::default())
.unwrap();
let result_set = conn
.execute(&String::from("SELECT TOP 200 * FROM pub.table"), ())
.unwrap();
if let Some(mut cursor) = result_set {
loop {
match cursor.next_row() {
Ok(Some(_cursor_row)) => {
println!("Hurray!")
}
Ok(None) => {
break;
}
Err(err) => {
eprintln!("Error while fetching next row: {:?}", err);
break;
}
}
}
} else {
eprintln!("Result set is None");
}
}
我找到了一个可行的解决方案:
use anyhow::Error;
use odbc_api::{buffers::TextRowSet, ConnectionOptions, Cursor, Environment, ResultSetMetadata};
use polars::error::PolarsResult;
use polars::frame::DataFrame;
use polars::prelude::*;
use std::{
ffi::CStr,
fs::File,
io::{stdout, Write},
path::PathBuf,
time::Instant,
};
fn connection_string() -> String {
let hostname = String::from();
let port = String::from();
let database = String::from();
let username = String::from();
let password = String::from();
let driver = String::from("{Progress OpenEdge 11.7 Driver}");
let connection_string = format!(
"DRIVER={};HostName={};DATABASENAME={};PORTNUMBER={};LogonID={};PASSWORD={}",
driver, hostname, database, port, username, password
);
connection_string
}
const BATCH_SIZE: usize = 100000;
fn extract_data() -> Result<(), Error> {
let out = stdout();
let mut writer = csv::Writer::from_writer(out);
let connection_string = connection_string();
let env = Environment::new()?;
let conn =
env.connect_with_connection_string(&connection_string, ConnectionOptions::default())?;
match conn.execute(
&String::from("SELECT top 200 * FROM pub.table"),
(),
)? {
Some(mut cursor) => {
let mut headline: Vec<String> = cursor.column_names()?.collect::<Result<_, _>>()?;
writer.write_record(headline)?;
let mut buffers = TextRowSet::for_cursor(BATCH_SIZE, &mut cursor, Some(4096))?;
let mut row_set_cursor = cursor.bind_buffer(&mut buffers)?;
while let Some(batch) = row_set_cursor.fetch()? {
// Within a batch, iterate over every row
for row_index in 0..batch.num_rows() {
// Within a row iterate over every column
let record = (0..batch.num_cols())
.map(|col_index| {
let column_data = batch.at(col_index, row_index).unwrap_or(&[]);
// Convert non-UTF-8 bytes to UTF-8
String::from_utf8_lossy(column_data).to_string()
})
.collect::<Vec<String>>();
// Writes row as csv
writer.write_record(&record)?;
}
}
}
None => {
eprintln!("Query came back empty. No output has been created.");
}
}
Ok(())
}
fn dataframe_creator() -> PolarsResult<DataFrame> {
let df = CsvReader::from_path("output.csv")?
.has_header(true)
.finish();
df
}
fn main() {
let now = Instant::now();
let _ = extract_data();
let pl_df = dataframe_creator();
println!("{:#?}", pl_df);
let elapsed = now.elapsed();
println!("Elapsed {:#?}", elapsed);
}
不幸的是,我必须将数据导出到 CSV 文件,然后将其导入到 DataFrame 中,这意味着相同的 Python 脚本运行得更快。希望有人可以帮助我改进我的脚本。