我使用Spring WebFlux,Spring boot版本是3.2.0。我使用原生 Reactor Kafka。
Spring boot Sleuth 已移至 Micrometer Tracing
您可以从这里阅读:
Spring Cloud Sleuth 的最后一个小版本是 3.1。您可以检查 3.1.x 分支以获取最新提交。该项目的核心已转移到 Micrometer Tracing 项目,并且仪器将转移到 Micrometer 和所有相应的项目(所有仪器不再在单个存储库中完成)。
我想实现分布式追踪。我需要 traceId 和 spanId。
我使用以下依赖项进行跟踪:
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core-micrometer</artifactId>
</dependency>
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-tracing</artifactId>
</dependency>
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-tracing-bridge-brave</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
@Configuration
@RequiredArgsConstructor
@Slf4j
public class KafkaConfiguration {
private final KafkaProperties kafkaProperties;
private final KafkaTopicProperties topicProperties;
@Bean
public <K, V> ReactiveKafkaProducerTemplate<K, V> reactiveKafkaProducerTemplate(ObservationRegistry registry, PropagatingSenderTracingObservationHandler<?> handler) { // Generic Producer, If needed , create custom producer for specific object.
registry.observationConfig().observationHandler(handler);
var properties = kafkaProperties.buildProducerProperties(new DefaultSslBundleRegistry());
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, String.join(",", kafkaProperties.getBootstrapServers()));
var senderOptions = SenderOptions.<K, V>create(properties)
.withObservation(registry, new KafkaSenderObservation.DefaultKafkaSenderObservationConvention());
return new ReactiveKafkaProducerTemplate<>(senderOptions);
}
@Bean(name = "otpSendReactiveKafkaConsumer")
public ReactiveKafkaConsumerTemplate<String, OtpSendRequestTransferObject> otpSendReactiveKafkaConsumer(ObservationRegistry registry, PropagatingReceiverTracingObservationHandler<?> handler) {
registry.observationConfig()
.observationHandler(handler);
return new ReactiveKafkaConsumerTemplate<>(createReceiverOptions(List.of(topicProperties.getOtp().getTopic()), registry));
}
@Bean(name = "smsSendReactiveKafkaConsumer")
public ReactiveKafkaConsumerTemplate<String, SmsSendRequestTransferObject> smsSendReactiveKafkaConsumer(ObservationRegistry registry, PropagatingReceiverTracingObservationHandler<?> handler) {
registry.observationConfig()
.observationHandler(handler);
return new ReactiveKafkaConsumerTemplate<>(createReceiverOptions(List.of(topicProperties.getSms().getTopic()), registry));
}
private <K, V> ReceiverOptions<K, V> createReceiverOptions(List<String> topics, ObservationRegistry registry) {
var properties = kafkaProperties.buildConsumerProperties(new DefaultSslBundleRegistry());
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, String.join(",", kafkaProperties.getBootstrapServers()));
return ReceiverOptions.<K, V>create(properties)
.withObservation(registry, new KafkaReceiverObservation.DefaultKafkaReceiverObservationConvention())
.subscription(topics);
}
}
@Service
@Slf4j
@RequiredArgsConstructor
public class SendSMSReactiveKafkaProducer {
private final KafkaTopicProperties kafkaTopicProperties;
private final ReactiveKafkaProducerTemplate<String, SmsSendRequestTransferObject> kafkaProducerTemplate;
public Mono<CommonResult> produce(SmsSendRequest request) {
var topic = kafkaTopicProperties.getSms().getTopic();
var kafkaHeaders = new RecordHeaders();
var message = SmsSendRequestTransferObject.builder()
.content(request.getContent())
.receiver(request.getReceiver())
.build();
var record = new ProducerRecord<>(topic, null, UUID.randomUUID().toString(), message, kafkaHeaders);
return kafkaProducerTemplate.send(record)
.doOnError(ex -> log.error("Failed to send record={} to topic={} failed.", record, topic, ex))
.doOnSuccess(result -> log.debug("Record is successfully sent to topic={}, metadata={}", topic, result.recordMetadata()))
.map(result -> {
var hasResult = HasResult.SUCCESS;
if (result.exception() != null) {
hasResult = HasResult.UNKNOWN_ERROR;
}
return new CommonResult(hasResult);
});
}
}
@Service
@Slf4j
public class SendSMSReactiveKafkaConsumer extends AbstractBaseReactiveKafkaConsumer<String, SmsSendRequestTransferObject> {
private final RetryBackoffSpec retryBackoffSpec;
private final SmsMessageSender smsMessageSender;
private final ReactiveKafkaConsumerTemplate<String, SmsSendRequestTransferObject> smsSendReactiveKafkaConsumer;
public SendSMSReactiveKafkaConsumer(ReactiveKafkaConsumerTemplate<String, SmsSendRequestTransferObject> smsSendReactiveKafkaConsumer,
ReactiveKafkaProducerTemplate<String, ErrorRecord<SmsSendRequestTransferObject>> dlqProducerTemplate,
SmsMessageSender smsMessageSender, KafkaTopicProperties topicProperties,
SmsSettingProperties smsSettingProperties) {
super(topicProperties.getSms().getDlq(), dlqProducerTemplate);
var params = smsSettingProperties.getRetryBackOffSpec().getKafka();
this.smsMessageSender = smsMessageSender;
this.smsSendReactiveKafkaConsumer = smsSendReactiveKafkaConsumer;
this.retryBackoffSpec = RetryBackoffSpecUtil.createRetryBackoffSpec(params.getMaxAttempts(), params.getBackOffPeriod());
}
@PostConstruct
public void init() {
consume()
.onErrorContinue((throwable, o) -> log.error("Error while initializing Kafka consumer.", throwable))
.subscribe();
}
@Override
public Flux<Void> consume() {
return smsSendReactiveKafkaConsumer.receiveAtMostOnce() // Be careful to use receive types. receiveAtMostOnce - receives message and commits immediately , if failure occurs message will not be redelivered, but it guarantees message commit.
.onErrorResume(Throwable.class, ex -> {
log.error("Exception on receiving message from Kafka.", ex);
return Mono.empty(); // Continue processing the next message
})
.flatMap(message -> {
return Mono.just(message)
.doOnNext(this::log)
.flatMap(record -> smsMessageSender.send(record.value(), retryBackoffSpec))
.then()
.onErrorResume(Throwable.class, ex -> {
log.error("Error during message processing.", ex);
return sendToDLQ(message, ex); // Send message to DLQ topic
});
}
)
.doOnTerminate(() -> log.error("The subscription was terminated. Either it was cancelled or completed successfully."))
.subscribeOn(Schedulers.boundedElastic());
}
}
Configured **application.yaml** file like this:
记录: 图案: 级别:“%5p [${spring.application.name:},%X{traceId:-},%X{spanId:-}]”
It is working well, But when I publish request to Kafka topic, I lose current **traceId** and **spanId**.
I also configured WebFlux like `Hooks.enableAutomaticContextPropagation();`
While receiving request from endpoint, tracing is working and traceId and spanId are generating. In different contexts also it keeps current trace.
[![enter image description here][1]][1]
So problem is that I lost my **traceId** and **spanId** after publishing message to Kafka. How to configure Reactive Context Propagation for Kafka.
[1]: https://i.stack.imgur.com/bKYsv.jpg
您必须在
SenderOptions
上显式配置观察。 Spring Boot 不会自动配置它,因为 Reactor Kafka 没有自动配置。
请参阅文档中的更多信息:https://projectreactor.io/docs/kafka/release/reference/
然后搜索
5.5. Micrometer Observation
。
并且您还必须使用
io.micrometer:micrometer-observation
依赖项。