我正在开发一个 Springboot 应用程序,它使用 Spring-kafka 来实现 Kafka Producer 以将消息发送到 Kafka 集群。
在第一次迭代中,我们在 AKS 集群中部署了 Spring 应用程序。该应用程序连接到基于 Azure 事件中心的“Kafka 集群”。
这部分效果很好。我们实现了集成,并且能够将消息发送到主题并使用 JSON 格式进行消费。
在下一次迭代中,我们希望引入 AVRO 序列化器和反序列化器 + 架构注册表(使用 Azure 架构注册表)。我们对代码进行了更改,并使用测试容器成功测试了更改。但是,当我们尝试在 Azure 环境中执行该应用程序时,却失败了。
这里有一个应用程序示例: https://github.com/alvNa/spring-kafka-demo
这是配置的一部分:
spring:
kafka:
bootstrap-servers: <MY-NAMESPACE>.servicebus.windows.net:9093
security:
protocol: SASL_SSL
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
consumer:
group-id: kafka-example-application
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
properties:
sasl.mechanism: PLAIN
sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://<MY-NAMESPACE>.servicebus.windows.net/;SharedAccessKeyName=<MY-KEYNAME>;SharedAccessKey=<MY-ACCESS-KEY>";
schema.registry.url: https://<MY-NAMESPACE>.servicebus.windows.net
schema.group: <MY-SCHEMA-GROUP>
specific.avro.reader: true
auto.register.schemas: false
use.latest.version: true
我有一个基本的 avro 模式作为示例:
{
"namespace": "com.atradius.examples",
"type": "record",
"name": "Message",
"version": "1",
"fields": [
{
"name": "number",
"type": "int"
},
{
"name": "description",
"type": "string"
}
]
}
使用 Avro Maven/Gradle 插件生成 Java Pojo:
package com.atradius.examples;
import org.apache.avro.generic.GenericArray;
import org.apache.avro.specific.SpecificData;
import org.apache.avro.util.Utf8;
import org.apache.avro.message.BinaryMessageEncoder;
import org.apache.avro.message.BinaryMessageDecoder;
import org.apache.avro.message.SchemaStore;
@org.apache.avro.specific.AvroGenerated
public class Message extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord {
private static final long serialVersionUID = -90117537915021226L;
...
}
使用这个模型,我使用 Spring Kafka 模板创建一个 Kafka Producer。
@Service
@RequiredArgsConstructor
public class KafkaAvroMessageService {
@Value("${example-app.kafka.producer.topic}")
private String topicName;
private final KafkaTemplate<String, Message> kafkaTemplate;
public void sendEvent(final String key, final String body) {
final Message message = new Message(key, body);
kafkaTemplate.send(topicName, key, message);
}
}
当我运行代码并尝试发送消息时出现此错误。
2024-02-08T17:19:18.807 + 01:00错误25180 --- [nio-8080-exec-2] o.a.c.c.C.[.[.[/].[dispatcherServlet]:servlet [dispatcherServlet]的Servlet.service() ] 在路径 [] 的上下文中抛出异常 [请求处理失败:org.apache.kafka.common.errors.SerializationException:序列化 Avro 消息时出错],其根本原因
com.fasterxml.jackson.core.JsonParseException: Unexpected character ('<' (code 60)): expected a valid value (JSON String, Number (or 'NaN'/'INF'/'+INF'), Array, Object or token 'null', 'true' or 'false')
at [Source: (sun.net.www.protocol.http.HttpURLConnection$HttpInputStream); line: 1, column: 2]
at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:2477) ~[jackson-core-2.15.2.jar:2.15.2]
at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:750) ~[jackson-core-2.15.2.jar:2.15.2]
at com.fasterxml.jackson.core.base.ParserMinimalBase._reportUnexpectedChar(ParserMinimalBase.java:674) ~[jackson-core-2.15.2.jar:2.15.2]
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._handleUnexpectedValue(UTF8StreamJsonParser.java:2790) ~[jackson-core-2.15.2.jar:2.15.2]
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:908) ~[jackson-core-2.15.2.jar:2.15.2]
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:794) ~[jackson-core-2.15.2.jar:2.15.2]
at com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:4912) ~[jackson-databind-2.15.2.jar:2.15.2]
at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4818) ~[jackson-databind-2.15.2.jar:2.15.2]
at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3817) ~[jackson-databind-2.15.2.jar:2.15.2]
at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:297) ~[kafka-schema-registry-client-7.3.1.jar:na]
at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:384) ~[kafka-schema-registry-client-7.3.1.jar:na]
at io.confluent.kafka.schemaregistry.client.rest.RestService.getLatestVersion(RestService.java:927) ~[kafka-schema-registry-client-7.3.1.jar:na]
at io.confluent.kafka.schemaregistry.client.rest.RestService.getLatestVersion(RestService.java:918) ~[kafka-schema-registry-client-7.3.1.jar:na]
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getLatestSchemaMetadata(CachedSchemaRegistryClient.java:496) ~[kafka-schema-registry-client-7.3.1.jar:na]
at io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe.lookupLatestVersion(AbstractKafkaSchemaSerDe.java:216) ~[kafka-schema-serializer-7.3.1.jar:na]
at io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe.lookupLatestVersion(AbstractKafkaSchemaSerDe.java:200) ~[kafka-schema-serializer-7.3.1.jar:na]
at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:133) ~[kafka-avro-serializer-7.3.1.jar:na]
at io.confluent.kafka.serializers.KafkaAvroSerializer.serialize(KafkaAvroSerializer.java:61) ~[kafka-avro-serializer-7.3.1.jar:na]
at org.apache.kafka.common.serialization.Serializer.serialize(Serializer.java:62) ~[kafka-clients-7.3.1-ccs.jar:na]
at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:1002) ~[kafka-clients-7.3.1-ccs.jar:na]
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:949) ~[kafka-clients-7.3.1-ccs.jar:na]
at org.springframework.kafka.core.DefaultKafkaProducerFactory$CloseSafeProducer.send(DefaultKafkaProducerFactory.java:1016) ~[spring-kafka-3.0.9.jar:3.0.9]
at org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:783) ~[spring-kafka-3.0.9.jar:3.0.9]
at org.springframework.kafka.core.KafkaTemplate.observeSend(KafkaTemplate.java:754) ~[spring-kafka-3.0.9.jar:3.0.9]
at org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:544) ~[spring-kafka-3.0.9.jar:3.0.9]
at com.atradius.example.kafka.service.KafkaAvroMessageService.sendEvent(KafkaAvroMessageService.java:31) ~[main/:na]
at com.atradius.example.kafka.api.KafkaAvroExampleController.postNewEvent(KafkaAvroExampleController.java:38) ~[main/:na]
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:na]
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) ~[na:na]
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
at java.base/java.lang.reflect.Method.invoke(Method.java:568) ~[na:na]
at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:205) ~[spring-web-6.0.11.jar:6.0.11]
at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:150) ~[spring-web-6.0.11.jar:6.0.11]
at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:118) ~[spring-webmvc-6.0.11.jar:6.0.11]
at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:884) ~[spring-webmvc-6.0.11.jar:6.0.11]
at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:797) ~[spring-webmvc-6.0.11.jar:6.0.11]
at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:87) ~[spring-webmvc-6.0.11.jar:6.0.11]
at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:1081) ~[spring-webmvc-6.0.11.jar:6.0.11]
at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:974) ~[spring-webmvc-6.0.11.jar:6.0.11]
at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:1011) ~[spring-webmvc-6.0.11.jar:6.0.11]
at org.springframework.web.servlet.FrameworkServlet.doPost(FrameworkServlet.java:914) ~[spring-webmvc-6.0.11.jar:6.0.11]
at jakarta.servlet.http.HttpServlet.service(HttpServlet.java:590) ~[tomcat-embed-core-10.1.11.jar:6.0]
at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:885) ~[spring-webmvc-6.0.11.jar:6.0.11]
at jakarta.servlet.http.HttpServlet.service(HttpServlet.java:658) ~[tomcat-embed-core-10.1.11.jar:6.0]
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:205) ~[tomcat-embed-core-10.1.11.jar:10.1.11]
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:149) ~[tomcat-embed-core-10.1.11.jar:10.1.11]
at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:51) ~[tomcat-embed-websocket-10.1.11.jar:10.1.11]
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:174) ~[tomcat-embed-core-10.1.11.jar:10.1.11]
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:149) ~[tomcat-embed-core-10.1.11.jar:10.1.11]
at org.springframework.web.filter.RequestContextFilter.doFilterInternal(RequestContextFilter.java:100) ~[spring-web-6.0.11.jar:6.0.11]
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:116) ~[spring-web-6.0.11.jar:6.0.11]
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:174) ~[tomcat-embed-core-10.1.11.jar:10.1.11]
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:149) ~[tomcat-embed-core-10.1.11.jar:10.1.11]
at org.springframework.web.filter.FormContentFilter.doFilterInternal(FormContentFilter.java:93) ~[spring-web-6.0.11.jar:6.0.11]
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:116) ~[spring-web-6.0.11.jar:6.0.11]
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:174) ~[tomcat-embed-core-10.1.11.jar:10.1.11]
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:149) ~[tomcat-embed-core-10.1.11.jar:10.1.11]
at org.springframework.web.filter.ServerHttpObservationFilter.doFilterInternal(ServerHttpObservationFilter.java:109) ~[spring-web-6.0.11.jar:6.0.11]
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:116) ~[spring-web-6.0.11.jar:6.0.11]
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:174) ~[tomcat-embed-core-10.1.11.jar:10.1.11]
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:149) ~[tomcat-embed-core-10.1.11.jar:10.1.11]
at org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:201) ~[spring-web-6.0.11.jar:6.0.11]
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:116) ~[spring-web-6.0.11.jar:6.0.11]
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:174) ~[tomcat-embed-core-10.1.11.jar:10.1.11]
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:149) ~[tomcat-embed-core-10.1.11.jar:10.1.11]
at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:166) ~[tomcat-embed-core-10.1.11.jar:10.1.11]
at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:90) ~[tomcat-embed-core-10.1.11.jar:10.1.11]
at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:482) ~[tomcat-embed-core-10.1.11.jar:10.1.11]
at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:115) ~[tomcat-embed-core-10.1.11.jar:10.1.11]
at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:93) ~[tomcat-embed-core-10.1.11.jar:10.1.11]
at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:74) ~[tomcat-embed-core-10.1.11.jar:10.1.11]
at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:341) ~[tomcat-embed-core-10.1.11.jar:10.1.11]
at org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:391) ~[tomcat-embed-core-10.1.11.jar:10.1.11]
at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:63) ~[tomcat-embed-core-10.1.11.jar:10.1.11]
at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:894) ~[tomcat-embed-core-10.1.11.jar:10.1.11]
at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1740) ~[tomcat-embed-core-10.1.11.jar:10.1.11]
at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:52) ~[tomcat-embed-core-10.1.11.jar:10.1.11]
at org.apache.tomcat.util.threads.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1191) ~[tomcat-embed-core-10.1.11.jar:10.1.11]
at org.apache.tomcat.util.threads.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:659) ~[tomcat-embed-core-10.1.11.jar:10.1.11]
at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61) ~[tomcat-embed-core-10.1.11.jar:10.1.11]
at java.base/java.lang.Thread.run(Thread.java:833) ~[na:na]
调试后,我意识到问题是 Azure 架构注册表没有返回我所期望的结果。 用于检索当前返回 XML 的架构的 REST 调用。
curl --location 'https://<MY-NAMESPACE>.servicebus.windows.net/subjects/<MY-TOPIC>/versions/latest' \
--header 'Content-Type: application/vnd.schemaregistry.v1+json'
像这样
<feed xmlns="http://www.w3.org/2005/Atom">
<title type="text">Publicly Listed Services</title>
<subtitle type="text">This is the list of publicly-listed services currently available.</subtitle>
<id>uuid:81b665e5-f924-4fdc-918e-346ccadd6fdb;id=90772</id>
<updated>2024-02-08T16:20:47Z</updated>
<generator>Service Bus 1.1</generator>
</feed>
而不是像这样返回 JSON 模式。
{
"namespace": "com.atradius.examples",
"type": "record",
"name": "Message",
"version": "1",
"fields": [
{
"name": "number",
"type": "int"
},
{
"name": "description",
"type": "string"
}
]
}
如果有人有与 Azure 架构注册表集成的经验。您知道这种行为的原因是什么吗?
我需要在 Azure 架构注册表中进行某种配置吗?
谢谢。
经过几周的测试,并向 Microsoft 团队请求帮助,我们可以使用 Avro 以及 Azure 事件中心和架构注册表完成端到端。
有点痛苦,所以我希望这可以帮助其他人。
事件中心上的 Azure 架构注册表需要特定配置,与 Confluence 的标准配置不同。
在 Confluence 中,模式命名的默认策略是 TopicNameStrategy,但它很灵活,您可以更改它。在 Azure 架构注册表中,只有一种可用策略:RecordNameStrategy。如果您选择其他不同的策略,它将不起作用。
当您将 Avro 与 Confluence 结合使用时,您有 2 个类来序列化和反序列化 Kafka 中的消息:
> io.confluent.kafka.serializers.KafkaAvroSerializer
> io.confluent.kafka.serializers.KafkaAvroDeserializer
对于 Microsoft Azure,您应该使用特定的实现:
com.microsoft.azure.schemaregistry.kafka.avro.KafkaAvroSerializer.class
com.microsoft.azure.schemaregistry.kafka.avro.KafkaAvroDeserializer.class
知道要使用哪个库也很难找到。在Azure生态系统中,我发现了4个类似的库:
第一个是正确的。看起来它没有主动维护,但微软的人通知我们的团队它正在支持中,建议使用。
<dependency>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-schemaregistry-kafka-avro</artifactId>
<version>1.1.1</version>
</dependency>
如果您将 Azure 托管身份与 Kafka 应用程序一起使用,则需要使用这些依赖项:
<dependency>
<groupId>com.azure.spring</groupId>
<artifactId>spring-cloud-azure-starter</artifactId>
<version>5.7.0</version>
</dependency>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-identity</artifactId>
<version>1.11.4</version>
</dependency>
在生产者和消费者的配置中,必须设置默认令牌凭证才能访问模式注册表。 例如:
TokenCredential tokenCredential = new DefaultAzureCredentialBuilder().build();
在消费者中,我们还必须配置 2 个附加属性 Specific.avro.reader 和 Specific.avro.value.type。如果我们不添加值类型,消费者将会失败。我们可以设置我们想要使用的 Avro 对象,在前面的示例中是 Message 对象。但是我们可以使用 SpecificRecordBase 类使这个解决方案更加通用,因为使用 Maven Avro 插件生成的所有 Avro 对象都从该类扩展。
总而言之,我们必须修改所有这些配置:
value.subject.name.strategy=RecordNameStrategy.class
schema.registry.credential=tokenCredential
value.serializer=com.microsoft.azure.schemaregistry.kafka.avro.KafkaAvroSerializer.class value.deserializer=com.microsoft.azure.schemaregistry.kafka.avro.KafkaAvroDeserializer.class Specific.avro.reader=true Specific.avro.value.type=org.apache.avro.specific.SpecificRecordBase.class