nServiceBus 无效订阅

问题描述 投票:0回答:1

我正在利用 nServiceBus 和 SQL Server 设置一个项目,采用发布/订阅模型,但遇到了一个毫无意义的问题。我有两条消息 -

OpportunityMessage
PageVisitMessage
,这两个消息都是标有
IEvent
标记的标准 POCO 类,如下所示:

public class OpportunityMessage : IEvent
{
    /// <summary>
    /// Gets or sets the name of the form
    /// </summary>
    public string FormName { get; set; }

    /// <summary>
    /// Gets or sets the page identifier
    /// </summary>
    public byte[] PageIdentifier { get; set; }

    /// <summary>
    /// Gets or sets any supplemental data with a form
    /// </summary>
    public string SupplementalData { get; set; }
}

public class PageVisitMessage : IEvent
{
    /// <summary>
    /// Gets or sets the IP address of the requestor
    /// </summary>
    public string IpAddress { get; set; }

    /// <summary>
    /// Gets or sets the URL that generated a page visit
    /// </summary>
    public string NavigateUrl { get; set; }

    /// <summary>
    /// Gets or sets the referrer
    /// </summary>
    public string Referrer { get; set; }

    /// <summary>
    /// Gets or sets the user agent
    /// </summary>
    public string UserAgent { get; set; }
}

从发布的角度来看,一切都按预期进行 - 消息根据我配置的订阅表中存储的内容发布;然而,这就是我感到困惑的地方。每种消息类型都注定要发送到不同的队列,

messaging_opportunity_in
messaging_visitors_in
,但每当任一消息被触发时,它都会出现在两个队列中。

我做了一些挖掘并查看了配置的订阅表,每条消息都配置为进入两个队列,因此它出现在两个队列中是有意义的;然而,从代码的角度来看,它并不是这样设置的。每种消息类型都有一个专用的“处理程序” - 一个用于

PageVisitMessage
,另一个用于
OpportunityMessage
。这些的类结构如下所示:

public class OpportunityMessageHandler : BaseMessageHandler, IHandleMessages<OpportunityMessage>
{   
    //configured in appsettings.json as "messaging_opportunity_in"
    public override string QueueName => this.Configuration["Messaging:OpportunityPublishQueue"];

    public Task Handle(OpportunityMessage message, IMessageHandlerContext context)
    {
        //code to process the message
    }
}

public class VisitorMessageHandler : BaseMessageHandler, IHandleMessages<PageVisitMessage>
{   
    //configured in appsettings.json as "messaging_visitors_in"
    public override string QueueName => this.Configuration["Messaging:VisitorPublishQueue"];

    public Task Handle(OpportunityMessage message, IMessageHandlerContext context)
    {
        //code to process the message
    }
}

其中每一个都继承自

BaseMessageHandler
,其中端点被配置为通过如下所示的
Initialize
方法侦听配置的队列名称:

    /// <summary>
    /// Initializes this message handler.
    /// </summary>
    /// <param name="configuration">The <see cref="IConfiguration"/> instance containing the application's configuration.</param>
    public async Task Initialize(IConfiguration configuration)
    {
        this.Configuration = configuration;

        //setup our base listener
        var listener = new EndpointConfiguration(this.QueueName);
        listener.EnableInstallers();
        listener.SendFailedMessagesTo($"{this.QueueName}_errors");

        //configure the SQL Server transport
        var transport = new SqlServerTransport(Configuration.GetConnectionString("CS"))
        {
            DefaultSchema = this.DefaultSchemaName
        };

        transport.SchemaAndCatalog.UseSchemaForQueue($"{this.QueueName}_errors", this.DefaultSchemaName);
        listener.UseTransport(transport);

        //configure the subscription listener
        transport.Subscriptions.DisableCaching = true;
        transport.Subscriptions.SubscriptionTableName = new NServiceBus.Transport.SqlServer.SubscriptionTableName(this.Configuration["Messaging:Subscriptions"], schema: this.DefaultSchemaName);

        //spin it up
        this.EndpointInstance = await Endpoint.Start(listener).ConfigureAwait(false);
    }

我想要追踪的是为什么这两个处理程序被设置为监听它们未配置的消息?当我运行启动这些消息的后端消息处理器时,配置的订阅表会添加两个端点(

messaging_opportunities_in
messaging_visitors_in
)来监听所有存在的消息,无论它们的处理程序配置为监听特定的消息队列而不是全部。这本质上意味着每个消息在两个队列之间都是重复的,并且由于处理程序正在运行,因此它们最终会处理所有内容两次,即使每个消息应该单独负责自己的消息类型。

任何帮助将不胜感激!

publish-subscribe nservicebus
1个回答
0
投票

NServiceBus 文档网站上没有任何地方与您使用的消息处理程序上的 BaseMessageHandler

 类类似。但这就是问题开始的地方。

您没有提供所有代码,因此您引用

BaseMessageHandler

 但仅显示 
Initialize()
 方法。我假设这个方法位于 
BaseMessageHandler
 类内部。但您共享的代码没有显示如何以及何时调用该方法。

让我们深入了解这一行:

var listener = new EndpointConfiguration(this.QueueName);
首先,对象

listener

应该被称为
configuration
endpointConfiguration
,因为它永远不会听任何东西。此外,构造函数中的参数是 
endpointName
 并且不需要队列名称。例如,在 SQL Server 中,您的实际队列名称类似于 
opportunity@dbo@nservicebus

既然您提到了

messaging_opportunity_in

messaging_visitors_in
,在没有任何其他上下文的情况下,我会将端点命名为 
opportunities
visitors

现在让我们看看下面这一行:

this.EndpointInstance = await Endpoint.Start(listener).ConfigureAwait(false);

Endpoint.Start()

方法返回实际的端点实例。正在尝试处理消息的事物。但在启动之前,它会对继承 
IHandleMessages<T>
 接口的所有类型进行一些扫描。如果 
Type<T>
 是一个事件(在您的情况下,类标记为 
IEvent
),它将为该事件创建订阅。

现在,我不知道

如何执行这些基类上的Initialize()

方法,因为您没有包含该代码。但你执行了两次。一次用于 
opportunity
,一次用于 
visitors
。这意味着 
opportunity
 端点开始扫描实现 
IMessageHandler<T>
 的类并找到两个。

  1. OpportunityMessageHandler
    
    
  2. VisitorMessageHandler
    
    
它看到两条消息,都是事件。

  1. OpportunityMessage
    
    
  2. PageVisitMessage
    
    
端点

opportunity

 端点存储它想要订阅这两条消息的事实。导致您看到的订阅。

接下来发生的事情是

visitors

端点启动并执行
完全相同的事情。它开始扫描,找到 2 个消息处理程序和 2 个事件,并创建另一个订阅。

如何解决这个问题

这假设您需要 2 个组件来在其中发送消息

    从消息处理程序中丢失基类
  1. 创建一个新的 Visual Studio 项目
    1. 添加其中一个端点的初始化代码;要么是访客,要么是机会
    2. 从第一个项目中删除正确的消息处理程序并将其移动到这个新项目
  2. 像现在一样运行 Initialize 方法,但每个项目仅运行一次
  3. 开始从
  4. visitors
    opportunities
     发送消息,反之亦然。
我建议遵循

NServiceBus 网站上的教程联系 NServiceBus 制造商特定软件。如果您提到我的名字,我们可以拨打电话并一起完成此过程,如果您愿意,您甚至可以请求免费帮助并提供概念证明

© www.soinside.com 2019 - 2024. All rights reserved.