Kafka Streams物化视图与Kotlin

问题描述 投票:1回答:2

要用Java创建kafka流状态存储,我可以这样做:

final KGroupedStream<String, String> wordCounts = textLines
            .flatMapValues(value -> Arrays.asList(pattern.split(value.toLowerCase())))
            .groupBy((key, word) -> word);

wordCounts.count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as(WORD_COUNT_STORE));

我试图将其转换为Kotlin,如下所示:

val wordCounts: KGroupedStream<String, String> = textLines
        .flatMapValues({value -> value.split("\\W+") })
        .groupBy({ _, word -> word})

wordCounts.count(Materialized.<String, Long, KeyValueStore<Bytes, Array<Byte>>>as(WORD_COUNT_STORE))

但是,我得到以下编译器错误:

Interface KeyValueStore does not have constructors

我需要做什么?

kotlin apache-kafka apache-kafka-streams
2个回答
2
投票

由于as是Kotlin中的保留词,尝试用反引号包围as,即

`as`

2
投票

如果它对其他任何人都有用,以及Raman建议的反引号,我不得不做出其他一些改动:

  • 首先,需要在as方法之后指定泛型类型,而不是在Materialized类之后。
  • 其次,我不得不使用Array<Byte>而不是使用ByteArray

所以适合我的全部代码是:

wordCounts.count(Materialized.`as`<String, Long, KeyValueStore<Bytes, ByteArray>>(WORD_COUNT_STORE))
© www.soinside.com 2019 - 2024. All rights reserved.