我是新手 Masstransit,打算将 Durable Future 和 Routing slip 模式与 Azure Service Bus 一起应用。 具有如下定义的控制器的网络应用程序:
app.MapPost("/orders/submit", async (SubmitOrderDto submitOrderDto, IRequestClient<SubmitOrder> client, ILogger<Program> logger, CancellationToken cancellationToken) =>
{
try
{
Response response = await client.GetResponse<OrderCompleted, OrderFaulted>(new
{
submitOrderDto.OrderId
}, cancellationToken);
return response switch
{
(_, OrderCompleted completed) => Results.Ok(new
{
completed.OrderId,
completed.Status,
completed.Created,
completed.Completed,
}),
(_, OrderFaulted faulted) => Results.BadRequest(new
{
faulted.OrderId,
faulted.Description,
faulted.Created,
faulted.Faulted
}),
_ => Results.BadRequest()
};
}
catch (Exception ex)
{
return Results.Accepted(value:new
{
submitOrderDto.OrderId,
ex.Message
});
}
})
.Produces<OrderCompleted>()
.Produces<OrderFaulted>(StatusCodes.Status400BadRequest)
.Produces(StatusCodes.Status202Accepted)
.ProducesValidationProblem(StatusCodes.Status409Conflict)
.WithName("SubmitOrder").WithTags("OrderServiceAPI");
有 Future、Planner 和 activities 类。 我的预期如果有 s 请求被提交,然后它将被活动类验证,然后将错误返回给客户端。如果它通过了验证,它可以继续这个过程。
但是我得到如下的公共交通异常:
等待响应超时,RequestId: 40040000-4446-107b-e828-08db2e658cb0 在 MassTransit.Clients.RequestClient
1 回调)在 /_/src/MassTransit/Clients/RequestClient.cs:line 2121.GetResponseInternal[T1,T2](SendRequestCallback request, CancellationToken cancellationToken, RequestTimeout timeout, RequestPipeConfiguratorCallback
如果我切换到使用 RabbitMq,请求将按照我的预期进行验证。但是当与 Azure Service Bus 一起使用时,它总是有超时异常,如上所述 在程序中,我按如下方式注入了 Masstransit
var builder = WebApplication.CreateBuilder(args);
builder.Services.TryAddScoped<IItineraryPlanner<SubmitOrder>, OrderItineraryPlannerAzure>();
builder.Services.AddMassTransit(x =>
{
// Azure
x.SetKebabCaseEndpointNameFormatter();
x.AddServiceBusMessageScheduler();
x.AddRequestClient<SubmitOrder>();
//-------------
x.ApplyCustomMassTransitConfiguration();
x.AddDelayedMessageScheduler();
x.AddActivitiesFromNamespaceContaining<OrderActivity>();
x.AddFuturesFromNamespaceContaining<OrderFuture>();
x.AddSagaRepository<FutureState>()
.InMemoryRepository();
//x.UsingRabbitMq((context, cfg) =>
//{
// cfg.AutoStart = true;
// cfg.ApplyCustomBusConfiguration();
// cfg.UseDelayedMessageScheduler();
// cfg.ConfigureEndpoints(context);
//});
x.UsingAzureServiceBus((context, cfg) =>
{
using var scope = context.CreateScope();
cfg.Host("Endpoint=sb://{connectionString}");
cfg.UseTimeout(timeoutConfigurator =>
{
timeoutConfigurator.Timeout = TimeSpan.FromMinutes(10);
});
cfg.PrefetchCount = 100;
cfg.LockDuration = TimeSpan.FromMinutes(5);
cfg.MaxConcurrentCalls = 100;
cfg.MaxDeliveryCount = 5;
cfg.DefaultMessageTimeToLive = TimeSpan.FromDays(7);
cfg.AutoStart = true;
cfg.RequiresSession = true;
cfg.UseServiceBusMessageScheduler();
// deploy topology configuration when service starting
cfg.DeployTopologyOnly = true;
cfg.UseDelayedMessageScheduler();
cfg.ConfigureEndpoints(context);
});
我在想我需要为请求客户端创建消费者,但我不知道如何在 Futures 模式中定义它。还是我错过了任何一步?