聚合使用错误的序列化程序

问题描述 投票:0回答:1

我正在使用kafka-streams应用程序来处理日志事件。在这种情况下,我想将WorkflowInput类型聚合到工作流类型中。我在使聚合工作时遇到问题。

final KStream<String, WorkflowInput> filteredStream = someStream;
final KTable<String, Workflow> aggregatedWorkflows = filteredStream
    .peek((k, v) -> {
        if (!(v instanceof WorkflowInput)) {
            throw new AssertionError("Type not expected");
        }
    })
    .groupByKey()
    .<Workflow>aggregate(Workflow::new, (k, input, workflow) -> workflow.updateFrom(input),
            Materialized.<String, Workflow, KeyValueStore<Bytes, byte[]>>as("worflow-cache")
                .withKeySerde(Serdes.String())
                .withValueSerde(Serdes.serdeFrom(new JsonSerializer<Workflow>(), new JsonDeserializer<Workflow>(Workflow.class)));

我得到以下异常:引起:org.apache.kafka.streams.errors.StreamsException: A serializer (key: org.apache.kafka.common.serialization.StringSerializer / value: org.apache.kafka.common.serialization.StringSerializer) is not compatible to the actual key or value type (key type: java.lang.String / value type: workflowauditstreamer.WorkflowInput).

需要注意的两件事:*值序列化器是一个StringSerializer,而我使用withValueSerde配置了不同的东西。 *实际值类型是WorkflowInput,而我期望Workflow,因为那是我的聚合值类型。

我是kafka-streams的新手,所以我可能会遗漏一些明显的东西,但我无法弄清楚。我在这里错过了什么?

apache-kafka-streams
1个回答
1
投票

如果从配置中覆盖默认的Serde,则它在运算符就地覆盖中。它没有传播到下游(Kafka 2.0--有WIP来改善这一点)。

因此,你需要将你在Serde中使用的someStream = builder.stream(...)s传递给.groupByKey(Serialized.with(...))

© www.soinside.com 2019 - 2024. All rights reserved.