使用 Orchestrator 在 Durable Function 中输入序列化

问题描述 投票:0回答:1

在隔离模式下使用 .NET 8 和 azure 函数,我有一个 EventGridTrigger 函数,该函数启动协调器,同时提供键入的输入作为参数。参数的类型是以下类层次结构的一部分:

[JsonDerivedType(typeof(IntegrationEventBase), typeDiscriminator: "IntegrationEventBase")]
[JsonDerivedType(typeof(WorkItemIntegrationEventBase), typeDiscriminator: "WorkItemIntegrationEventBase")]
public class IntegrationEventBase
{
    public string? Traceparent { get; set; }
}

[JsonDerivedType(typeof(WorkItemCreatedIntegrationEvent), typeDiscriminator: "WorkItemCreatedIntegrationEvent")]
[JsonDerivedType(typeof(WorkItemProgressedIntegrationEvent), typeDiscriminator: "WorkItemProgressedIntegrationEvent")]
public class WorkItemIntegrationEventBase : IntegrationEventBase
{
    public string? WorkItemId { get; set; }

    public ushort? Quantity { get; set; }
}

public class WorkItemCreatedIntegrationEvent : WorkItemIntegrationEventBase
{
}

public class WorkItemProgressedIntegrationEvent : WorkItemIntegrationEventBase
{
    public WorkItemStatus Status { get; set; }
}

EventGridTrigger 函数

这只是反序列化从事件网格接收的事件并将其作为编排器的输入传递

[Function("WorkItemProgressedIntegrationEventHandler")]
public async Task WorkItemProgressedIntegrationEventHandler([EventGridTrigger] EventGridEvent eventGridEvent,
    [DurableClient] DurableTaskClient client,
    FunctionContext executionContex)
{
    var @event = _serializer.Deserialize<WorkItemProgressedIntegrationEvent>(eventGridEvent.Data.ToString());

    await client.ScheduleNewOrchestrationInstanceAsync(
        Constants.WORKFLOW_ORCHESTRATOR, @event);
}

协调者

协调器收到一个

WorkItemIntegrationEventBase
,然后它将根据事件的实际内部类型(多态性)以两种不同方式之一调用实体。

[Function("Orchestrator")]
public static async Task RunOrchestrator(
    [OrchestrationTrigger] TaskOrchestrationContext context, WorkItemIntegrationEventBase @event)
{
    var entityId = BuildEntityIds(@event);

    if (@event is WorkItemCreatedIntegrationEvent createdEvent)
        await context.Entities.SignalEntityAsync(workflowEntityId, nameof(IDurableWorkflow.EnqueueWork),
            WorkItemInput.FromWorkItemIntegrationEvent(createdEvent));
    else if (@event is WorkItemProgressedIntegrationEvent progressedEvent)
        await context.Entities.SignalEntityAsync(workflowEntityId, nameof(IDurableWorkflow.ProgressWork),
            WorkItemInput.FromWorkItemIntegrationEvent(progressedEvent));
}

问题!: 我无法在协调器中获取

@event
来使用事件触发器中使用的类型的类型信息进行反序列化,这意味着上述两项检查始终为
false

我已经尝试了几乎所有这里所说的,但没有运气,我一定还遗漏了一些东西。

c# azure-functions azure-durable-functions
1个回答
0
投票

这对我有用。

我使用了一个简单的数据,并添加了

logger
来打印
if else
的值。

如果要将事件值传递给编排,则需要使用此语法

input:@event
string instanceId = await client.ScheduleNewOrchestrationInstanceAsync(
                nameof(Function1),input:@event);

我的代码:

Function1.cs
:

using Azure.Messaging;
using Azure.Messaging.EventGrid;
using Microsoft.Azure.Functions.Worker;
using Microsoft.Azure.Functions.Worker.Http;
using Microsoft.DurableTask;
using Microsoft.DurableTask.Client;
using Microsoft.Extensions.Logging;
using System.Text.Json;
using System.Text.Json.Serialization;

namespace FunctionApp3
{
    public static class Function1
    {

        [Function(nameof(Function1))]
        public static async Task RunOrchestrator(
            [OrchestrationTrigger] TaskOrchestrationContext context, WorkItemIntegrationEventBase @event)
        {
            var entity = @event;

            ILogger logger = context.CreateReplaySafeLogger(nameof(Function1));
            logger.LogInformation($"Saying hello. \nValues are: \n \t{entity.WorkItemId} \n \t{entity.Quantity}");

            if (@event is WorkItemCreatedIntegrationEvent cloudEvent)
            {
                logger.LogInformation("it is a WorkItemCreatedIntegrationEvent");
            }
            else
            {
                logger.LogInformation("it is a WorkItemProgressedIntegrationEvent");
            }

        [Function("EventGrid")]
        public static async Task Run([EventGridTrigger] EventGridEvent cloudEvent,
            [DurableClient] DurableTaskClient client,
            FunctionContext executionContext)
        {
            ILogger logger = executionContext.GetLogger("Function1_HttpStart");

            var @event = JsonSerializer.Deserialize<WorkItemProgressedIntegrationEvent>(cloudEvent.Data.ToString());

            
            string instanceId = await client.ScheduleNewOrchestrationInstanceAsync(
                nameof(Function1),input:@event);

            logger.LogInformation("Started orchestration with ID = '{instanceId}'.", instanceId);
        }
    }

    [JsonDerivedType(typeof(IntegrationEventBase), typeDiscriminator: "IntegrationEventBase")]
    [JsonDerivedType(typeof(WorkItemIntegrationEventBase), typeDiscriminator: "WorkItemIntegrationEventBase")]
    public class IntegrationEventBase
    {
        public string? Traceparent { get; set; }
    }

    [JsonDerivedType(typeof(WorkItemCreatedIntegrationEvent), typeDiscriminator: "WorkItemCreatedIntegrationEvent")]
    [JsonDerivedType(typeof(WorkItemProgressedIntegrationEvent), typeDiscriminator: "WorkItemProgressedIntegrationEvent")]
    public class WorkItemIntegrationEventBase : IntegrationEventBase
    {
        public string? WorkItemId { get; set; }

        public string? Quantity { get; set; }
    }

    public class WorkItemCreatedIntegrationEvent : WorkItemIntegrationEventBase
    {
    }

    public class WorkItemProgressedIntegrationEvent : WorkItemIntegrationEventBase
    {
        public WorkItemStatus Status { get; set; }
    }

    public enum WorkItemStatus
    {
        InProgress,
        Completed
    }
}

Program.cs
:

using Microsoft.Azure.Functions.Worker;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;

var host = new HostBuilder()
    .ConfigureFunctionsWebApplication()
    .ConfigureServices(services =>
    {
        services.AddApplicationInsightsTelemetryWorkerService();
        services.ConfigureFunctionsApplicationInsights();
    })
    .ConfigureLogging(logging =>
    {
        logging.Services.Configure<LoggerFilterOptions>(options =>
        {
            LoggerFilterRule defaultRule = options.Rules.FirstOrDefault(rule => rule.ProviderName
                == "Microsoft.Extensions.Logging.ApplicationInsights.ApplicationInsightsLoggerProvider");
            if (defaultRule is not null)
            {
                options.Rules.Remove(defaultRule);
            }
        });
    })
    .Build();

host.Run();

INPUT

EventGrid 架构示例

[
    {
        "id":"test-id",
        "data":{"WorkItemId": "value1", "Quantity": "value2"},
        "subject":"test-subject",
        "eventType":"test-event-1",
        "eventTime":"2024-11-3",
        "dataVersion":"1.0"
    }
]

OUTPUT

© www.soinside.com 2019 - 2024. All rights reserved.