分布式跟踪不适用于 Spring WebFlux + Reactor Kafka

问题描述 投票:0回答:1

我使用Spring WebFlux,Spring boot版本是3.2.0。我使用原生 Reactor Kafka

Spring boot Sleuth 已移至 Micrometer Tracing

您可以从这里阅读:

Spring Cloud Sleuth 的最后一个小版本是 3.1。您可以检查 3.1.x 分支以获取最新提交。该项目的核心已转移到 Micrometer Tracing 项目,并且仪器将转移到 Micrometer 和所有相应的项目(所有仪器不再在单个存储库中完成)。

我想实现分布式追踪。我需要 traceIdspanId

我使用以下依赖项进行跟踪:

        <dependency>
            <groupId>io.projectreactor</groupId>
            <artifactId>reactor-core-micrometer</artifactId>
        </dependency>
        <dependency>
            <groupId>io.micrometer</groupId>
            <artifactId>micrometer-tracing</artifactId>
        </dependency>
        <dependency>
            <groupId>io.micrometer</groupId>
            <artifactId>micrometer-tracing-bridge-brave</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>
@Configuration
@RequiredArgsConstructor
@Slf4j
public class KafkaConfiguration {
    private final KafkaProperties kafkaProperties;
    private final KafkaTopicProperties topicProperties;

    @Bean
    public <K, V> ReactiveKafkaProducerTemplate<K, V> reactiveKafkaProducerTemplate(ObservationRegistry registry, PropagatingSenderTracingObservationHandler<?> handler) { // Generic Producer, If needed , create custom producer for specific object.
        registry.observationConfig().observationHandler(handler);

        var properties = kafkaProperties.buildProducerProperties(new DefaultSslBundleRegistry());
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, String.join(",", kafkaProperties.getBootstrapServers()));

        var senderOptions = SenderOptions.<K, V>create(properties)
                .withObservation(registry, new KafkaSenderObservation.DefaultKafkaSenderObservationConvention());
        return new ReactiveKafkaProducerTemplate<>(senderOptions);
    }

    @Bean(name = "otpSendReactiveKafkaConsumer")
    public ReactiveKafkaConsumerTemplate<String, OtpSendRequestTransferObject> otpSendReactiveKafkaConsumer(ObservationRegistry registry, PropagatingReceiverTracingObservationHandler<?> handler) {
        registry.observationConfig()
                .observationHandler(handler);
        return new ReactiveKafkaConsumerTemplate<>(createReceiverOptions(List.of(topicProperties.getOtp().getTopic()), registry));
    }


    @Bean(name = "smsSendReactiveKafkaConsumer")
    public ReactiveKafkaConsumerTemplate<String, SmsSendRequestTransferObject> smsSendReactiveKafkaConsumer(ObservationRegistry registry, PropagatingReceiverTracingObservationHandler<?> handler) {
        registry.observationConfig()
                .observationHandler(handler);
        return new ReactiveKafkaConsumerTemplate<>(createReceiverOptions(List.of(topicProperties.getSms().getTopic()), registry));
    }

    private <K, V> ReceiverOptions<K, V> createReceiverOptions(List<String> topics, ObservationRegistry registry) {

        var properties = kafkaProperties.buildConsumerProperties(new DefaultSslBundleRegistry());
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, String.join(",", kafkaProperties.getBootstrapServers()));

        return ReceiverOptions.<K, V>create(properties)
                .withObservation(registry, new KafkaReceiverObservation.DefaultKafkaReceiverObservationConvention())
                .subscription(topics);

    }

}
@Service
@Slf4j
@RequiredArgsConstructor
public class SendSMSReactiveKafkaProducer {
    private final KafkaTopicProperties kafkaTopicProperties;
    private final ReactiveKafkaProducerTemplate<String, SmsSendRequestTransferObject> kafkaProducerTemplate;

    public Mono<CommonResult> produce(SmsSendRequest request) {
        var topic = kafkaTopicProperties.getSms().getTopic();

        var kafkaHeaders = new RecordHeaders();
        var message = SmsSendRequestTransferObject.builder()
                .content(request.getContent())
                .receiver(request.getReceiver())
                .build();
        var record = new ProducerRecord<>(topic, null, UUID.randomUUID().toString(), message, kafkaHeaders);

        return kafkaProducerTemplate.send(record)
                .doOnError(ex -> log.error("Failed to send record={} to topic={} failed.", record, topic, ex))
                .doOnSuccess(result -> log.debug("Record is successfully sent to topic={}, metadata={}", topic, result.recordMetadata()))
                .map(result -> {
                    var hasResult = HasResult.SUCCESS;
                    if (result.exception() != null) {
                        hasResult = HasResult.UNKNOWN_ERROR;
                    }
                    return new CommonResult(hasResult);
                });
    }
}

@Service
@Slf4j
public class SendSMSReactiveKafkaConsumer extends AbstractBaseReactiveKafkaConsumer<String, SmsSendRequestTransferObject> {
    private final RetryBackoffSpec retryBackoffSpec;
    private final SmsMessageSender smsMessageSender;
    private final ReactiveKafkaConsumerTemplate<String, SmsSendRequestTransferObject> smsSendReactiveKafkaConsumer;

    public SendSMSReactiveKafkaConsumer(ReactiveKafkaConsumerTemplate<String, SmsSendRequestTransferObject> smsSendReactiveKafkaConsumer,
                                        ReactiveKafkaProducerTemplate<String, ErrorRecord<SmsSendRequestTransferObject>> dlqProducerTemplate,
                                        SmsMessageSender smsMessageSender, KafkaTopicProperties topicProperties,
                                        SmsSettingProperties smsSettingProperties) {
        super(topicProperties.getSms().getDlq(), dlqProducerTemplate);
        var params = smsSettingProperties.getRetryBackOffSpec().getKafka();
        this.smsMessageSender = smsMessageSender;
        this.smsSendReactiveKafkaConsumer = smsSendReactiveKafkaConsumer;
        this.retryBackoffSpec = RetryBackoffSpecUtil.createRetryBackoffSpec(params.getMaxAttempts(), params.getBackOffPeriod());
    }

    @PostConstruct
    public void init() {
        consume()
                .onErrorContinue((throwable, o) -> log.error("Error while initializing Kafka consumer.", throwable))
                .subscribe();
    }

    @Override
    public Flux<Void> consume() {
        return smsSendReactiveKafkaConsumer.receiveAtMostOnce() // Be careful to use receive types. receiveAtMostOnce - receives message and commits immediately , if failure occurs message will not be redelivered, but it guarantees message commit.
                .onErrorResume(Throwable.class, ex -> {
                    log.error("Exception on receiving message from Kafka.", ex);
                    return Mono.empty();  // Continue processing the next message
                })
                .flatMap(message -> {
                            return Mono.just(message)
                                    .doOnNext(this::log)
                                    .flatMap(record -> smsMessageSender.send(record.value(), retryBackoffSpec))
                                    .then()
                                    .onErrorResume(Throwable.class, ex -> {
                                        log.error("Error during message processing.", ex);
                                        return sendToDLQ(message, ex); // Send message to DLQ topic
                                    });
                        }
                )
                .doOnTerminate(() -> log.error("The subscription was terminated. Either it was cancelled or completed successfully."))
                .subscribeOn(Schedulers.boundedElastic());
    }


}
Configured **application.yaml** file like this:

记录: 图案: 级别:“%5p [${spring.application.name:},%X{traceId:-},%X{spanId:-}]”

 It is working well, But when I publish request to Kafka topic, I lose current **traceId** and **spanId**. 

I also configured WebFlux like `Hooks.enableAutomaticContextPropagation();`
While receiving request from endpoint, tracing is working and traceId and spanId are generating. In different contexts also it keeps current trace. 

[![enter image description here][1]][1]


So problem is that I lost my **traceId** and **spanId** after publishing message to Kafka. How to configure Reactive Context Propagation for Kafka. 


  [1]: https://i.stack.imgur.com/bKYsv.jpg
spring-boot apache-kafka spring-webflux project-reactor micrometer-tracing
1个回答
0
投票

您必须在

SenderOptions
上显式配置观察。 Spring Boot 不会自动配置它,因为 Reactor Kafka 没有自动配置。

请参阅文档中的更多信息:https://projectreactor.io/docs/kafka/release/reference/

然后搜索

5.5. Micrometer Observation

并且您还必须使用

io.micrometer:micrometer-observation
依赖项。

© www.soinside.com 2019 - 2024. All rights reserved.