Kafka Streams主题中的多个对象和deserizalization

问题描述 投票:1回答:2

在我的kafka流应用程序中,我使用一个主题用于多个对象类型,seriazliazed为JSON。我使用类名作为键,我的想法是,消费者将按键过滤传入条目的子集,并从JSON反序列化它们。我假设我可以在不定义serdes的情况下应用初始过滤,但在这种情况下,源流被推断为<Object,Object>,并且以下代码无法编译:

 return streamsBuilder.stream("topic")
            .filter((k, v) -> k.equals("TestClassA"))
            .groupByKey()
            .reduce((oldValue, newValue) -> newValue,
                    Materialized.<String, TestClassA, KeyValueStore<Bytes, byte[]>>as(StoreManager.STORE_NAME)
                    .withKeySerde(Serdes.String())
                    .withValueSerde(new JsonSerde<>(TestClassA.class)));

如果我向流定义添加类型,它会编译:

return streamsBuilder.stream(businessEntityTopicName, Consumed.with(Serdes.String(), new JsonSerde<>(TestClassA.class))) {...}

但是在这种情况下,当例如TestClassB的对象出现在主题中时,我得到运行时异常。这种情况的最佳做法是什么,或者我应该针对不同的对象使用不同的主题?

apache-kafka apache-kafka-streams
2个回答
2
投票

如果你没有在Serde中指定任何#stream()并且不覆盖StreamsConfig的默认值,那么Kafka Streams将使用字节数组serdes。因此,你会得到

KStream<byte[], byte[]> streams = builder.<byte[], byte[]>stream("topicName");

请注意,如果您没有在右侧指定正确的类型,Java本身会回退到KStream<Object, Object>,如上所示。但两种情况下运行时的实际类型都是byte[]

因此,您可以应用过滤器,但它需要处理byte[]数据类型。

我想,你真正想做的是只为密钥指定一个StringSerde

KStream<String, byte[]> streams = builder.<String, byte[]>("topicName", Consumed.with(Serdes.String(), null)); // null with fall back to defaul Serde from StreamConfig

这允许您在filter()操作之前应用基于String键的groupByKey()


0
投票

我有一个类似的用例。我使所有可能的对象继承了一个公共接口(Event)并使用@JsonTypeInfo进行注释,因此jackson可以正确地反序列化该对象。

streamsBuilder.stream("topic")//need to add some sort of JSONSerde<Event> to this stream call, i use personally use the one bundled with spring
            .filter((k, v) -> v instanceOf testClassA)
© www.soinside.com 2019 - 2024. All rights reserved.