使用 delta-rs Python API 连接到 Azure Data Lake Storage Gen 2 上的 Delta Lake 并进行身份验证

问题描述 投票:0回答:2

我正在尝试使用 Delta-rs Python API 连接并验证 Azure Data Lake Storage Gen 2 中的现有 Delta 表。我从这个 StackOverflow 问题中找到了 Delta-rs 库:Delta Lake 独立于 Apache Spark?

但是,Delta-rs 的文档(https://delta-io.github.io/delta-rs/python/usage.htmlhttps://docs.rs/object_store/latest/object_store/azure /enum.AzureConfigKey.html#variant.SasKey)对于 Azure Data Lake Storage Gen 2 的身份验证和连接过程相当模糊。我无法找到一个清晰的示例来演示所需的步骤。

有人可以提供有关如何使用 Delta-rs Python API 连接到 Azure Data Lake Storage Gen 2 Delta 表并进行身份验证的分步指南或示例吗?

python authentication azure-storage azure-data-lake delta-lake
2个回答
1
投票

您可以使用以下 Python 代码与 Azure Data Lake Storage (ADLS) 上的 Delta Lake 进行交互,并使用 SAS 令牌进行身份验证。此代码从 ADLS 容器读取 CSV 文件,将其内容附加到 Delta Lake,并打印一些元数据。

首先,确保您已安装所需的库:

pip install deltalake pandas numpy

然后,使用这个Python脚本:

import deltalake as dl
from deltalake.writer import write_deltalake
import pandas as pd
import numpy as np

# Define your SAS token, storage account name, container name, and file path
sas_token = "<please_generate_sas_token_using_a_sap_stored_acces_policy>"
storage_account_name = "mystorage"
container_name = "test-container"
csv_file = "test_delta/test_csv_data/products1.csv"
delta_path = "test_delta/light_delta_lake"
 
#csv url
csv_url = f"https://{storage_account_name}.dfs.core.windows.net/{container_name}/{csv_file}?{sas_token}"
 
# Choose the protocol (abfs or abfss)
# https://learn.microsoft.com/en-us/azure/storage/blobs/data-lake-storage-introduction-abfs-uri
protocol = "abfss"  # Use "abfs" for non-secure connections
 
# Construct the URL for the specified folder
delta_url = f"{protocol}://{container_name}@{storage_account_name}.dfs.core.windows.net/{delta_path}"
 
# Give SAS_TOKEN as storage option (can be set via ENV variable as well)
storage_options = {"SAS_TOKEN": f"{sas_token}"}
 
print(csv_url.replace(sas_token, "<SECRET>"))
print(' ')
print(str(storage_options).replace(sas_token, "<SECRET>"))
print(delta_url.replace(sas_token, "<SECRET>"))

# Read the Delta table from the storage account 
dt = dl.DeltaTable(delta_url, storage_options=storage_options)
 
# Print the schema and file URIs of the Delta table
print(dt.schema())
print(dt.file_uris())
 
# Print the history of the Delta table as a DataFrame
print(pd.DataFrame(dt.history()))
 
# Read the CSV file, modify the data, and convert it to a DataFrame
data = pd.read_csv(csv_url).assign(stars=lambda df: df['rating'].astype(np.int32)).drop(['description', 'ingredients'], axis=1).astype({'rating_count': np.int32})
data.head()
 
# Append the DataFrame to the Delta table
write_deltalake(table_or_uri=dt, data=data, mode="append")
 
# Print the updated file URIs and history of the Delta table
print(dt.file_uris())
print(pd.DataFrame(dt.history()))

0
投票

以下代码有效 - 使用 cli 凭据:

storage_options = {
"azure_storage_account_name": ACCOUNT_NAME, 
"use_azure_cli":"true"
}
dt=DeltaTable("az://CONTAINER/FOLDER/",storage_options=storage_options)

希望能够将凭据传递给函数,例如 AzureDefaultCredential() 或类似的。目前看来,在本地测试和在托管服务上运行时需要单独的实现。

© www.soinside.com 2019 - 2024. All rights reserved.