我刚刚开始使用ksql,当我从一开始做打印主题时,我得到的数据是下面的格式。
rowtime: 4/12/20, 9:00:05 AM MDT, key: {"messageId":null}, value: {"WHS":[{"Character Set":"UTF-8","action":"finished","Update-Date-Time":"2020-04-11 09:00:02:25","Number":0,"Abbr":"","Name":"","Name2":"","Country-Code":"","Addr-1":"","Addr-2":"","Addr-3":"","Addr-4":"","City":"","State":""}]}
但在KSQL中,所有的例子都有以下格式的数据。
{"ROWTIME":1537436551210,"ROWKEY":"3375","rating_id":3375,"user_id":2,"stars":3,"route_id":6972,"rating_time":1537436551210,"channel":"web","message":"airport refurb looks great, will fly outta here more!"}
所以我无法进行任何操作,格式显示为
Key format: JSON or SESSION(KAFKA_STRING) or HOPPING(KAFKA_STRING) or TUMBLING(KAFKA_STRING) or KAFKA_STRING
Value format: JSON or KAFKA_STRING
在我的题目上。如何将数据修改成特定的格式?
谅谅
ksqlDB还不支持JSON消息键,(请看跟踪报道) Github问题).
但是,你仍然可以访问数据,包括键和值。 JSON键毕竟只是一个字符串!
值,当重新格式化后,看起来像这样。
{
"WHS":[
{
"Character Set":"UTF-8",
"action":"finished",
"Update-Date-Time":"2020-04-11 09:00:02:25",
"Number":0,
"Abbr":"",
"Name":"",
"Name2":"",
"Country-Code":"",
"Addr-1":"",
"Addr-2":"",
"Addr-3":"",
"Addr-4":"",
"City":"",
"State":""
}
]
}
假设所有的行都有一个共同的格式 ksqlDB可以很容易地处理。
要导入你的流,你应该能够运行这样的东西。
-- assuming v0.9 of Kafka
create stream stuff
(
ROWKEY STRING KEY,
WHS ARRAY<
STRUCT<
`Character Set` STRING,
action STRING,
`Update-Date-Time` STRING,
Number STRING,
... etc
>
>
)
WITH (kafka_topic='?', value_format='JSON');
值列 WHS
是一个结构的数组,(其中将只有一个元素),结构定义了你需要访问的所有字段。 注意,有些字段名需要加引号,因为它们包含无效字符,如空格和破折号。