序列器与实际的键类型不兼容(键类型:java.lang.String)。

问题描述 投票:0回答:1

我试图用GlobaKTable加入流,得到的序列化问题。KStream<String,String> 与产品表是 GlobalKTable<String,Product>(这里Product是一个AVRO消息)下面一段代码正在做加入。

KStream<String, Product> detailedProductStream = productStream.join(
                productTable,
                (productStreamKey, productStreamValue) -> productStreamValue,
                (productStreamValue, productTableValue) ->  productTableValue
);

为Avro消息定义了SpecificAvroSerde,为String类型的消息定义了Serdes.String()。我得到了以下异常。有什么线索吗?谢谢。

Exception in thread "web-click-charts-a086d30c-5f8d-4e1f-8e3e-ab4b2ac8f03a-StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_0, processor=KSTREAM-SOURCE-0000000006, topic=PRODUCT-CURRENT, partition=0, offset=2, stacktrace=org.apache.kafka.streams.errors.StreamsException: A serializer (org.apache.kafka.common.serialization.ByteArraySerializer) is not compatible to the actual key type (key type: java.lang.String). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters.
            at org.apache.kafka.streams.state.StateSerdes.rawKey(StateSerdes.java:175)
            at org.apache.kafka.streams.state.internals.MeteredWindowStore.keyBytes(MeteredWindowStore.java:222)
            at org.apache.kafka.streams.state.internals.MeteredWindowStore.fetch(MeteredWindowStore.java:167)
            at org.apache.kafka.streams.processor.internals.ProcessorContextImpl$WindowStoreReadWriteDecorator.fetch(ProcessorContextImpl.java:547)
            at org.apache.kafka.streams.kstream.internals.KStreamKStreamJoin$KStreamKStreamJoinProcessor.process(KStreamKStreamJoin.java:94)
            at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:117)
            at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
            at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)

Serdes定义如下

Map<String, String> props = new HashMap<>();
props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081");

SpecificAvroSerde<Product> productSerde = new SpecificAvroSerde<>();
productSerde.configure(props, false);

Serde stringSerde = Serdes.String();

GlobalKTable<String, Product> productTable = builder.globalTable("TOPIC-1",
                Consumed.with(stringSerde, productSerde));
KStream<String, String> productStream = builder.stream("TOPIC-2",
        Consumed.with(stringSerde, stringSerde));
apache-kafka-streams
1个回答
1
投票

你是如何指定serdes的?你的代码片段没有显示这部分。

然而,堆栈跟踪表明,问题不在于流-全局表连接,而在于窗口化的流-流连接(堆栈跟踪显示了 KStreamKStreamJoin),错误信息显示 ByteArraySerializer 用于字符串键。

© www.soinside.com 2019 - 2024. All rights reserved.