如何通过 s3 存储桶读取 Delta Lake 表

问题描述 投票:0回答:1

从文档here可以看到,可以使用

deltalake::open_table
方法打开位于文件系统上的Delta Lake表。这我已经能做到了。

但是我一直无法弄清楚如何在 s3 上打开 Delta Lake 表。

可以在

here
找到deltalake::open_table的文档说

使用当前元数据从给定路径创建并加载 DeltaTable。从给定表路径中的方案推断要使用的存储后端。

我尝试将 url 传递到 s3 存储桶,因为文档说将推断存储后端。但这样做

open_table("s3://127.0.0.1:9000/deltalake/delta-table")
会导致恐慌

thread 'main' panicked at 'called `Result::unwrap()` on an `Err` value: LoadCheckpoint { source: Storage { source: Generic { store: "S3", source: Error { retries: 0, message: "request error", source: Some(reqwest::Error { kind: Request, url: Url { scheme: "http", cannot_be_a_base: false, username: "", password: None, host: Some(Ipv4(169.254.169.254)), port: None, path: "/latest/api/token", query: None, fragment: None }, source: hyper::Error(Connect, ConnectError("tcp connect error", Os { code: 60, kind: TimedOut, message: "Operation timed out" })) }) } } } }', src/main.rs:71:10
stack backtrace:
   0: rust_begin_unwind

我有一种感觉这应该是微不足道的,但不幸的是事实并非如此:(

那么有人知道如何让 open_table 打开位于 s3 存储桶上的表吗?

rust delta-lake apache-arrow
1个回答
0
投票

我相信您可以使用

open_table_with_storage_options
来完成此操作,因为
AWS_ENDPOINT_URL
是选项之一
。我在这个要点中有一个 在 Rust 中阅读 S3 的工作示例,你可以调整它。

use deltalake::open_table_with_storage_options;
use std::{collections::HashMap, error::Error};
use aws_credential_types::provider::ProvideCredentials;

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
    let table_uri = "s3://my-valid-table";

    // I am using a specific profile
    let sdk_config = aws_config::load_from_env().await;
    let cp = sdk_config
        .credentials_provider().expect("credentials");
    let creds = cp.provide_credentials().await?;

    let mut options = HashMap::new();
    options.insert("AWS_ACCESS_KEY_ID".into(), creds.access_key_id().to_string());
    options.insert("AWS_SECRET_ACCESS_KEY".into(), creds.secret_access_key().to_string());
    options.insert("AWS_SESSION_TOKEN".into(), creds.session_token().expect("got token").to_string());

    // you need to be psychic to know to do this https://github.com/delta-io/delta-rs/releases/tag/rust-v0.17.0
    deltalake::aws::register_handlers(None);

    let dt = open_table_with_storage_options(table_uri, options).await.expect("got table");
    println!("{:?} version: {}", table_uri, dt.version());

    Ok(())
}
© www.soinside.com 2019 - 2024. All rights reserved.