Kafka 清理策略“COMPACT”:消息未从主题中清除

问题描述 投票:0回答:0

我有 2 个主题 topic1 和 topic2 我正在将消息生成到测试主题中 topic1的配置如下: 主题:topic1 TopicId: PartitionCount:1 ReplicationFactor:1 配置:min.insync.replicas=1,cleanup.policy=delete,retention.ms=24192000000,retention.bytes=-1 主题:topic1 分区:0 领导者:0 副本:0 Isr:0

topic2的配置如下: 主题:topic2 TopicId: PartitionCount:1 ReplicationFactor:1 配置:min.insync.replicas=1,cleanup.policy=compact,retention.ms=60000,retention.bytes=-1 主题:topic2 分区:0 领导者:0 副本:0 Isr:0

测试主题中的消息如下:

消息1:

键: { “COl1”:{ “字符串”:“虚拟001” } }

价值:

{ “数据”: { “Col1”:“虚拟001”, “第2列”:“2023-01-01 01:01:01”, “第3列”:“1000.100” }, “操作”:“插入”, “时间戳”:“2023-01-01 01:01:01” }

消息2:

键: { “COl1”:{ “字符串”:“虚拟001” } }

价值:

{ “数据”: { “Col1”:“虚拟001”, "Col2": "2023-02-02 02:02:02", “第3栏”:“2000.100” }, “操作”:“更新”, “时间戳”:“2023-02-02 02:02:02” }

我创建了一个 Kafka 流“stream1”来从 topic1 中提取数据 下面是数据在stream1中的样子:

+------------------------------------------------ -------------------------------------------------- ------------------------------------------------+ |数据| +------------------------------------------------ -------------------------------------------------- -----------------------------------------------------------+ |{COL1=dummy001, COL2=2023-01-01 01:01:01, COL3=1000.100} | |{COL1=dummy001, COL2=2023-02-02 02:02:02, COL3=2000.100}

我创建了另一个 Kafka 流“stream2”并将 col1 定义为键列。

ksql> 使用(kafka_topic='topic2', value_format='AVRO')创建流stream2 (col1 string key, col2 string, col3 string);

然后我将值从stream1插入到stream2中 ksql> 插入到流2中,通过数据->col1从流1分区中选择数据->COL1作为COL1,数据->COL2作为COL2,数据->COl3作为COL3;

我在topic2中的数据如下:

消息1: 关键:

虚拟001

价值:

{ “COL2”:{ “字符串”:“2023-01-01 01:01:01” }, “COL3”:{ “字符串”:“1000.100” } } 消息2: 钥匙: 假人001 值:

{ “COL2”:{ “字符串”:“2023-02-02 02:02:02” }, “COL3”:{ “字符串”:“2000.100” } }

                                                                   |

我面临的问题是符合清理政策“COMPACT”的。尽管我已经设置了清理策略来压缩我的邮件,但我的邮件并未被删除。正如我上面提到的,我当前如何使用stream1和stream2处理从topic1到topic2的情况。即使将键列定义为 col1 后,清理策略也不起作用。 当我将清理策略设置为“压缩,删除”时,它会删除所有消息。

根据我的理解,由于消息有一个键列,因此它应该根据每条消息的键列保留最新消息并清除旧消息。但这不起作用。

如果我遗漏了什么,请告诉我。

我想要实现的是在 topic2 中的任何给定时间点根据 col1 保留最新消息。

apache-kafka apache-kafka-streams
© www.soinside.com 2019 - 2024. All rights reserved.