使用 JDBC 源连接器将嵌套 JSON 从 Snowflake 流式传输到 Kafka

问题描述 投票:0回答:1

我正在尝试建立一个数据管道,其中来自 Snowflake 的 SQL 查询结果(使用

OBJECT_CONSTRUCT
函数构造为嵌套 JSON)流式传输到 Kafka 主题。我想仅使用配置来实现此目的,而不使用自定义编码。

我设法设置一个 JDBC 源连接器,将消息发送到执行以下查询的 Kafka 主题:

select 
    object_construct(
        'id', id, 
        'nestedField', object_construct(
            'nestedvalue', value
        )
    ) as myRecord
from myTable;

使用

"value.converter": "org.apache.kafka.connect.json.JsonConverter"
设置Kafka消息看起来像这样

{ "MYRECORD": "{\"id\":1,\"nestedField\":{\"nestedValue\":\"value\"}}"}

添加到连接器配置后,添加一个字段转换器

    "transforms": "extractField",
    "transforms.extractField.type": "org.apache.kafka.connect.transforms.ExtractField$Value",
    "transforms.extractField.field": "MYRECORD",

讯息就是这样

"{\"id\":1,\"nestedField\":{\"nestedValue\":\"value\"}}"

这不是我真正想要的。 我的目标是拥有一个纯 JSON 值像这样

{"id":1,"nestedField":{"nestedValue":"value"}}

问题是如何配置源连接器来实现呢?同样,理想情况下我不想编写任何自定义代码,例如实现自己的 SMT。我还想避免使用 ksqlDB。

可能值得单独询问,但它仍然与我的问题有关。如果我使用 base64 对原始值进行编码,在将消息发布到主题之前,是否有任何简单的方法可以将其解码回来(使用 SMT)?

apache-kafka snowflake-cloud-data-platform apache-kafka-connect
1个回答
0
投票

这比看起来容易。

只需将值转换器更改为

org.apache.kafka.connect.storage.StringConverter
即可达到目的,并且相同的
extractField
转换有助于从结构中获取值。

毕竟 JDBC 源连接器的配置看起来像这样

{
    "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
    "connection.url": "jdbc:snowflake://<MY_SNOWFLAKE_ACCOUNT>.snowflakecomputing.com/?warehouse=<MY_WAREHOUSE>&db=<MY_DB>&role=<MY_ROLE>&schema=<MY_SCHEMA>&private_key_file=<MY_KEY_PATH>&private_key_file_pwd=<MY_KEY_PASS>",
    "query": "SELECT OBJECT_CONSTRUCT( 'id', id,  'nestedField', object_construct( 'nestedValue', value ) ) AS RECORD FROM my_db.my_schema.my_table;",
    "mode": "bulk",
    "tasks.max": "1",
    "topic.prefix": "jdbc_source_snowflake",
    "value.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter.schemas.enable": "false",
    "transforms": "extractField",
    "transforms.extractField.type": "org.apache.kafka.connect.transforms.ExtractField$Value",
    "transforms.extractField.field": "RECORD"
}
© www.soinside.com 2019 - 2024. All rights reserved.