使用 pyspark 将文件结构(包括文件)从一个存储增量复制到另一个存储

问题描述 投票:0回答:1

我有一个存储帐户 dexflex 和两个容器源和目标。 源容器有以下目录和文件:

results  
    search
        03
            Module19111.json
            Module19126.json
        04
            Module11291.json
            Module19222.json
    product
        03
            Module18867.json
            Module182625.json
        04
            Module122251.json
            Module192287.json

我正在尝试使用以下代码片段将数据从源容器增量复制到目标容器

from datetime import datetime, timedelta
from pyspark.sql import SparkSession


# Set up the source and destination storage account configurations
source_account_name = "dev-stor"
source_container_name = "results"
destination_account_name = "dev-stor"
destination_container_name = "results"

# Set up the source and destination paths
source_path = f"abfss://{source_container_name}@{source_account_name}.dfs.core.windows.net/{search,product}/"
destination_path = f"abfss://{destination_container_name}@{destination_account_name}.dfs.core.windows.net/copy-data-2024"

# Set up the date range for incremental copy
start_date = datetime(2024, 3, 1)
end_date = datetime(2999, 12, 12)

dbutils.fs.cp(source_path, destination_path, recurse=True)

上面的代码是完整副本,但我更倾向于增量复制,即在下次运行中仅复制新文件。

PS。目录层次结构是相同的。

我也尝试过自动加载器,但无法维持相同的分层目录结构。

我可以得到一些专家的建议吗

pyspark azure-databricks
1个回答
0
投票

使用 Azure 数据工厂复制活动,您可以将文件从源增量复制到目标。在那里,您将找到复制行为的选项,允许您根据文件的上次修改日期和时间选择增量复制。

有关更多信息,请参阅文档。

如果您不想使用 Azure 数据工厂并且仅限于使用 Databricks,则需要创建一个在运行复制脚本时保存时间戳的视图。当您再次运行它时,它将复制上次运行脚本后修改或添加的文件。

使用以下代码:

from pyspark.sql.functions import lit
import datetime

start_date = int(datetime.datetime(2024, 3, 1).timestamp()) * 1000
data = [(start_date,)]
df = spark.createDataFrame(data, ["lastmodifiedcopy"])
df.createOrReplaceTempView("CPY_TBL")

使用以下代码进行增量加载:

def list_files_recursively(folder_path):
    file_paths = []
    contents = dbutils.fs.ls(folder_path)
    for item in contents:
        if item.isDir():
            file_paths.extend(list_files_recursively(item.path))
        else:
            file_paths.append(item)
    return file_paths

def incremental_copy(source_path, destination_path, last_copied_time):
    files = list_files_recursively(source_path)
    print(f"Last copied time: {datetime.datetime.fromtimestamp(last_copied_time / 1000).strftime('%d-%m-%Y %H:%M:%S')}")
    for file in files:
        if file.modificationTime >= last_copied_time:
            destination_file_path = destination_path + file.path.replace(source_path, '')
            print(destination_file_path)
            dbutils.fs.cp(file.path, destination_file_path)

    current_time = int(datetime.datetime.now().timestamp()) * 1000
    data = [(current_time,)]
    df = spark.createDataFrame(data, ["lastmodifiedcopy"])
    df.createOrReplaceTempView("CPY_TBL")

复制文件后,将临时视图更新为当前时间,以供以后的文件复制再次使用。

像这样调用函数:

last_copied_time = spark.sql("select * from CPY_TBL").collect()[0][0]
incremental_copy(source_path, destination_path, last_copied_time)

输出:

第一轮

enter image description here

上传一些数据后第二次运行。

enter image description here

© www.soinside.com 2019 - 2024. All rights reserved.