我正在使用SQLTransport遍历NServiceBus上的示例,在此示例中,我显然不正确理解传输的概念,因为在其示例应用程序之后没有看到预期的行为,可以在这里找到它们:https://docs.particular.net/samples/sqltransport-sqlpersistence/
我在查看示例数据消息OrderSubmitted
的存储位置时遇到麻烦。我的假设是,由于我使用的是SqlTransport,因此我应该能够看到存储在表示OrderSubmitted
项目队列的表(Samples.Sql.Sender)中的消息。示例中有一个发送方和接收方,并且我已禁用了接收方,以便可以验证消息是否存储在SQL Server的表(队列)中。发件人启动时,我看到一个sender.Samples.Sql.Sender
表出现。我假设这是此端点发送的消息的队列。但是,当您发送消息时,在sender.Samples.Sql.Sender
表(或与此相关的任何表)中都没有看到记录,但是它表明OrderSubmitted已发布,所以它去了哪里?
本质上,我想使用NServiceBus实现生产者/消费者模式。我有生产者将在持久队列上生成消息。然后,我有一个或多个工作服务器,这些工作服务器将从队列中取出项目并进行处理。因为我有多个工作服务器都在同一个队列中执行工作,所以我需要确保其中只有一个可以处理任何给定消息(我不需要主题,我只想简单地实现一个持久队列,其中许多工作进程可以帮助处理队列中的项目。NServiceBus可以用于这种类型的用例。看起来可以,但是我在如何配置这种类型的环境上有点挣扎。应该如何配置它被视为工作队列,而不是每个实例都将尝试处理同一消息的主题?
using System;
using System.Threading.Tasks;
using NServiceBus;
using NServiceBus.Transport.SQLServer;
public static class Program
{
static Random random;
public static async Task Main()
{
random = new Random();
Console.Title = "Samples.Sql.Sender";
var endpointConfiguration = new EndpointConfiguration("Samples.Sql.Sender");
endpointConfiguration.SendFailedMessagesTo("error");
endpointConfiguration.EnableInstallers();
#region SenderConfiguration
var connection = @"Data Source=MySqlServer;Initial Catalog=NServiceBus;Integrated Security=True;Max Pool Size=100";
var transport = endpointConfiguration.UseTransport<SqlServerTransport>();
transport.ConnectionString(connection);
transport.DefaultSchema("sender");
transport.UseSchemaForQueue("error", "dbo");
transport.UseSchemaForQueue("audit", "dbo");
transport.NativeDelayedDelivery().DisableTimeoutManagerCompatibility();
endpointConfiguration.UsePersistence<InMemoryPersistence>();
var subscriptions = transport.SubscriptionSettings();
subscriptions.SubscriptionTableName(
tableName: "Subscriptions",
schemaName: "dbo");
#endregion
SqlHelper.CreateSchema(connection, "sender");
var endpointInstance = await Endpoint.Start(endpointConfiguration)
.ConfigureAwait(false);
Console.WriteLine("Press enter to send a message");
Console.WriteLine("Press any key to exit");
while (true)
{
var key = Console.ReadKey();
Console.WriteLine();
if (key.Key != ConsoleKey.Enter)
{
break;
}
var orderSubmitted = new OrderSubmitted
{
OrderId = Guid.NewGuid(),
Value = random.Next(100)
};
await endpointInstance.Publish(orderSubmitted)
.ConfigureAwait(false);
Console.WriteLine("Published OrderSubmitted message");
}
await endpointInstance.Stop()
.ConfigureAwait(false);
}
}
在这里看看,也许它具有您想要的详细信息? https://docs.particular.net/transports/sql/#how-it-works
一些概念:
端点有一个队列在代理传输(例如SQL,Azure,Rabbit MQ)中发送消息时,消息将保存在目标终结点队列名称中目标端点运行后,它将读取并删除该队列中的消息...
很高兴为您提供更多帮助,只需直接给我发送电子邮件[email protected]