从 Kafka 获取最新值

问题描述 投票:0回答:1

我有一个名为 A 的 Kafka 主题。

主题A中的数据格式为:

{ id : 1, name:stackoverflow, created_at:2017-09-28 22:30:00.000}
{ id : 2, name:confluent, created_at:2017-09-28 22:00:00.000}
{ id : 3, name:kafka, created_at:2017-09-28 24:42:00.000}
{ id : 4, name:apache, created_at:2017-09-28 24:41:00.000}

现在在消费者方面,我只想获取一小时窗口的最新数据,这意味着每一小时我都需要根据created_at从主题中获取最新值

我的预期输出是:

{ id : 1, name:stackoverflow, created_at:2017-09-28 22:30:00.000}
{ id : 3, name:kafka, created_at:2017-09-28 24:42:00.000}

我认为这可以通过 ksql 解决,但我不确定。请帮助我。

提前致谢。

apache-kafka apache-kafka-streams ksqldb
1个回答
4
投票

是的,您可以使用 KSQL 来实现此目的。尝试以下操作:

CREATE STREAM S1 (id BIGINT, name VARCHAR, created_at VARCHAR) 
WITH (kafka_topic = 'topic_name', value_format = 'JSON');
CREATE TABLE maxRow AS SELECT id, name, max(STRINGTOTIMESTAMP(created_at, 'yyyy-mm-dd hh:mm:ss.SSS')) 
AS created_at 
FROM s1 
WINDOW TUMBLING (size 1 hour) 
GROUP BY id, name;

结果将具有 Linux 时间戳格式的

created_at
时间。您可以在新查询中使用
TIMESTAMPTOSTRING
UDF 将其更改为您想要的格式。

如果您发现任何问题,请告诉我。

© www.soinside.com 2019 - 2024. All rights reserved.