RabbitMq - ConversationId vs CorrelationId - 哪个更适合跟踪特定请求?

问题描述 投票:1回答:1

RabbitMQ似乎有两个非常相似的属性,我不完全理解它们的区别。 ConversationIdCorrelationId

我的用例如下。我有一个生成Guid的网站。该网站调用AP​​I,将该唯一标识符添加到HttpRequest标头中。这反过来又向RabbitMQ发布消息。该消息由第一个消费者处理并在其他地方传递给另一个消费者,依此类推。

出于记录目的,我想记录一个标识符,该标识符将初始请求与所有后续操作联系起来。对于整个应用程序的不同部分,这应该是唯一的。因此。当记录到像Serilog / ElasticSearch这样的东西时,这很容易看出哪个请求触发了初始请求,并且整个应用程序中该请求的所有日志条目都可以相互关联。

我创建了一个提供程序,它查看传入的HttpRequest的标识符。我称之为“CorrelationId”,但我开始怀疑这是否真的应该命名为“ConversationId”。就RabbitMQ而言,“ConversationId”的概念是否更适合这个模型,还是“CorrelationId”更好?

这两个概念有什么区别?

在代码方面,我希望做到以下几点。首先在我的API中注册总线并配置SendPublish以使用提供商提供的CorrelationId

// bus registration in the API
var busSettings = context.Resolve<BusSettings>();
// using AspNetCoreCorrelationIdProvider
var correlationIdProvider = context.Resolve<ICorrelationIdProvider>();

var busControl = Bus.Factory.CreateUsingRabbitMq(cfg =>
{
    cfg.Host(
        new Uri(busSettings.HostAddress),
        h =>
        {
            h.Username(busSettings.Username);
            h.Password(busSettings.Password);
        });
    cfg.ConfigurePublish(x => x.UseSendExecute(sendContext =>
    {
        // which one is more appropriate
        //sendContext.ConversationId = correlationIdProvider.GetCorrelationId();
        sendContext.CorrelationId = correlationIdProvider.GetCorrelationId();
    }));
});

作为参考,这是我简单的提供者界面

// define the interface
public interface ICorrelationIdProvider
{
    Guid GetCorrelationId();
}

以及AspNetCore实现,它提取由调用客户端(即网站)设置的唯一ID。

public class AspNetCoreCorrelationIdProvider : ICorrelationIdProvider
{
    private IHttpContextAccessor _httpContextAccessor;

    public AspNetCoreCorrelationIdProvider(IHttpContextAccessor httpContextAccessor)
    {
        _httpContextAccessor = httpContextAccessor;
    }

    public Guid GetCorrelationId()
    {
        if (_httpContextAccessor.HttpContext.Request.Headers.TryGetValue("correlation-Id", out StringValues headers))
        {
            var header = headers.FirstOrDefault();
            if (Guid.TryParse(header, out Guid headerCorrelationId))
            {
                return headerCorrelationId;
            }
        }

        return Guid.NewGuid();
    }
}

最后,我的服务主机是简单的Windows服务应用程序,它们可以使用已发布的消息。他们使用以下内容来获取CorrelationId,并可能在其他服务主机中发布给其他消费者。

public class MessageContextCorrelationIdProvider : ICorrelationIdProvider
{
    /// <summary>
    /// The consume context
    /// </summary>
    private readonly ConsumeContext _consumeContext;

    /// <summary>
    /// Initializes a new instance of the <see cref="MessageContextCorrelationIdProvider"/> class.
    /// </summary>
    /// <param name="consumeContext">The consume context.</param>
    public MessageContextCorrelationIdProvider(ConsumeContext consumeContext)
    {
        _consumeContext = consumeContext;
    }

    /// <summary>
    /// Gets the correlation identifier.
    /// </summary>
    /// <returns></returns>
    public Guid GetCorrelationId()
    {
        // correlationid or conversationIs?
        if (_consumeContext.CorrelationId.HasValue && _consumeContext.CorrelationId != Guid.Empty)
        {
            return _consumeContext.CorrelationId.Value;
        }

        return Guid.NewGuid();
    }
}

然后我在我的消费者中使用一个记录器,使用该提供程序来提取CorrelationId

public async Task Consume(ConsumeContext<IMyEvent> context)
{
    var correlationId = _correlationProvider.GetCorrelationId();
    _logger.Info(correlationId, $"#### IMyEvent received for customer:{context.Message.CustomerId}");

    try
    {
        await _mediator.Send(new SomeOtherRequest(correlationId) { SomeObject: context.Message.SomeObject });
    }
    catch (Exception e)
    {
        _logger.Exception(e, correlationId, $"Exception:{e}");
        throw;
    }

    _logger.Info(correlationId, $"Finished processing: {DateTime.Now}");
}

阅读docs,它说下面的“ConversationId”:

对话由发送或发布的第一条消息创建,其中没有现有的上下文可用(例如,使用IBus.Send或IBus.Publish发送或发布消息时)。如果使用现有上下文发送或发布消息,则会将ConversationId复制到新消息,以确保同一对话中的一组消息具有相同的标识符。

现在我开始认为我的术语已经混淆了,从技术上来说这是一个对话(尽管'对话'就像'电话游戏')。

那么,CorrelationId在这个用例中,还是ConversationId?请帮我弄清楚我的术语吧!

c# rabbitmq masstransit
1个回答
3
投票

在消息对话中(提示预测乐谱),可以有一条消息(我告诉你要做某事,或者我告诉所有正在听的事情发生了什么事)或多条消息(我告诉你要做点什么,然后告诉你别人,或者我告诉所有正在倾听事情发生的人和那些听众告诉他们的朋友,等等。

使用MassTransit,从第一条消息到最终消息,正确使用,这些消息中的每一条消息都具有相同的ConversationId。在消息消费期间,MassTransit将未经修改的ConsumeContext属性复制到每个传出消息。这使得一切都成为同一条痕迹 - 一个对话。

但是,MassTransit默认不设置CorrelationId。如果消息属性名为CorrelationId(或CommandId或EventId),则可以自动设置,也可以添加自己的名称。

如果消耗的消息上存在CorrelationId,则任何传出消息都会将CorrelationId属性复制到InitiatorId属性(原因和效果 - 消耗的消息在创建后续消息时启动)。这形成了一个链(或跨度,在跟踪术语中),可以遵循该链以显示来自初始消息的消息的传播。

CorrelationId应被视为命令或事件的标识符,以便在整个系统日志中可以看到该命令的效果。

听起来像是来自HTTP的输入可能是启动器,因此将该标识符复制到InitiatorId并为消息创建新的CorrelationId,或者您可能只想对初始CorrelationId使用相同的标识符并让后续消息使用它作为发起者。

© www.soinside.com 2019 - 2024. All rights reserved.