是否可以使用Azure Durable Functions来“替换”Azure Synapse Analytics中的管道

问题描述 投票:0回答:1

我们目前正在使用 Azure Synapse Analytics 从链接服务读取数据并将其存储在 DLH 中。没有更多的事情发生。我的老板要求研究耐用功能,看看我们是否可以在该过程中使用它们,以节省管道费用。

我现在陷入困境,因为我设法将数据从一个 blob 复制到另一个 blob,但尚未找到一种使用 azure 函数从链接服务读取数据的方法。

有办法做到这一点吗?使用 Azure Durable Functions 代替 管道是否有意义?

azure azure-functions analytics apache-synapse
1个回答
0
投票

是的,使用 Azure synapse 管道或复制工具将数据从源链接服务移动到接收器链接服务是理想的方法。由于 Synapse 和数据工厂内置了连接器,使数据复制工作变得更加容易。但持久功能并不适合处理具有数百万行的大规模数据。 Azure synapse 也是理想的数据仓库解决方案。但您可以创建 Azure 持久函数并执行将数据从一个 Blob 传输到另一个 Blob 的活动。您可以使用 blob 触发器创建普通函数,只要新文件上传到 Azure 存储 blob 容器中,Blob 触发器函数就会触发。然后,您可以通过将输出绑定添加到目标 blob 来执行将上传的 blob 复制到另一个 blob 的任务。您还可以创建基本的 Http 触发器并调用 blob 客户端将 blob 从一个容器复制到另一个容器,或从一个存储帐户复制到另一个容器。

此外,要与链接服务交互,您需要调用该客户端特定的 sdk。

使用Blob触发器功能:-

init.py:-

import logging
import azure.functions as func

def main(myblob: func.InputStream, outputBlob: func.Out[str]):
    logging.info(f"Python blob trigger function processed blob \n"
                 f"Name: {myblob.name}\n"
                 f"Blob Size: {myblob.length} bytes")
    outputBlob.set(myblob.read())

function.json:-

添加了用于获取 blob 的输入绑定和用于复制 blob 的输出绑定:-


{
  "scriptFile": "__init__.py",
  "bindings": [
    {
      "name": "myblob",
      "type": "blobTrigger",
      "direction": "in",
      "path": "data/{name}",
      "connection": "siddheshstrg_STORAGE"
    },
    {
      "type": "blob",
      "direction": "out",
      "name": "outputBlob",
      "path": "copy-data/{rand-guid}",
      "connection": "siddheshstrg_STORAGE"
    }
  ]
}

输出:-

enter image description here

已在源 blob 中上传文件:-

enter image description here

源 blob 文件复制到目标 blob:-

enter image description here

使用耐用功能触发器:-

import logging
from azure.storage.blob import BlobServiceClient, BlobClient
import azure.functions as func
from azure.durable_functions import DurableOrchestrationContext, Orchestrator


def orchestrator_function(context: DurableOrchestrationContext):
    source_container = "data"
    source_blob_name = "7xxxx.html"
    destination_container = "copy-data"
    destination_blob_name = "test.html"
    copy_result = yield context.call_activity("CopyBlobActivity", {
        "source_container": source_container,
        "source_blob_name": source_blob_name,
        "destination_container": destination_container,
        "destination_blob_name": destination_blob_name
    })

    return copy_result

main = Orchestrator.create(orchestrator_function)

def copy_blob_activity(context: DurableOrchestrationContext, input_data: dict) -> str:
    source_container = input_data["source_container"]
    source_blob_name = input_data["source_blob_name"]
    destination_container = input_data["destination_container"]
    destination_blob_name = input_data["destination_blob_name"]

    source_connection_string = "DefaultEndpointsProtocol=https;AccountName=xxxxxrg;AccountKey=dxxxxxxxXdtA==;EndpointSuffix=core.windows.net"
    destination_connection_string = "DefaultEndpointsProtocol=https;AccountName=xxxxxhstrg;AccountKey=dxxxxxxxEG+AStYRXdtA==;EndpointSuffix=core.windows.net"
    
    source_blob_service_client = BlobServiceClient.from_connection_string(source_connection_string)
    destination_blob_service_client = BlobServiceClient.from_connection_string(destination_connection_string)

    source_blob_client = source_blob_service_client.get_blob_client(container=source_container, blob=source_blob_name)

    blob_data = source_blob_client.download_blob().readall()

    destination_blob_client = destination_blob_service_client.get_blob_client(container=destination_container, blob=destination_blob_name)
    destination_blob_client.upload_blob(blob_data, overwrite=True)

    return f"Blob '{source_blob_name}' copied to '{destination_container}/{destination_blob_name}'"
© www.soinside.com 2019 - 2024. All rights reserved.