我有一个 Azure Function,它使用 Orchestrator 从外部源同步产品。 直到今天,这个函数都是由 TimerTrigger 或 HttpTrigger 调用并传递 InventoryId,这限制了该函数同步特定产品。
现在我需要为其添加队列支持,但我无法将 DurableTaskClient 注入 IConsumer 内。
到目前为止我所做的一切都是有效的,但它根本不使用 MassTransit Consumer。对于生产者的制作方式,我们到处使用 MassTransit,我们不想改变行为并发送没有整个主体的简单消息。
可以以某种方式注入它,或者在最坏的情况下,根据我在这里所做的事情,以更干净的方式反序列化消息
[Function(nameof(MessageReceiverHandler))]
public async Task Run(
[ServiceBusTrigger("%QueueName%", Connection = "ServiceBusConnection")] ServiceBusReceivedMessage message, [DurableClient] DurableTaskClient client, CancellationToken cancellationToken)
{
var str = message.Body.ToString();
var definition = new { message = new { inventoryId =3} };
var res = JsonConvert.DeserializeAnonymousType(str, definition);
await receiver.HandleConsumer<ProductSyncConsumer>(options.Value.QueueName, message, cancellationToken);
string instanceId = await client.ScheduleNewOrchestrationInstanceAsync(nameof(DurablePartnerSyncActions), res.message.inventoryId, cancellationToken);
logger.LogInformation("Started Individual orchestration with ID = '{instanceId}'.", instanceId);
}
public class ProductSyncConsumer : IConsumer<ExternalProductSyncSingle>
{
private readonly ILogger<ProductSyncConsumer> _logger;
public ProductSyncConsumer(ILogger<ProductSyncConsumer> logger)
{
_logger = logger;
}
public Task Consume(ConsumeContext<ExternalProductSyncSingle> context)
{
return Task.FromResult(context.Message.InventoryId);
// DurableTaskClient client = default(DurableTaskClient);
// string instanceId = await client.ScheduleNewOrchestrationInstanceAsync(nameof(DurablePartnerSyncActions), context.Message.InventoryId);
_logger.LogInformation("Started Individual orchestration with ID = '{instanceId}'.", 5);
// return await client.CreateCheckStatusResponseAsync(req, instanceId);
}
}
要使用服务总线队列中收到的消息触发编排,您需要使用服务总线触发器,正如您正在使用的那样。
Mass Transit 消费者用于接收消息并处理它。 作为参考,请检查此文档。
您只需要在服务总线触发器中使用公共交通消费者正在使用的队列名称。 据我所知
DurableTaskClient
不能直接用于公共交通消费者。
这对我有用。
Function.cs
:
using Microsoft.Azure.Functions.Worker;
using Microsoft.Azure.Functions.Worker.Http;
using Microsoft.DurableTask;
using Microsoft.DurableTask.Client;
using Microsoft.Extensions.Logging;
namespace FunctionApp16
{
public static class Function
{
[Function(nameof(Function))]
public static async Task<List<string>> RunOrchestrator(
[OrchestrationTrigger] TaskOrchestrationContext context)
{
ILogger logger = context.CreateReplaySafeLogger(nameof(Function));
logger.LogInformation("Saying hello.");
var outputs = new List<string>();
outputs.Add(await context.CallActivityAsync<string>(nameof(SayHello), "Tokyo"));
outputs.Add(await context.CallActivityAsync<string>(nameof(SayHello), "Seattle"));
outputs.Add(await context.CallActivityAsync<string>(nameof(SayHello), "London"));
return outputs;
}
[Function(nameof(SayHello))]
public static string SayHello([ActivityTrigger] string name, FunctionContext executionContext)
{
ILogger logger = executionContext.GetLogger("SayHello");
logger.LogInformation("Saying hello to {name}.", name);
return $"Hello {name}!";
}
[Function("servicebus")]
public static async Task Run(
[ServiceBusTrigger(queueName:"hello",Connection ="sb_conn")]string message,
[DurableClient] DurableTaskClient client,
FunctionContext executionContext)
{
ILogger logger = executionContext.GetLogger("servicebus");
var msg = message.Body.ToString();
string instanceId = await client.ScheduleNewOrchestrationInstanceAsync(nameof(Function));
logger.LogInformation($"Message content: {msg}");
logger.LogInformation("Started orchestration with ID = '{instanceId}'.", instanceId);
}
}
}
OUTPUT
:您可以使用消息值并根据您的要求进行处理。
Message content: {
[2024-03-25T09:57:57.128Z] "messageId": "043e0000-7a6f-b022-cf98-08dc4cb20be6",
[2024-03-25T09:57:57.139Z] "requestId": null,
[2024-03-25T09:57:57.142Z] "correlationId": null,
[2024-03-25T09:57:57.145Z] "conversationId": "043e0000-7a6f-b022-d097-08dc4cb20be6",
[2024-03-25T09:57:57.147Z] "initiatorId": null,
[2024-03-25T09:57:57.151Z] "sourceAddress": "sb://durablefuncsb.servicebus.windows.net/xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx?autodelete=300",
[2024-03-25T09:57:57.154Z] "destinationAddress": "sb://durablefuncsb.servicebus.windows.net/Contracts/Hello?type=topic",
[2024-03-25T09:57:57.157Z] "responseAddress": null,
[2024-03-25T09:57:57.159Z] "faultAddress": null,
[2024-03-25T09:57:57.163Z] "messageType": [
[2024-03-25T09:57:57.167Z] "urn:message:Contracts:Hello"
[2024-03-25T09:57:57.170Z] ],
[2024-03-25T09:57:57.172Z] "message": {
[2024-03-25T09:57:57.175Z] "name": "Vivek"
[2024-03-25T09:57:57.182Z] },
[2024-03-25T09:57:57.185Z] "expirationTime": null,
[2024-03-25T09:57:57.192Z] "sentTime": "2024-03-25T09:57:56.7333272Z",
[2024-03-25T09:57:57.195Z] "headers": {},
[2024-03-25T09:57:57.201Z] "host": {
[2024-03-25T09:57:57.203Z] "machineName": "xxxxxxxxxxx",
[2024-03-25T09:57:57.207Z] "processName": "MT_servicebus",
[2024-03-25T09:57:57.210Z] "processId": 15876,
[2024-03-25T09:57:57.215Z] "assembly": "MT_servicebus",
[2024-03-25T09:57:57.218Z] "assemblyVersion": "1.0.0.0",
[2024-03-25T09:57:57.221Z] "frameworkVersion": "6.0.28",
[2024-03-25T09:57:57.225Z] "massTransitVersion": "8.1.3.0",
[2024-03-25T09:57:57.232Z] "operatingSystemVersion": "Microsoft Windows NT 10.0.22631.0"
[2024-03-25T09:57:57.235Z] }
[2024-03-25T09:57:57.237Z] }