Durable 函数的执行时间和响应时间之间的关系

问题描述 投票:0回答:1

我使用 Durable Functions 实现了数据分析,以便每个实例只有一个函数。

使用VScode的Durable函数扩展,执行细节如图所示。

“exploratory_data_analysis”实例到“decision_tree_regression”实例之间的等待时间很短。然而,“random_forest”的开始距离前一个函数的结束需要相当长的时间。这导致整体响应时间更长。

“random_forest”执行时间长是不可避免的,但为什么启动实例需要这么长时间呢? 传递的数据与“redge_regression”等相同,因此不应是传输数据造成的开销。

还有,这个扩展所代表的时间是实例启动到返回代码的时间吗?

azure azure-functions instance azure-durable-functions
1个回答
0
投票

random_forest
函数在最后启动,因为它是在所有其他先前活动之后运行的最后一个活动。 Azure Durable Function 将根据代码顺序运行活动。据我了解,random_forest 是在持久函数中运行的最后一个活动,因此它最终显示出来。水平列的时间表示您的函数创建或启动的时间,直到最后更新时间或完成时间。

我尝试执行与您类似的数据分析,下面是我的结果:-

参考-我的SO答案

我的function_app.py:-

确保通过更有效地调用数据来正确分配代码中的负载:-

import pickle
import azure.functions as func
import azure.durable_functions as df
import pandas as pd
from sklearn.linear_model import LinearRegression
from sklearn.datasets import fetch_california_housing  # Dataset
from sklearn.model_selection import train_test_split 
from sklearn.linear_model import Lasso  
from sklearn.linear_model import Ridge 
from sklearn.metrics import mean_squared_error  # MSE(Mean Squared Error)
from sklearn.preprocessing import StandardScaler 

app = df.DFApp(http_auth_level=func.AuthLevel.ANONYMOUS)
### client function ###
@app.route(route="orchestrators/client_function")
@app.durable_client_input(client_name="client")
async def client_function(req: func.HttpRequest, client: df.DurableOrchestrationClient) -> func.HttpResponse:
    instance_id = await client.start_new("orchestrator", None, {})
    await client.wait_for_completion_or_create_check_status_response(req, instance_id)
    return client.create_check_status_response(req, instance_id)

### orchestrator function ###
@app.orchestration_trigger(context_name="context")
def orchestrator(context: df.DurableOrchestrationContext) -> str:
    data = yield context.call_activity("prepare_data", '')
    simple = yield context.call_activity("simple_regression", {"data": data})
    multiple = yield context.call_activity("multiple_regression", {"data": data})
    exploratory_data_analysis =  yield context.call_activity("exploratory_data_analysis", {"data": data})
    data_processing = yield context.call_activity("multivariate_linear_regression", {"data": data})
    multivariate_linear_regression = yield context.call_activity("data_processing", {"data": data})
    ridge_regression =  yield context.call_activity("ridge_regression", {"data": data})
    k_nearest_neighbor =  yield context.call_activity("k_nearest_neighbor", {"data": data})
    decision_tree_regression = yield context.call_activity("decision_tree_regression", {"data": data})
    random_forest =  yield context.call_activity("random_forest", {"data": data})
    return "finished"

### activity function ###
@app.activity_trigger(input_name="blank")
def prepare_data(blank: str):
    # prepare data
    california_housing = fetch_california_housing()
    exp_data = pd.DataFrame(california_housing.data, columns=california_housing.feature_names)
    print(exp_data.columns.tolist) # explanatory variables
    tar_data = pd.DataFrame(california_housing.target, columns=['HousingPrices']) # target variable
    data = pd.concat([exp_data, tar_data], axis=1) # merge data
    print("Column Names:")
    print(data.columns.tolist()) 
    # Delete anomalous values
    data = data[data['HouseAge'] != 52]
    data = data[data['HousingPrices'] != 5.00001]

    # Create useful variables
    data['Household'] = data['Population']/data['AveOccup']
    data['AllRooms'] = data['AveRooms']*data['Household']
    data['AllBedrms'] = data['AveBedrms']*data['Household']

    # Ensure 'MedInc' column doesn't contain null or missing values
    data = data.dropna(subset=['MedInc'])

    # Create a dictionary to store multiple data items
    prepared_data = {
        'data': data.to_dict(),
        'columns': data.columns.tolist(),
        'target_column': 'HousingPrices',
        'MedInc': data['MedInc'].tolist()  # Add 'MedInc' column to the dictionary as a list
    }
    return prepared_data

@app.activity_trigger(input_name="arg")
def simple_regression(arg: dict):
    try:
        # Convert dictionary back to a DataFrame
        data = pd.DataFrame.from_dict(arg['data'])

        # Handling missing or NaN values
        data.dropna(inplace=True)

        # Selecting the explanatory variable 'MedInc' and target 'HousingPrices'
        X_simple = data[['MedInc']]
        y = data[arg['target_column']]

        # Check lengths of X_simple and y
        if len(X_simple) != len(y):
            return "Lengths of X_simple and y do not match"

        # Initialize and fit the linear regression model
        simple_model = LinearRegression()
        simple_model.fit(X_simple, y)

        return {
            'model': simple_model,
            'status': 'success'
        }
    except Exception as e:
        return f"Error: {str(e)}"

### multiple regression analysis ###
@app.activity_trigger(input_name="arg")
def multiple_regression(arg: dict):
    try:
        # Convert dictionary back to a DataFrame
        data = pd.DataFrame.from_dict(arg['data'])

        # Handling missing or NaN values
        data.dropna(inplace=True)

        # Selecting multiple explanatory variables and target 'HousingPrices'
        X_multiple = data.drop(columns=[arg['target_column']])  # Drop the target column
        y = data[arg['target_column']]

        # Check lengths of X_multiple and y
        if len(X_multiple) != len(y):
            return "Lengths of X_multiple and y do not match"

        # Initialize and fit the multiple regression model
        multiple_model = LinearRegression()
        multiple_model.fit(X_multiple, y)

        return {
            'model': multiple_model,
            'status': 'success'
        }
    except Exception as e:
        return f"Error: {str(e)}"
    

@app.activity_trigger(input_name="arg")
def exploratory_data_analysis(arg: dict):
    try:
        # Convert dictionary back to a DataFrame
        data = pd.DataFrame.from_dict(arg['data'])

        # Perform exploratory data analysis tasks here
        # For example: summary statistics, visualizations, etc.

        # Placeholder - perform some EDA tasks here
        eda_result = "Exploratory Data Analysis performed"

        return {
            'result': eda_result,
            'status': 'success'
        }
    except Exception as e:
        return f"Error: {str(e)}"

### data processing ###
@app.activity_trigger(input_name="arg")
def data_processing(arg: dict):
    try:
        # Placeholder - Add data processing steps here
        # This could involve data cleaning, normalization, etc.

        processed_data_result = "Data processing completed"

        return {
            'result': processed_data_result,
            'status': 'success'
        }
    except Exception as e:
        return f"Error: {str(e)}"
    
@app.activity_trigger(input_name="arg")
def multivariate_linear_regression(arg: dict):
    try:
        # Placeholder - Implement Multivariate Linear Regression here
        # Use the processed data from 'arg' dictionary

        mlr_result = "Multivariate Linear Regression completed"

        return {
            'result': mlr_result,
            'status': 'success'
        }
    except Exception as e:
        return f"Error: {str(e)}"


### ridge regression ###
@app.activity_trigger(input_name="arg")
def ridge_regression(arg: dict):
    try:
        # Placeholder - Implement Ridge Regression here

        ridge_result = "Ridge Regression completed"

        return {
            'result': ridge_result,
            'status': 'success'
        }
    except Exception as e:
        return f"Error: {str(e)}"

### k-nearest neighbor ###
@app.activity_trigger(input_name="arg")
def k_nearest_neighbor(arg: dict):
    try:
        # Placeholder - Implement K-Nearest Neighbor here

        knn_result = "K-Nearest Neighbor completed"

        return {
            'result': knn_result,
            'status': 'success'
        }
    except Exception as e:
        return f"Error: {str(e)}"

### decision tree regression ###
@app.activity_trigger(input_name="arg")
def decision_tree_regression(arg: dict):
    try:
        # Placeholder - Implement Decision Tree Regression here

        dt_result = "Decision Tree Regression completed"

        return {
            'result': dt_result,
            'status': 'success'
        }
    except Exception as e:
        return f"Error: {str(e)}"

### random forest ###
@app.activity_trigger(input_name="arg")
def random_forest(arg: dict):
    try:
        # Placeholder - Implement Random Forest here

        rf_result = "Random Forest completed"

        return {
            'result': rf_result,
            'status': 'success'
        }
    except Exception as e:
        return f"Error: {str(e)}"

在您的场景中,在之前的活动完成后,random_forest需要很长时间才能启动,这也是因为函数处理繁重。 如果您在本地或在 Function App 中运行此函数,它将使用

AzureWebJobsStorage
连接字符串中提到的默认 Azure 存储帐户运行该函数,如果您想更有效地运行持久函数,您可以添加额外的存储帐户耐用功能如下:-

我的

local.settings.json
还有额外的
MyStorageAccountAppSetting
:-

{
  "IsEncrypted": false,
  "Values": {
    "AzureWebJobsStorage": "DefaultEndpointsProtocol=https;AccountName=vsiddheshrg989173;AccountKey=xxxxxStOyfWOw==;EndpointSuffix=core.windows.net",
    "MyStorageAccountAppSetting" : "DefaultEndpointsProtocol=https;AccountName=silicon1;AccountKey=xxxxxaBKbO/PA+AStKWFvaQ==;EndpointSuffix=core.windows.net",
    "FUNCTIONS_WORKER_RUNTIME": "python",
    "AzureWebJobsFeatureFlags": "EnableWorkerIndexing"
  }
}

您需要在 Function App 环境变量中应用相同的设置。

我的

host.json
包含
MyStorageAccountAppSetting
:-

{
  "version": "2.0",
  "logging": {
    "applicationInsights": {
      "samplingSettings": {
        "isEnabled": true,
        "excludedTypes": "Request"
      }
    }
  },
  "extensionBundle": {
    "id": "Microsoft.Azure.Functions.ExtensionBundle",
    "version": "[3.*, 4.0.0)"
  },
  "extensions": {
    "durableTask": {
      "storageProvider": {
        "connectionStringName": "MyStorageAccountAppSetting"
      }
    }
  }
}

您还可以为您的活动函数添加并发限制设置,通过在host.json文件中指定分区计数来增加函数和分区计数机制的并行执行,例如:-

host.json:-

{
  "version": "2.0",
  "logging": {
    "applicationInsights": {
      "samplingSettings": {
        "isEnabled": true,
        "excludedTypes": "Request"
      }
    }
  },
  "extensionBundle": {
    "id": "Microsoft.Azure.Functions.ExtensionBundle",
    "version": "[3.*, 4.0.0)"
  },
  "extensions": {
    "durableTask": {
      "storageProvider": {
        "connectionStringName": "MyStorageAccountAppSetting",
        "partitionCount": 3

        
      },
      "maxConcurrentActivityFunctions": 10,
      "maxConcurrentOrchestratorFunctions": 10

    }
  }
}

水平时间是 createdTime UTClastUpdatedTime UTC:-

enter image description here

我的 random_forest 活动通过我的 host.json 中的以下设置正确触发:-

enter image description here

enter image description here

我已经参考了这两个 Github 文档来进行主机、json 设置,并参考这些文档来获得有关提高 Azure Durable Functions 性能的更多见解:-

  1. azure-docs/articles/azure-functions/durable/durable-functions-perf-and-scale.md 主要 · MicrosoftDocs/azure-docs (github.com)

  2. azure-docs/articles/azure-functions/durable/durable-functions-azure-storage-provider.md at main · MicrosoftDocs/azure-docs (github.com)

© www.soinside.com 2019 - 2024. All rights reserved.