我正在使用 RabbitMQ 使用 IBusControl 对象和仅 1 个队列来实现 MassTransit 的请求/响应模式。
我们将运行 1 个请求发起者应用程序(当前是单元测试)和多个消费者(控制台)应用程序。每个消费者应用程序只会响应具有给定 MyId(命令行参数)的消息。
我假设我应该通过向 ReceiveEndpoint 添加过滤器来完成此任务 定义。我正在使用最新的 MassTransit (8.1.3.0)。
我尝试在下面的代码中创建一个 ConsumeContext 过滤器 (MyFilter1) (e.UseFilter(new MyFilter1(myId));) 这个过滤器从未被击中。 现在我正在尝试创建一个 ConsumerConsumeContext 过滤器(MyFilter)。我相信这是 我想要的过滤器类型。
我发现的所有代码示例似乎都在使用旧版本的 MassTransit,并且它们能够调用 UseConsumeFilter 传入新的过滤器对象。
像这样...
e.UseConsumeFilter(new MyFilter1(myId));
e.UseConsumeFilter 现在接受类型和注册上下文,而不是过滤器对象。
我可以向 Program 类添加一个静态变量并给它一个 MyId。
但是我应该发送什么作为注册上下文?
或者有没有更好的方法来过滤掉不适用的消息?
此外,如果我让过滤器正常工作,MassTransit 会接受此请求并将其发送给下一个消费者还是会丢失?
这是代码。为了便于阅读,我更改了类名并删除了所有 try/catch 块和一些函数。
消费者应用程序
static async Task<int> Main(string[] args)
{
Console.WriteLine("Consumer started ...");
if (args.Length == 0)
{
Console.WriteLine("You have given no command-line arguments. Expected <myId> ...");
return -1;
}
string myId = args[0];
string rabbitHost = System.Configuration.ConfigurationManager.AppSettings["RabbitMQ_Host"];
string rabbitUserName = System.Configuration.ConfigurationManager.AppSettings["RabbitMQ_UserName"];
string rabbitPassword= System.Configuration.ConfigurationManager.AppSettings["RabbitMQ_Password"];
string rabbitQueue = System.Configuration.ConfigurationManager.AppSettings["RabbitMQ_QueueName"];
Console.WriteLine($"Creating Bus for id {myId}...RabbitHost = {rabbitHost}... Queue Name = {rabbitQueue}");
IBusControl busControl = Bus.Factory.CreateUsingRabbitMq(x =>
{
x.Host(new Uri(rabbitHost), h =>
{
h.Username(rabbitUserName);
h.Password(rabbitPassword);
});
x.ReceiveEndpoint(rabbitQueue,
e =>{
e.Consumer<MyConsumer>( );
!!!!INSERT LINE HERE!!!!
//the following line never hits the filter before consumer
//gets called
//e.UseFilter(new MyFilter1(myId));
//e.UseConsumeFilter(typeof (MyFilter),null);
}
);
});
Console.WriteLine($"Finished Creating Bus..MyId={myId}");
}
public class MyFilter:
IFilter<ConsumerConsumeContext<MyConsumer,IMyRequest>>
{
private string MyId { get; set; }
public MyFilter(string myId)
{
MyId = myId;
}
public async Task Send(ConsumerConsumeContext<MyConsumer, IMyRequest> context, IPipe<ConsumerConsumeContext<MyConsumer, IMyRequest>> next)
{
if (context.Message.MyId == MyId)
{
await next.Send(context);
}
}
public void Probe(ProbeContext context)
{
var scope = context.CreateFilterScope("MyId");
scope.Add("MyId", MyId);
}
}
public class MyFilter1 :
IFilter<ConsumeContext<IMyRequest>>
{
private string MyId { get; set; }
public MyFilter1(string myId)
{
MyId = myId;
}
async Task IFilter<ConsumeContext<IMyRequest>>.Send(ConsumeContext<IMyRequest> context, IPipe<ConsumeContext<IMyRequest>> next)
{
if (context.Message.MyId == MyId )
{
await next.Send(context);
}
}
void IProbeSite.Probe(ProbeContext context)
{
}
}
public class MyConsumer : IConsumer<MyRequest>
{
public async Task Consume(ConsumeContext<MyRequest> context)
{
DateTime receivedTime = DateTime.Now;
Console.WriteLine($"Received Request for {context.Message.MyId}");
. . . do stuff . . .
context.Respond(
new MyResponse
{
MyId = context.Message.MyId,
ResponseStatus = ResponseStatus.Passed
});
Console.WriteLine($"My Response sent for {context.Message.MyId}... other stuff");
}
}
请求发起者应用程序
const string RABBIT_USER_NAME = "username";
const string RABBIT_PASSWORD = "password";
const string RABBIT_SERVER = "rabbitmq://<server>";
const string RABBIT_SERVICE = "rabbitmq://<server>/<queueName>";
var bus = return Bus.Factory.CreateUsingRabbitMq(x => x.Host(new Uri(RABBIT_SERVER), h =>
{
h.Username(RABBIT_USER_NAME);
h.Password(RABBIT_PASSWORD);
}));
TaskUtil.Await(() => bus.StartAsync());
var serviceAddress = new Uri(RABBIT_SERVICE);
IRequestClient<LightupRequest> client =
busControl.CreateRequestClient<MyRequest>(serviceAddress, TimeSpan.FromSeconds(10));
MyRequest request = new MyRequest();
foreach (var myId in Ids)
{
request.MyId = myId;
request.sid = 123456;
request.tid = 123457;
request.Data = "1111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111";
request.Token = "111111111111111111111111111111";
request.Version = "10.02.003.0004";
Task.Run(async () =>
{
request.StartTime = DateTime.Now;
var response = await client.GetResponse<MyResponse>(request);
//do something with response
}).Wait();
}
请求/响应类型
public interface IMyRequest
{
string MyId { get; set; }
}
public class MyRequest : IMyRequest
{
public string MyId { get; set; }
public string Token { get; set; }
public string Data { get; set; }
public int someId { get; set; }
public int someId2 { get; set; }
public string Version { get; set; }
public DateTime StartTime { get; set; }
}
public class MyResponse
{
public enum ResponseStatus { passed, failed, timeout, networkError }
public string MyId { get; set; }
public string ErrorData { get; set; }
}
我尝试在下面的代码中创建一个 ConsumeContext 过滤器 (MyFilter1) (e.UseFilter(new MyFilter1(myId));)
我希望这个过滤器会在消费者之前被命中,这样如果 ID 不是我期望的 ID,它可以阻止该消费者消费该消息。
我在 MyFilter1.Send 上设置了断点,但它从未命中。
此外,当我运行订阅者应用程序的 4 个实例时,我可以看到 MassTransit 仅将消息分发到 4 个可执行文件中的 1 个,并且下一条消息将位于不同的可执行文件上。
我更改了队列名称以包含 myId,并且添加了绑定
消费者应用程序
static async Task<int> Main(string[] args)
{
Console.WriteLine("Consumer started ...");
if (args.Length == 0)
{
Console.WriteLine("You have given no command-line arguments. Expected <myId> ...");
return -1;
}
string myId = args[0];
string rabbitHost = System.Configuration.ConfigurationManager.AppSettings["RabbitMQ_Host"];
string rabbitUserName = System.Configuration.ConfigurationManager.AppSettings["RabbitMQ_UserName"];
string rabbitPassword= System.Configuration.ConfigurationManager.AppSettings["RabbitMQ_Password"];
string rabbitQueue = System.Configuration.ConfigurationManager.AppSettings["RabbitMQ_QueueName"]+ $".{myId}";
Console.WriteLine($"Creating Bus for id {myId}...RabbitHost = {rabbitHost}... Queue Name = {rabbitQueue}");
IBusControl busControl = Bus.Factory.CreateUsingRabbitMq(x =>
{
x.Host(new Uri(rabbitHost), h =>
{
h.Username(rabbitUserName);
h.Password(rabbitPassword);
});
x.ReceiveEndpoint(rabbitQueue,
e =>{
e.Consumer<MyConsumer>( );
e.Bind<MyConsumer>();
}
);
});
Console.WriteLine($"Finished Creating Bus..MyId={myId}");
}
public class MyFilter:
IFilter<ConsumerConsumeContext<MyConsumer,IMyRequest>>
{
private string MyId { get; set; }
public MyFilter(string myId)
{
MyId = myId;
}
public async Task Send(ConsumerConsumeContext<MyConsumer, IMyRequest> context, IPipe<ConsumerConsumeContext<MyConsumer, IMyRequest>> next)
{
if (context.Message.MyId == MyId)
{
await next.Send(context);
}
}
public void Probe(ProbeContext context)
{
var scope = context.CreateFilterScope("MyId");
scope.Add("MyId", MyId);
}
}
public class MyFilter1 :
IFilter<ConsumeContext<IMyRequest>>
{
private string MyId { get; set; }
public MyFilter1(string myId)
{
MyId = myId;
}
async Task IFilter<ConsumeContext<IMyRequest>>.Send(ConsumeContext<IMyRequest> context, IPipe<ConsumeContext<IMyRequest>> next)
{
if (context.Message.MyId == MyId )
{
await next.Send(context);
}
}
void IProbeSite.Probe(ProbeContext context)
{
}
}
public class MyConsumer : IConsumer<MyRequest>
{
public async Task Consume(ConsumeContext<MyRequest> context)
{
DateTime receivedTime = DateTime.Now;
Console.WriteLine($"Received Request for {context.Message.MyId}");
. . . do stuff . . .
context.Respond(
new MyResponse
{
MyId = context.Message.MyId,
ResponseStatus = ResponseStatus.Passed
});
Console.WriteLine($"My Response sent for {context.Message.MyId}... other stuff");
}
}
请求发起者应用程序
const string RABBIT_USER_NAME = "username";
const string RABBIT_PASSWORD = "password";
const string RABBIT_SERVER = "rabbitmq://<server>";
const string RABBIT_SERVICE = "rabbitmq://<server>/<queueName>";
var bus = return Bus.Factory.CreateUsingRabbitMq(x => x.Host(new Uri(RABBIT_SERVER), h =>
{
h.Username(RABBIT_USER_NAME);
h.Password(RABBIT_PASSWORD);
}));
TaskUtil.Await(() => bus.StartAsync());
MyRequest request = new MyRequest();
foreach (var myId in Ids)
{
var serviceAddress = new Uri($"{RABBIT_SERVICE}.{myId}");
IRequestClient<LightupRequest> client =
busControl.CreateRequestClient<MyRequest>(serviceAddress, TimeSpan.FromSeconds(10));
request.MyId = myId;
request.sid = 123456;
request.tid = 123457;
request.Data = "1111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111";
request.Token = "111111111111111111111111111111";
request.Version = "10.02.003.0004";
Task.Run(async () =>
{
request.StartTime = DateTime.Now;
var response = await client.GetResponse<MyResponse>(request);
//do something with response
}).Wait();
}
请求/响应类型(这些没有改变)
public interface IMyRequest
{
string MyId { get; set; }
}
public class MyRequest : IMyRequest
{
public string MyId { get; set; }
public string Token { get; set; }
public string Data { get; set; }
public int someId { get; set; }
public int someId2 { get; set; }
public string Version { get; set; }
public DateTime StartTime { get; set; }
}
public class MyResponse
{
public enum ResponseStatus { passed, failed, timeout, networkError }
public string MyId { get; set; }
public string ErrorData { get; set; }
}
当消费者应用程序运行时,在 RabbitMQ 上创建以下内容
当我使用新的 myId 运行消费者应用程序的第二个实例时,以下是添加的内容
1 个连接
2 个频道
交流.<2nd Id> 绑定自 交换:MyConsumer(无路由密钥) 交换:IMyRequest(无路由密钥) 绑定到 队列.<2nd Id>
队列.<2nd Id> 绑定自 交换.<2nd Id>(无路由密钥)
Exchange :MyConsumer 获得添加到 Exchange 的绑定。<2nd Id>
Exchange :IMyRequest 获取添加到 Exchange 的绑定。<2nd Id>
我真的很喜欢这种模式的简单使用,这是确保您的请求不会陷入黑洞的简单方法。 但是,每个 id 创建 1 个队列和 1 个交换器。每次运行消费者应用程序时,我都会看到 2 个通道和 1 个连接。 这是确保我们的回复不会丢失的最佳方法吗?
Chris Patterson 回答后,这是最终代码
消费者应用程序
static async Task<int> Main(string[] args)
{
Console.WriteLine("Consumer started ...");
if (args.Length == 0)
{
Console.WriteLine("You have given no command-line arguments. Expected <myId> ...");
return -1;
}
string myId = args[0];
string rabbitHost = System.Configuration.ConfigurationManager.AppSettings["RabbitMQ_Host"];
string rabbitUserName = System.Configuration.ConfigurationManager.AppSettings["RabbitMQ_UserName"];
string rabbitPassword= System.Configuration.ConfigurationManager.AppSettings["RabbitMQ_Password"];
string rabbitQueue = System.Configuration.ConfigurationManager.AppSettings["RabbitMQ_QueueName"]+ $".{myId}";
Console.WriteLine($"Creating Bus for id {myId}...RabbitHost = {rabbitHost}... Queue Name = {rabbitQueue}");
IBusControl busControl = Bus.Factory.CreateUsingRabbitMq(x =>
{
x.Host(new Uri(rabbitHost), h =>
{
h.Username(rabbitUserName);
h.Password(rabbitPassword);
});
x.ReceiveEndpoint(rabbitQueue,
e =>{
e.Consumer<MyConsumer>( );
e.Bind<MyConsumer>();
}
);
});
Console.WriteLine($"Finished Creating Bus..MyId={myId}");
}
public class MyFilter:
IFilter<ConsumerConsumeContext<MyConsumer,IMyRequest>>
{
private string MyId { get; set; }
public MyFilter(string myId)
{
MyId = myId;
}
public async Task Send(ConsumerConsumeContext<MyConsumer, IMyRequest> context, IPipe<ConsumerConsumeContext<MyConsumer, IMyRequest>> next)
{
if (context.Message.MyId == MyId)
{
await next.Send(context);
}
}
public void Probe(ProbeContext context)
{
var scope = context.CreateFilterScope("MyId");
scope.Add("MyId", MyId);
}
}
public class MyFilter1 :
IFilter<ConsumeContext<IMyRequest>>
{
private string MyId { get; set; }
public MyFilter1(string myId)
{
MyId = myId;
}
async Task IFilter<ConsumeContext<IMyRequest>>.Send(ConsumeContext<IMyRequest> context, IPipe<ConsumeContext<IMyRequest>> next)
{
if (context.Message.MyId == MyId )
{
await next.Send(context);
}
}
void IProbeSite.Probe(ProbeContext context)
{
}
}
public class MyConsumer : IConsumer<MyRequest>
{
public async Task Consume(ConsumeContext<MyRequest> context)
{
DateTime receivedTime = DateTime.Now;
Console.WriteLine($"Received Request for {context.Message.MyId}");
. . . do stuff . . .
context.Respond(
new MyResponse
{
MyId = context.Message.MyId,
ResponseStatus = ResponseStatus.Passed
});
Console.WriteLine($"My Response sent for {context.Message.MyId}... other stuff");
}
}
请求发起者应用程序
const string RABBIT_USER_NAME = "username";
const string RABBIT_PASSWORD = "password";
const string RABBIT_SERVER = "rabbitmq://<server>";
const string RABBIT_SERVICE = "rabbitmq://<server>/<queueName>";
var bus = return Bus.Factory.CreateUsingRabbitMq(x => x.Host(new Uri(RABBIT_SERVER), h =>
{
h.Username(RABBIT_USER_NAME);
h.Password(RABBIT_PASSWORD);
}));
TaskUtil.Await(() => bus.StartAsync());
MyRequest request = new MyRequest();
foreach (var myId in Ids)
{
var serviceAddress = new Uri($"{RABBIT_SERVICE}.{myId}");
IRequestClient<LightupRequest> client =
busControl.CreateRequestClient<MyRequest>(serviceAddress, TimeSpan.FromSeconds(10));
request.MyId = myId;
request.sid = 123456;
request.tid = 123457;
request.Data = "1111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111";
request.Token = "111111111111111111111111111111";
request.Version = "10.02.003.0004";
Task.Run(async () =>
{
request.StartTime = DateTime.Now;
var response = await client.GetResponse<MyResponse>(request);
//do something with response
}).Wait();
}
请求/响应类型(这些没有改变)
public interface IMyRequest
{
string MyId { get; set; }
}
public class MyRequest : IMyRequest
{
public string MyId { get; set; }
public string Token { get; set; }
public string Data { get; set; }
public int someId { get; set; }
public int someId2 { get; set; }
public string Version { get; set; }
public DateTime StartTime { get; set; }
}
public class MyResponse
{
public enum ResponseStatus { passed, failed, timeout, networkError }
public string MyId { get; set; }
public string ErrorData { get; set; }
}