我正在尝试建立一个数据管道,其中来自 Snowflake 的 SQL 查询结果(使用
OBJECT_CONSTRUCT
函数构造为嵌套 JSON)流式传输到 Kafka 主题。我想仅使用配置来实现此目的,而不使用自定义编码。
我设法设置一个 JDBC 源连接器,将消息发送到执行以下查询的 Kafka 主题:
select
object_construct(
'id', id,
'nestedField', object_construct(
'nestedvalue', value
)
) as myRecord
from myTable;
使用
"value.converter": "org.apache.kafka.connect.json.JsonConverter"
设置Kafka消息看起来像这样
{ "MYRECORD": "{\"id\":1,\"nestedField\":{\"nestedValue\":\"value\"}}"}
添加到连接器配置后,添加一个字段转换器
"transforms": "extractField",
"transforms.extractField.type": "org.apache.kafka.connect.transforms.ExtractField$Value",
"transforms.extractField.field": "MYRECORD",
讯息就是这样
"{\"id\":1,\"nestedField\":{\"nestedValue\":\"value\"}}"
这不是我真正想要的。 我的目标是拥有一个纯 JSON 值像这样
{"id":1,"nestedField":{"nestedValue":"value"}}
问题是如何配置源连接器来实现呢?同样,理想情况下我不想编写任何自定义代码,例如实现自己的 SMT。我还想避免使用 ksqlDB。
可能值得单独询问,但它仍然与我的问题有关。如果我使用 base64 对原始值进行编码,在将消息发布到主题之前,是否有任何简单的方法可以将其解码回来(使用 SMT)?
这比看起来容易。
只需将值转换器更改为
org.apache.kafka.connect.storage.StringConverter
即可达到目的,并且相同的 extractField
转换有助于从结构中获取值。
毕竟 JDBC 源连接器的配置看起来像这样
{
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url": "jdbc:snowflake://<MY_SNOWFLAKE_ACCOUNT>.snowflakecomputing.com/?warehouse=<MY_WAREHOUSE>&db=<MY_DB>&role=<MY_ROLE>&schema=<MY_SCHEMA>&private_key_file=<MY_KEY_PATH>&private_key_file_pwd=<MY_KEY_PASS>",
"query": "SELECT OBJECT_CONSTRUCT( 'id', id, 'nestedField', object_construct( 'nestedValue', value ) ) AS RECORD FROM my_db.my_schema.my_table;",
"mode": "bulk",
"tasks.max": "1",
"topic.prefix": "jdbc_source_snowflake",
"value.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter.schemas.enable": "false",
"transforms": "extractField",
"transforms.extractField.type": "org.apache.kafka.connect.transforms.ExtractField$Value",
"transforms.extractField.field": "RECORD"
}