在 Databricks 中使用 AutoLoader 时是否可以不合并镶木地板?

问题描述 投票:0回答:1

在 Databricks 中使用 AutoLoader 时是否可以不合并镶木地板? 问题是,我想将数据从 S3 存储桶直接复制到 Azure Blob 存储,而不合并它。只需从 S3 到 Azure Blob 的 1:1 复制

这是我的代码:


cda_path = f'{planet}/{center}/{deployment}'

streams = list()

for table_name in tables:
    streams.append(
        (table_name, spark.readStream.format("cloudFiles")\
        .option("cloudFiles.format", "parquet")\
        .option('cloudFiles.schemaLocation', f'dbfs:/FileStore/shared_uploads/checkpints/stream_{table_name}')\
        .option('cloudFiles.schemaEvolutionMode', 'rescue')\
        .load(f'dbfs:/mnt/gwcp/{cda_path}/{table_name}/*'))
        )

for table_name, stream in streams:
    blob_container = "databricks-container" 
    blob_output_path = f"/mnt/test_mount_databricks/{planet}/{center}/{deployment}/{table_name}_test"

    stream.writeStream\
        .format("parquet")\
        .outputMode("append")\
        .option("checkpointLocation", f"dbfs:/FileStore/shared_uploads/checkpoints/blob_{table_name}_test")\
        .option("mergeSchema", "false")\
        .start(blob_output_path)

此代码有效,但作为输出,它给出具有多行的镶木地板,例如 100。“源镶木地板”只有 1 到 2 行。正如你所看到的,我尝试将“mergeSchema”选项设置为 false,但没有成功。我看不到任何关于这个东西的主题,也没有在 Databricks 文档或谷歌中找到任何内容。

谢谢!

databricks azure-databricks aws-databricks databricks-autoloader
1个回答
0
投票

是的,我认为您可以实现从 S3 到 Azure Blob 存储的 1:1 复制,而无需使用 Databricks 中的 AutoLoader 合并 Parquet 文件。确保 Azure Blob 中的输出文件与源 Parquet 文件保持相同的结构和大小。

下面是更新的代码,尝试一下,

cda_path = f'{planet}/{center}/{deployment}'

streams = []

for table_name in tables:
    streams.append(
        (table_name, spark.readStream.format("cloudFiles")
         .option("cloudFiles.format", "parquet")
         .option('cloudFiles.schemaLocation', f'dbfs:/FileStore/shared_uploads/checkpints/stream_{table_name}')
         .option('cloudFiles.schemaEvolutionMode', 'rescue')
         .load(f'dbfs:/mnt/gwcp/{cda_path}/{table_name}/*'))
    )

for table_name, stream in streams:
    blob_output_path = f"wasbs://<container_name>@<storage_account_name>.blob.core.windows.net/{planet}/{center}/{deployment}/{table_name}_test"

    stream.writeStream\
        .format("parquet")
        .outputMode("append")
        .option("checkpointLocation", f"dbfs:/FileStore/shared_uploads/checkpoints/blob_{table_name}_test")
        .start(blob_output_path)

© www.soinside.com 2019 - 2024. All rights reserved.