我有一个存储帐户 dexflex 和两个容器源和目标。 源容器有以下目录和文件:
results
search
03
Module19111.json
Module19126.json
04
Module11291.json
Module19222.json
product
03
Module18867.json
Module182625.json
04
Module122251.json
Module192287.json
我正在尝试使用以下代码片段将数据从源容器增量复制到目标容器
from datetime import datetime, timedelta
from pyspark.sql import SparkSession
# Set up the source and destination storage account configurations
source_account_name = "dev-stor"
source_container_name = "results"
destination_account_name = "dev-stor"
destination_container_name = "results"
# Set up the source and destination paths
source_path = f"abfss://{source_container_name}@{source_account_name}.dfs.core.windows.net/{search,product}/"
destination_path = f"abfss://{destination_container_name}@{destination_account_name}.dfs.core.windows.net/copy-data-2024"
# Set up the date range for incremental copy
start_date = datetime(2024, 3, 1)
end_date = datetime(2999, 12, 12)
dbutils.fs.cp(source_path, destination_path, recurse=True)
上面的代码是完整副本,但我更倾向于增量复制,即在下次运行中仅复制新文件。
PS。目录层次结构是相同的。
我也尝试过自动加载器,但无法维持相同的分层目录结构。
我可以得到一些专家的建议吗
使用 Azure 数据工厂复制活动,您可以将文件从源增量复制到目标。在那里,您将找到复制行为的选项,允许您根据文件的上次修改日期和时间选择增量复制。
有关更多信息,请参阅此文档。
如果您不想使用 Azure 数据工厂并且仅限于使用 Databricks,则需要创建一个在运行复制脚本时保存时间戳的视图。当您再次运行它时,它将复制上次运行脚本后修改或添加的文件。
使用以下代码:
from pyspark.sql.functions import lit
import datetime
start_date = int(datetime.datetime(2024, 3, 1).timestamp()) * 1000
data = [(start_date,)]
df = spark.createDataFrame(data, ["lastmodifiedcopy"])
df.createOrReplaceTempView("CPY_TBL")
使用以下代码进行增量加载:
def list_files_recursively(folder_path):
file_paths = []
contents = dbutils.fs.ls(folder_path)
for item in contents:
if item.isDir():
file_paths.extend(list_files_recursively(item.path))
else:
file_paths.append(item)
return file_paths
def incremental_copy(source_path, destination_path, last_copied_time):
files = list_files_recursively(source_path)
print(f"Last copied time: {datetime.datetime.fromtimestamp(last_copied_time / 1000).strftime('%d-%m-%Y %H:%M:%S')}")
for file in files:
if file.modificationTime >= last_copied_time:
destination_file_path = destination_path + file.path.replace(source_path, '')
print(destination_file_path)
dbutils.fs.cp(file.path, destination_file_path)
current_time = int(datetime.datetime.now().timestamp()) * 1000
data = [(current_time,)]
df = spark.createDataFrame(data, ["lastmodifiedcopy"])
df.createOrReplaceTempView("CPY_TBL")
复制文件后,将临时视图更新为当前时间,以供以后的文件复制再次使用。
像这样调用函数:
last_copied_time = spark.sql("select * from CPY_TBL").collect()[0][0]
incremental_copy(source_path, destination_path, last_copied_time)
输出:
第一轮
上传一些数据后第二次运行。