在 Azure 函数中找不到 HTTP 触发器

问题描述 投票:0回答:1

我使用 vs-code 在 python 中编写一个 azure 函数进行 excel 工作表转换,并希望将结果存储在 azure blob 存储中,它在 localhost 上成功 我在 azure 门户上部署 Azure 函数,但显示未找到 http 触发器 我使用下面的代码

import logging

import azure.functions as func

from azure.storage.blob import  BlobServiceClient, BlobClient, ContainerClient

import pandas as pd

from datetime import datetime

from azure.storage.blob import BlobServiceClient, generate_container_sas,  ContainerSasPermissions, generate_blob_sas, BlobSasPermissions, ContainerClient

from azure.storage.blob import generate_blob_sas

from datetime import timedelta

import os

              

logging.info('fn started')

def excel_transform():

    logging.info('transform fn triggered')

    ACCOUNTNAME = "your-account-name"

    KEY = "your-account-key"


    
    TARGET_CONTAINER = "ur-target-container" # uncomment and give target container when used in azure

    SOURCE_CONTAINER = "ur-source-container"

                                         
    # Create a BlobServiceClient using your account URL and credentials

    source_blob_service_client = BlobServiceClient (account_url=f" ", credential=KEY))

    # Get a reference to the Blob Container

    source_container_client = source_blob_service_client.get_container_client(SOURCE_CONTAINER)  

    # List all blobs (files) in the source container

    source_blobs = source_container_client.list_blobs()

    # Define the desired permissions and expiration for the SAS tokens

    permission=BlobSasPermissions(read=True,write=True,add=True,create=True,execute=True,move=True,ownership=True,permissions=True),

    expiry=datetime.utcnow() + timedelta(hours=6)

    # Create and print SAS tokens and URLs for each blob(file)
    
    chem_daily = pd.DataFrame()
   

    def generate_sas_token_url(blob,container):  # To generate SAS token and URL

        logging.info('sas token created')

        # Generate the SAS token for the current blob

        sas_token = generate_blob_sas(
            container_name=container,
            blob_name= blob,
            account_name=ACCOUNTNAME,
            account_key=KEY,  # Use account_key for generating a user delegation SAS token
            permission=BlobSasPermissions(read=True,write=True,add=True,create=True,execute=True,move=True,ownership=True,permissions=True),
            expiry=datetime.utcnow() + timedelta(hours=6)
        )



        # Create the URL with the SAS token

        blob_url_with_sas =f"https://{ACCOUNTNAME}.blob.core.windows.net/{container}/{blob}?{sas_token}"
        blob_url_with_sas = blob_url_with_sas.replace(" ", "%20") 
        return(blob_url_with_sas)



    # To read and transform files from source container

    def list_source_blob(): return source_container_client.list_blobs()
    logging.info('blobs listed')
    for blob in list_source_blob():
        blob_name = blob.name
        source_url = generate_sas_token_url(blob_name,SOURCE_CONTAINER)

        logging.info(f'{blob_name} url created')

    list_source_blob()
    

        # To obtain LocationId from filename
        
    excel_file_name = str(blob_name)
    file_part = excel_file_name.split('-')
    LocationId_1 = file_part[1].split('.')
    LocationId = LocationId_1[0]


       

    ]
        

     # To append corresponding dataframes
        if sheet_name == 'process - Daily':
            process_daily = pd.concat([process_daily,df])        
        
            
    # List of all dataframes
    df_list = [process_daily]

    process_daily.attrs['name']='ProcessConsumption'
    


    target_container_client = source_blob_service_client.get_container_client(TARGET_CONTAINER)  


    
    for df in df_list:
        
        file_name = f"excel_transformed/{df.attrs['name']}.csv"   # Filename created
        output_csv = df.to_csv(index = False)          # Converted df to csv file 

        target_blob_url = generate_sas_token_url(file_name,TARGET_CONTAINER)
        target_blob_client = BlobClient.from_blob_url(target_blob_url)
        target_blob_client.upload_blob(output_csv, overwrite = True)


app = func.FunctionApp(http_auth_level=func.AuthLevel.ANONYMOUS)

@app.route(route="http_trigger")
def http_trigger(req: func.HttpRequest) -> func.HttpResponse:
    logging.info('Python HTTP trigger function processed a request.')
    excel_transform()


    
    name = req.params.get('name')
    if not name:
        try:
            req_body = req.get_json()
        except ValueError:
            pass
        else:
            name = req_body.get('name')

    if name:
        return func.HttpResponse(f"Hello, {name}. This HTTP triggered function executed successfully.")
    else:
        return func.HttpResponse(
             "This HTTP triggered function executed successfully. Pass a name in the query string or in the request body for a personalized response.",
             status_code=200
        )

1.降级azure核心工具 2.增加了本地和azure功能配置的必要设置

python git azure azure-functions azure-http-trigger
1个回答
0
投票

我尝试部署问题中给出的相同代码,但可以看到很少有构建错误,因此我也无法将该函数部署到Azure。

修改代码如下,可以将函数部署到Azure并同步触发器。

import logging
import os
from datetime import datetime, timedelta

import azure.functions as func
import pandas as pd
from azure.storage.blob import (BlobClient, BlobSasPermissions,
                                BlobServiceClient, ContainerClient,
                                ContainerSasPermissions, generate_blob_sas,
                                generate_container_sas)

logging.info('fn started')

def excel_transform():
    logging.info('transform fn triggered')

    ACCOUNTNAME = "your-account-name"
    KEY = "your-account-key"    
    TARGET_CONTAINER = "ur-target-container" # uncomment and give target container when used in azure
    SOURCE_CONTAINER = "ur-source-container"
                                         
    # Create a BlobServiceClient using your account URL and credentials
    source_blob_service_client = BlobServiceClient (account_url="<storage_account_url> ", credential=KEY)

    # Get a reference to the Blob Container
    source_container_client = source_blob_service_client.get_container_client(SOURCE_CONTAINER)  

    # List all blobs (files) in the source container
    source_blobs = source_container_client.list_blobs()

    # Define the desired permissions and expiration for the SAS tokens 

   permission=BlobSasPermissions(read=True,write=True,add=True,create=True,execute=True,move=True,ownership=True,permissions=True),
    expiry=datetime.utcnow() + timedelta(hours=6)

    # Create and print SAS tokens and URLs for each blob(file)    
    chem_daily = pd.DataFrame()   

    def generate_sas_token_url(blob,container):  # To generate SAS token and URL

        logging.info('sas token created')

        # Generate the SAS token for the current blob

        sas_token = generate_blob_sas(
            container_name=container,
            blob_name= blob,
            account_name=ACCOUNTNAME,
            account_key=KEY,  # Use account_key for generating a user delegation SAS token
            permission=BlobSasPermissions(read=True,write=True,add=True,create=True,execute=True,move=True,ownership=True,permissions=True),
            expiry=datetime.utcnow() + timedelta(hours=6)
        )

        # Create the URL with the SAS token

        blob_url_with_sas =f"https://{ACCOUNTNAME}.blob.core.windows.net/{container}/{blob}?{sas_token}"
        blob_url_with_sas = blob_url_with_sas.replace(" ", "%20") 
        return(blob_url_with_sas)

    # To read and transform files from source container

    def list_source_blob(): return source_container_client.list_blobs()
    logging.info('blobs listed')
    for blob in list_source_blob():
        blob_name = blob.name
        source_url = generate_sas_token_url(blob_name,SOURCE_CONTAINER)

        logging.info(f'{blob_name} url created')

    list_source_blob()    

        # To obtain LocationId from filename
        
    excel_file_name = str(blob_name)
    file_part = excel_file_name.split('-')
    LocationId_1 = file_part[1].split('.')
    LocationId = LocationId_1[0]        

     # To append corresponding dataframes
    if sheet_name == 'process - Daily':
            process_daily = pd.concat([process_daily,df])        
                    
    # List of all dataframes
    df_list = [process_daily]

    process_daily.attrs['name']='ProcessConsumption'
    
    target_container_client = source_blob_service_client.get_container_client(TARGET_CONTAINER)  
    
    for df in df_list:
        
        file_name = f"excel_transformed/{df.attrs['name']}.csv"   # Filename created
        output_csv = df.to_csv(index = False)          # Converted df to csv file 

        target_blob_url = generate_sas_token_url(file_name,TARGET_CONTAINER)
        target_blob_client = BlobClient.from_blob_url(target_blob_url)
        target_blob_client.upload_blob(output_csv, overwrite = True)


app = func.FunctionApp(http_auth_level=func.AuthLevel.ANONYMOUS)

@app.route(route="http_trigger")
def http_trigger(req: func.HttpRequest) -> func.HttpResponse:
    logging.info('Python HTTP trigger function processed a request.')

    excel_transform()
    
    name = req.params.get('name')
    if not name:
        try:
            req_body = req.get_json()
        except ValueError:
            pass
        else:
            name = req_body.get('name')

    if name:
        return func.HttpResponse(f"Hello, {name}. This HTTP triggered function executed successfully.")
    else:
        return func.HttpResponse(
             "This HTTP triggered function executed successfully. Pass a name in the query string or in the request body for a personalized response.",
             status_code=200
        )

local.settings.json:

{
  "IsEncrypted": false,
  "Values": {
    "FUNCTIONS_WORKER_RUNTIME": "python",
    "AzureWebJobsFeatureFlags": "EnableWorkerIndexing",
    "AzureWebJobsStorage": "UseDevelopmentStorage=true"
  }
}

主机.json:

{
  "version": "2.0",
  "logging": {
    "applicationInsights": {
      "samplingSettings": {
        "isEnabled": true,
        "excludedTypes": "Request"
      }
    }
  },
  "extensionBundle": {
    "id": "Microsoft.Azure.Functions.ExtensionBundle",
    "version": "[4.*, 5.0.0)"
  },
  "extensions": 
  { 
    "http": 
      { 
      "routePrefix": "api", 
      "maxOutstandingRequests": 200, 
      "maxConcurrentRequests": 100, 
      "dynamicThrottlesEnabled": true, 
      "hsts": { "isEnabled": true, "maxAge": "10" }, 
      "customHeaders": { "X-Content-Type-Options": "nosniff" } 
      } 
  } 
} 
  • 能够成功将功能部署到Azure:

enter image description here

传送门:

enter image description here

© www.soinside.com 2019 - 2024. All rights reserved.