Spark.read()一次生成多个路径,而不是在for循环中一一对应

问题描述 投票:0回答:1

我正在运行以下代码:

list_of_paths是一个列表,其路径以avro文件结尾。例如,

['folder_1/folder_2/0/2020/05/15/10/41/08.avro', 'folder_1/folder_2/0/2020/05/15/11/41/08.avro', 'folder_1/folder_2/0/2020/05/15/12/41/08.avro']

注意:上面的路径存储在Azure Data Lake存储中,下面的过程在Databricks中执行

spark.conf.set("fs.azure.account.key.{0}.dfs.core.windows.net".format(storage_account_name), storage_account_key)
spark.conf.set("spark.sql.execution.arrow.enabled", "false")
begin_time = time.time()

for i in range(len(list_of_paths)):

    try:
      read_avro_data,avro_decoded=None,None

      #Read paths from Azure Data Lake "abfss"
      read_avro_data=spark.read.format("avro").load("abfss://{0}@{1}.dfs.core.windows.net/{2}".format(storage_container_name, storage_account_name, list_of_paths[i]))

    except Exception as e:
      custom_log(e)

模式

read_avro_data.printSchema()

root
 |-- SequenceNumber: long (nullable = true)
 |-- Offset: string (nullable = true)
 |-- EnqueuedTimeUtc: string (nullable = true)
 |-- SystemProperties: map (nullable = true)
 |    |-- key: string
 |    |-- value: struct (valueContainsNull = true)
 |    |    |-- member0: long (nullable = true)
 |    |    |-- member1: double (nullable = true)
 |    |    |-- member2: string (nullable = true)
 |    |    |-- member3: binary (nullable = true)
 |-- Properties: map (nullable = true)
 |    |-- key: string
 |    |-- value: struct (valueContainsNull = true)
 |    |    |-- member0: long (nullable = true)
 |    |    |-- member1: double (nullable = true)
 |    |    |-- member2: string (nullable = true)
 |    |    |-- member3: binary (nullable = true)
 |-- Body: binary (nullable = true) 
# this is the content of the AVRO file.

行数和列数

print("ROWS: ", read_avro_data.count(), ", NUMBER OF COLUMNS: ", len(read_avro_data.columns))

ROWS:  2 , NUMBER OF COLUMNS:  6

我想要的是not每次迭代读取1个AVRO文件,因此一次迭代读取2行内容。相反,我想一次读取所有AVRO文件。所以2x3 = 6行内容在我的最终Spark DataFrame中。

spark.read()可行吗?类似于以下内容:

spark.read.format("avro").load("abfss://{0}@{1}.dfs.core.windows.net/folder_1/folder_2/0/2020/05/15/*")

感谢您的帮助和建议。

python apache-spark pyspark databricks azure-data-lake
1个回答
1
投票

load(path=None, format=None, schema=None, **options)此方法将接受单个路径或路径列表。

例如,您可以直接传递如下所示的路径列表

spark.read.format("avro").load(["/tmp/dataa/userdata1.avro","/tmp/dataa/userdata2.avro"]).count()

1998

© www.soinside.com 2019 - 2024. All rights reserved.