Kafka 流应用程序想要检索错误的模式

问题描述 投票:0回答:1

我们在其中一个 Kafka 流应用程序中看到这样的错误:

org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_6, processor=KSTREAM-SOURCE-0000000000, topic=mytopicXYZ, partition=6, offset=465, stacktrace=org.apache.kafka.common.errors.SerializationException: Error retrieving Protobuf schema for id 571
    at io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe.toKafkaException(AbstractKafkaSchemaSerDe.java:253)
    at io.confluent.kafka.serializers.protobuf.AbstractKafkaProtobufDeserializer.deserialize(AbstractKafkaProtobufDeserializer.java:177)
    at io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializer.deserialize(KafkaProtobufDeserializer.java:76)
    at io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializer.deserialize(KafkaProtobufDeserializer.java:27)
    at org.apache.kafka.streams.state.internals.ValueAndTimestampDeserializer.deserialize(ValueAndTimestampDeserializer.java:58)
    at org.apache.kafka.streams.state.internals.ValueAndTimestampDeserializer.deserialize(ValueAndTimestampDeserializer.java:31)
    at org.apache.kafka.streams.state.StateSerdes.valueFrom(StateSerdes.java:163)
    at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.outerValue(MeteredKeyValueStore.java:427)
    at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$get$5(MeteredKeyValueStore.java:318)
    at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:887)
    at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.get(MeteredKeyValueStore.java:318)
    at org.apache.kafka.streams.state.internals.ReadOnlyKeyValueStoreFacade.get(ReadOnlyKeyValueStoreFacade.java:35)
    at org.apache.kafka.streams.state.internals.CompositeReadOnlyKeyValueStore.get(CompositeReadOnlyKeyValueStore.java:56)
    at com.lgt.pdcl.avaloqminer.currency.CurrencyService.getCurrencyEntry(CurrencyService.java:54)
    at com.lgt.pdcl.avaloqminer.currency.CurrencyService.getCurrencyEntryByKey(CurrencyService.java:50)
    at com.lgt.pdcl.avaloqminer.currency.CurrencyService.getCurrency(CurrencyService.java:28)
    at com.lgt.pdcl.avaloqminer.mapper.DocCordMapper.map(DocCordMapper.java:490)
    at com.lgt.pdcl.avaloqminer.mapper.DocCordMapper.map(DocCordMapper.java:194)
    at com.lgt.pdcl.avaloqminer.mapper.DocCordMapper.process(DocCordMapper.java:32)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:157)

模式注册表中不存在 ID 为 571 的模式,因此显然无法检索它。问题是,在该主题的事件中,存储了一个完全不同的架构 ID,并且该架构 ID 存在于注册表中,因此在 Kafka 服务器和注册表端,一切似乎都正常。不知何故,流应用程序的 id 错误,但我们不知道该 id 来自哪里。

我们尝试删除状态存储,流应用程序想要检索的ID已更改,但仍然是错误的。

apache-kafka apache-kafka-streams
1个回答
0
投票

错误来自

deserialize
;因此 schema-id 来自数据本身; schema-id 被编码在序列化值字节的前缀中。

我们尝试删除状态存储,

嗯,状态存储由变更日志主题支持。因此,如果更改日志中的数据已损坏,它只会从更改日志重新填充到存储中。

© www.soinside.com 2019 - 2024. All rights reserved.