我使用Kafka流DSL和地图将KStream<String, JsonNode>
转换为KStream<String, String>
。
在ValueMapper
函数中,我简单地返回new ValueMapper("key", "some constant string")
,但是将值发送回Kafka使用KStream.to("some topic")
,我得到的结果是添加双引号。
我的代码是这样的:
KStream<String, JsonNode> views = builder.stream("fromTopic");
views.map(new ValueKMapper()).to("toTopic");
和ValueMapper
只是实现KeyValueMapper
和apply()
的代码是:
public KeyValue<String, String> apply(String key, JsonNode value)
{
return new KeyValue<String,String>("a", "hello");
}
然后当我消耗toTopic
时,我得到了""hello""
,并添加了引号。
也许这是卡夫卡流的错误?
我假设您在问题中提供的方法apply()
public KeyValue<String, String> apply(String key, JsonNode value)
{
return new KeyValue<String,String>("a", "hello");
}
没有将硬编码的密钥和值传递给KeyValue
的构造函数。我猜这个问题与JsonNode
有关。也许,你的方法的实际实现使用value.get(key)
即
public KeyValue<String, String> apply(String key, JsonNode value)
{
return new KeyValue<String,String>(key, value.get(key));
}
但是,value.get(key)
将返回TextNode
,而toString()
方法将返回TextNode
的字符串表示,包括引号。为了正确解析JsonNode
,你需要使用textValue()
方法,所以你的方法将成为
public KeyValue<String, String> apply(String key, JsonNode value)
{
return new KeyValue<String,String>(key, value.get(key).textValue());
}
示例:假设您有一个密钥a
和一个值hello
,
json.get("a").toString())
将返回"hello"
而
json.get("a").textValue();
将返回hello