pyspark - 如何使用获得的凭据从 ADLS 读取

问题描述 投票:0回答:1

已编辑

我能够使用 pandas 和 InteractiveBrowserCredential() 读取 ADLS Gen 2 上的文件

#### abfs read test
# from azure.identity import ClientSecretCredential
# from azure.identity import DefaultAzureCredential
from azure.identity import InteractiveBrowserCredential
from azure.storage.filedatalake import DataLakeServiceClient
from azure.mgmt.resource import SubscriptionClient
from adlfs import AzureBlobFileSystem
import pandas as pd
import yaml
import os
from pyspark.sql import SparkSession

with open('conf/local/credentials.yml', 'r') as file:
    creds = yaml.safe_load(file)
storage_account_name = creds['storage_account_name']
tenant_id = creds['tenant_id']
file_system = 'project-data' # container
file_path = "myfile.txt.gz"

# authenticating
credential = InteractiveBrowserCredential()
subscription_client = SubscriptionClient(credential)
subscription = next(subscription_client.subscriptions.list()) # launches interactive window
print(subscription.subscription_id)
print(subscription.tenant_id) 

dir(credential)
credential._client_id
credential._authority
credential._tenant_id
### authentication successful

现在,使用正确的参数使用 pandas 读取会成功

# reading pandas
service_client = DataLakeServiceClient(account_url="{}://{}.dfs.core.windows.net".format(
    "https", storage_account_name), credential=credential)

file_system_client = service_client.get_file_system_client(file_system)
file_client = file_system_client.get_file_client(file_path)

abfs = AzureBlobFileSystem(account_name=storage_account_name, credential=credential)
with abfs.open(f"{file_system}/{file_path}", "rb") as f:
    df = pd.read_csv(f, compression="gzip", sep="\t", header=None, encoding="latin1") # OK******

我的问题是,如何做完全相同的事情 - 即使用已经获得的

credential
对象,但使用 Spark?

ff = abfs.open(f"{file_system}/{file_path}", "rb")
spark.read.format("csv").load(ff.full_name)

给予

py4j.protocol.Py4JJavaError: An error occurred while calling o38.load.
: abfs://project-data/myfile.txt.gz has invalid authority.

file=f"abfss://project-data@{storage_account_name}.dfs.core.windows.net/{file_path}"
xx = spark.read.format("csv").load(file)

# py4j.protocol.Py4JJavaError: An error occurred while calling o56.load.
# : Failure to initialize configuration
azure pyspark azure-blob-storage
1个回答
0
投票

对于 Spark,您需要将文件路径传递给加载函数,如下所示,以从存储帐户读取文件:

<protocol>://<container-name>@<storage-account-name>.dfs.core.windows.net/<path-to-data>

协议将是

abfss
wasbs

对于授权,应配置以下其中一项:

要使用上述协议,在使用Azure VM基础环境时,还需要一些包,例如hadoop-azureazure-storage

但是,您正在经过如下路径:

abfs://project-data/myfile.txt.gz

仅在

adlfs
中受支持。

您可以尝试以下方法之一:

  1. ff
    中读取的数据复制到本地文件并加载。
with abfs.open(f"{file_system}/{file_path}", "r") as f:
    with open("/tmp_folder/tmp.csv",'w') as f2:
        f2.write(f.read())
        
spark.read.csv("/tmp_folder/tmp.csv").show()

输出:

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|  22|    1|    0|       A/5 21171|   7.25| NULL|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|  38|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|  26|    0|    0|STON/O2. 3101282|  7.925| NULL|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|  35|    1|    0|          113803|   53.1| C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male|  35|    0|    0|          373450|   8.05| NULL|       S|
  1. 如果你已经有了
    adlfs
    包,你可以直接在pandas中读取它并将其转换为Spark DataFrame。
storage_options = {'account_name':'jadls2','account_key': key}
#storage_options={'tenant_id': TENANT_ID, 'client_id': CLIENT_ID, 'client_secret': CLIENT_SECRET}
#path='abfs://{CONTAINER}/{FOLDER}/*.csv'
ddf = pd.read_csv('abfs://data/pdf/titanic.csv', storage_options=storage_options)
spark.createDataFrame(ddf).show()

输出:

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| NULL|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| NULL|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05| NULL|       S|
|          6|       0|     3|    Moran, Mr. James|  male|NULL|    0|    0|          330877| 8.4583| NULL|       Q|
|          7|       0|     1|McCarthy, Mr. Tim...|  male|54.0|    0|    0|           17463|51.8625|  E46|       S|
|          8|       0|     3|Palsson, Master. ...|  male| 2.0|    3|    1|          349909| 21.075| NULL|       S|
|          9|       1|     3|Johnson, Mrs. Osc...|female|27.0|    0|    2|          347742|11.1333| NULL|       S|

您可以了解更多关于

adlfs
这里

© www.soinside.com 2019 - 2024. All rights reserved.