使用 Avro Schema 注册表进行 Kafka 消费者单元测试失败

问题描述 投票:0回答:5

我正在编写一个消费者,它监听 Kafka 主题并在消息可用时消费消息。我已经通过在本地运行 Kafka 测试了逻辑/代码,并且工作正常。

在编写单元/组件测试用例时,由于 avro 架构注册表 url 错误而失败。我尝试过互联网上提供的不同选项,但找不到任何有效的方法。我不确定我的方法是否正确。请帮忙。

监听类

@KafkaListener(topics = "positionmgmt.v1", containerFactory = "genericKafkaListenerFactory")
    public void receive(ConsumerRecord<String, GenericRecord> consumerRecord) {
        try {
            GenericRecord generic = consumerRecord.value();
            Object obj = generic.get("metadata");

            ObjectMapper mapper = new ObjectMapper();

            Header headerMetaData = mapper.readValue(obj.toString(), Header.class);

            System.out.println("Received payload :   " + consumerRecord.value());

            //Call backend with details in GenericRecord 

        }catch (Exception e){
            System.out.println("Exception while reading message from Kafka " + e );
        }

卡夫卡配置

@Bean
    public ConcurrentKafkaListenerContainerFactory<String, GenericRecord> genericKafkaListenerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, GenericRecord> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(genericConsumerFactory());
        return factory;
    }

public ConsumerFactory<String, GenericRecord> genericConsumerFactory() {
        Map<String, Object> config = new HashMap<>();

        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
        config.put(ConsumerConfig.GROUP_ID_CONFIG, "group_id");
        config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
        config.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG,"http://localhost:8081");
        return new DefaultKafkaConsumerFactory<>(config);
    }

Avro 架构

{
   "type":"record",
   "name":"KafkaEvent",
   "namespace":"com.ms.model.avro",
   "fields":[
      {
         "name":"metadata",
         "type":{
            "name":"metadata",
            "type":"record",
            "fields":[
               {
                  "name":"correlationid",
                  "type":"string",
                  "doc":"this is corrleation id for transaction"
               },
               {
                  "name":"subject",
                  "type":"string",
                  "doc":"this is subject for transaction"
               },
               {
                  "name":"version",
                  "type":"string",
                  "doc":"this is version for transaction"
               }
            ]
         }
      },
      {
         "name":"name",
         "type":"string"
      },
      {
         "name":"dept",
         "type":"string"
      },
      {
         "name":"empnumber",
         "type":"string"
      }
   ]
}

这是我尝试过的测试代码...

@ComponentTest
    @RunWith(SpringRunner.class)
    @EmbeddedKafka(partitions = 1, topics = { "positionmgmt.v1" })
    @SpringBootTest(classes={Application.class})
    @DirtiesContext
    public class ConsumeKafkaMessageTest {

      private static final String TEST_TOPIC = "positionmgmt.v1";

      @Autowired(required=true)
      EmbeddedKafkaBroker embeddedKafkaBroker;

      private Schema schema;

      private  SchemaRegistryClient schemaRegistry;
      private  KafkaAvroSerializer avroSerializer;
      private  KafkaAvroDeserializer avroDeserializer;

      private MockSchemaRegistryClient mockSchemaRegistryClient = new MockSchemaRegistryClient();
      private String registryUrl = "unused";

      private String avroSchema = string representation of avro schema

      @BeforeEach
      public void setUp() throws Exception {
        Schema.Parser parser = new Schema.Parser();
        schema = parser.parse(avroSchema);

        mockSchemaRegistryClient.register("Vendors-value", schema);
      }

      @Test
      public void consumeKafkaMessage_receive_sucess() {

        Schema metadataSchema = schema.getField("metadata").schema();
        GenericRecord metadata = new GenericData.Record(metadataSchema);
        metadata.put("version", "1.0");
        metadata.put("correlationid", "correlationid");
        metadata.put("subject", "metadata");

        GenericRecord record = new GenericData.Record(schema);
        record.put("metadata", metadata);
        record.put("name", "ABC");
        record.put("dept", "XYZ");

        Consumer<String, GenericRecord> consumer = configureConsumer();
        Producer<String, GenericRecord> producer = configureProducer();

        ProducerRecord<String, GenericRecord> prodRecord = new ProducerRecord<String, GenericRecord>(TEST_TOPIC, record);

        producer.send(prodRecord);

        ConsumerRecord<String, GenericRecord> singleRecord = KafkaTestUtils.getSingleRecord(consumer, TEST_TOPIC);
        assertNotNull(singleRecord.value());

        consumer.close();
        producer.close();

      }

      private Consumer<String, GenericRecord> configureConsumer() {
        Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("groupid", "true", embeddedKafkaBroker);
        consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        Consumer<String, GenericRecord> consumer = new DefaultKafkaConsumerFactory<String, GenericRecord>(consumerProps).createConsumer();
        consumer.subscribe(Collections.singleton(TEST_TOPIC));
        return consumer;
      }

      private Producer<String, GenericRecord> configureProducer() {
        Map<String, Object> producerProps = new HashMap<>(KafkaTestUtils.producerProps(embeddedKafkaBroker));
        producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName());
        producerProps.put(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, mockSchemaRegistryClient);
        producerProps.put(KafkaAvroSerializerConfig.AUTO_REGISTER_SCHEMAS, "false");
        return new DefaultKafkaProducerFactory<String, GenericRecord>(producerProps).createProducer();
      }

}

错误

component.com.ms.listener.ConsumeKafkaMessageTest > consumeKafkaMessage_receive_sucess() FAILED
    org.apache.kafka.common.KafkaException: Failed to construct kafka producer
        at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:457)
        at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:289)
        at org.springframework.kafka.core.DefaultKafkaProducerFactory.createKafkaProducer(DefaultKafkaProducerFactory.java:318)
        at org.springframework.kafka.core.DefaultKafkaProducerFactory.createProducer(DefaultKafkaProducerFactory.java:305)
        at component.com.ms.listener.ConsumeKafkaMessageTest.configureProducer(ConsumeKafkaMessageTest.java:125)
        at component.com.ms.listener.ConsumeKafkaMessageTest.consumeKafkaMessage_receive_sucess(ConsumeKafkaMessageTest.java:97)

        Caused by:
        io.confluent.common.config.ConfigException: Invalid value io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient@20751870 for configuration schema.registry.url: Expected a comma separated list.
            at io.confluent.common.config.ConfigDef.parseType(ConfigDef.java:345)
            at io.confluent.common.config.ConfigDef.parse(ConfigDef.java:249)
            at io.confluent.common.config.AbstractConfig.<init>(AbstractConfig.java:78)
            at io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig.<init>(AbstractKafkaAvroSerDeConfig.java:105)
            at io.confluent.kafka.serializers.KafkaAvroSerializerConfig.<init>(KafkaAvroSerializerConfig.java:32)
            at io.confluent.kafka.serializers.KafkaAvroSerializer.configure(KafkaAvroSerializer.java:48)
            at org.apache.kafka.common.serialization.ExtendedSerializer$Wrapper.configure(ExtendedSerializer.java:60)
            at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:372)
            ... 5 more
java spring-boot junit apache-kafka confluent-schema-registry
5个回答
5
投票

我调查了一下,发现问题出在 KafkaAvroSerializer/Deserializer 使用的 CashedSchemaRegistryClient 中。它用于从 Confluence 模式注册表中获取模式定义。

您已经在本地拥有架构定义,因此无需前往架构注册表。 (至少在你的测试中)

我遇到了类似的问题,我通过创建自定义 KafkaAvroSerializer/KafkaAvroDeserializer 解决了它。

这是 KafkaAvroSerializer 的示例。这相当简单。你只需要扩展提供的KafkaAvroSerializer并告诉他使用MockSchemaRegistryClient。

public class CustomKafkaAvroSerializer extends KafkaAvroSerializer {
    public CustomKafkaAvroSerializer() {
        super();
        super.schemaRegistry = new MockSchemaRegistryClient();
    }

    public CustomKafkaAvroSerializer(SchemaRegistryClient client) {
        super(new MockSchemaRegistryClient());
    }

    public CustomKafkaAvroSerializer(SchemaRegistryClient client, Map<String, ?> props) {
        super(new MockSchemaRegistryClient(), props);
    }
}

这是 KafkaAvroDeserializer 的示例。当反序列化方法被调用时,你需要告诉他要使用哪个模式。

public class CustomKafkaAvroDeserializer extends KafkaAvroDeserializer {
    @Override
    public Object deserialize(String topic, byte[] bytes) {
        this.schemaRegistry = getMockClient(KafkaEvent.SCHEMA$);  
        return super.deserialize(topic, bytes);
    }

    private static SchemaRegistryClient getMockClient(final Schema schema$) {
        return new MockSchemaRegistryClient() {
            @Override
            public synchronized Schema getById(int id) {
                return schema$;
            }
        };
    }
}

最后一步是告诉spring使用创建的序列化器/反序列化器

spring.kafka.producer.properties.schema.registry.url= not-used
spring.kafka.producer.value-serializer = CustomKafkaAvroSerializer
spring.kafka.producer.key-serializer = org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.group-id = showcase-producer-id

spring.kafka.consumer.properties.schema.registry.url= not-used
spring.kafka.consumer.value-deserializer = CustomKafkaAvroDeserializer
spring.kafka.consumer.key-deserializer = org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.group-id = showcase-consumer-id
spring.kafka.auto.offset.reset = earliest

spring.kafka.producer.auto.register.schemas= true
spring.kafka.properties.specific.avro.reader= true

我写了一篇简短的博客文章: https://medium.com/@igorvlahek1/no-need-for-schema-registry-in-your-spring-kafka-tests-a5b81468a0e1?source=friends_link&sk=e55f73b86504e9f577e259181c8d0e23

工作示例项目的链接:https://github.com/ivlahek/kafka-avro-without-registry


2
投票

@ivlahek 的答案是有效的,但是如果您在 3 年后查看这个示例,您可能需要对 CustomKafkaAvroDeserializer 进行轻微修改

private static SchemaRegistryClient getMockClient(final Schema schema) {
        return new MockSchemaRegistryClient() {

     @Override
     public ParsedSchema getSchemaBySubjectAndId(String subject, int id)
                    throws IOException, RestClientException {
         return new AvroSchema(schema);
     }            
 };
}

0
投票

正如错误所述,您需要在生产者配置中向注册表提供一个字符串,而不是一个对象。

由于您使用的是 Mock 类,因此该字符串可以是任何内容......

但是,您需要根据注册表实例构建序列化器

Serializer serializer = new KafkaAvroSerializer(mockSchemaRegistry);
 // make config map with ("schema.registry.url", "unused") 
serializer.configure(config, false);

否则,它将尝试创建一个非模拟客户端

并将其放入属性中

producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, serializer);

0
投票

如果您的 @KafkaListener 在测试类中,那么您可以在 StringDeserializer 中读取它,然后手动将其转换为所需的类

    @Autowired
    private MyKafkaAvroDeserializer myKafkaAvroDeserializer;

    @KafkaListener( topics = "test")
    public void inputData(ConsumerRecord<?, ?> consumerRecord) {
        log.info("received payload='{}'", consumerRecord.toString(),consumerRecord.value());

        GenericRecord genericRecord = (GenericRecord)myKafkaAvroDeserializer.deserialize("test",consumerRecord.value().toString().getBytes(StandardCharsets.UTF_8));


        Myclass myclass = (Myclass) SpecificData.get().deepCopy(Myclass.SCHEMA$, genericRecord);
}
@Component
public class MyKafkaAvroDeserializer extends KafkaAvroDeserializer {
    @Override
    public Object deserialize(String topic, byte[] bytes) {

            this.schemaRegistry = getMockClient(Myclass.SCHEMA$);

        return super.deserialize(topic, bytes);
    }



    private static SchemaRegistryClient getMockClient(final Schema schema$) {
        return new MockSchemaRegistryClient() {
            @Override
            public synchronized org.apache.avro.Schema getById(int id) {
                return schema$;
            }
        };
    }
}

请记住在 application.yml 中添加模式注册表和键/值序列化器,尽管它不会被使用

    consumer:
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    properties:
      schema.registry.url :http://localhost:8080

0
投票

您应该使用

new MockSchemaRegistryClient()
,而不是使用
MockSchemaRegistry.getClientForScope("dummy-registry")
。这应该与
schema.registry.url = mock://dummy-registry
匹配,这会将消费者与您的测试联系在一起。

© www.soinside.com 2019 - 2024. All rights reserved.