为jdbc sink连接器提取和转换kafka消息的特定字段。

问题描述 投票:0回答:2

我有一个kafka主题,使用Debezium mysql源连接器从mysql数据库获取数据,以下是其中一个消息的格式。

{
    "Message": {
        "schema": {
            "type": "struct",
            "fields": [
              ...
            ],
            "optional": true,
            "name": "mysql-server-1.inventory.somename"
        },
        "payload": {
            "op": "u",
            "ts_ms": 1465491411815,
            "before": {
                "id": 1004,
                "first_name": "Anne",
                "last_name": "Doof",
                "email": "[email protected]"
            },
            "after": {
                "id": 1004,
                "first_name": "Anne",
                "last_name": "Marry",
                "email": "[email protected]"
            },
            "source": {
                "db": "inventory",
                "table": "customers",
                ...
                "query": "Update customers set last_name = 'Marry' where id = 1004"
            }
        }
    }
}

我想推送 ts_ms, before, afterid (从objectrow)列到另一个数据库,使用jdbc sink连接器,表的模式是 (id,before(text),after(text),timestamp)我是一个新的kafka新手,不知道该怎么做。

  • 我怎样才能从消息中只提取这些字段来推送而忽略其他字段?

  • 我怎么能把之前,之后的字段转换成字符串序列化格式?

  • 我怎么能提取 id 从对象? (如果是插入操作,前面为空,如果是删除,后面为空)

对于上面的消息,sink目的表的最后应该有下面这样的数据。

id:     1004
before: '{"id":1004,"first_name":"Anne","last_name":"Doof","email":"[email protected]"}'
after:  '{"id":1004,"first_name":"Anne","last_name":"Marry","email":"[email protected]"}'
timestamp: 1465491411815
mysql apache-kafka transformation apache-kafka-connect debezium
2个回答
0
投票

你可以使用连锁的 Kafka连接转换,像这样 解决办法.


0
投票

你可以创建一个DTO(Java对象为你的json有效载荷,你从你的kafka主题)利用这个在线转换器帮助你转换你的json到Java对象。[http:/pojo.sodhanalibrary.com][1]。

一旦你从你的kafka主题中接收到你的消息,你就可以使用objectmapper来转换json,并将其映射到你相应的DTO对象中。你可以利用这个对象来提取你想要的字段,只需调用getId(),getBefore()等。

下面是一些参考代码,有助于你理解。

    @KafkaListener(topics = "test")
        public void listen(String payload)  {

            logger.info("Message Received from Kafka topic: {}", payload);

            ObjectMapper objectMapper = new ObjectMapper();
            objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);

            DTOObject dtoObject = objectMapper.readValue(payload,DTOObject.class);

                logger.info("After Convertion: {}", objectMapper.writeValueAsString(dtoObject));

                logger.info("Get Before:{}", dtoObject.getId());



        }
© www.soinside.com 2019 - 2024. All rights reserved.