不久前,我开始使用 Mastransit,并面临着拦截传入消息的反序列化错误的需要。不幸的是,经过几天的尝试,我仍然没有成功。我正在寻求帮助。我来描述一下情况。
客户端向我的应用程序发送消息,交换通过 Kafka。 MassTransit 库的版本是 8.0.13。碰巧客户端向 Kafka 发送了无效的 Json,并且在我的服务中发生了错误。我需要拦截这个错误,并以某种格式向客户端的 Kafka 主题发送一条新消息。起初,我尝试通过过滤器和中间件来实现这一点,反序列化后发生的错误已经在消费者中,它们拦截,但没有反序列化错误,它发生在进入消费者之前。然后我决定编写一个自定义序列化器,它将调用 _topicProducer
请告诉我,也许我选择了错误的方法,需要采取不同的做法?如何拦截反序列化错误并据此采取一些操作?
这是我的配置:
public static void AddMassTransitConfiguration(
this IServiceCollection services,
IConfiguration configuration)
{
services.AddMassTransit(busCfg =>
{
busCfg.UsingInMemory((context, config) =>
{
config.ConfigureEndpoints(context);
});
busCfg.AddRider(rider =>
{
rider.AddConsumer<OrderConsumer>();
rider.AddConsumer<OrderProcessingResultConsumer>();
rider.AddProducer<OrderForProcessingMessage>(
configuration
.GetSection("OrderForProcessingProducer")
.GetSection("Topic").Value,
(context, cfg) =>
{
cfg.SetValueSerializer(
new MassTransitJsonSerializer<OrderForProcessingMessage>());
});
rider.AddProducer<OrderResultMessage>(
configuration.GetSection("OrderResultProducer").GetSection("Topic").Value,
(context, cfg) =>
{
cfg.SetValueSerializer(new MassTransitJsonSerializer<OrderResultMessage>());
});
var isDevelopment = Environment.GetEnvironmentVariable("ASPNETCORE_ENVIRONMENT") !=
Environments.Development;
Action<IKafkaHostConfigurator>? kafkaHostConfigurator = isDevelopment ? hostConfig =>
{
if (Environment.GetEnvironmentVariable("ASPNETCORE_ENVIRONMENT") !=
Environments.Development)
{
hostConfig.UseSasl(saslConf =>
{
saslConf.Mechanism = SaslMechanism.ScramSha256;
saslConf.SecurityProtocol = SecurityProtocol.SaslPlaintext;
saslConf.Username = configuration.GetSection("Kafka")
.GetSection("User").Value;
saslConf.Password = configuration.GetSection("Kafka")
.GetSection("Password").Value;
});
}
}
: null;
rider.UsingKafka((ctx, kafkaCfg) =>
{
SetMainConsumer(kafkaCfg, configuration, kafkaHostConfigurator, ctx);
kafkaCfg.Host(configuration.GetSection("Kafka").GetSection("Host").Value, kafkaHostConfigurator);
kafkaCfg.TopicEndpoint<OrderMessage>(
configuration.GetSection("OrderConsumer").GetSection("Topic").Value,
configuration.GetSection("OrderConsumer").GetSection("ConsumerGroup").Value,
endpointCfg =>
{
var serviceProvider = services.BuildServiceProvider();
var beeKeeperIntegrationService =
serviceProvider.GetService<IBeeKeeperIntegrationService>();
endpointCfg.AutoOffsetReset = AutoOffsetReset.Earliest;
endpointCfg.SetValueDeserializer(new CustromMassTransitJsonDeserializer(beeKeeperIntegrationService));
endpointCfg.ConfigureConsumer<OrderConsumer>(ctx);
}
);
kafkaCfg.TopicEndpoint<OrderProcessingResultMessage>(
configuration.GetSection("OrderProcessingResultConsumer").GetSection("Topic").Value,
configuration.GetSection("OrderProcessingResultConsumer").GetSection("ConsumerGroup").Value,
endpointCfg =>
{
endpointCfg.AutoOffsetReset = AutoOffsetReset.Earliest;
endpointCfg.SetValueDeserializer(new MassTransitJsonDeserializer<OrderProcessingResultMessage>());
endpointCfg.ConfigureConsumer<OrderProcessingResultConsumer>(ctx);
}
);
});
});
});
}
您可以将接收端点配置为原始 JSON,并使用任何消息类型进行反序列化。
endpoint.UseRawJsonSerializer();
使用以下链接获取详细信息:https://masstransit.io/documentation/configuration/serialization