Azure 架构注册表集成问题

问题描述 投票:0回答:1

我正在开发一个 Springboot 应用程序,它使用 Spring-kafka 来实现 Kafka Producer 以将消息发送到 Kafka 集群。

在第一次迭代中,我们在 AKS 集群中部署了 Spring 应用程序。该应用程序连接到基于 Azure 事件中心的“Kafka 集群”。

这部分效果很好。我们实现了集成,并且能够将消息发送到主题并使用 JSON 格式进行消费。

在下一次迭代中,我们希望引入 AVRO 序列化器和反序列化器 + 架构注册表(使用 Azure 架构注册表)。我们对代码进行了更改,并使用测试容器成功测试了更改。但是,当我们尝试在 Azure 环境中执行该应用程序时,却失败了。

这里有一个应用程序示例: https://github.com/alvNa/spring-kafka-demo

这是配置的一部分:

spring:
  kafka:
    bootstrap-servers: <MY-NAMESPACE>.servicebus.windows.net:9093
    security:
      protocol: SASL_SSL
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
    consumer:
      group-id: kafka-example-application
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
    properties:
      sasl.mechanism: PLAIN
      sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://<MY-NAMESPACE>.servicebus.windows.net/;SharedAccessKeyName=<MY-KEYNAME>;SharedAccessKey=<MY-ACCESS-KEY>";
      schema.registry.url: https://<MY-NAMESPACE>.servicebus.windows.net
      schema.group: <MY-SCHEMA-GROUP>
      specific.avro.reader: true
      auto.register.schemas: false
      use.latest.version: true

我有一个基本的 avro 模式作为示例:

{
  "namespace": "com.atradius.examples",
  "type": "record",
  "name": "Message",
  "version": "1",
  "fields": [
    {
      "name": "number",
      "type": "int"
    },
    {
      "name": "description",
      "type": "string"
    }
  ]
}

使用 Avro Maven/Gradle 插件生成 Java Pojo:

package com.atradius.examples;

import org.apache.avro.generic.GenericArray;
import org.apache.avro.specific.SpecificData;
import org.apache.avro.util.Utf8;
import org.apache.avro.message.BinaryMessageEncoder;
import org.apache.avro.message.BinaryMessageDecoder;
import org.apache.avro.message.SchemaStore;

@org.apache.avro.specific.AvroGenerated
public class Message extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord {
  private static final long serialVersionUID = -90117537915021226L;
 ...
}

使用这个模型,我使用 Spring Kafka 模板创建一个 Kafka Producer。

@Service
@RequiredArgsConstructor
public class KafkaAvroMessageService {

  @Value("${example-app.kafka.producer.topic}")
  private String topicName;

  private final KafkaTemplate<String, Message> kafkaTemplate;

  public void sendEvent(final String key, final String body) {
    final Message message = new Message(key, body);

    kafkaTemplate.send(topicName, key, message);
  }
}

当我运行代码并尝试发送消息时出现此错误。

2024-02-08T17:19:18.807 + 01:00错误25180 --- [nio-8080-exec-2] o.a.c.c.C.[.[.[/].[dispatcherServlet]:servlet [dispatcherServlet]的Servlet.service() ] 在路径 [] 的上下文中抛出异常 [请求处理失败:org.apache.kafka.common.errors.SerializationException:序列化 Avro 消息时出错],其根本原因

com.fasterxml.jackson.core.JsonParseException: Unexpected character ('<' (code 60)): expected a valid value (JSON String, Number (or 'NaN'/'INF'/'+INF'), Array, Object or token 'null', 'true' or 'false')
 at [Source: (sun.net.www.protocol.http.HttpURLConnection$HttpInputStream); line: 1, column: 2]
    at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:2477) ~[jackson-core-2.15.2.jar:2.15.2]
    at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:750) ~[jackson-core-2.15.2.jar:2.15.2]
    at com.fasterxml.jackson.core.base.ParserMinimalBase._reportUnexpectedChar(ParserMinimalBase.java:674) ~[jackson-core-2.15.2.jar:2.15.2]
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._handleUnexpectedValue(UTF8StreamJsonParser.java:2790) ~[jackson-core-2.15.2.jar:2.15.2]
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:908) ~[jackson-core-2.15.2.jar:2.15.2]
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:794) ~[jackson-core-2.15.2.jar:2.15.2]
    at com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:4912) ~[jackson-databind-2.15.2.jar:2.15.2]
    at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4818) ~[jackson-databind-2.15.2.jar:2.15.2]
    at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3817) ~[jackson-databind-2.15.2.jar:2.15.2]
    at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:297) ~[kafka-schema-registry-client-7.3.1.jar:na]
    at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:384) ~[kafka-schema-registry-client-7.3.1.jar:na]
    at io.confluent.kafka.schemaregistry.client.rest.RestService.getLatestVersion(RestService.java:927) ~[kafka-schema-registry-client-7.3.1.jar:na]
    at io.confluent.kafka.schemaregistry.client.rest.RestService.getLatestVersion(RestService.java:918) ~[kafka-schema-registry-client-7.3.1.jar:na]
    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getLatestSchemaMetadata(CachedSchemaRegistryClient.java:496) ~[kafka-schema-registry-client-7.3.1.jar:na]
    at io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe.lookupLatestVersion(AbstractKafkaSchemaSerDe.java:216) ~[kafka-schema-serializer-7.3.1.jar:na]
    at io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe.lookupLatestVersion(AbstractKafkaSchemaSerDe.java:200) ~[kafka-schema-serializer-7.3.1.jar:na]
    at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:133) ~[kafka-avro-serializer-7.3.1.jar:na]
    at io.confluent.kafka.serializers.KafkaAvroSerializer.serialize(KafkaAvroSerializer.java:61) ~[kafka-avro-serializer-7.3.1.jar:na]
    at org.apache.kafka.common.serialization.Serializer.serialize(Serializer.java:62) ~[kafka-clients-7.3.1-ccs.jar:na]
    at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:1002) ~[kafka-clients-7.3.1-ccs.jar:na]
    at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:949) ~[kafka-clients-7.3.1-ccs.jar:na]
    at org.springframework.kafka.core.DefaultKafkaProducerFactory$CloseSafeProducer.send(DefaultKafkaProducerFactory.java:1016) ~[spring-kafka-3.0.9.jar:3.0.9]
    at org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:783) ~[spring-kafka-3.0.9.jar:3.0.9]
    at org.springframework.kafka.core.KafkaTemplate.observeSend(KafkaTemplate.java:754) ~[spring-kafka-3.0.9.jar:3.0.9]
    at org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:544) ~[spring-kafka-3.0.9.jar:3.0.9]
    at com.atradius.example.kafka.service.KafkaAvroMessageService.sendEvent(KafkaAvroMessageService.java:31) ~[main/:na]
    at com.atradius.example.kafka.api.KafkaAvroExampleController.postNewEvent(KafkaAvroExampleController.java:38) ~[main/:na]
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:na]
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) ~[na:na]
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
    at java.base/java.lang.reflect.Method.invoke(Method.java:568) ~[na:na]
    at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:205) ~[spring-web-6.0.11.jar:6.0.11]
    at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:150) ~[spring-web-6.0.11.jar:6.0.11]
    at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:118) ~[spring-webmvc-6.0.11.jar:6.0.11]
    at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:884) ~[spring-webmvc-6.0.11.jar:6.0.11]
    at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:797) ~[spring-webmvc-6.0.11.jar:6.0.11]
    at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:87) ~[spring-webmvc-6.0.11.jar:6.0.11]
    at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:1081) ~[spring-webmvc-6.0.11.jar:6.0.11]
    at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:974) ~[spring-webmvc-6.0.11.jar:6.0.11]
    at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:1011) ~[spring-webmvc-6.0.11.jar:6.0.11]
    at org.springframework.web.servlet.FrameworkServlet.doPost(FrameworkServlet.java:914) ~[spring-webmvc-6.0.11.jar:6.0.11]
    at jakarta.servlet.http.HttpServlet.service(HttpServlet.java:590) ~[tomcat-embed-core-10.1.11.jar:6.0]
    at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:885) ~[spring-webmvc-6.0.11.jar:6.0.11]
    at jakarta.servlet.http.HttpServlet.service(HttpServlet.java:658) ~[tomcat-embed-core-10.1.11.jar:6.0]
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:205) ~[tomcat-embed-core-10.1.11.jar:10.1.11]
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:149) ~[tomcat-embed-core-10.1.11.jar:10.1.11]
    at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:51) ~[tomcat-embed-websocket-10.1.11.jar:10.1.11]
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:174) ~[tomcat-embed-core-10.1.11.jar:10.1.11]
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:149) ~[tomcat-embed-core-10.1.11.jar:10.1.11]
    at org.springframework.web.filter.RequestContextFilter.doFilterInternal(RequestContextFilter.java:100) ~[spring-web-6.0.11.jar:6.0.11]
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:116) ~[spring-web-6.0.11.jar:6.0.11]
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:174) ~[tomcat-embed-core-10.1.11.jar:10.1.11]
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:149) ~[tomcat-embed-core-10.1.11.jar:10.1.11]
    at org.springframework.web.filter.FormContentFilter.doFilterInternal(FormContentFilter.java:93) ~[spring-web-6.0.11.jar:6.0.11]
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:116) ~[spring-web-6.0.11.jar:6.0.11]
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:174) ~[tomcat-embed-core-10.1.11.jar:10.1.11]
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:149) ~[tomcat-embed-core-10.1.11.jar:10.1.11]
    at org.springframework.web.filter.ServerHttpObservationFilter.doFilterInternal(ServerHttpObservationFilter.java:109) ~[spring-web-6.0.11.jar:6.0.11]
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:116) ~[spring-web-6.0.11.jar:6.0.11]
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:174) ~[tomcat-embed-core-10.1.11.jar:10.1.11]
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:149) ~[tomcat-embed-core-10.1.11.jar:10.1.11]
    at org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:201) ~[spring-web-6.0.11.jar:6.0.11]
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:116) ~[spring-web-6.0.11.jar:6.0.11]
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:174) ~[tomcat-embed-core-10.1.11.jar:10.1.11]
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:149) ~[tomcat-embed-core-10.1.11.jar:10.1.11]
    at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:166) ~[tomcat-embed-core-10.1.11.jar:10.1.11]
    at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:90) ~[tomcat-embed-core-10.1.11.jar:10.1.11]
    at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:482) ~[tomcat-embed-core-10.1.11.jar:10.1.11]
    at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:115) ~[tomcat-embed-core-10.1.11.jar:10.1.11]
    at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:93) ~[tomcat-embed-core-10.1.11.jar:10.1.11]
    at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:74) ~[tomcat-embed-core-10.1.11.jar:10.1.11]
    at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:341) ~[tomcat-embed-core-10.1.11.jar:10.1.11]
    at org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:391) ~[tomcat-embed-core-10.1.11.jar:10.1.11]
    at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:63) ~[tomcat-embed-core-10.1.11.jar:10.1.11]
    at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:894) ~[tomcat-embed-core-10.1.11.jar:10.1.11]
    at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1740) ~[tomcat-embed-core-10.1.11.jar:10.1.11]
    at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:52) ~[tomcat-embed-core-10.1.11.jar:10.1.11]
    at org.apache.tomcat.util.threads.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1191) ~[tomcat-embed-core-10.1.11.jar:10.1.11]
    at org.apache.tomcat.util.threads.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:659) ~[tomcat-embed-core-10.1.11.jar:10.1.11]
    at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61) ~[tomcat-embed-core-10.1.11.jar:10.1.11]
    at java.base/java.lang.Thread.run(Thread.java:833) ~[na:na]

调试后,我意识到问题是 Azure 架构注册表没有返回我所期望的结果。 用于检索当前返回 XML 的架构的 REST 调用。

curl --location 'https://<MY-NAMESPACE>.servicebus.windows.net/subjects/<MY-TOPIC>/versions/latest' \
--header 'Content-Type: application/vnd.schemaregistry.v1+json'

像这样

<feed xmlns="http://www.w3.org/2005/Atom">
    <title type="text">Publicly Listed Services</title>
    <subtitle type="text">This is the list of publicly-listed services currently available.</subtitle>
    <id>uuid:81b665e5-f924-4fdc-918e-346ccadd6fdb;id=90772</id>
    <updated>2024-02-08T16:20:47Z</updated>
    <generator>Service Bus 1.1</generator>
</feed>

而不是像这样返回 JSON 模式。

{
  "namespace": "com.atradius.examples",
  "type": "record",
  "name": "Message",
  "version": "1",
  "fields": [
    {
      "name": "number",
      "type": "int"
    },
    {
      "name": "description",
      "type": "string"
    }
  ]
}

如果有人有与 Azure 架构注册表集成的经验。您知道这种行为的原因是什么吗?

我需要在 Azure 架构注册表中进行某种配置吗?

谢谢。

参考代码:https://github.com/alvNa/spring-kafka-demo

spring-boot azure apache-kafka spring-kafka confluent-schema-registry
1个回答
0
投票

经过几周的测试,并向 Microsoft 团队请求帮助,我们可以使用 Avro 以及 Azure 事件中心和架构注册表完成端到端。

有点痛苦,所以我希望这可以帮助其他人。

事件中心上的 Azure 架构注册表需要特定配置,与 Confluence 的标准配置不同。

在 Confluence 中,模式命名的默认策略是 TopicNameStrategy,但它很灵活,您可以更改它。在 Azure 架构注册表中,只有一种可用策略:RecordNameStrategy。如果您选择其他不同的策略,它将不起作用。

当您将 Avro 与 Confluence 结合使用时,您有 2 个类来序列化和反序列化 Kafka 中的消息:

> io.confluent.kafka.serializers.KafkaAvroSerializer
> io.confluent.kafka.serializers.KafkaAvroDeserializer

对于 Microsoft Azure,您应该使用特定的实现:

com.microsoft.azure.schemaregistry.kafka.avro.KafkaAvroSerializer.class​
com.microsoft.azure.schemaregistry.kafka.avro.KafkaAvroDeserializer.class

知道要使用哪个库也很难找到。在Azure生态系统中,我发现了4个类似的库:

  1. https://mvnrepository.com/artifact/com.microsoft.azure/azure-schemaregistry-kafka-avro(*)
  2. https://mvnrepository.com/artifact/com.azure/azure-data-schemaregistry-apacheavro
  3. https://mvnrepository.com/artifact/com.azure/azure-core-serializer-avro-apache
  4. https://mvnrepository.com/artifact/com.azure/azure-data-schemaregistry-avro

第一个是正确的。看起来它没有主动维护,但微软的人通知我们的团队它正在支持中,建议使用。

    <dependency>
        <groupId>com.microsoft.azure</groupId>
        <artifactId>azure-schemaregistry-kafka-avro</artifactId>
        <version>1.1.1</version>
    </dependency>

如果您将 Azure 托管身份与 Kafka 应用程序一起使用,则需要使用这些依赖项:

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-starter</artifactId>
    <version>5.7.0</version>
</dependency>
<dependency>
   <groupId>com.azure</groupId>
   <artifactId>azure-identity</artifactId>
   <version>1.11.4</version>
</dependency>

在生产者和消费者的配置中,必须设置默认令牌凭证才能访问模式注册表。 例如:

TokenCredential tokenCredential = new DefaultAzureCredentialBuilder().build();

在消费者中,我们还必须配置 2 个附加属性 Specific.avro.reader 和 Specific.avro.value.type。如果我们不添加值类型,消费者将会失败。我们可以设置我们想要使用的 Avro 对象,在前面的示例中是 Message 对象。但是我们可以使用 SpecificRecordBase 类使这个解决方案更加通用,因为使用 Maven Avro 插件生成的所有 Avro 对象都从该类扩展。

总而言之,我们必须修改所有这些配置:

value.subject.name.strategy=RecordNameStrategy.class​
schema.registry.credential=tokenCredential

value.serializer=com.microsoft.azure.schemaregistry.kafka.avro.KafkaAvroSerializer.class value.deserializer=com.microsoft.azure.schemaregistry.kafka.avro.KafkaAvroDeserializer.class Specific.avro.reader=true Specific.avro.value.type=org.apache.avro.specific.SpecificRecordBase.class

© www.soinside.com 2019 - 2024. All rights reserved.