我对Masstransit有点陌生,我不太清楚如何配置这个.有没有办法把消耗的消息返回到我调用Consume方法的地方?Consume方法返回IBusControl,但我无法掌握消息。
实际的Consume方法是通过ReceiveEndpoint调用的,而这个Consume方法返回一个Task持有消息。所以我的问题是,如何把这个消息传回ConsumeMessage方法,以便我可以用结果填充InventoryItemObject。
public IInventory ConsumeMessage(string domainId)
{
using (consumer = new MassTransitConsumer(MassTransitConsumer.GetConfiguration()))
{
try
{
var message = consumer.Consume(domainId).StartAsync().Result;
}
catch (Exception ex)
{
throw new Exception(ex.Message);
}
}
return new InventoryItemObject();
}
public IBusControl Consume(string queue)
{
var endpoint = new Uri(string.Concat(configuration.HostAddress, "/", queue));
try
{
if (configuration.QueueType == "RabbitMQ" || string.IsNullOrEmpty(configuration.QueueType))
{
bus = Bus.Factory.CreateUsingRabbitMq(rabbit =>
{
IRabbitMqHost host = rabbit.Host(configuration.HostAddress, settings =>
{
settings.Username(configuration.Username);
settings.Password(configuration.Password);
});
rabbit.ReceiveEndpoint(host, queue, c =>
{
c.UseConcurrencyLimit(1);
c.UseRetry(retryConfig => retryConfig.Incremental(5, TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(5)));
c.Consumer(() => new InventoryConsumer(_connectionString));
});
});
bus.Start();
}
}
catch (Exception ex)
{
Logger.LogError(ex.Message);
}
return bus;
}
public Task Consume(ConsumeContext<IInventory> context)
{
var inventory = context.Message;
_logger.LogDebug($"Order object: {JsonConvert.SerializeObject(inventory, Formatting.Indented)}");
_logger.LogDebug("Stating timer for UserChangeEventConsumer:");
var timer = new Stopwatch();
timer.Start();
var message = context.Message;
SetMessageAsConsumed(message);
timer.Stop();
_logger.LogDebug($"UserChangeEventConsumer completed in {timer.Elapsed.TotalSeconds} seconds.");
return Task.FromResult(message);
}
你想返回消息有什么具体原因吗?据我所知,最好的做法是干脆不要让Consume方法返回消息,因为不应该有任何人在那里监听。
如果你想返回一些东西给发布者,你应该使用 应答模式