Kafka Streams:获取时间窗口中的事件计数

问题描述 投票:0回答:2

我有数据流作为事件。我想获取 10 分钟时间窗口内的事件计数并输出到另一个主题。以下是我的代码

StreamsBuilder streamsBuilder = new StreamsBuilder();
streamsBuilder.stream("events")
            .groupByKey()
            .windowedBy(TimeWindows.of(Duration.ofMillis(10000)))
            .count()
            .toStream()
            .to("output");

但我收到错误

 ClassCastException while producing data to topic output. A serializer (key: org.apache.kafka.common.serialization.StringSerializer / value: org.apache.kafka.common.serialization.LongSerializer) is not compatible to the actual key or value type (key type: org.apache.kafka.streams.kstream.Windowed / value type: java.lang.Long). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters (for example if using the DSL, `#to(String topic, Produced<K, V> produced)` with `Produced.keySerde(WindowedSerdes.timeWindowedSerdeFrom(String.class))`).
apache-kafka apache-kafka-streams
2个回答
2
投票

windowedBy
加上
count()
的结果是类型为
<Windowed<String>, Long>
的键值对,因此您需要通过
to()
参数在
Produced
中设置不同的serde。默认情况下,将使用配置中的 Serdes,看起来您设置为
StringSerde/StringSerde
并且这些显然与输出主题键/值类型不匹配。

Kafka Streams 附带用于窗口类型的内置 Serdes,您可以通过

Serdes
工厂类获得。


0
投票

可以尝试下面的方法

        .count()
        .toStream()
        .map((key, value) -> new KeyValue<>(key.key() + "@" + key.window().start() + "->" + key.window().end(), value))
        .to("output");
© www.soinside.com 2019 - 2024. All rights reserved.