masstransit 相关问题

MassTransit是一个免费的,开源的,轻量级的Microsoft .NET框架服务总线。

AWS 上的 MassTransit 队列

我已在 Azure 上成功使用 MassTransit,它为每个消费服务/应用程序生成一个“队列”,其中包含许多主题和订阅。 现在我已经将一些代码移植到 AWS,它似乎可以工作......

回答 1 投票 0

公共交通,未创建 saga 资源

我有两个服务,一个充当消费者,另一个充当生产者。以下是我对它们每个的配置。 生产者配置 服务 .AddSingleton(KebabCaseEndpointNameFormatter.Ins...

回答 2 投票 0

有没有办法使用 MassTransit API 在 SessionIdleTimeout 之前关闭 Azure 服务总线会话?

我们已使用以下配置成功激活会话: cfg.ReceiveEndpoint(端点.Item1, e => { e.RequiresSession = true; e.SessionIdleTimeout = TimeSpan.FromMinute...

回答 1 投票 0

当 Pod 关闭或重新启动时,公共交通取消消费者作业

我在 .Net 应用程序上使用公共交通时遇到以下问题。 我有一个类实现 IConsumer<> 接口并使用来自 RabbitMQ 队列的消息。该应用程序作为 pod 运行...

回答 1 投票 0

RabbitMQ MassTransit 批量竞争消费者

我已经设置了事件ExampleEvent的批量使用者,并将其设置为ConcurrencyLimit = 16、MessageLimit = 100、TimeLimit = 5秒并按ExampleEvent.GroupingKey字段分组。 这很好用...

回答 2 投票 0

具有粘性会话的 MassTransit 和 RabbitMQ

是否可以使用 RabbitMQ 作为 MassTransit 下的传输来使用粘性会话进行请求/响应? 我使用 Azure ServiceBus 作为传输进行以下工作,但我们想重新...

回答 1 投票 0

在 MassTransit 中实现持久存储和事务发件箱的示例或文档?

我正在使用 RavenDb,希望从持久存储和事务发件箱中受益。我试图弄清楚这是如何实现的,但我找不到任何文档并正在寻找

回答 1 投票 0

如何配置消费者生成的错误队列以使用与正常队列相同的路由键?

假设我有一个消费者: 公共类 TestConsumer : IConsumer { ... 配置为使用路由密钥: cfg.ReceiveEndpoint("test_queue", e=> {...

回答 1 投票 0

无法从 Masstransit Saga 事件更新实体框架核心关系

我无法更新我的传奇实例事件中的实体关系。 我可以为条目创建一对一行,但是当我再次回到 IssueLocation 状态时,它没有检测到...

回答 1 投票 0

由于 MassTransit,应用程序无法启动。他必须自己注册他的依赖项

我的消费者有这个构造函数: public QueueProcessingService(ILogger记录器,IQueueRepository存储库,ISendEndpoint端点) { _logger=记录器; _回购...

回答 1 投票 0

公共交通。处理反序列化异常

不久前,我开始使用Mastransit,并面临着拦截传入消息的反序列化错误的需要。不幸的是,经过几天的尝试,我...

回答 1 投票 0

MassTransit Kafka 和发件箱:没有此类配置属性:“schema.registry.url”异常

我已经使用 Kafka 和发件箱模式设置了 MassTransit var confluenceConfiguration = builder.Configuration.GetSection("Kafka").ToConfluenceConfiguration(); builder.Services.AddSingleton<

回答 1 投票 0

如何通过 Dapper 将 MassTransit 连接到 Postgres 来存储 Sagas?

我在任何地方都找不到工作示例,但在 MassTransit 的码头上有一个 MSSQL 示例(它具有 SqlClient 依赖项,并且不读取 Postgre 的连接字符串)。或者是

回答 1 投票 0

Azure Functions 使用 MassTransit 发布到 RabbitMQ

我当前正在运行一个 Azure Function,该函数使用 Masstransit 发布到 Azure 服务总线。为了便于调试,我希望能够在本地运行该函数并发布到本地...

回答 1 投票 0

Greenpipes 正在吃 CPU,而我没有使用它

我正在使用 Masstransit 版本 7.3.1 和rabbitmq 来创建消息传递机制(我在 API 收到的每个请求中发送一条消息),并且我安装了 Greenpipes 库以便稍后使用它...

回答 1 投票 0

如何使用 MassTransit 设置 _error 队列中消息的生存时间?

是否可以为 _error 队列中的消息设置生存时间?或者甚至跳过这种将消息发送到错误队列的机制,因为错误已经被消耗了? 我正在使用消费者...

回答 3 投票 0

具有法定队列的 MassTransit - 有任何问题吗?

默认情况下,MT 在 RabbitMQ 中创建经典队列类型,我们正在考虑更改为仲裁队列(因为建议使用这些队列类型,而不是 RabbitMQ 文档中的经典镜像队列...

回答 1 投票 0

遇到MassTransit异常:System.Threading.Channels.ChannelClosedException:通道已关闭

我们有一个面向 .NET 3.1 的 ASP.NET Core 应用程序。 我们使用 MassTransit 发送和使用消息,并使用 ActiveMQ 作为消息代理。 我们正在使用这些 MassTransit 套餐 我们有一个面向 .NET 3.1 的 ASP.NET Core 应用程序。 我们使用 MassTransit 发送和使用消息,并使用 ActiveMQ 作为消息代理。 我们正在使用这些 MassTransit 套餐 <PackageReference Include="MassTransit" Version="7.2.2" /> <PackageReference Include="MassTransit.ActiveMQ" Version="7.2.2" /> <PackageReference Include="MassTransit.AspNetCore" Version="7.2.2" /> <PackageReference Include="MassTransit.Extensions.DependencyInjection" Version="7.2.2" /> 我们有很多消费者,这样声明: services.AddMassTransit(busConfigurator => { busConfigurator.AddConsumer<CreateLeadConsumer>(); busConfigurator.AddConsumer<CreateOrderConsumer>(); busConfigurator.AddConsumer<InvoiceCreatedConsumer>(); busConfigurator.AddConsumer<PhoenixContractCreatedConsumer>(); busConfigurator.AddConsumer<PhoenixDocumentCreatedConsumer>(); ConfigureBus(busConfigurator, activeMqOptions); }); 我们定义了这些ActiveMqOptions var activeMqOptions = Configuration.GetSection(SodexoActiveMQOptions.SectionName).Get<SodexoActiveMQOptions>(); EndpointConvention.Map<InvoiceCreatedMessage>(new Uri(QueueFormatter.FormatToAllowConcurrency(activeMqOptions.LocalQueues.InvoiceCreatedQueue))); EndpointConvention.Map<CompanyOrderDto>(new Uri(QueueFormatter.FormatToAllowConcurrency(activeMqOptions.GlobalQueues.CreateOrUpdateClientQueue))); EndpointConvention.Map<BeneficiaryWithProductDto>(new Uri(QueueFormatter.FormatToTopic(activeMqOptions.GlobalQueues.CreateOrUpdateBeneficiaryQueue))); EndpointConvention.Map<CreatePhoenixOrderMessage>(new Uri(QueueFormatter.FormatToAllowConcurrency(activeMqOptions.GlobalQueues.CreateOrderQueue))); EndpointConvention.Map<PhoenixOrderCreationMessage>(new Uri(QueueFormatter.FormatToAllowConcurrency(activeMqOptions.GlobalQueues.PhoenixCreateOrderQueue))); EndpointConvention.Map<CancelOrderMessage>(new Uri(QueueFormatter.FormatToAllowConcurrency(activeMqOptions.GlobalQueues.CancelOrderQueue))); EndpointConvention.Map<CancelOrderMessage>(new Uri(QueueFormatter.FormatToAllowConcurrency(activeMqOptions.GlobalQueues.CancelOrderQueue))); EndpointConvention.Map<DocumentCreatedMessage>(new Uri(QueueFormatter.FormatToAllowConcurrency(activeMqOptions.LocalQueues.DocumentCreatedQueue))); 我们的消费者大部分时间都工作正常,有效负载消耗良好。 但有时会抛出异常: [MassTransit.ReceiveTransport] R-FAULT "activemq://localhost:61616/{name of the queue}" System.Threading.Channels.ChannelClosedException: The channel has been closed. at MassTransit.Util.ChannelExecutor.Run[T](Func`1 method, CancellationToken cancellationToken) at MassTransit.ActiveMqTransport.Transport.ActiveMqSendTransport.SendPipe`1.Send(SessionContext sessionContext) at GreenPipes.Agents.PipeContextSupervisor`1.GreenPipes.IPipeContextSource<TContext>.Send(IPipe`1 pipe, CancellationToken cancellationToken) at GreenPipes.Agents.PipeContextSupervisor`1.GreenPipes.IPipeContextSource<TContext>.Send(IPipe`1 pipe, CancellationToken cancellationToken) at GreenPipes.Agents.PipeContextSupervisor`1.GreenPipes.IPipeContextSource<TContext>.Send(IPipe`1 pipe, CancellationToken cancellationToken) at GreenPipes.Internals.Extensions.TaskExtensions.<>c__DisplayClass4_0.<<OrCanceled>g__WaitAsync|0>d.MoveNext() --- End of stack trace from previous location where exception was thrown --- at MassTransit.Util.PendingTaskCollection.Completed(CancellationToken cancellationToken) at MassTransit.Pipeline.Filters.DeserializeFilter.Send(ReceiveContext context, IPipe`1 next) at GreenPipes.Filters.RescueFilter`2.GreenPipes.IFilter<TContext>.Send(TContext context, IPipe`1 next) 由于我们收到此异常,消息将进入错误队列。 为了避免此异常,我在应用程序上添加了重试行为。当遇到异常时,消息将在队列中重新发送。在其他例外情况下它工作正常,但在这个例外情况下则不然。 一旦抛出 System.Threading.Channels.ChannelClosedException,消息就会重试并被消耗。但在消息被消费后,消息仍然会进入错误队列。 这是更清晰的日志视图,您将看到有一个模式: As you will see, the first time, the application try to publish the message to a queue, but a ChannelClosedException is thrown. The second time, the message is correctly published, because we can see final logs, and also because I checked with the targeted consumers. [22:39:13.647 INF] [] [ECommerce.Messaging.Consumers.CreateOrderConsumer] Receiving order message --> First time we get the message [22:39:13.924 INF] [] [ECommerce.Messaging.Consumers.CreateOrderConsumer] Sending phoenix reorder creation message for enterprise number 754887949 [22:39:13.927 WRN] [] [MassTransit.ReceiveTransport] R-RETRY "activemq://localhost:61616/create-order" "8ee80000-56b0-0050-5f50-08da74c70f82" MassTransit.Context.RetryConsumeContext<MassTransitMessaging.CreateOrderMessage> System.Threading.Channels.ChannelClosedException: The channel has been closed. at MassTransit.Util.ChannelExecutor.Run[T](Func`1 method, CancellationToken cancellationToken) at MassTransit.ActiveMqTransport.Transport.ActiveMqSendTransport.SendPipe`1.Send(SessionContext sessionContext) at GreenPipes.Agents.PipeContextSupervisor`1.GreenPipes.IPipeContextSource<TContext>.Send(IPipe`1 pipe, CancellationToken cancellationToken) at GreenPipes.Agents.PipeContextSupervisor`1.GreenPipes.IPipeContextSource<TContext>.Send(IPipe`1 pipe, CancellationToken cancellationToken) at GreenPipes.Agents.PipeContextSupervisor`1.GreenPipes.IPipeContextSource<TContext>.Send(IPipe`1 pipe, CancellationToken cancellationToken) at MassTransit.EndpointConventionExtensions.Send[T](ISendEndpointProvider provider, T message, CancellationToken cancellationToken) at ECommerce.Messaging.Consumers.MessageNotifier.NotifyAsync[T](T message, CancellationToken cancellationToken) in /home/vsts/work/1/s/ECommerce.Messaging/Consumers/MessageNotifier.cs:line 18 at ECommerce.Messaging.Consumers.CreateOrderConsumer.Consume(ConsumeContext`1 context) in /home/vsts/work/1/s/ECommerce.Messaging/Consumers/CreateOrderConsumer.cs:line 96 at MassTransit.Scoping.ScopeConsumerFactory`1.Send[TMessage](ConsumeContext`1 context, IPipe`1 next) at MassTransit.Scoping.ScopeConsumerFactory`1.Send[TMessage](ConsumeContext`1 context, IPipe`1 next) at MassTransit.Pipeline.Filters.ConsumerMessageFilter`2.GreenPipes.IFilter<MassTransit.ConsumeContext<TMessage>>.Send(ConsumeContext`1 context, IPipe`1 next) at MassTransit.Pipeline.Filters.ConsumerMessageFilter`2.GreenPipes.IFilter<MassTransit.ConsumeContext<TMessage>>.Send(ConsumeContext`1 context, IPipe`1 next) at GreenPipes.Filters.RetryFilter`1.GreenPipes.IFilter<TContext>.Send(TContext context, IPipe`1 next) [22:39:14.031 INF] [] [ECommerce.Messaging.Consumers.CreateOrderConsumer] Receiving order message --> Second time we get the message [22:39:14.255 INF] [] [ECommerce.Messaging.Consumers.CreateOrderConsumer] Sending phoenix reorder creation message Phoenix reorder creation message is sent for enterprise number 754887949 [22:39:14.281 INF] [] [ECommerce.Messaging.Consumers.CreateOrderConsumer] Phoenix reorder creation message is sent for enterprise number 754887949 [22:39:14.281 INF] [] [ECommerce.Messaging.Consumers.CreateOrderConsumer] Reorder was created for company 754887949 --> Success, it means that the message has been correctly sent [22:39:14.291 ERR] [] [MassTransit.ReceiveTransport] R-FAULT "activemq://localhost:61616/create-order" ID:CEBESVC-BA5AP08-54759-637950758237778584-1:0:1:1:1 "00:00:00.6544301" System.Threading.Channels.ChannelClosedException: The channel has been closed. at MassTransit.Util.ChannelExecutor.Run[T](Func`1 method, CancellationToken cancellationToken) at MassTransit.ActiveMqTransport.Transport.ActiveMqSendTransport.SendPipe`1.Send(SessionContext sessionContext) at GreenPipes.Agents.PipeContextSupervisor`1.GreenPipes.IPipeContextSource<TContext>.Send(IPipe`1 pipe, CancellationToken cancellationToken) at GreenPipes.Agents.PipeContextSupervisor`1.GreenPipes.IPipeContextSource<TContext>.Send(IPipe`1 pipe, CancellationToken cancellationToken) at GreenPipes.Agents.PipeContextSupervisor`1.GreenPipes.IPipeContextSource<TContext>.Send(IPipe`1 pipe, CancellationToken cancellationToken) at GreenPipes.Internals.Extensions.TaskExtensions.<>c_DisplayClass4_0.<<OrCanceled>g_WaitAsync|0>d.MoveNext() — End of stack trace from previous location where exception was thrown — at MassTransit.Util.PendingTaskCollection.Completed(CancellationToken cancellationToken) at MassTransit.Pipeline.Filters.DeserializeFilter.Send(ReceiveContext context, IPipe`1 next) at GreenPipes.Filters.RescueFilter`2.GreenPipes.IFilter<TContext>.Send(TContext context, IPipe`1 next) --> The message has been sent BUT we get this error ! 这很烦人,因为我们必须检查错误队列中的每条消息是否是真正的错误。 不幸的是,我无法在本地环境中重现此行为。 您遇到的异常已在 MassTransit 版本 8.1.0 中修复,其中 ActiveMQ 库已更新到官方版本。更新到此版本或更新版本应该有助于解决此问题。

回答 1 投票 0

从 .Net WebAPI 连接到 Docker Swarm 中的 RabbitMQ/Masstransit 服务时出现问题

我刚开始在 Docker Swarm 中使用 RabbitMQ/Masstransit。从所述堆栈中的其他服务(C# .Net WebAPI)连接到我的堆栈中的 RabbitMQ 服务时遇到问题。连接到 RabbitMQ...

回答 1 投票 0

Masstransit Saga 状态机无法使用 .Net Core 中的 Azure 服务总线从消费者发送消息

我们正在使用 Masstransit 实现 Saga 状态机模式。 典型的结构是我们有不同的微服务执行数据库操作,Saga 状态机用于管理

回答 1 投票 0

© www.soinside.com 2019 - 2024. All rights reserved.