通过基类发布时未执行任何使用者代码

问题描述 投票:0回答:1

我有一个基类IntegrationEvent和所有从其继承的其他业务事件。

    public abstract class IntegrationEvent
    {
        public Guid Id { get; private set; }
        public DateTimeOffset OccuredOn { get; private set; }

        protected IntegrationEvent()
        {
            this.Id = Guid.NewGuid();
            this.OccuredOn = DateTimeOffset.Now;
        }
    }

    public sealed class StudentRegisteredIntegrationEvent : IntegrationEvent
    {
        public Guid StudentId { get; set; }
        public string FullName { get; set; }

        public StudentRegisteredIntegrationEvent(Guid studentId, string fullName)
        {
            StudentId = studentId;
            FullName = fullName;
        }
    }

然后我创建了一个消费者:


 public sealed class StudentRegisteredConsumer: IConsumer<StudentRegisteredIntegrationEvent>
    {
        private readonly ILogger<StudentRegisteredConsumer> _logger;

        public StudentRegisteredConsumer(ILogger<StudentRegisteredConsumer> logger)
        {
            _logger = logger;
        }


        public Task Consume(ConsumeContext<StudentRegisteredIntegrationEvent> context)
        {
            _logger.LogWarning("========== Message Received +==========================");
            _logger.LogInformation($"Sending notification to {context.Message.FullName}");
            _logger.LogWarning("========== Message Received +==========================");
            return Task.CompletedTask;
        }
    }

在生产者端,我有一个List<IntegrationEvent>列表,我通过publishIPublishEndpoint他们,但它没有路由到正确的队列,而是仅创建了另一个交换Sample.Abstraction.Domain:IntegrationEvent。如何告诉MassTransit不要使用基类,而要使用实型类?我也尝试过使用ISendEndpointProvider,但是由于没有可供student-registered-integration-event_skipped类使用的使用者,它们又被路由到另一个队列base队列。

这是用户端的日志:


[00:33:21 DBG] Configuring endpoint student-registered-integration-event, Consumer: Sample.University.Notification.Consumers.StudentRegisteredConsumer
[00:33:21 DBG] Declare exchange: name: student-registered-integration-event, type: fanout, durable
[00:33:21 DBG] Declare exchange: name: Sample.IntegrationEvents:StudentRegisteredIntegrationEvent, type: fanout, durable
[00:33:21 DBG] Bind exchange: source: Sample.IntegrationEvents:StudentRegisteredIntegrationEvent, destination: student-registered-integration-event
[00:33:21 DBG] Declare queue: name: student-registered-integration-event, durable
[00:33:21 DBG] Bind queue: source: student-registered-integration-event, destination: student-registered-integration-event
[00:33:21 DBG] Prefetch Count: 16
[00:33:21 DBG] Consumer Ok: rabbitmq://localhost/wrapperizer/student-registered-integration-event - amq.ctag-nqSrJ0A5UQZCXg3tIr9Hfg

我不知道如何配置,我也使用了ConsumerDefinition<StudentRegisteredConsumer>,但无济于事,这是代码:

    public sealed class StudentRegisteredConsumerDefinition : ConsumerDefinition<StudentRegisteredConsumer>
    {
        public StudentRegisteredConsumerDefinition()
        {
            const string eventName = nameof(StudentRegisteredIntegrationEvent);
            var sanitized = KebabCaseEndpointNameFormatter.Instance.SanitizeName(eventName);
            this.EndpointName = sanitized;
        }
    }

在生产者端获取发送端点的uri:

var eventName = logEvt.IntegrationEvent.GetType().Name;
                    var sanitized = KebabCaseEndpointNameFormatter.Instance.SanitizeName(eventName);
var uri = new Uri($"exchange:{sanitized}");
var sender = await _sendEndpointProvider.GetSendEndpoint(uri);
await sender.Send(logEvt.IntegrationEvent);

我知道上面的代码有点像MT的默认行为,但是没有它,我就没有正确的队列和交换。任何解决方案将不胜感激。

.net-core rabbitmq masstransit rabbitmq-exchange
1个回答
0
投票

首先,您可以完全使用Publish来完成此操作,无需配置您为解决此问题所做的所有事情。您可以按照约定配置使用者,并让他们在自己的端点上进行配置。您错过的一部分是消息的发布方式。

[从您的List<IntegrationEvent>开始,我怀疑您是在打电话给Publish<T>(T message),其中T被推断为IntegrationEvent。这就是为什么您只在该交易所收到消息的原因。您需要使用Publish(object message)重载,以便确定适当的类型。

您可以简单地将列表中的每个消息分配给对象,然后使用该对象调用Publish。或者,您可以强制使用重载:

await Task.WhenAll(events.Select(x => bus.Publish((object)x, x.GetType()));

这样,MassTransit将使用对象类型来调用适当的通用重载。

© www.soinside.com 2019 - 2024. All rights reserved.