我创建了以下测试类来使用 AvroSerializer 生成事件。
@SpringBootTest
@EmbeddedKafka(partitions = 1, brokerProperties = { "listeners=PLAINTEXT://localhost:9092", "port=9092" })
@TestPropertySource(locations = ("classpath:application-test.properties"))
@ContextConfiguration(classes = { TestAppConfig.class })
@DirtiesContext
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
class EntitlementEventsConsumerServiceImplTest {
@Autowired
EmbeddedKafkaBroker embeddedKafkaBroker;
@Bean
MockSchemaRegistryClient mockSchemaRegistryClient() {
return new MockSchemaRegistryClient();
}
@Bean
KafkaAvroSerializer kafkaAvroSerializer() {
return new KafkaAvroSerializer(mockSchemaRegistryClient());
}
@Bean
public DefaultKafkaProducerFactory producerFactory() {
Map<String, Object> props = KafkaTestUtils.producerProps(embeddedKafkaBroker);
props.put(KafkaAvroSerializerConfig.AUTO_REGISTER_SCHEMAS, false);
return new DefaultKafkaProducerFactory(props, new StringSerializer(), kafkaAvroSerializer());
}
@Bean
public KafkaTemplate<String, ApplicationEvent> kafkaTemplate() {
KafkaTemplate<String, ApplicationEvent> kafkaTemplate = new KafkaTemplate(producerFactory());
return kafkaTemplate;
}
}
但是当我使用
kafkaTemplate().send(appEventsTopic, applicationEvent);
发送事件时,我收到以下异常。
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema Not Found; error code: 404001
at io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient.getIdFromRegistry(MockSchemaRegistryClient.java:79)
at io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient.getId(MockSchemaRegistryClient.java:273)
at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:82)
at io.confluent.kafka.serializers.KafkaAvroSerializer.serialize(KafkaAvroSerializer.java:53)
at org.apache.kafka.common.serialization.Serializer.serialize(Serializer.java:62)
at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:902)
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:862)
at org.springframework.kafka.core.DefaultKafkaProducerFactory$CloseSafeProducer.send(DefaultKafkaProducerFactory.java:781)
at org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:562)
at org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:363)
当我使用 MockSchemaRegistryClient 时,为什么它试图查找架构?
Avro 架构注册表支持“模拟”伪协议:
schema.registry.url= mock://localhost.something
基本上任何以mock为前缀的东西都可以完成这项工作。
AbstractKafkaAvroSerDeConfig
:
public static final String SCHEMA_REGISTRY_URL_DOC = "Comma-separated list of URLs for schema registry instances that can be used to register " + "or look up schemas. " + "If you wish to get a connection to a mocked schema registry for testing, " + "you can specify a scope using the 'mock://' pseudo-protocol. For example, " + "'mock://my-scope-name' corresponds to " + "'MockSchemaRegistry.getClientForScope(\"my-scope-name\")'.";
同时设置:
auto.register.schemas=true
您将生产者设置为在生成消息时不尝试自动注册新架构,因此它只是尝试从 SR 中获取数据,但在 SR 上找不到其架构。
也没有看到你设置模式注册表 URL 猜测它采用默认值
对于你的问题,模拟正在模仿真实模式注册表的工作,但有其明显的缺点
/**
您可以查看文档以获取更多信息