Kafka Streams API:会话窗口不兼容的类型

问题描述 投票:0回答:1

我有以下代码段:

groupedStream.windowedBy(SessionWindows.with(Duration.ofSeconds(config.joinWindowSeconds)).grace(Duration.ZERO));

KTable<byte[], byte[]> mergedTable =
        groupedStream
            .reduce((aggregateValue, newValue) -> {
              try {
                Map<String, String> recentMap = MAPPER.readValue(new String(newValue), HashMap.class);
                Map<String, String> aggregateMap = MAPPER.readValue(new String(newValue), HashMap.class);
                aggregateMap.forEach(recentMap::putIfAbsent);
                newValue = MAPPER.writeValueAsString(recentMap).getBytes();
              } catch (Exception e) {
                LOG.warn("Couldn't aggregate key grouped stream\n", e);
              }
              return newValue;
            }, Materialized.with(Serdes.ByteArray(), Serdes.ByteArray()))
            .suppress(Suppressed.untilWindowCloses(unbounded()));

我收到以下编译异常:

Error:(164, 63) java: incompatible types: org.apache.kafka.streams.kstream.Suppressed<org.apache.kafka.streams.kstream.Windowed> cannot be converted to org.apache.kafka.streams.kstream.Suppressed<? super byte[]>

我知道如果我像这样内联windowedBy

        KTable<Windowed<byte[]>, byte[]> mergedTable =
                groupedStream
                        .windowedBy(SessionWindows.with(Duration.ofSeconds(config.joinWindowSeconds)).grace(Duration.ZERO))
                        .reduce((aggregateValue, newValue) -> {
                            try {
                                Map<String, String> recentMap = MAPPER.readValue(new String(newValue), HashMap.class);
                                Map<String, String> aggregateMap = MAPPER.readValue(new String(newValue), HashMap.class);
                                aggregateMap.forEach(recentMap::putIfAbsent);
                                newValue = MAPPER.writeValueAsString(recentMap).getBytes();
                            } catch (Exception e) {
                                LOG.warn("Couldn't aggregate key grouped stream\n", e);
                            }
                            return newValue;
                        }, Materialized.with(Serdes.ByteArray(), Serdes.ByteArray()))
                        .suppress(Suppressed.untilWindowCloses(unbounded()));

它有效,但是我不确定如何将这两个部分分开和分割...

java apache-kafka kafka-consumer-api apache-kafka-streams kafka-producer-api
1个回答
0
投票

这里有两个问题。

第一个问题是KGroupedStream.windowedBy(SessionWindows)返回一个SessionWindowedKStream<K, V>的实例,在您的第一个示例中

groupedStream.windowedBy(SessionWindows.with(Duration.ofSeconds(config.joinWindowSeconds)).grace(Duration.ZERO));

您没有在变量中捕获返回的SessionWindowedKStream

第二个问题在您的第一个代码示例中

KTable<byte[], byte[]> mergedTable

应该的时候

KTable<Windowed<byte[]>, byte[]> mergedTable

与您的第二个示例一样。

如果将代码更改为

SessionWindowedKStream<byte[], byte[]> sessionWindowedKStream = groupedStream.windowedBy(SessionWindows.with(Duration.ofSeconds(config.joinWindowSeconds)).grace(Duration.ZERO));

KTable<Windowed<byte[]>, byte[]> mergedTable = 
      sessionWindowedKStream
                .reduce((aggregateValue, newValue) -> {...

然后应该可以正常编译。

HTH帐单

© www.soinside.com 2019 - 2024. All rights reserved.