我现有的解决方案是读取 base64 字符串并将其作为文件写入 blob 存储
# Initialize Azure Blob Service Client
connection_string = "DefaultEndpointsProtocol=https;AccountName=xxxxx;AccountKey=xxxxxxx;EndpointSuffix=core.windows.net" # Replace with your connection string
container_name = "sandpit/Attachments" # Replace with your container name
blob_service_client = BlobServiceClient.from_connection_string(connection_string)
def write_file_to_blob(data, filename):
blob_client = blob_service_client.get_blob_client(container=container_name, blob=filename)
blob_client.upload_blob(data, overwrite=True)
# UDF to decode base64
def decode_base64(base64_str):
return base64.b64decode(base64_str)
# Register UDF
decode_udf = udf(decode_base64, BinaryType())
并且在上面调用为
collected_data = df_with_decoded_data.collect()
# Write each file to blob storage
for row in collected_data:
write_file_to_blob(row['DecodedData'], row['FinalFileName'])
现在我想将其移至 Onelake,如何建立与 onelake 文件/文件夹的连接并执行此任务
Onelake 需要什么样的凭证才能通过?
我设法使其正常运行,因为运行笔记本的用户帐户可以完全访问 /Files 目录。由于这是一项一次性任务,因此我没有继续将其与服务主体或托管身份集成。
以下代码对我来说效果很好:
collected_data = df_with_decoded_data.collect()
base_dir = "/lakehouse/default/Files/attachments" # File API Path
# Ensure the base directory exists
os.makedirs(base_dir, exist_ok=True)
# Write the decoded bytes to the file
for item in collected_data:
# Construct the filename using AttachmentId and FileName
filename = item["AttachmentId"] + item["FileName"][-4:]
# Full path for the file
file_path = os.path.join(base_dir, filename)
# Ensure the directory for the file exists (in case the filename includes subdirectories)
os.makedirs(os.path.dirname(file_path), exist_ok=True)
# Write the Body content to the file
with open(file_path, "wb") as file:
file.write(item["DecodedData"])