EventHubTrigger EventData[] 绑定不起作用

问题描述 投票:0回答:1

我正在创建一个由 EventHub 触发的 C# 隔离工作进程 Azure 函数,该函数会将带有标头的

EventData
列表输出到另一个 EventHub 中。

该函数在使用

string[]
绑定时运行良好,但是当我使用
EventData
中的
Azure.Messaging.EventHubs
绑定时,该函数会在控制台中抛出以下消息:

System.Private.CoreLib: Exception while executing function: Functions.MyFunction. Microsoft.Azure.WebJobs.Host: Exception binding parameter 'eventDatas'. Azure.Core.Amqp: Serialization failed due to an unsupported type, System.Byte[].
Executed 'Functions.MyFunction' (Failed, Id=4a2fcfa1-0042-4cb8-92d6-75289685b4dd, Duration=15ms)
System.Private.CoreLib: Exception while executing function: Functions.MyFunction. Microsoft.Azure.WebJobs.Host: Exception binding parameter 'eventDatas'. Azure.Core.Amqp: Serialization failed due to an unsupported type, System.Byte[].

项目文件

    <TargetFramework>net7.0</TargetFramework>
    <AzureFunctionsVersion>v4</AzureFunctionsVersion>

    ...

    <PackageReference Include="Azure.Messaging.EventHubs" Version="5.9.2" />
    <PackageReference Include="Microsoft.Azure.Functions.Worker" Version="1.19.0" />
    <PackageReference Include="Microsoft.Azure.Functions.Worker.Extensions.EventHubs" Version="5.5.0" />
    <PackageReference Include="Microsoft.Azure.Functions.Worker.Sdk" Version="1.14.0" />
    <PackageReference Include="Newtonsoft.Json" Version="13.0.3" />

功能代码

using Azure.Messaging.EventHubs;
using Microsoft.Azure.Functions.Worker;
using Microsoft.Extensions.Logging;
using my.Models;
using Newtonsoft.Json;

namespace my.function
{
    public class MyFunction
    {
        [EventHubOutput("EventHubOutput", Connection = "EventHubOutputConnectionString")]
        [FixedDelayRetry(5, "00:00:10")]
        [Function("MyFunction")]
        public EventData[] Run([EventHubTrigger("EventHubInput", Connection = "EventHubInputConnectionString", IsBatched = true)] EventData[] eventDatas)
        {
            List<EventData> eventDataOutputs = new List<EventData> { };

            // Receive events
            foreach (EventData eventData in eventDatas)
            {
                // Serialize the event
                string eventJsonBody = eventData.EventBody.ToString();
                MyObject? myObject = JsonConvert.DeserializeObject<MyObject>(eventJsonBody);

                // Append nested elements
                if (myObject != null)
                {
                    eventDataOutputs.AddRange(myObject.nestedElements.Select(nestedElement => new EventData(JsonConvert.SerializeObject(nestedElement))));
                }
            }
            return eventDataOutputs.ToArray();
        }
    }
}

我的对象类

namespace my.Models
{
    using Newtonsoft.Json;

    public class MyObject
    {
        [JsonProperty("randomField")]
        public string RandomField { get; set; }

        [JsonProperty("nestedFields")]
        public NestedField[] NestedFields { get; set; }
    }
}

NestedField 类

namespace my.Models
{
    using Newtonsoft.Json;

    public class NestedField
    {
        [JsonProperty("randomFieldNested")]
        public string RandomField { get; set; }

        [JsonProperty("time")]
        public DateTimeOffset Time { get; set; }

        [JsonProperty("longField")]
        public long LongField { get; set; }
    }
}

更新

我尝试了较新的 EventHubs,并且收到了事件。

c# azure azure-functions azure-eventhub
1个回答
1
投票

我对你的代码做了一些修改,它对我有用

Code:

[Function(nameof(Function1))]
        [FixedDelayRetry(5, "00:00:10")]
        [EventHubOutput("eventHubOutput", Connection = "EventHubConnectionAppSetting")]
        public EventData[] Run([EventHubTrigger("eventhubinput", Connection = "EventHubConnectionAppSetting", IsBatched = true)] EventData[] events)
        {
            _logger.LogInformation($"Received {events.Length} events");
            List<EventData> eventDataOutputs = new List<EventData>();
            foreach (EventData eventData in events)
            {
                string eventJsonBody = Encoding.UTF8.GetString(eventData.EventBody);

                MyObject[]? myObjects = JsonConvert.DeserializeObject<MyObject[]>(eventJsonBody);

                if (myObjects != null)
                {
                    foreach (MyObject myObject in myObjects)
                    {
                        if (myObject.NestedFields != null)
                        {
                            foreach (NestedField nestedField in myObject.NestedFields)
                            {
                                // Extract field values from the NestedField object
                                string randomFieldNested = nestedField.RandomField;
                                DateTimeOffset time = nestedField.Time;
                                long longField = nestedField.LongField;

                                // Create a new EventData instance with the extracted field values
                                string eventDataJson = JsonConvert.SerializeObject(new
                                {
                                    RandomFieldNested = randomFieldNested,
                                    Time = time,
                                    LongField = longField
                                });
                                EventData newEventData = new EventData(Encoding.UTF8.GetBytes(eventDataJson));

                                // Add the new EventData to the output list
                                eventDataOutputs.Add(newEventData);
                            }
                        }
                    }
                }
            }
            _logger.LogInformation("Data Processed.....");
            return eventDataOutputs.ToArray();

        }

Output:

enter image description here

© www.soinside.com 2019 - 2024. All rights reserved.