耐用功能的传输格式

问题描述 投票:0回答:1

我正在尝试使用 Durable Functions 按函数实现数据分析。

通过函数实现时,我需要传输数据帧等,所以我尝试通过使其pickle来交换数据。但是,当我实现如下所示的代码时,出现如图所示的错误,并且 Vscode 卡住了。

原因是什么?如何解决?

下面是我要实现的部分代码。

import azure.functions as func
import azure.durable_functions as df
import pandas as pd
from sklearn.linear_model import LinearRegression
from sklearn.datasets import fetch_california_housing  # Dataset
from sklearn.model_selection import train_test_split 
from sklearn.linear_model import Lasso  
from sklearn.linear_model import Ridge 
from sklearn.metrics import mean_squared_error  # MSE(Mean Squared Error)
from sklearn.preprocessing import StandardScaler 

app = df.DFApp(http_auth_level=func.AuthLevel.ANONYMOUS)
### client function ###
@app.route(route="orchestrators/client_function")
@app.durable_client_input(client_name="client")
async def client_function(req: func.HttpRequest, client: df.DurableOrchestrationClient) -> func.HttpResponse:
    instance_id = await client.start_new("orchestrator", None, {})
    await client.wait_for_completion_or_create_check_status_response(req, instance_id)
    return client.create_check_status_response(req, instance_id)

### orchestrator function ###
@app.orchestration_trigger(context_name="context")
def orchestrator(context: df.DurableOrchestrationContext) -> str:
    data = yield context.call_activity("prepare_data", '')
    simple = yield context.call_activity("simple_regression", {"data": data})
    multiple = yield context.call_activity("multiple_regression", {"data": data})
    return "finished"

### activity function ###
@app.activity_trigger(input_name="blank")
def prepare_data(blank: str):
    # prepare data
    california_housing = fetch_california_housing()
    exp_data = pd.DataFrame(california_housing.data, columns=california_housing.feature_names) # 説明変数
    tar_data = pd.DataFrame(california_housing.target, columns=['HousingPrices']) # 目的変数
    data = pd.concat([exp_data, tar_data], axis=1) # データを結合

    # Delete anomalous values
    data = data[data['HouseAge'] != 52]
    data = data[data['HousingPrices'] != 5.00001]

    # Create useful variables
    data['Household'] = data['Population']/data['AveOccup']
    data['AllRooms'] = data['AveRooms']*data['Household']
    data['AllBedrms'] = data['AveBedrms']*data['Household']

    data = pickle.dumps(data)
    return data

### simple regression analysis ###
@app.activity_trigger(input_name="arg")
def simple_regression(arg: dict):
    data = pickle.loads(arg['data'])
    exp_var = 'MedInc'
    tar_var = 'HousingPrices'

    # Remove outliers
    q_95 = data[exp_var].quantile(0.95)
    data = data[data[exp_var] < q_95]

    # Split data into explanatory and objective variables
    X = data[[exp_var]]
    y = data[[tar_var]]

    # learn
    model = LinearRegression()
    model.fit(X, y)

    model = pickle.dumps(model)
    return model

### multiple regression analysis ###
@app.activity_trigger(input_name="arg")
def multiple_regression(arg: dict):
    data = pickle.loads(arg['data'])
    exp_vars = ['MedInc', 'HouseAge', 'AveRooms', 'AveBedrms', 'Population', 'AveOccup', 'Latitude', 'Longitude']
    tar_var = 'HousingPrices'

    # Remove outliers
    for exp_var in exp_vars:
        q_95 = data[exp_var].quantile(0.95)
        data = data[data[exp_var] < q_95]

    # Split data into explanatory and objective variables
    X = data[exp_vars]
    y = data[[tar_var]]

    # Split into training and test data
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=0)

    #  Standardize X_train
    scaler = StandardScaler()
    scaler.fit(X_train)
    X_train_scaled = scaler.transform(X_train)
    X_train_scaled = pd.DataFrame(X_train_scaled, columns = exp_vars)

    # learn
    model = LinearRegression()
    model.fit(X_train_scaled, y_train)

    model = pickle.dumps(model)
    X_train_scaled = pickle.dumps(X_train_scaled)
    y_train = pickle.dumps(y_train)
    X_test = pickle.dumps(X_test)
    y_test = pickle.dumps(y_test)
    scaler = pickle.dumps(scaler)
    return model, X_train_scaled, y_train, X_test, y_test, scaler

python visual-studio-code azure-functions pickle azure-durable-functions
1个回答
0
投票

为了解决您的错误,请将您的数据转换为prepared_data中的字典dict,然后在您的Durable Function代码中运行一个示例simple_regression和multiple_regression,我在您的function_app.py代码中进行了一些更改,如下所示:-

我的function_app.py:-

import pickle
import azure.functions as func
import azure.durable_functions as df
import pandas as pd
from sklearn.linear_model import LinearRegression
from sklearn.datasets import fetch_california_housing  # Dataset
from sklearn.model_selection import train_test_split 
from sklearn.linear_model import Lasso  
from sklearn.linear_model import Ridge 
from sklearn.metrics import mean_squared_error  # MSE(Mean Squared Error)
from sklearn.preprocessing import StandardScaler 

app = df.DFApp(http_auth_level=func.AuthLevel.ANONYMOUS)
### client function ###
@app.route(route="orchestrators/client_function")
@app.durable_client_input(client_name="client")
async def client_function(req: func.HttpRequest, client: df.DurableOrchestrationClient) -> func.HttpResponse:
    instance_id = await client.start_new("orchestrator", None, {})
    await client.wait_for_completion_or_create_check_status_response(req, instance_id)
    return client.create_check_status_response(req, instance_id)

### orchestrator function ###
@app.orchestration_trigger(context_name="context")
def orchestrator(context: df.DurableOrchestrationContext) -> str:
    data = yield context.call_activity("prepare_data", '')
    simple = yield context.call_activity("simple_regression", {"data": data})
    multiple = yield context.call_activity("multiple_regression", {"data": data})
    return "finished"

### activity function ###
@app.activity_trigger(input_name="blank")
def prepare_data(blank: str):
    # prepare data
    california_housing = fetch_california_housing()
    exp_data = pd.DataFrame(california_housing.data, columns=california_housing.feature_names)
    print(exp_data.columns.tolist) # explanatory variables
    tar_data = pd.DataFrame(california_housing.target, columns=['HousingPrices']) # target variable
    data = pd.concat([exp_data, tar_data], axis=1) # merge data
    print("Column Names:")
    print(data.columns.tolist()) 
    # Delete anomalous values
    data = data[data['HouseAge'] != 52]
    data = data[data['HousingPrices'] != 5.00001]

    # Create useful variables
    data['Household'] = data['Population']/data['AveOccup']
    data['AllRooms'] = data['AveRooms']*data['Household']
    data['AllBedrms'] = data['AveBedrms']*data['Household']

    # Ensure 'MedInc' column doesn't contain null or missing values
    data = data.dropna(subset=['MedInc'])

    # Create a dictionary to store multiple data items
    prepared_data = {
        'data': data.to_dict(),
        'columns': data.columns.tolist(),
        'target_column': 'HousingPrices',
        'MedInc': data['MedInc'].tolist()  # Add 'MedInc' column to the dictionary as a list
    }
    return prepared_data

@app.activity_trigger(input_name="arg")
def simple_regression(arg: dict):
    try:
        # Convert dictionary back to a DataFrame
        data = pd.DataFrame.from_dict(arg['data'])

        # Handling missing or NaN values
        data.dropna(inplace=True)

        # Selecting the explanatory variable 'MedInc' and target 'HousingPrices'
        X_simple = data[['MedInc']]
        y = data[arg['target_column']]

        # Check lengths of X_simple and y
        if len(X_simple) != len(y):
            return "Lengths of X_simple and y do not match"

        # Initialize and fit the linear regression model
        simple_model = LinearRegression()
        simple_model.fit(X_simple, y)

        return {
            'model': simple_model,
            'status': 'success'
        }
    except Exception as e:
        return f"Error: {str(e)}"

### multiple regression analysis ###
@app.activity_trigger(input_name="arg")
def multiple_regression(arg: dict):
    try:
        # Convert dictionary back to a DataFrame
        data = pd.DataFrame.from_dict(arg['data'])

        # Handling missing or NaN values
        data.dropna(inplace=True)

        # Selecting multiple explanatory variables and target 'HousingPrices'
        X_multiple = data.drop(columns=[arg['target_column']])  # Drop the target column
        y = data[arg['target_column']]

        # Check lengths of X_multiple and y
        if len(X_multiple) != len(y):
            return "Lengths of X_multiple and y do not match"

        # Initialize and fit the multiple regression model
        multiple_model = LinearRegression()
        multiple_model.fit(X_multiple, y)

        return {
            'model': multiple_model,
            'status': 'success'
        }
    except Exception as e:
        return f"Error: {str(e)}"

输出:-

enter image description here

所有编排和活动功能均已成功运行,请参阅以下内容:-

enter image description here

© www.soinside.com 2019 - 2024. All rights reserved.