可选逻辑类型情况下的 Avro 序列化问题

问题描述 投票:0回答:1

我没有任何kafka和相关技术的经验,所以我需要你的帮助。

我面临以下问题:

在我当前的项目中,我们使用融合平台(kafka + schema 注册表)。对于序列化/反序列化,我们决定使用 AVRO。

我创建了 springboot 应用程序,它充当 kafka 消息的生产者和消费者。

我有一个问题,当我尝试发送包含字段

shippedAt
的kafka消息时...在模式中它被定义为可选字段(默认值:null)

当我尝试向 kafka 发送消息时,生产者抛出异常:

org.apache.avro.AvroRuntimeException: Unknown datum type java.time.Instant: 2023-12-18T10:40:01.594Z

你有类似的经历吗?或者有更好的方法来处理时间戳吗?

消息架构(字段为必填):

{
  "namespace": "sk.kubbo.eventhubpoc.avro",
  "type": "record",
  "name": "Order",
  "fields": [
    {
      "name": "orderId",
      "type": "string"
    },
    {
      "name": "product",
      "type": "string"
    },
    {
      "name": "quantity",
      "type": "int"
    },
    {
      "name": "shippedAt",
      "type": [
        "null",
        {
          "type": "long",
          "logicalType": "timestamp-millis"
        }
      ],
      "default": null
    }
  ]
}

完整的堆栈跟踪:

org.apache.avro.AvroRuntimeException: Unknown datum type java.time.Instant: 2023-12-18T10:40:01.594Z
    at org.apache.avro.generic.GenericData.getSchemaName(GenericData.java:947) ~[avro-1.11.3.jar:1.11.3]
    at org.apache.avro.specific.SpecificData.getSchemaName(SpecificData.java:440) ~[avro-1.11.3.jar:1.11.3]
    at org.apache.avro.generic.GenericData.resolveUnion(GenericData.java:906) ~[avro-1.11.3.jar:1.11.3]
    at org.apache.avro.generic.GenericDatumWriter.resolveUnion(GenericDatumWriter.java:307) ~[avro-1.11.3.jar:1.11.3]
    at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:157) ~[avro-1.11.3.jar:1.11.3]
    at org.apache.avro.specific.SpecificDatumWriter.writeField(SpecificDatumWriter.java:108) ~[avro-1.11.3.jar:1.11.3]
    at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:234) ~[avro-1.11.3.jar:1.11.3]
    at org.apache.avro.specific.SpecificDatumWriter.writeRecord(SpecificDatumWriter.java:92) ~[avro-1.11.3.jar:1.11.3]
    at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:145) ~[avro-1.11.3.jar:1.11.3]
    at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:95) ~[avro-1.11.3.jar:1.11.3]
    at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:159) ~[avro-1.11.3.jar:1.11.3]
    at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:95) ~[avro-1.11.3.jar:1.11.3]
    at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82) ~[avro-1.11.3.jar:1.11.3]
    at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.writeDatum(AbstractKafkaAvroSerializer.java:192) ~[kafka-avro-serializer-7.5.1.jar:na]
    at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:162) ~[kafka-avro-serializer-7.5.1.jar:na]
    at io.confluent.kafka.serializers.KafkaAvroSerializer.serialize(KafkaAvroSerializer.java:68) ~[kafka-avro-serializer-7.5.1.jar:na]
    at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:1000) ~[kafka-clients-3.6.0.jar:na]
    at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:947) ~[kafka-clients-3.6.0.jar:na]
    at org.springframework.kafka.core.DefaultKafkaProducerFactory$CloseSafeProducer.send(DefaultKafkaProducerFactory.java:1050) ~[spring-kafka-3.1.0.jar:3.1.0]
    at org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:787) ~[spring-kafka-3.1.0.jar:3.1.0]
    at org.springframework.kafka.core.KafkaTemplate.observeSend(KafkaTemplate.java:756) ~[spring-kafka-3.1.0.jar:3.1.0]
    at org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:545) ~[spring-kafka-3.1.0.jar:3.1.0]
    at sk.kubbo.eventhubpoc.producer.EventProducer.sendMessage(EventProducer.java:37) ~[classes/:na]
    at sk.kubbo.eventhubpoc.producer.EventProducer.createOrderEvent(EventProducer.java:32) ~[classes/:na]
    at sk.kubbo.eventhubpoc.controller.ProducerController.produceOrder(ProducerController.java:24) ~[classes/:na]
    at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103) ~[na:na]
    at java.base/java.lang.reflect.Method.invoke(Method.java:580) ~[na:na]
    at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:254) ~[spring-web-6.1.1.jar:6.1.1]
    at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:182) ~[spring-web-6.1.1.jar:6.1.1]
    at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:118) ~[spring-webmvc-6.1.1.jar:6.1.1]
    at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:917) ~[spring-webmvc-6.1.1.jar:6.1.1]
    at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:829) ~[spring-webmvc-6.1.1.jar:6.1.1]
    at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:87) ~[spring-webmvc-6.1.1.jar:6.1.1]
    at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:1089) ~[spring-webmvc-6.1.1.jar:6.1.1]
    at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:979) ~[spring-webmvc-6.1.1.jar:6.1.1]
    at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:1014) ~[spring-webmvc-6.1.1.jar:6.1.1]
    at org.springframework.web.servlet.FrameworkServlet.doPost(FrameworkServlet.java:914) ~[spring-webmvc-6.1.1.jar:6.1.1]
    at jakarta.servlet.http.HttpServlet.service(HttpServlet.java:590) ~[tomcat-embed-core-10.1.16.jar:6.0]
    at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:885) ~[spring-webmvc-6.1.1.jar:6.1.1]
    at jakarta.servlet.http.HttpServlet.service(HttpServlet.java:658) ~[tomcat-embed-core-10.1.16.jar:6.0]
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:205) ~[tomcat-embed-core-10.1.16.jar:10.1.16]
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:149) ~[tomcat-embed-core-10.1.16.jar:10.1.16]
    at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:51) ~[tomcat-embed-websocket-10.1.16.jar:10.1.16]
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:174) ~[tomcat-embed-core-10.1.16.jar:10.1.16]
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:149) ~[tomcat-embed-core-10.1.16.jar:10.1.16]
    at org.zalando.logbook.servlet.LogbookFilter.doFilter(LogbookFilter.java:76) ~[logbook-servlet-3.7.2.jar:na]
    at org.zalando.logbook.servlet.HttpFilter.doFilter(HttpFilter.java:32) ~[logbook-servlet-3.7.2.jar:na]
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:174) ~[tomcat-embed-core-10.1.16.jar:10.1.16]
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:149) ~[tomcat-embed-core-10.1.16.jar:10.1.16]
    at org.springframework.web.filter.RequestContextFilter.doFilterInternal(RequestContextFilter.java:100) ~[spring-web-6.1.1.jar:6.1.1]
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:116) ~[spring-web-6.1.1.jar:6.1.1]
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:174) ~[tomcat-embed-core-10.1.16.jar:10.1.16]
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:149) ~[tomcat-embed-core-10.1.16.jar:10.1.16]
    at org.springframework.web.filter.FormContentFilter.doFilterInternal(FormContentFilter.java:93) ~[spring-web-6.1.1.jar:6.1.1]
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:116) ~[spring-web-6.1.1.jar:6.1.1]
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:174) ~[tomcat-embed-core-10.1.16.jar:10.1.16]
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:149) ~[tomcat-embed-core-10.1.16.jar:10.1.16]
    at org.springframework.web.filter.ServerHttpObservationFilter.doFilterInternal(ServerHttpObservationFilter.java:109) ~[spring-web-6.1.1.jar:6.1.1]
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:116) ~[spring-web-6.1.1.jar:6.1.1]
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:174) ~[tomcat-embed-core-10.1.16.jar:10.1.16]
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:149) ~[tomcat-embed-core-10.1.16.jar:10.1.16]
    at org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:201) ~[spring-web-6.1.1.jar:6.1.1]
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:116) ~[spring-web-6.1.1.jar:6.1.1]
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:174) ~[tomcat-embed-core-10.1.16.jar:10.1.16]
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:149) ~[tomcat-embed-core-10.1.16.jar:10.1.16]
    at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:167) ~[tomcat-embed-core-10.1.16.jar:10.1.16]
    at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:90) ~[tomcat-embed-core-10.1.16.jar:10.1.16]
    at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:482) ~[tomcat-embed-core-10.1.16.jar:10.1.16]
    at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:115) ~[tomcat-embed-core-10.1.16.jar:10.1.16]
    at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:93) ~[tomcat-embed-core-10.1.16.jar:10.1.16]
    at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:74) ~[tomcat-embed-core-10.1.16.jar:10.1.16]
    at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:340) ~[tomcat-embed-core-10.1.16.jar:10.1.16]
    at org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:391) ~[tomcat-embed-core-10.1.16.jar:10.1.16]
    at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:63) ~[tomcat-embed-core-10.1.16.jar:10.1.16]
    at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:896) ~[tomcat-embed-core-10.1.16.jar:10.1.16]
    at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1744) ~[tomcat-embed-core-10.1.16.jar:10.1.16]
    at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:52) ~[tomcat-embed-core-10.1.16.jar:10.1.16]
    at org.apache.tomcat.util.threads.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1191) ~[tomcat-embed-core-10.1.16.jar:10.1.16]
    at org.apache.tomcat.util.threads.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:659) ~[tomcat-embed-core-10.1.16.jar:10.1.16]
    at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61) ~[tomcat-embed-core-10.1.16.jar:10.1.16]
    at java.base/java.lang.Thread.run(Thread.java:1583) ~[na:na]

如果

shippedAt
的值为
null
,则工作正常。此外,当我修改架构并使字段
shippedAt
成为必填字段(参见下面的代码片段)时,它可以正常工作。

{
  "namespace": "sk.kubbo.eventhubpoc.avro",
  "type": "record",
  "name": "Order",
  "fields": [
    {
      "name": "orderId",
      "type": "string"
    },
    {
      "name": "product",
      "type": "string"
    },
    {
      "name": "quantity",
      "type": "int"
    },
    {
      "name": "shippedAt",
      "type": {
        "type": "long",
        "logicalType": "timestamp-millis"
      }
    }
  ]
}

为了生成java类,我使用maven插件

org.apache.avro:avro-maven-plugin:1.11.3
,我的设置如下:

<plugin>
    <groupId>org.apache.avro</groupId>
    <artifactId>avro-maven-plugin</artifactId>
    <version>1.11.3</version>
    <executions>
        <execution>
            <phase>generate-sources</phase>
            <goals>
                <goal>schema</goal>
            </goals>
        </execution>
    </executions>
    <configuration>
        <sourceDirectory>${project.basedir}/src/main/resources/avro/event</sourceDirectory>
        <createSetters>false</createSetters>
        <enableDecimalLogicalType>true</enableDecimalLogicalType>
        <fieldVisibility>private</fieldVisibility>
    </configuration>
</plugin>

我注意到,当我从 schema2 生成 java 类(字段是强制性的)时,在生成的类中存在 getConversion 方法(当字段是可选的时,生成的类中缺少该方法)

  private static final org.apache.avro.Conversion<?>[] conversions =
      new org.apache.avro.Conversion<?>[] {
      null,
      null,
      null,
      new org.apache.avro.data.TimeConversions.TimestampMillisConversion(),
      null
  };

  @Override
  public org.apache.avro.Conversion<?> getConversion(int field) {
    return conversions[field];
  }

看起来这可能是由于一个主题中有多种事件类型引起的。我像文章中那样实现了它https://www.confluence.io/blog/multiple-event-types-in-the-same-kafka-topic/ 当我在主题中使用仅允许

order
消息的
order
类时,它可以工作。

我现在真的很绝望,因为我们需要在一个主题中有多种事件类型,因为我们必须保留事件的顺序。

spring-boot apache-kafka spring-kafka avro confluent-platform
1个回答
0
投票

好的,我成功解决了这个问题。

我发现一个信息(Writing Avro From Spark to Kafka),这是avro依赖项中的一个错误。在此页面上还有一个指向 StackOverflow 线程 的链接,我在其中找到了解决方案

我在应用程序类中添加了以下行,现在它可以正常工作了

SpecificData.get().addLogicalTypeConversion(new TimeConversions.TimestampMillisConversion());
© www.soinside.com 2019 - 2024. All rights reserved.