在Kafka流中执行映射操作后,双引号中的字符串

问题描述 投票:2回答:1

我使用Kafka流DSL和地图将KStream<String, JsonNode>转换为KStream<String, String>

ValueMapper函数中,我简单地返回new ValueMapper("key", "some constant string"),但是将值发送回Kafka使用KStream.to("some topic"),我得到的结果是添加双引号。

我的代码是这样的:

KStream<String, JsonNode> views = builder.stream("fromTopic");
views.map(new ValueKMapper()).to("toTopic");

ValueMapper只是实现KeyValueMapperapply()的代码是:

public KeyValue<String, String> apply(String key, JsonNode value) 
{
    return new KeyValue<String,String>("a", "hello");
}

然后当我消耗toTopic时,我得到了""hello"",并添加了引号。

也许这是卡夫卡流的错误?

apache-kafka apache-kafka-streams
1个回答
0
投票

我假设您在问题中提供的方法apply()

public KeyValue<String, String> apply(String key, JsonNode value) 
{
    return new KeyValue<String,String>("a", "hello");
}

没有将硬编码的密钥和值传递给KeyValue的构造函数。我猜这个问题与JsonNode有关。也许,你的方法的实际实现使用value.get(key)

public KeyValue<String, String> apply(String key, JsonNode value) 
{
    return new KeyValue<String,String>(key, value.get(key));
}

但是,value.get(key)将返回TextNode,而toString()方法将返回TextNode的字符串表示,包括引号。为了正确解析JsonNode,你需要使用textValue()方法,所以你的方法将成为

public KeyValue<String, String> apply(String key, JsonNode value) 
{
    return new KeyValue<String,String>(key, value.get(key).textValue());
}

示例:假设您有一个密钥a和一个值hello

json.get("a").toString())

将返回"hello"

json.get("a").textValue();

将返回hello

© www.soinside.com 2019 - 2024. All rights reserved.