公共交通。处理反序列化异常

问题描述 投票:0回答:1

不久前,我开始使用 Mastransit,并面临着拦截传入消息的反序列化错误的需要。不幸的是,经过几天的尝试,我仍然没有成功。我正在寻求帮助。我来描述一下情况。

客户端向我的应用程序发送消息,交换通过 Kafka。 MassTransit 库的版本是 8.0.13。碰巧客户端向 Kafka 发送了无效的 Json,并且在我的服务中发生了错误。我需要拦截这个错误,并以某种格式向客户端的 Kafka 主题发送一条新消息。起初,我尝试通过过滤器和中间件来实现这一点,反序列化后发生的错误已经在消费者中,它们拦截,但没有反序列化错误,它发生在进入消费者之前。然后我决定编写一个自定义序列化器,它将调用 _topicProducer.Produce(message);但存在依赖性方面的困难。我需要一个 ServiceProvider,我通过 ServiceCollection.BuildServiceProvider() 构建它,然后从它接收必要的 TopicProducer 以用于自定义反序列化器,但错误是“The ReceivePipeConfiguration can only be use一次。”

请告诉我,也许我选择了错误的方法,需要采取不同的做法?如何拦截反序列化错误并据此采取一些操作?

这是我的配置:

public static void AddMassTransitConfiguration(
    this IServiceCollection services, 
    IConfiguration configuration)
{
    services.AddMassTransit(busCfg =>
    {
        busCfg.UsingInMemory((context, config) =>
        {
            config.ConfigureEndpoints(context);
        });

        busCfg.AddRider(rider =>
        {
            rider.AddConsumer<OrderConsumer>(); 
            rider.AddConsumer<OrderProcessingResultConsumer>();

            rider.AddProducer<OrderForProcessingMessage>(
                configuration
                    .GetSection("OrderForProcessingProducer")
                    .GetSection("Topic").Value,
                (context, cfg) =>
                {
                    cfg.SetValueSerializer(
                        new MassTransitJsonSerializer<OrderForProcessingMessage>());
                });
           
            rider.AddProducer<OrderResultMessage>(
                configuration.GetSection("OrderResultProducer").GetSection("Topic").Value,
                (context, cfg) =>
                {
                    cfg.SetValueSerializer(new MassTransitJsonSerializer<OrderResultMessage>()); 
                });

            var isDevelopment = Environment.GetEnvironmentVariable("ASPNETCORE_ENVIRONMENT") !=
                                Environments.Development;

            Action<IKafkaHostConfigurator>? kafkaHostConfigurator = isDevelopment ? hostConfig =>
                {
                    if (Environment.GetEnvironmentVariable("ASPNETCORE_ENVIRONMENT") !=
                        Environments.Development)
                    {
                        hostConfig.UseSasl(saslConf =>
                        {
                            saslConf.Mechanism = SaslMechanism.ScramSha256;
                            saslConf.SecurityProtocol = SecurityProtocol.SaslPlaintext;
                            saslConf.Username = configuration.GetSection("Kafka")
                                .GetSection("User").Value;
                            saslConf.Password = configuration.GetSection("Kafka")
                                .GetSection("Password").Value;
                        });
                    }
                }
                : null;

            rider.UsingKafka((ctx, kafkaCfg) =>
            {
                SetMainConsumer(kafkaCfg, configuration, kafkaHostConfigurator, ctx);

                kafkaCfg.Host(configuration.GetSection("Kafka").GetSection("Host").Value, kafkaHostConfigurator);
                kafkaCfg.TopicEndpoint<OrderMessage>(
                    configuration.GetSection("OrderConsumer").GetSection("Topic").Value,
                    configuration.GetSection("OrderConsumer").GetSection("ConsumerGroup").Value,
                    endpointCfg =>
                    {
                        var serviceProvider = services.BuildServiceProvider();
                        var beeKeeperIntegrationService = 
                            serviceProvider.GetService<IBeeKeeperIntegrationService>();

                        endpointCfg.AutoOffsetReset = AutoOffsetReset.Earliest;
                        endpointCfg.SetValueDeserializer(new CustromMassTransitJsonDeserializer(beeKeeperIntegrationService));
                        endpointCfg.ConfigureConsumer<OrderConsumer>(ctx);
                    }
                );

                kafkaCfg.TopicEndpoint<OrderProcessingResultMessage>(
                    configuration.GetSection("OrderProcessingResultConsumer").GetSection("Topic").Value,
                    configuration.GetSection("OrderProcessingResultConsumer").GetSection("ConsumerGroup").Value,
                    endpointCfg => 
                    {
                        endpointCfg.AutoOffsetReset = AutoOffsetReset.Earliest;
                        endpointCfg.SetValueDeserializer(new MassTransitJsonDeserializer<OrderProcessingResultMessage>());
                        endpointCfg.ConfigureConsumer<OrderProcessingResultConsumer>(ctx);
                    }
                );
            });
        });
    });
}
.net exception apache-kafka deserialization masstransit
1个回答
0
投票

您可以将接收端点配置为原始 JSON,并使用任何消息类型进行反序列化。

endpoint.UseRawJsonSerializer();

使用以下链接获取详细信息:https://masstransit.io/documentation/configuration/serialization

© www.soinside.com 2019 - 2024. All rights reserved.