如何使用 RabbitMQ 服务总线加过滤器实现 MassTransit 的请求/响应模式?

问题描述 投票:0回答:1

我正在使用 RabbitMQ 使用 IBusControl 对象和仅 1 个队列来实现 MassTransit 的请求/响应模式。

我们将运行 1 个请求发起者应用程序(当前是单元测试)和多个消费者(控制台)应用程序。每个消费者应用程序只会响应具有给定 MyId(命令行参数)的消息。

我假设我应该通过向 ReceiveEndpoint 添加过滤器来完成此任务 定义。我正在使用最新的 MassTransit (8.1.3.0)。

我尝试在下面的代码中创建一个 ConsumeContext 过滤器 (MyFilter1) (e.UseFilter(new MyFilter1(myId));) 这个过滤器从未被击中。 现在我正在尝试创建一个 ConsumerConsumeContext 过滤器(MyFilter)。我相信这是 我想要的过滤器类型。

我发现的所有代码示例似乎都在使用旧版本的 MassTransit,并且它们能够调用 UseConsumeFilter 传入新的过滤器对象。
像这样... e.UseConsumeFilter(new MyFilter1(myId));

e.UseConsumeFilter 现在接受类型和注册上下文,而不是过滤器对象。 我可以向 Program 类添加一个静态变量并给它一个 MyId。
但是我应该发送什么作为注册上下文?
或者有没有更好的方法来过滤掉不适用的消息?

此外,如果我让过滤器正常工作,MassTransit 会接受此请求并将其发送给下一个消费者还是会丢失?

这是代码。为了便于阅读,我更改了类名并删除了所有 try/catch 块和一些函数。

消费者应用程序

static async Task<int> Main(string[] args)
{
    Console.WriteLine("Consumer started ...");
    if (args.Length == 0)
    {
        Console.WriteLine("You have given no command-line arguments.  Expected <myId> ...");
        return -1;
    }
    
    string myId = args[0];
    string rabbitHost = System.Configuration.ConfigurationManager.AppSettings["RabbitMQ_Host"];
    string rabbitUserName = System.Configuration.ConfigurationManager.AppSettings["RabbitMQ_UserName"];
    string rabbitPassword= System.Configuration.ConfigurationManager.AppSettings["RabbitMQ_Password"];
    string rabbitQueue = System.Configuration.ConfigurationManager.AppSettings["RabbitMQ_QueueName"];
    Console.WriteLine($"Creating Bus for id {myId}...RabbitHost = {rabbitHost}... Queue Name = {rabbitQueue}");
    IBusControl busControl = Bus.Factory.CreateUsingRabbitMq(x =>
    {
   
        x.Host(new Uri(rabbitHost), h =>
        {
            h.Username(rabbitUserName);
            h.Password(rabbitPassword);
        });
                        
        x.ReceiveEndpoint(rabbitQueue,
            e =>{
                e.Consumer<MyConsumer>(  );
            
                !!!!INSERT LINE HERE!!!!
                //the following line never hits the filter before consumer
                //gets called
                //e.UseFilter(new MyFilter1(myId));
                //e.UseConsumeFilter(typeof (MyFilter),null);
            }
        );
    });
    Console.WriteLine($"Finished Creating Bus..MyId={myId}");

}

public class MyFilter: 
    IFilter<ConsumerConsumeContext<MyConsumer,IMyRequest>>
    
{
    private string MyId { get; set; }
    public MyFilter(string myId) 
    {
        MyId = myId;
    }

    public async Task Send(ConsumerConsumeContext<MyConsumer, IMyRequest> context, IPipe<ConsumerConsumeContext<MyConsumer, IMyRequest>> next)
    {
        if (context.Message.MyId == MyId)
        {
            await next.Send(context);
        }
    }

    public void Probe(ProbeContext context)
    {
        var scope = context.CreateFilterScope("MyId");
        scope.Add("MyId", MyId);
    }
}

public class MyFilter1 :
    IFilter<ConsumeContext<IMyRequest>> 
{
    private string MyId { get; set; }
    public MyFilter1(string myId)
    {
        MyId = myId;
    }

    async Task IFilter<ConsumeContext<IMyRequest>>.Send(ConsumeContext<IMyRequest> context, IPipe<ConsumeContext<IMyRequest>> next)
    {
        if (context.Message.MyId == MyId )
        {
            await next.Send(context);
        }
    }

    void IProbeSite.Probe(ProbeContext context)
    {
        
    }
}


public class MyConsumer : IConsumer<MyRequest>
{

    public async Task Consume(ConsumeContext<MyRequest> context)
    {
        DateTime receivedTime = DateTime.Now;
        Console.WriteLine($"Received Request for {context.Message.MyId}");
        . . . do stuff . . .
        
        context.Respond(
            new MyResponse
            {
                MyId = context.Message.MyId,
                ResponseStatus = ResponseStatus.Passed
            });
        Console.WriteLine($"My Response sent for {context.Message.MyId}... other stuff");       
    }
}

请求发起者应用程序

const string RABBIT_USER_NAME = "username";
const string RABBIT_PASSWORD = "password";
const string RABBIT_SERVER = "rabbitmq://<server>";
const string RABBIT_SERVICE = "rabbitmq://<server>/<queueName>";
var bus = return Bus.Factory.CreateUsingRabbitMq(x => x.Host(new Uri(RABBIT_SERVER), h =>
    {
        h.Username(RABBIT_USER_NAME);
        h.Password(RABBIT_PASSWORD);
    }));
TaskUtil.Await(() => bus.StartAsync());
var serviceAddress = new Uri(RABBIT_SERVICE);
IRequestClient<LightupRequest> client =
    busControl.CreateRequestClient<MyRequest>(serviceAddress, TimeSpan.FromSeconds(10));

MyRequest request = new MyRequest();
foreach (var myId in Ids)
{
    request.MyId = myId;
    request.sid = 123456;
    request.tid = 123457;
    request.Data = "1111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111";
    request.Token = "111111111111111111111111111111";
    request.Version = "10.02.003.0004";
    Task.Run(async () =>
    {
        request.StartTime = DateTime.Now;
        var response = await client.GetResponse<MyResponse>(request);
        //do something with response  
    }).Wait();  
}

请求/响应类型

public interface IMyRequest
{
    string MyId { get; set; }
}
public class MyRequest : IMyRequest
{
    public string MyId { get; set; }
    public string Token { get; set; }
    public string Data { get; set; }
    public int someId { get; set; }
    public int someId2 { get; set; }
    public string Version { get; set; }
    public DateTime StartTime { get; set; }
}

public class MyResponse
{
    public enum ResponseStatus { passed, failed, timeout, networkError }
    public string MyId { get; set; }
    public string ErrorData { get; set; }
}

我尝试在下面的代码中创建一个 ConsumeContext 过滤器 (MyFilter1) (e.UseFilter(new MyFilter1(myId));)

我希望这个过滤器会在消费者之前被命中,这样如果 ID 不是我期望的 ID,它可以阻止该消费者消费该消息。

我在 MyFilter1.Send 上设置了断点,但它从未命中。

此外,当我运行订阅者应用程序的 4 个实例时,我可以看到 MassTransit 仅将消息分发到 4 个可执行文件中的 1 个,并且下一条消息将位于不同的可执行文件上。

响应后的变化

我更改了队列名称以包含 myId,并且添加了绑定

消费者应用程序

static async Task<int> Main(string[] args)
{
    Console.WriteLine("Consumer started ...");
    if (args.Length == 0)
    {
        Console.WriteLine("You have given no command-line arguments.  Expected <myId> ...");
        return -1;
    }
    
    string myId = args[0];
    string rabbitHost = System.Configuration.ConfigurationManager.AppSettings["RabbitMQ_Host"];
    string rabbitUserName = System.Configuration.ConfigurationManager.AppSettings["RabbitMQ_UserName"];
    string rabbitPassword= System.Configuration.ConfigurationManager.AppSettings["RabbitMQ_Password"];
    string rabbitQueue = System.Configuration.ConfigurationManager.AppSettings["RabbitMQ_QueueName"]+ $".{myId}";
    Console.WriteLine($"Creating Bus for id {myId}...RabbitHost = {rabbitHost}... Queue Name = {rabbitQueue}");
    IBusControl busControl = Bus.Factory.CreateUsingRabbitMq(x =>
    {
   
        x.Host(new Uri(rabbitHost), h =>
        {
            h.Username(rabbitUserName);
            h.Password(rabbitPassword);
        });
                        
        x.ReceiveEndpoint(rabbitQueue,
            e =>{
                e.Consumer<MyConsumer>(  );
                e.Bind<MyConsumer>();
            }
        );
    });
    Console.WriteLine($"Finished Creating Bus..MyId={myId}");

}

public class MyFilter: 
    IFilter<ConsumerConsumeContext<MyConsumer,IMyRequest>>
    
{
    private string MyId { get; set; }
    public MyFilter(string myId) 
    {
        MyId = myId;
    }

    public async Task Send(ConsumerConsumeContext<MyConsumer, IMyRequest> context, IPipe<ConsumerConsumeContext<MyConsumer, IMyRequest>> next)
    {
        if (context.Message.MyId == MyId)
        {
            await next.Send(context);
        }
    }

    public void Probe(ProbeContext context)
    {
        var scope = context.CreateFilterScope("MyId");
        scope.Add("MyId", MyId);
    }
}

public class MyFilter1 :
    IFilter<ConsumeContext<IMyRequest>> 
{
    private string MyId { get; set; }
    public MyFilter1(string myId)
    {
        MyId = myId;
    }

    async Task IFilter<ConsumeContext<IMyRequest>>.Send(ConsumeContext<IMyRequest> context, IPipe<ConsumeContext<IMyRequest>> next)
    {
        if (context.Message.MyId == MyId )
        {
            await next.Send(context);
        }
    }

    void IProbeSite.Probe(ProbeContext context)
    {
        
    }
}


public class MyConsumer : IConsumer<MyRequest>
{

    public async Task Consume(ConsumeContext<MyRequest> context)
    {
        DateTime receivedTime = DateTime.Now;
        Console.WriteLine($"Received Request for {context.Message.MyId}");
        . . . do stuff . . .
        
        context.Respond(
            new MyResponse
            {
                MyId = context.Message.MyId,
                ResponseStatus = ResponseStatus.Passed
            });
        Console.WriteLine($"My Response sent for {context.Message.MyId}... other stuff");       
    }
}

请求发起者应用程序

const string RABBIT_USER_NAME = "username";
const string RABBIT_PASSWORD = "password";
const string RABBIT_SERVER = "rabbitmq://<server>";
const string RABBIT_SERVICE = "rabbitmq://<server>/<queueName>";
var bus = return Bus.Factory.CreateUsingRabbitMq(x => x.Host(new Uri(RABBIT_SERVER), h =>
    {
        h.Username(RABBIT_USER_NAME);
        h.Password(RABBIT_PASSWORD);
    }));
TaskUtil.Await(() => bus.StartAsync());

MyRequest request = new MyRequest();
foreach (var myId in Ids)
{
    var serviceAddress = new Uri($"{RABBIT_SERVICE}.{myId}");
    IRequestClient<LightupRequest> client =
        busControl.CreateRequestClient<MyRequest>(serviceAddress, TimeSpan.FromSeconds(10));


    request.MyId = myId;
    request.sid = 123456;
    request.tid = 123457;
    request.Data = "1111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111";
    request.Token = "111111111111111111111111111111";
    request.Version = "10.02.003.0004";
    Task.Run(async () =>
    {
        request.StartTime = DateTime.Now;
        var response = await client.GetResponse<MyResponse>(request);
        //do something with response  
    }).Wait();  
}

请求/响应类型(这些没有改变)

public interface IMyRequest
{
    string MyId { get; set; }
}
public class MyRequest : IMyRequest
{
    public string MyId { get; set; }
    public string Token { get; set; }
    public string Data { get; set; }
    public int someId { get; set; }
    public int someId2 { get; set; }
    public string Version { get; set; }
    public DateTime StartTime { get; set; }
}

public class MyResponse
{
    public enum ResponseStatus { passed, failed, timeout, networkError }
    public string MyId { get; set; }
    public string ErrorData { get; set; }
}

观察结果

当消费者应用程序运行时,在 RabbitMQ 上创建以下内容

  • 1 个连接
  • 2 个频道
  • 交换。 绑定自 交换:MyConsumer(无路由密钥) 交换:IMyRequest(无路由密钥) 绑定到 队列.
  • 交易所:我的消费者 绑定到 交换 。 (没有路由密钥)
  • 交换:IMyRequest 绑定到 交换 。 (没有路由密钥)
  • 队列。 绑定自 交换 。 (没有路由密钥)

当我使用新的 myId 运行消费者应用程序的第二个实例时,以下是添加的内容

  • 1 个连接

  • 2 个频道

  • 交流.<2nd Id> 绑定自 交换:MyConsumer(无路由密钥) 交换:IMyRequest(无路由密钥) 绑定到 队列.<2nd Id>

  • 队列.<2nd Id> 绑定自 交换.<2nd Id>(无路由密钥)

  • Exchange :MyConsumer 获得添加到 Exchange 的绑定。<2nd Id>

  • Exchange :IMyRequest 获取添加到 Exchange 的绑定。<2nd Id>

最后一个问题

我真的很喜欢这种模式的简单使用,这是确保您的请求不会陷入黑洞的简单方法。 但是,每个 id 创建 1 个队列和 1 个交换器。每次运行消费者应用程序时,我都会看到 2 个通道和 1 个连接。 这是确保我们的回复不会丢失的最佳方法吗?

c# rabbitmq masstransit request-response masstransit-courier
1个回答
0
投票

Chris Patterson 回答后,这是最终代码

消费者应用程序

static async Task<int> Main(string[] args)
{
    Console.WriteLine("Consumer started ...");
    if (args.Length == 0)
    {
        Console.WriteLine("You have given no command-line arguments.  Expected <myId> ...");
        return -1;
    }
    
    string myId = args[0];
    string rabbitHost = System.Configuration.ConfigurationManager.AppSettings["RabbitMQ_Host"];
    string rabbitUserName = System.Configuration.ConfigurationManager.AppSettings["RabbitMQ_UserName"];
    string rabbitPassword= System.Configuration.ConfigurationManager.AppSettings["RabbitMQ_Password"];
    string rabbitQueue = System.Configuration.ConfigurationManager.AppSettings["RabbitMQ_QueueName"]+ $".{myId}";
    Console.WriteLine($"Creating Bus for id {myId}...RabbitHost = {rabbitHost}... Queue Name = {rabbitQueue}");
    IBusControl busControl = Bus.Factory.CreateUsingRabbitMq(x =>
    {
   
        x.Host(new Uri(rabbitHost), h =>
        {
            h.Username(rabbitUserName);
            h.Password(rabbitPassword);
        });
                        
        x.ReceiveEndpoint(rabbitQueue,
            e =>{
                e.Consumer<MyConsumer>(  );
                e.Bind<MyConsumer>();
            }
        );
    });
    Console.WriteLine($"Finished Creating Bus..MyId={myId}");

}

public class MyFilter: 
    IFilter<ConsumerConsumeContext<MyConsumer,IMyRequest>>
    
{
    private string MyId { get; set; }
    public MyFilter(string myId) 
    {
        MyId = myId;
    }

    public async Task Send(ConsumerConsumeContext<MyConsumer, IMyRequest> context, IPipe<ConsumerConsumeContext<MyConsumer, IMyRequest>> next)
    {
        if (context.Message.MyId == MyId)
        {
            await next.Send(context);
        }
    }

    public void Probe(ProbeContext context)
    {
        var scope = context.CreateFilterScope("MyId");
        scope.Add("MyId", MyId);
    }
}

public class MyFilter1 :
    IFilter<ConsumeContext<IMyRequest>> 
{
    private string MyId { get; set; }
    public MyFilter1(string myId)
    {
        MyId = myId;
    }

    async Task IFilter<ConsumeContext<IMyRequest>>.Send(ConsumeContext<IMyRequest> context, IPipe<ConsumeContext<IMyRequest>> next)
    {
        if (context.Message.MyId == MyId )
        {
            await next.Send(context);
        }
    }

    void IProbeSite.Probe(ProbeContext context)
    {
        
    }
}


public class MyConsumer : IConsumer<MyRequest>
{

    public async Task Consume(ConsumeContext<MyRequest> context)
    {
        DateTime receivedTime = DateTime.Now;
        Console.WriteLine($"Received Request for {context.Message.MyId}");
        . . . do stuff . . .
        
        context.Respond(
            new MyResponse
            {
                MyId = context.Message.MyId,
                ResponseStatus = ResponseStatus.Passed
            });
        Console.WriteLine($"My Response sent for {context.Message.MyId}... other stuff");       
    }
}

请求发起者应用程序

const string RABBIT_USER_NAME = "username";
const string RABBIT_PASSWORD = "password";
const string RABBIT_SERVER = "rabbitmq://<server>";
const string RABBIT_SERVICE = "rabbitmq://<server>/<queueName>";
var bus = return Bus.Factory.CreateUsingRabbitMq(x => x.Host(new Uri(RABBIT_SERVER), h =>
    {
        h.Username(RABBIT_USER_NAME);
        h.Password(RABBIT_PASSWORD);
    }));
TaskUtil.Await(() => bus.StartAsync());

MyRequest request = new MyRequest();
foreach (var myId in Ids)
{
    var serviceAddress = new Uri($"{RABBIT_SERVICE}.{myId}");
    IRequestClient<LightupRequest> client =
        busControl.CreateRequestClient<MyRequest>(serviceAddress, TimeSpan.FromSeconds(10));


    request.MyId = myId;
    request.sid = 123456;
    request.tid = 123457;
    request.Data = "1111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111";
    request.Token = "111111111111111111111111111111";
    request.Version = "10.02.003.0004";
    Task.Run(async () =>
    {
        request.StartTime = DateTime.Now;
        var response = await client.GetResponse<MyResponse>(request);
        //do something with response  
    }).Wait();  
}

请求/响应类型(这些没有改变)

public interface IMyRequest
{
    string MyId { get; set; }
}
public class MyRequest : IMyRequest
{
    public string MyId { get; set; }
    public string Token { get; set; }
    public string Data { get; set; }
    public int someId { get; set; }
    public int someId2 { get; set; }
    public string Version { get; set; }
    public DateTime StartTime { get; set; }
}

public class MyResponse
{
    public enum ResponseStatus { passed, failed, timeout, networkError }
    public string MyId { get; set; }
    public string ErrorData { get; set; }
}
© www.soinside.com 2019 - 2024. All rights reserved.