我正在开发一个与使用 Kafka Connect 构建流式变更数据捕获相关的项目。更改的来源是 MySQL,然后将它们发送到相应的 Kafka 主题,然后使用 ClickHouse Kafka Connect Sink 应用于 ClickHouse 中的表。 INSERT 和 UPDATE 等操作可以正常处理,但是根据文档,此接收器连接器无法处理 DELETE 等操作(https://clickhouse.com/docs/en/integrations/kafka/clickhouse-kafka-connect-sink#limitations:~ :text=Deletes%20are%20not%20supported)以及使用体验
这个问题有什么可能的解决方案吗?
我尝试在 MySQL 连接器中使用 InsertField 转换和 RecordIsTombstone 谓词来解决这个问题,例如:
{
"name": "mysql-source-connector",
...,
"predicates": "IsTombstone",
"predicates.IsTombstone.type": "org.apache.kafka.connect.transforms.predicates.RecordIsTombstone",
"transforms": "MarkAsDeleted",
"transforms.MarkAsDeleted.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.MarkAsDeleted.static.field": "is_deleted",
"transforms.MarkAsDeleted.static.value": 1,
"transforms.MarkAsDeleted.predicate": "IsTombstone"
}
在这里,如果 RecordIsTombstone 返回 true,那么我们插入值为 1 的 is_deleted 字段(如果不是 - 不执行任何操作,因为 ClickHouse 表中的 is_deleted 字段默认值为 0),然后在 ClickHouse 端我们已经构建了一个材质我们删除 WHERE is_deleted = 1 的表示。但是这个解决方案没有导致任何结果,因为 ClickHouse Kafka Connect Sink 简单地忽略了 Kafka 主题中具有键值“op”的所有记录:“d”(一种“操作”) ”:“已删除”)。
您将需要使用ReplacingMergeTree。这需要你:
行将使用表的列顺序进行唯一标识。
详情https://clickhouse.com/docs/en/engines/table-engines/mergetree-family/replacingmergetree