我正在尝试检查kafka输出消息中是否存在该密钥,如果存在则进行进一步的操作。 kafka 主题的输出消息如下
["random_name_547hcg": {
"access_Data": {
"name": "BU",
"repo": "CRX",
"Id": "908hngt"
},
"level": "easy",
"check_status": {
"prod": 0,
"dev": 3,
"QA": 2,
"UAT": 1
},
"Test_Result": {
"results": [
{
"id": "a20e025e0451eeed05d94a4cfc4523e58a",
"name": "ImageVerify",
"Release": "3.0",
"wrapper": [
"All"
]
}
]
},
"output": "failure"
}]
输出消息的数据类型为
java.util.LinkedHashMap
我的代码片段
import groovy.json.JsonSlurper
import kafkarelatedpackage
key = 'random_name_547hcg'
def slurper = new groovy.json.JsonSlurper()
def kafConsumer = new ConsumerClassKafka(servers, topic)
def kafmsg = kafConsumer.getMessages()
messg = slurper.parseText(kafmsg)
if (messg.containsKey(key)) {
println "Yes"
} else {
println "No"
}
但是它抛出了
Possible solutions: parseText(java.lang.String), parse(java.io.File), parse(java.io.Reader), parse(java.net.URL)
我做错了什么?
您显示的输出是
.toString()
“序列化”;但问题中显示的文字
已更改(.toString()
不会在键周围添加引号,并且
.inspect()
会添加单引号对于后者,您只需稍微交换一下即可。你想要
JSON 反序列化 random_name_547hcg
键的
value:
def msg = slurper.parseText(kafmsg.get('random_name_547hcg))
。
对于前一部分:groovy
.toString()
不适合序列化
数据,不得使用。修复生产者。使用 JSON 或任何你喜欢的东西
看看合适。