使用 ClickHouse Kafka Connect Sink 处理 DELETE 操作

问题描述 投票:0回答:1

我正在开发一个与使用 Kafka Connect 构建流式变更数据捕获相关的项目。更改的来源是 MySQL,然后将它们发送到相应的 Kafka 主题,然后使用 ClickHouse Kafka Connect Sink 应用于 ClickHouse 中的表。 INSERT 和 UPDATE 等操作可以正常处理,但是根据文档,此接收器连接器无法处理 DELETE 等操作(https://clickhouse.com/docs/en/integrations/kafka/clickhouse-kafka-connect-sink#limitations:~ :text=Deletes%20are%20not%20supported)以及使用体验

这个问题有什么可能的解决方案吗?

我尝试在 MySQL 连接器中使用 InsertField 转换和 RecordIsTombstone 谓词来解决这个问题,例如:

{
    "name": "mysql-source-connector",    
    ...,
    "predicates": "IsTombstone",
    "predicates.IsTombstone.type": "org.apache.kafka.connect.transforms.predicates.RecordIsTombstone",
    "transforms": "MarkAsDeleted",
    "transforms.MarkAsDeleted.type": "org.apache.kafka.connect.transforms.InsertField$Value",
    "transforms.MarkAsDeleted.static.field": "is_deleted",
    "transforms.MarkAsDeleted.static.value": 1,
    "transforms.MarkAsDeleted.predicate": "IsTombstone"
}

在这里,如果 RecordIsTombstone 返回 true,那么我们插入值为 1 的 is_deleted 字段(如果不是 - 不执行任何操作,因为 ClickHouse 表中的 is_deleted 字段默认值为 0),然后在 ClickHouse 端我们已经构建了一个材质我们删除 WHERE is_deleted = 1 的表示。但是这个解决方案没有导致任何结果,因为 ClickHouse Kafka Connect Sink 简单地忽略了 Kafka 主题中具有键值“op”的所有记录:“d”(一种“操作”) ”:“已删除”)。

apache-kafka apache-kafka-connect clickhouse change-data-capture clickhouse-kafka
1个回答
0
投票

您将需要使用ReplacingMergeTree。这需要你:

  1. 再次发送整个记录以获取更高版本号的更新
  2. 对于删除,您可以发送包含 is_deleted 列的记录

行将使用表的列顺序进行唯一标识。

详情https://clickhouse.com/docs/en/engines/table-engines/mergetree-family/replacingmergetree

© www.soinside.com 2019 - 2024. All rights reserved.