我正在创建一个由 EventHub 触发的 C# 隔离工作进程 Azure 函数,该函数会将带有标头的
EventData
列表输出到另一个 EventHub 中。
该函数在使用
string[]
绑定时运行良好,但是当我使用 EventData
中的 Azure.Messaging.EventHubs
绑定时,该函数会在控制台中抛出以下消息:
System.Private.CoreLib: Exception while executing function: Functions.MyFunction. Microsoft.Azure.WebJobs.Host: Exception binding parameter 'eventDatas'. Azure.Core.Amqp: Serialization failed due to an unsupported type, System.Byte[].
Executed 'Functions.MyFunction' (Failed, Id=4a2fcfa1-0042-4cb8-92d6-75289685b4dd, Duration=15ms)
System.Private.CoreLib: Exception while executing function: Functions.MyFunction. Microsoft.Azure.WebJobs.Host: Exception binding parameter 'eventDatas'. Azure.Core.Amqp: Serialization failed due to an unsupported type, System.Byte[].
<TargetFramework>net7.0</TargetFramework>
<AzureFunctionsVersion>v4</AzureFunctionsVersion>
...
<PackageReference Include="Azure.Messaging.EventHubs" Version="5.9.2" />
<PackageReference Include="Microsoft.Azure.Functions.Worker" Version="1.19.0" />
<PackageReference Include="Microsoft.Azure.Functions.Worker.Extensions.EventHubs" Version="5.5.0" />
<PackageReference Include="Microsoft.Azure.Functions.Worker.Sdk" Version="1.14.0" />
<PackageReference Include="Newtonsoft.Json" Version="13.0.3" />
using Azure.Messaging.EventHubs;
using Microsoft.Azure.Functions.Worker;
using Microsoft.Extensions.Logging;
using my.Models;
using Newtonsoft.Json;
namespace my.function
{
public class MyFunction
{
[EventHubOutput("EventHubOutput", Connection = "EventHubOutputConnectionString")]
[FixedDelayRetry(5, "00:00:10")]
[Function("MyFunction")]
public EventData[] Run([EventHubTrigger("EventHubInput", Connection = "EventHubInputConnectionString", IsBatched = true)] EventData[] eventDatas)
{
List<EventData> eventDataOutputs = new List<EventData> { };
// Receive events
foreach (EventData eventData in eventDatas)
{
// Serialize the event
string eventJsonBody = eventData.EventBody.ToString();
MyObject? myObject = JsonConvert.DeserializeObject<MyObject>(eventJsonBody);
// Append nested elements
if (myObject != null)
{
eventDataOutputs.AddRange(myObject.nestedElements.Select(nestedElement => new EventData(JsonConvert.SerializeObject(nestedElement))));
}
}
return eventDataOutputs.ToArray();
}
}
}
namespace my.Models
{
using Newtonsoft.Json;
public class MyObject
{
[JsonProperty("randomField")]
public string RandomField { get; set; }
[JsonProperty("nestedFields")]
public NestedField[] NestedFields { get; set; }
}
}
namespace my.Models
{
using Newtonsoft.Json;
public class NestedField
{
[JsonProperty("randomFieldNested")]
public string RandomField { get; set; }
[JsonProperty("time")]
public DateTimeOffset Time { get; set; }
[JsonProperty("longField")]
public long LongField { get; set; }
}
}
我尝试了较新的 EventHubs,并且收到了事件。
我对你的代码做了一些修改,它对我有用
Code:
[Function(nameof(Function1))]
[FixedDelayRetry(5, "00:00:10")]
[EventHubOutput("eventHubOutput", Connection = "EventHubConnectionAppSetting")]
public EventData[] Run([EventHubTrigger("eventhubinput", Connection = "EventHubConnectionAppSetting", IsBatched = true)] EventData[] events)
{
_logger.LogInformation($"Received {events.Length} events");
List<EventData> eventDataOutputs = new List<EventData>();
foreach (EventData eventData in events)
{
string eventJsonBody = Encoding.UTF8.GetString(eventData.EventBody);
MyObject[]? myObjects = JsonConvert.DeserializeObject<MyObject[]>(eventJsonBody);
if (myObjects != null)
{
foreach (MyObject myObject in myObjects)
{
if (myObject.NestedFields != null)
{
foreach (NestedField nestedField in myObject.NestedFields)
{
// Extract field values from the NestedField object
string randomFieldNested = nestedField.RandomField;
DateTimeOffset time = nestedField.Time;
long longField = nestedField.LongField;
// Create a new EventData instance with the extracted field values
string eventDataJson = JsonConvert.SerializeObject(new
{
RandomFieldNested = randomFieldNested,
Time = time,
LongField = longField
});
EventData newEventData = new EventData(Encoding.UTF8.GetBytes(eventDataJson));
// Add the new EventData to the output list
eventDataOutputs.Add(newEventData);
}
}
}
}
}
_logger.LogInformation("Data Processed.....");
return eventDataOutputs.ToArray();
}
Output: