如何在MassTransit中制作过滤器?

问题描述 投票:0回答:1

我正在尝试处理 MassTransit,我有许多带有 StableDefussion 的服务器,它们接受在队列中生成图像的任务

但事实是不同的服务器有不同的模型集,我不想为每个模型创建很多队列

我的 GPU 服务器如何从队列中过滤任务并仅将那些可以执行的任务投入工作?

我的发布服务器

internal class Program {
    static async Task Main(string[] args) {
        var busControl = Bus.Factory.CreateUsingRabbitMq(cfg => {
            cfg.Host(new Uri(ServerConfig.Host), h => {
                h.Username(ServerConfig.Username);
                h.Password(ServerConfig.Password);
            });

            cfg.ReceiveEndpoint("calculation_results_queue", e => {
                e.Consumer(() => new ResultConsumer());
            });
        });
        await busControl.StartAsync();
        Console.WriteLine("Publisher is running...");


        await busControl.Publish<CalculationTask>(new {
            TaskId = NewId.NextGuid().ToString(),
            Data = "Important data for calculation"
        }, context => {
            context.Headers.Set("model", "model_comic");
        });


        Console.WriteLine("Press any key to exit");
        await Task.Run(() => Console.ReadKey());
        await busControl.StopAsync();
    }
}

我的 GPU 工作者

internal class Program {
    static async Task Main(string[] args) {
        var busControl = Bus.Factory.CreateUsingRabbitMq(cfg => {
            cfg.Host(new Uri(ServerConfig.Host), h => {
                h.Username(ServerConfig.Username);
                h.Password(ServerConfig.Password);
            });

            cfg.ReceiveEndpoint("calculation_task_queue", e => {
                e.Bind("headers_exchange", x => {
                    x.ExchangeType = "headers";
                    x.SetExchangeArgument("x-match", "any");
                    x.SetExchangeArgument("model", "model_real");
                    x.SetExchangeArgument("model", "model_anime");
                });

                e.Consumer(() => new CalculationTaskConsumer());
            });
        });

        await busControl.StartAsync();


        Console.WriteLine("Press any key to exit");
        await Task.Run(() => Console.ReadKey());
        await busControl.StopAsync();
    }
}

我的工作人员不应该接收消息,但他还是这么做了。

c# rabbitmq masstransit
1个回答
0
投票
cfg.ReceiveEndpoint("calculation_task_queue", e => {

    e.ConfigureConsumeTopology = false;
    
    e.Bind("headers_exchange", x => {
        x.ExchangeType = "headers";
        x.SetExchangeArgument("x-match", "any");
        x.SetExchangeArgument("model", "model_real");
        x.SetExchangeArgument("model", "model_anime");
    });

    e.Consumer(() => new CalculationTaskConsumer());
});

默认拓扑配置可能是原因,但您可以通过查看 RabbitMQ 管理控制台并查看附加绑定来找出答案。

© www.soinside.com 2019 - 2024. All rights reserved.