如何映射输入KStream 成 使用KeyValueMapper?

问题描述 投票:0回答:1

接收我想在CarClass上映射的Json数据并想要创建新流但map方法不允许我在自定义数据类型上映射KStream类型中的方法映射(KeyValueMapper>)不适用于参数( new KeyValueMapper>(){})?

apache-kafka apache-kafka-streams
1个回答
4
投票

来自http://docs.confluent.io/current/streams/developer-guide.html#stateless-transformations

该示例将值类型从byte[]更改为Integer。对于StringCarClass会是一样的。

KStream<byte[], String> stream = ...;

// Java 8+ example, using lambda expressions
// Note how we change the key and the key type (similar to `selectKey`)
// as well as the value and the value type.
KStream<String, Integer> transformed = stream.map(
  (key, value) -> KeyValue.pair(value.toLowerCase(), value.length()));

// Java 7 example
KStream<String, Integer> transformed = stream.map(
  new KeyValueMapper<byte[], String, KeyValue<String, Integer>>() {
    @Override
    public KeyValue<String, Integer> apply(byte[] key, String value) {
      return new KeyValue<>(value.toLowerCase(), value.length());
    }
  });

但是,如果您只想修改该值,我建议使用mapValues()而不是map()

© www.soinside.com 2019 - 2024. All rights reserved.