MassTransit是一个免费的,开源的,轻量级的Microsoft .NET框架服务总线。
我已在 Azure 上成功使用 MassTransit,它为每个消费服务/应用程序生成一个“队列”,其中包含许多主题和订阅。 现在我已经将一些代码移植到 AWS,它似乎可以工作......
我有两个服务,一个充当消费者,另一个充当生产者。以下是我对它们每个的配置。 生产者配置 服务 .AddSingleton(KebabCaseEndpointNameFormatter.Ins...
有没有办法使用 MassTransit API 在 SessionIdleTimeout 之前关闭 Azure 服务总线会话?
我们已使用以下配置成功激活会话: cfg.ReceiveEndpoint(端点.Item1, e => { e.RequiresSession = true; e.SessionIdleTimeout = TimeSpan.FromMinute...
我在 .Net 应用程序上使用公共交通时遇到以下问题。 我有一个类实现 IConsumer<> 接口并使用来自 RabbitMQ 队列的消息。该应用程序作为 pod 运行...
我已经设置了事件ExampleEvent的批量使用者,并将其设置为ConcurrencyLimit = 16、MessageLimit = 100、TimeLimit = 5秒并按ExampleEvent.GroupingKey字段分组。 这很好用...
具有粘性会话的 MassTransit 和 RabbitMQ
是否可以使用 RabbitMQ 作为 MassTransit 下的传输来使用粘性会话进行请求/响应? 我使用 Azure ServiceBus 作为传输进行以下工作,但我们想重新...
在 MassTransit 中实现持久存储和事务发件箱的示例或文档?
我正在使用 RavenDb,希望从持久存储和事务发件箱中受益。我试图弄清楚这是如何实现的,但我找不到任何文档并正在寻找
假设我有一个消费者: 公共类 TestConsumer : IConsumer { ... 配置为使用路由密钥: cfg.ReceiveEndpoint("test_queue", e=> {...
无法从 Masstransit Saga 事件更新实体框架核心关系
我无法更新我的传奇实例事件中的实体关系。 我可以为条目创建一对一行,但是当我再次回到 IssueLocation 状态时,它没有检测到...
由于 MassTransit,应用程序无法启动。他必须自己注册他的依赖项
我的消费者有这个构造函数: public QueueProcessingService(ILogger记录器,IQueueRepository存储库,ISendEndpoint端点) { _logger=记录器; _回购...
不久前,我开始使用Mastransit,并面临着拦截传入消息的反序列化错误的需要。不幸的是,经过几天的尝试,我...
MassTransit Kafka 和发件箱:没有此类配置属性:“schema.registry.url”异常
我已经使用 Kafka 和发件箱模式设置了 MassTransit var confluenceConfiguration = builder.Configuration.GetSection("Kafka").ToConfluenceConfiguration(); builder.Services.AddSingleton<
如何通过 Dapper 将 MassTransit 连接到 Postgres 来存储 Sagas?
我在任何地方都找不到工作示例,但在 MassTransit 的码头上有一个 MSSQL 示例(它具有 SqlClient 依赖项,并且不读取 Postgre 的连接字符串)。或者是
Azure Functions 使用 MassTransit 发布到 RabbitMQ
我当前正在运行一个 Azure Function,该函数使用 Masstransit 发布到 Azure 服务总线。为了便于调试,我希望能够在本地运行该函数并发布到本地...
我正在使用 Masstransit 版本 7.3.1 和rabbitmq 来创建消息传递机制(我在 API 收到的每个请求中发送一条消息),并且我安装了 Greenpipes 库以便稍后使用它...
如何使用 MassTransit 设置 _error 队列中消息的生存时间?
是否可以为 _error 队列中的消息设置生存时间?或者甚至跳过这种将消息发送到错误队列的机制,因为错误已经被消耗了? 我正在使用消费者...
默认情况下,MT 在 RabbitMQ 中创建经典队列类型,我们正在考虑更改为仲裁队列(因为建议使用这些队列类型,而不是 RabbitMQ 文档中的经典镜像队列...
遇到MassTransit异常:System.Threading.Channels.ChannelClosedException:通道已关闭
我们有一个面向 .NET 3.1 的 ASP.NET Core 应用程序。 我们使用 MassTransit 发送和使用消息,并使用 ActiveMQ 作为消息代理。 我们正在使用这些 MassTransit 套餐 我们有一个面向 .NET 3.1 的 ASP.NET Core 应用程序。 我们使用 MassTransit 发送和使用消息,并使用 ActiveMQ 作为消息代理。 我们正在使用这些 MassTransit 套餐 <PackageReference Include="MassTransit" Version="7.2.2" /> <PackageReference Include="MassTransit.ActiveMQ" Version="7.2.2" /> <PackageReference Include="MassTransit.AspNetCore" Version="7.2.2" /> <PackageReference Include="MassTransit.Extensions.DependencyInjection" Version="7.2.2" /> 我们有很多消费者,这样声明: services.AddMassTransit(busConfigurator => { busConfigurator.AddConsumer<CreateLeadConsumer>(); busConfigurator.AddConsumer<CreateOrderConsumer>(); busConfigurator.AddConsumer<InvoiceCreatedConsumer>(); busConfigurator.AddConsumer<PhoenixContractCreatedConsumer>(); busConfigurator.AddConsumer<PhoenixDocumentCreatedConsumer>(); ConfigureBus(busConfigurator, activeMqOptions); }); 我们定义了这些ActiveMqOptions var activeMqOptions = Configuration.GetSection(SodexoActiveMQOptions.SectionName).Get<SodexoActiveMQOptions>(); EndpointConvention.Map<InvoiceCreatedMessage>(new Uri(QueueFormatter.FormatToAllowConcurrency(activeMqOptions.LocalQueues.InvoiceCreatedQueue))); EndpointConvention.Map<CompanyOrderDto>(new Uri(QueueFormatter.FormatToAllowConcurrency(activeMqOptions.GlobalQueues.CreateOrUpdateClientQueue))); EndpointConvention.Map<BeneficiaryWithProductDto>(new Uri(QueueFormatter.FormatToTopic(activeMqOptions.GlobalQueues.CreateOrUpdateBeneficiaryQueue))); EndpointConvention.Map<CreatePhoenixOrderMessage>(new Uri(QueueFormatter.FormatToAllowConcurrency(activeMqOptions.GlobalQueues.CreateOrderQueue))); EndpointConvention.Map<PhoenixOrderCreationMessage>(new Uri(QueueFormatter.FormatToAllowConcurrency(activeMqOptions.GlobalQueues.PhoenixCreateOrderQueue))); EndpointConvention.Map<CancelOrderMessage>(new Uri(QueueFormatter.FormatToAllowConcurrency(activeMqOptions.GlobalQueues.CancelOrderQueue))); EndpointConvention.Map<CancelOrderMessage>(new Uri(QueueFormatter.FormatToAllowConcurrency(activeMqOptions.GlobalQueues.CancelOrderQueue))); EndpointConvention.Map<DocumentCreatedMessage>(new Uri(QueueFormatter.FormatToAllowConcurrency(activeMqOptions.LocalQueues.DocumentCreatedQueue))); 我们的消费者大部分时间都工作正常,有效负载消耗良好。 但有时会抛出异常: [MassTransit.ReceiveTransport] R-FAULT "activemq://localhost:61616/{name of the queue}" System.Threading.Channels.ChannelClosedException: The channel has been closed. at MassTransit.Util.ChannelExecutor.Run[T](Func`1 method, CancellationToken cancellationToken) at MassTransit.ActiveMqTransport.Transport.ActiveMqSendTransport.SendPipe`1.Send(SessionContext sessionContext) at GreenPipes.Agents.PipeContextSupervisor`1.GreenPipes.IPipeContextSource<TContext>.Send(IPipe`1 pipe, CancellationToken cancellationToken) at GreenPipes.Agents.PipeContextSupervisor`1.GreenPipes.IPipeContextSource<TContext>.Send(IPipe`1 pipe, CancellationToken cancellationToken) at GreenPipes.Agents.PipeContextSupervisor`1.GreenPipes.IPipeContextSource<TContext>.Send(IPipe`1 pipe, CancellationToken cancellationToken) at GreenPipes.Internals.Extensions.TaskExtensions.<>c__DisplayClass4_0.<<OrCanceled>g__WaitAsync|0>d.MoveNext() --- End of stack trace from previous location where exception was thrown --- at MassTransit.Util.PendingTaskCollection.Completed(CancellationToken cancellationToken) at MassTransit.Pipeline.Filters.DeserializeFilter.Send(ReceiveContext context, IPipe`1 next) at GreenPipes.Filters.RescueFilter`2.GreenPipes.IFilter<TContext>.Send(TContext context, IPipe`1 next) 由于我们收到此异常,消息将进入错误队列。 为了避免此异常,我在应用程序上添加了重试行为。当遇到异常时,消息将在队列中重新发送。在其他例外情况下它工作正常,但在这个例外情况下则不然。 一旦抛出 System.Threading.Channels.ChannelClosedException,消息就会重试并被消耗。但在消息被消费后,消息仍然会进入错误队列。 这是更清晰的日志视图,您将看到有一个模式: As you will see, the first time, the application try to publish the message to a queue, but a ChannelClosedException is thrown. The second time, the message is correctly published, because we can see final logs, and also because I checked with the targeted consumers. [22:39:13.647 INF] [] [ECommerce.Messaging.Consumers.CreateOrderConsumer] Receiving order message --> First time we get the message [22:39:13.924 INF] [] [ECommerce.Messaging.Consumers.CreateOrderConsumer] Sending phoenix reorder creation message for enterprise number 754887949 [22:39:13.927 WRN] [] [MassTransit.ReceiveTransport] R-RETRY "activemq://localhost:61616/create-order" "8ee80000-56b0-0050-5f50-08da74c70f82" MassTransit.Context.RetryConsumeContext<MassTransitMessaging.CreateOrderMessage> System.Threading.Channels.ChannelClosedException: The channel has been closed. at MassTransit.Util.ChannelExecutor.Run[T](Func`1 method, CancellationToken cancellationToken) at MassTransit.ActiveMqTransport.Transport.ActiveMqSendTransport.SendPipe`1.Send(SessionContext sessionContext) at GreenPipes.Agents.PipeContextSupervisor`1.GreenPipes.IPipeContextSource<TContext>.Send(IPipe`1 pipe, CancellationToken cancellationToken) at GreenPipes.Agents.PipeContextSupervisor`1.GreenPipes.IPipeContextSource<TContext>.Send(IPipe`1 pipe, CancellationToken cancellationToken) at GreenPipes.Agents.PipeContextSupervisor`1.GreenPipes.IPipeContextSource<TContext>.Send(IPipe`1 pipe, CancellationToken cancellationToken) at MassTransit.EndpointConventionExtensions.Send[T](ISendEndpointProvider provider, T message, CancellationToken cancellationToken) at ECommerce.Messaging.Consumers.MessageNotifier.NotifyAsync[T](T message, CancellationToken cancellationToken) in /home/vsts/work/1/s/ECommerce.Messaging/Consumers/MessageNotifier.cs:line 18 at ECommerce.Messaging.Consumers.CreateOrderConsumer.Consume(ConsumeContext`1 context) in /home/vsts/work/1/s/ECommerce.Messaging/Consumers/CreateOrderConsumer.cs:line 96 at MassTransit.Scoping.ScopeConsumerFactory`1.Send[TMessage](ConsumeContext`1 context, IPipe`1 next) at MassTransit.Scoping.ScopeConsumerFactory`1.Send[TMessage](ConsumeContext`1 context, IPipe`1 next) at MassTransit.Pipeline.Filters.ConsumerMessageFilter`2.GreenPipes.IFilter<MassTransit.ConsumeContext<TMessage>>.Send(ConsumeContext`1 context, IPipe`1 next) at MassTransit.Pipeline.Filters.ConsumerMessageFilter`2.GreenPipes.IFilter<MassTransit.ConsumeContext<TMessage>>.Send(ConsumeContext`1 context, IPipe`1 next) at GreenPipes.Filters.RetryFilter`1.GreenPipes.IFilter<TContext>.Send(TContext context, IPipe`1 next) [22:39:14.031 INF] [] [ECommerce.Messaging.Consumers.CreateOrderConsumer] Receiving order message --> Second time we get the message [22:39:14.255 INF] [] [ECommerce.Messaging.Consumers.CreateOrderConsumer] Sending phoenix reorder creation message Phoenix reorder creation message is sent for enterprise number 754887949 [22:39:14.281 INF] [] [ECommerce.Messaging.Consumers.CreateOrderConsumer] Phoenix reorder creation message is sent for enterprise number 754887949 [22:39:14.281 INF] [] [ECommerce.Messaging.Consumers.CreateOrderConsumer] Reorder was created for company 754887949 --> Success, it means that the message has been correctly sent [22:39:14.291 ERR] [] [MassTransit.ReceiveTransport] R-FAULT "activemq://localhost:61616/create-order" ID:CEBESVC-BA5AP08-54759-637950758237778584-1:0:1:1:1 "00:00:00.6544301" System.Threading.Channels.ChannelClosedException: The channel has been closed. at MassTransit.Util.ChannelExecutor.Run[T](Func`1 method, CancellationToken cancellationToken) at MassTransit.ActiveMqTransport.Transport.ActiveMqSendTransport.SendPipe`1.Send(SessionContext sessionContext) at GreenPipes.Agents.PipeContextSupervisor`1.GreenPipes.IPipeContextSource<TContext>.Send(IPipe`1 pipe, CancellationToken cancellationToken) at GreenPipes.Agents.PipeContextSupervisor`1.GreenPipes.IPipeContextSource<TContext>.Send(IPipe`1 pipe, CancellationToken cancellationToken) at GreenPipes.Agents.PipeContextSupervisor`1.GreenPipes.IPipeContextSource<TContext>.Send(IPipe`1 pipe, CancellationToken cancellationToken) at GreenPipes.Internals.Extensions.TaskExtensions.<>c_DisplayClass4_0.<<OrCanceled>g_WaitAsync|0>d.MoveNext() — End of stack trace from previous location where exception was thrown — at MassTransit.Util.PendingTaskCollection.Completed(CancellationToken cancellationToken) at MassTransit.Pipeline.Filters.DeserializeFilter.Send(ReceiveContext context, IPipe`1 next) at GreenPipes.Filters.RescueFilter`2.GreenPipes.IFilter<TContext>.Send(TContext context, IPipe`1 next) --> The message has been sent BUT we get this error ! 这很烦人,因为我们必须检查错误队列中的每条消息是否是真正的错误。 不幸的是,我无法在本地环境中重现此行为。 您遇到的异常已在 MassTransit 版本 8.1.0 中修复,其中 ActiveMQ 库已更新到官方版本。更新到此版本或更新版本应该有助于解决此问题。
从 .Net WebAPI 连接到 Docker Swarm 中的 RabbitMQ/Masstransit 服务时出现问题
我刚开始在 Docker Swarm 中使用 RabbitMQ/Masstransit。从所述堆栈中的其他服务(C# .Net WebAPI)连接到我的堆栈中的 RabbitMQ 服务时遇到问题。连接到 RabbitMQ...
Masstransit Saga 状态机无法使用 .Net Core 中的 Azure 服务总线从消费者发送消息
我们正在使用 Masstransit 实现 Saga 状态机模式。 典型的结构是我们有不同的微服务执行数据库操作,Saga 状态机用于管理