我有 2 个主题 topic1 和 topic2
我正在将消息生成到测试主题中
topic1的配置如下:
主题:topic1 TopicId:
topic2的配置如下:
主题:topic2 TopicId:
测试主题中的消息如下:
消息1:
键: { “COl1”:{ “字符串”:“虚拟001” } }
价值:
{ “数据”: { “Col1”:“虚拟001”, “第2列”:“2023-01-01 01:01:01”, “第3列”:“1000.100” }, “操作”:“插入”, “时间戳”:“2023-01-01 01:01:01” }
消息2:
键: { “COl1”:{ “字符串”:“虚拟001” } }
价值:
{ “数据”: { “Col1”:“虚拟001”, "Col2": "2023-02-02 02:02:02", “第3栏”:“2000.100” }, “操作”:“更新”, “时间戳”:“2023-02-02 02:02:02” }
我创建了一个 Kafka 流“stream1”来从 topic1 中提取数据 下面是数据在stream1中的样子:
+------------------------------------------------ -------------------------------------------------- ------------------------------------------------+ |数据| +------------------------------------------------ -------------------------------------------------- -----------------------------------------------------------+ |{COL1=dummy001, COL2=2023-01-01 01:01:01, COL3=1000.100} | |{COL1=dummy001, COL2=2023-02-02 02:02:02, COL3=2000.100}
我创建了另一个 Kafka 流“stream2”并将 col1 定义为键列。
ksql> 使用(kafka_topic='topic2', value_format='AVRO')创建流stream2 (col1 string key, col2 string, col3 string);
然后我将值从stream1插入到stream2中 ksql> 插入到流2中,通过数据->col1从流1分区中选择数据->COL1作为COL1,数据->COL2作为COL2,数据->COl3作为COL3;
我在topic2中的数据如下:
消息1: 关键:
虚拟001
价值:
{ “COL2”:{ “字符串”:“2023-01-01 01:01:01” }, “COL3”:{ “字符串”:“1000.100” } } 消息2: 钥匙: 假人001 值:
{ “COL2”:{ “字符串”:“2023-02-02 02:02:02” }, “COL3”:{ “字符串”:“2000.100” } }
|
我面临的问题是符合清理政策“COMPACT”的。尽管我已经设置了清理策略来压缩我的邮件,但我的邮件并未被删除。正如我上面提到的,我当前如何使用stream1和stream2处理从topic1到topic2的情况。即使将键列定义为 col1 后,清理策略也不起作用。 当我将清理策略设置为“压缩,删除”时,它会删除所有消息。
根据我的理解,由于消息有一个键列,因此它应该根据每条消息的键列保留最新消息并清除旧消息。但这不起作用。
如果我遗漏了什么,请告诉我。
我想要实现的是在 topic2 中的任何给定时间点根据 col1 保留最新消息。