Spring Integration 中的跟踪(Spring boot 3.0.9)

问题描述 投票:0回答:1

我正在尝试使用 Spring 集成:我通过 JMS 收到一条消息并将其发送到 Kafka:

    @Override
    protected IntegrationFlowDefinition<?> buildFlow() {
        return from(Jms.messageDrivenChannelAdapter(connectionFactory).destination(mqInQueue))
                .log(LoggingHandler.Level.DEBUG, message -> "Received JMS message: " + message.getPayload())
                .channel(channels -> MessageChannelFactory.create(channels, "request-channel-1"))
                .handle(Kafka.outboundChannelAdapter(kafkaTemplate).topic(kafkaOutTopic));
    }

为了检查,我使用 POST 请求 + JmsTemplate 向 MQ 发送消息:

    @PostMapping("/mq")
    public String sentToMq(@RequestBody final String body) {
        jmsTemplate.convertAndSend(mqRequestQueue, body, m -> {
            final var span = tracer.startScopedSpan("jms-send");
            try {
                final var context = span.context();
                m.setStringProperty("b3", "%s-%s-%s".formatted(context.traceId(), context.spanId(), Boolean.TRUE.equals(context.sampled()) ? "1" : "0"));
            } finally {
                span.end();
            }

            return m;
        });
        return "done";
    }

除了追踪之外,一切正常。我在发送之前手动设置了

b3
标头,但在收到 Spring Integration 后会覆盖它。我该怎么做才能保留传入的traceId?

我还有下一个配置:

  • @EnableIntegrationManagement(observationPatterns = "*")
  •   @Bean
      @GlobalChannelInterceptor(order = Ordered.HIGHEST_PRECEDENCE)
      public ChannelInterceptor observationPropagationChannelInterceptor(final ObservationRegistry observationRegistry) {
          return new ObservationPropagationChannelInterceptor(observationRegistry);
      }
    
spring spring-integration spring-micrometer micrometer-tracing
1个回答
0
投票

好的。我知道问题出在哪里了。

JmsMessageDrivenEndpoint
不会将其
registerObservationRegistry(ObservationRegistry)
向下传播到
ChannelPublishingJmsMessageListener
,因此
IntegrationObservation.HANDLER
不会从该
b3
标头恢复跟踪。

MessageChannel
的观察仅参考当前上下文,并根据当前上下文真正填充新的
b3
标头。

考虑改用

Jms.messageDrivenChannelAdapter(AbstractMessageListenerContainer)
并使用
AbstractMessageListenerContainer.setObservationRegistry()
启用其
JmsObservationDocumentation.JMS_MESSAGE_PROCESS
消费者追踪。

对于

JmsMessageDrivenEndpoint
,请在 Spring Integration 中提出 GH 问题。

© www.soinside.com 2019 - 2024. All rights reserved.