Kafka中KSQL操作的主题数据格式

问题描述 投票:0回答:1

我刚刚开始使用ksql,当我从一开始做打印主题时,我得到的数据是下面的格式。

   rowtime: 4/12/20, 9:00:05 AM MDT, key: {"messageId":null}, value: {"WHS":[{"Character Set":"UTF-8","action":"finished","Update-Date-Time":"2020-04-11 09:00:02:25","Number":0,"Abbr":"","Name":"","Name2":"","Country-Code":"","Addr-1":"","Addr-2":"","Addr-3":"","Addr-4":"","City":"","State":""}]}

但在KSQL中,所有的例子都有以下格式的数据。

{"ROWTIME":1537436551210,"ROWKEY":"3375","rating_id":3375,"user_id":2,"stars":3,"route_id":6972,"rating_time":1537436551210,"channel":"web","message":"airport refurb looks great, will fly outta here more!"}

所以我无法进行任何操作,格式显示为

Key format: JSON or SESSION(KAFKA_STRING) or HOPPING(KAFKA_STRING) or TUMBLING(KAFKA_STRING) or KAFKA_STRING
Value format: JSON or KAFKA_STRING

在我的题目上。如何将数据修改成特定的格式?

谅谅

apache-kafka confluent ksqldb
1个回答
1
投票

ksqlDB还不支持JSON消息键,(请看跟踪报道) Github问题).

但是,你仍然可以访问数据,包括键和值。 JSON键毕竟只是一个字符串!

值,当重新格式化后,看起来像这样。

{
  "WHS":[
    {
      "Character Set":"UTF-8",
      "action":"finished",
      "Update-Date-Time":"2020-04-11 09:00:02:25",
      "Number":0,
      "Abbr":"",
      "Name":"",
      "Name2":"", 
      "Country-Code":"",
      "Addr-1":"",
      "Addr-2":"",
      "Addr-3":"",
      "Addr-4":"",
      "City":"",
      "State":""
    }
  ]
}

假设所有的行都有一个共同的格式 ksqlDB可以很容易地处理。

要导入你的流,你应该能够运行这样的东西。

-- assuming v0.9 of Kafka
create stream stuff 
  (
    ROWKEY STRING KEY,
    WHS ARRAY<
      STRUCT<
        `Character Set` STRING,
        action STRING,
        `Update-Date-Time` STRING,
        Number STRING,
        ... etc
      >
    >
   )
   WITH (kafka_topic='?', value_format='JSON');

值列 WHS 是一个结构的数组,(其中将只有一个元素),结构定义了你需要访问的所有字段。 注意,有些字段名需要加引号,因为它们包含无效字符,如空格和破折号。

© www.soinside.com 2019 - 2024. All rights reserved.