将数据发送到数据湖存储

问题描述 投票:0回答:1

我正在尝试保存从 ESP32 发送到 IoTHub 的 json,json 文件如下所示:`

' { "msgCount": 0, "message": "hello world", "id": "test1", "SoundCategory": "rue", "SoundName": "pelleteuse", "dB": "105 dB", "Duration": "40 min", "StartTime": "04-06-2024:17h24", "EndTime": "04-06-2024:18h04", "Location": "[100.00,0.00]" }'

我已经完成了一个 python azure 函数,每次收到遥测数据时都会触发该函数,到目前为止一直有效

import logging
import azure.functions as func
import base64


app = func.FunctionApp()

@app.event_grid_trigger(arg_name="azeventgrid")
def getTelemetry(azeventgrid: func.EventGridEvent):
    logging.info('Python EventGrid trigger processed an event')

    event_payload = azeventgrid.get_json()
    payload_base64 = event_payload.get('body', '')

    if payload_base64:
        payload_bytes = base64.b64decode(payload_base64)
        logging.info(payload_bytes)
    else:
        logging.warning('No payload found in the event.')

问题是,现在我想将它发送到我的数据湖存储,但每当我尝试在天蓝色上部署我的函数并尝试与数据湖存储连接时,该函数就会消失,甚至很难部署成功。我不知道我的做法是否正确,也不知道为什么每当我尝试其中一种方法时它都不起作用。

这是问题似乎来自的两个模块

from azure.storage.filedatalake import DataLakeServiceClient, FileSystemClient, DataLakeDirectoryClient
from azure.identity import DefaultAzureCredential

当我只导入模块时,我的代码已经从功能应用程序中消失了

import logging
import azure.functions as func
import base64
from azure.storage.filedatalake import DataLakeServiceClient, FileSystemClient, DataLakeDirectoryClient
from azure.identity import DefaultAzureCredential

app = func.FunctionApp()


@app.event_grid_trigger(arg_name="azeventgrid")
def getTelemetry(azeventgrid: func.EventGridEvent):
    logging.info('Python EventGrid trigger processed an event')

    event_payload = azeventgrid.get_json()
    payload_base64 = event_payload.get('body', '')

    if payload_base64:
        payload_bytes = base64.b64decode(payload_base64)
        logging.info(payload_bytes)
    else:
        logging.warning('No payload found in the event.')

这是我尝试过但没有成功的其他不同代码

import logging
import azure.functions as func
import json 
import base64
import os
from azure.storage.filedatalake import DataLakeServiceClient, FileSystemClient, DataLakeDirectoryClient
from azure.identity import DefaultAzureCredential
from datetime import datetime

app = func.FunctionApp()

storage_account_name = "mystorageaccountname"
directory_name = datetime.now().strftime('%Y%m%d')

@app.event_grid_trigger(arg_name="azeventgrid")
def getTelemetry(azeventgrid: func.EventGridEvent):
    logging.info('Python EventGrid trigger processed an event')

    event_payload = azeventgrid.get_json()
    payload_base64 = event_payload.get('body', '')

    if payload_base64:
        payload_bytes = base64.b64decode(payload_base64)
        
        # Initialize Azure Data Lake Storage client
        storage_client = initialize_storage_account()
        
        # # Create file system dynamically
        file_system_client = create_file_system(storage_client)
        
        # # Create directory within the file system
        directory_client = create_directory(file_system_client, directory_name)
        
        # # Upload payload bytes to the created directory
        upload_file_to_directory(directory_client, local_path='', file_name='payload_file.bin', data=payload_bytes)

    else:
        logging.warning('No payload found in the event.')

def initialize_storage_account():
    try:
        credential = DefaultAzureCredential()
        service_client = DataLakeServiceClient(account_url=f"https://{storage_account_name}.dfs.core.windows.net", credential=credential)
        return service_client
    except Exception as e:
        logging.error(f"Error initializing storage account: {e}")

def create_file_system(service_client: DataLakeServiceClient) -> FileSystemClient:
    try:
        file_system_name = datetime.now().strftime('%Y%m%d%H%M%S')  # Generate a unique file system name
        file_system_client = service_client.create_file_system(file_system=file_system_name)
        logging.info(f"File system '{file_system_name}' created successfully.")
        return file_system_client
    except Exception as e:
        logging.error(f"Error creating file system: {e}")

def create_directory(self, file_system_client: FileSystemClient, directory_name: str) -> DataLakeDirectoryClient:
    directory_client = file_system_client.create_directory(directory_name)

    return directory_client

def upload_file_to_directory(self, directory_client: DataLakeDirectoryClient, local_path: str, file_name: str):
    file_client = directory_client.get_file_client(file_name)

    with open(file=os.path.join(local_path, file_name), mode="rb") as data:
        file_client.upload_data(data, overwrite=True)
import logging
import azure.functions as func
import json 
import base64
import os
from azure.storage.filedatalake import DataLakeServiceClient, FileSystemClient, DataLakeDirectoryClient
from azure.identity import DefaultAzureCredential
from datetime import datetime

app = func.FunctionApp()

storage_account_name = "mystorageaccountname"
directory_name = datetime.now().strftime('%Y%m%d')

def initialize_storage_account():
    try:
        credential = DefaultAzureCredential()
        service_client = DataLakeServiceClient(account_url=f"https://{storage_account_name}.dfs.core.windows.net", credential=credential)
        return service_client
    except Exception as e:
        logging.error(f"Error initializing storage account: {e}")

def create_file_system(service_client: DataLakeServiceClient) -> FileSystemClient:
    try:
        file_system_name = datetime.now().strftime('%Y%m%d%H%M%S')  # Generate a unique file system name
        file_system_client = service_client.create_file_system(file_system=file_system_name)
        logging.info(f"File system '{file_system_name}' created successfully.")
        return file_system_client
    except Exception as e:
        logging.error(f"Error creating file system: {e}")

def create_directory(self, file_system_client: FileSystemClient, directory_name: str) -> DataLakeDirectoryClient:
    directory_client = file_system_client.create_directory(directory_name)

    return directory_client

def upload_file_to_directory(self, directory_client: DataLakeDirectoryClient, local_path: str, file_name: str):
    file_client = directory_client.get_file_client(file_name)

    with open(file=os.path.join(local_path, file_name), mode="rb") as data:
        file_client.upload_data(data, overwrite=True)

def upload_payload_to_data_lake(storage_client, payload_bytes, file_system_client):
    try:
        directory_client = file_system_client.get_directory_client(directory_name)
        
        # Create a new file with a unique name in the directory
        file_name = f"{datetime.now().strftime('%H%M%S')}.bin"
        file_client = directory_client.create_file(file_name)
        
        # Upload payload bytes to the file
        file_client.upload_data(payload_bytes, overwrite=True)
        
        logging.info(f"Payload successfully uploaded to Azure Data Lake Storage: {file_name}")
    except Exception as e:
        logging.error(f"Error uploading payload to Azure Data Lake Storage: {e}")

def upload_file_to_directory():
    try:

        file_system_client = DataLakeServiceClient.get_file_system_client(file_system="my-file-system")

        directory_client = file_system_client.get_directory_client("my-directory")
        
        file_client = directory_client.create_file("uploaded-file.txt")
        local_file = open("C:\\file-to-upload.txt",'r')

        file_contents = local_file.read()

        file_client.append_data(data=file_contents, offset=0, length=len(file_contents))

        file_client.flush_data(len(file_contents))

    except Exception as e:
      print(e)

@app.event_grid_trigger(arg_name="azeventgrid")
def getTelemetry(azeventgrid: func.EventGridEvent):
    logging.info('Python EventGrid trigger processed an event')

    event_payload = azeventgrid.get_json()
    payload_base64 = event_payload.get('body', '')

    if payload_base64:
        payload_bytes = base64.b64decode(payload_base64)
        

        storage_client = initialize_storage_account()
        file_system_client = create_file_system(storage_client)
        upload_payload_to_data_lake(storage_client, payload_bytes, file_system_client)
    else:
        logging.warning('No payload found in the event.')
azure azure-functions azure-data-lake
1个回答
0
投票

将数据发送到数据湖存储

您可以使用下面的代码(我使用了http触发器),您可以集成到您的事件网格功能中:

import azure.functions as func
from azure.storage.filedatalake import DataLakeServiceClient
import logging
import json

app = func.FunctionApp(http_auth_level=func.AuthLevel.FUNCTION)

@app.route(route="http_trigger")
def http_trigger(req: func.HttpRequest) -> func.HttpResponse:
    logging.info('Python HTTP trigger function processed a request.')
    rith_con_name = "rithwik"
    rith_dir_path = "rithfolder1/rithfolder2"
    rith_outfile_name = "test.txt"
    rith_data = {
        "name":"rithwik",
        "id":8
        }
    rith_cs = "DefaultEndpointsProtocol=https;AccountName=rith32;AccountKey=9Ek4f80ttpwPjCg==;EndpointSuffix=core.windows.net"
    rith_sc = DataLakeServiceClient.from_connection_string(rith_cs)
    rith_fsc = rith_sc.get_file_system_client(rith_con_name)
    rith_des_path = rith_dir_path + "/" + rith_outfile_name
    json_rith = json.dumps(rith_data).encode()
    rith_fc = rith_fsc.get_file_client(rith_des_path)
    rith_fc.upload_data(json_rith, overwrite=True)
    return func.HttpResponse(f"Hello, Rithwik Bojja. This HTTP triggered function executed successfully.")

rith_con_name 是容器名称 rith_cs 是连接字符串

需求.txt:

azure-functions
azure-storage-file-datalake

输出:

enter image description here

© www.soinside.com 2019 - 2024. All rights reserved.