KStream的问题,以avro格式读取前后的有效载荷。

问题描述 投票:0回答:1

我正在汇流Kafka平台上开发KStream应用。数据以嵌套的avro格式存在于源主题上,如下所述。

{
   "type":"record",
   "namespace":"xyz",
   "table":"abc",
   "op_type":{
      "string":"D"
   },
   "op_ts":{
      "string":"2020-05-16 09:03:25.000462"
   },
   "pos":{
      "string":"00000000000000010722",
      "before":{
[
            "fields":            {
               "column1":"value"
            },
            {
               "column2":"value"
            },
            {
               "column3":"value"
            }
         ]
      },
      "after":{
[
            "fields":            {
               "column1":"value"
            },
            {
               "column2":"value"
            },
            {
               "column3":"value"
            }
         ]
      }
   }

我想根据op_type="D "来过滤记录,因为我想把删除的记录与其他Kafka主题分开。

我在输出主题上写模式时面临着反序列化错误:for required row found array的问题,我使用apache maven-avro插件创建了POJO对象。

我使用apache maven-avro插件创建了POJO对象。

在 "before tag "中,我把类型定为数组,并把before类的对象传给它,在 "after tag "中,我也把类型定为数组,并把after的对象传给它。

需要解决如何解决这个嵌套的模式或任何其他方式来过滤删除记录到其他kafka主题从源主题。

apache-kafka avro apache-kafka-streams confluent
1个回答
0
投票

为了解决上述问题,我使用通用的avro序列化器来使用Kafka流处理数据。

如果我们使用特定的avro序列化器,我们就不能过滤嵌套数据。使用通用的avro序列化器将有助于用字符串过滤特定的记录。我使用了,并解决了上述问题。

© www.soinside.com 2019 - 2024. All rights reserved.