批处理时验证Kafka消息

问题描述 投票:0回答:1

我正在使用 spring kafka 监听器批量处理消息,我希望在处理之前对它们进行验证。这是我配置 kafka 监听器的方式

@KafkaListener(id = "listener.id",
      topics = "kafka.events",
      idIsGroup = true,
      batch = "true",
      concurrency = "1")
  public void onMessage(@Valid @Payload ConsumerRecords<String, KafkaEvent> consumerRecords) {
    ...
  }

我的事件消息如下所示 - 字段注释有需要验证的所需约束:

@JsonIgnoreProperties(ignoreUnknown = true)
public class KafkaEvent {
    @NotNull 
    @Size(min = 1, max = 64) 
    private String eventId;
    
    @NotNull @PositiveOrZero 
    @Digits(integer = 20, fraction = 6)   
    private  BigDecimal amount;

    ....
}

根据文档https://docs.spring.io/spring-kafka/reference/kafka/receiving-messages/validation.html,我还配置了一个验证器:

@Configuration
@EnableKafka
public class Config implements KafkaListenerConfigurer {

    @Autowired
    private LocalValidatorFactoryBean validator;
    
    @Override
    public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) {
      registrar.setValidator(this.validator);
    }
}

但是如果我遵循上述方法,字段不会得到验证。因此,我必须改变我的方法,创建一个错误反序列化器来处理反序列化失败,并创建一个自定义验证器,我在反序列化器中设置该验证器来验证消息。据我从文档中了解到,我们确实需要一个错误反序列化器来处理反序列化失败,但是我们真的需要一个自定义验证器来批量验证消息吗?如果我们正确放置了 @Valid @Payload 注解,批处理中的消息在反序列化后不应该由 spring 本身进行验证吗?

方法2:

application.yaml

consumer:
      value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      ...
      properties:
        spring.deserializer.value.delegate.class: org.springframework.kafka.support.serializer.JsonDeserializer
        spring.deserializer.validator.class: com.shinky.myservice.validator.CustomValidator
    

验证器

import jakarta.validation.Validation;
import jakarta.validation.Validator;
import org.springframework.validation.beanvalidation.SpringValidatorAdapter;

public class CustomValidator extends SpringValidatorAdapter {

  public CustomValidator() {
    super(validator());
  }

  public static Validator validator() {
    return Validation.byDefaultProvider()
        .configure()
        .buildValidatorFactory().getValidator();
  }
}

我想了解,是否由于使用 @Payload 和 @Valid 注释进行验证而无法正常工作而导致我在实现过程中遗漏了某些内容,或者是否出于任何有效原因而故意这样做?

spring-boot validation spring-kafka
1个回答
0
投票

这实际上是Spring 中对象列表的验证的细微变化。

要使其正常工作,请在包含

@Validated
的类上添加
@KafkaListener

就我而言,我不需要 ConsumerRecords,因此我仅使用

List<MyDto>
测试了此解决方案。

详细说明请参阅https://stackoverflow.com/a/54394177/12794090

© www.soinside.com 2019 - 2024. All rights reserved.