我想实现的是根据消息中存在的时间戳来获取记录中存在的每条消息的数量。每条记录包括 List<Metric>
对象的时间戳。我想提取每个度量的时间戳,并根据度量名称聚合度量。
度量值
public class Metric {
String metric;
Long timestamp;
Double value;
}
自定义时间戳提取器
我已经实现了这个时间戳提取器,它可以将记录转换为List对象。而目前它获取的第一个时间戳,为这个ArrayList做了窗口化处理。
public class EventTimestampExtractor implements TimestampExtractor {
public long extract(ConsumerRecord<Object, Object> record, long previousTimeStamp) {
try {
// Have a ListSerde in place to deserialize the record to a List<Metric> object.
final List<Metric> value = (List<Metric>) record.value();
final Metric metric = value.get(0); // Returning the first timestamp from the metric list.
return metric.getTimestamp();
}
catch (Exception e) {
// If there is an exception, return back the event time.
return record.timestamp();
}
}
}
拓扑结构
一旦我获取了列表,我就会执行FlatTransform来转换这个List,并基于扁平化的List执行聚合。
final StreamsBuilder builder = new StreamsBuilder();
KStream<String, List<Metric>> stream = builder.stream(inputTopic, Consumed.with(Serdes.String(),new MetricListSerde()));
TimeWindows windows = TimeWindows.of(Duration.ofSeconds(10)).grace(Duration.ofSeconds(2));
stream.filter((key, value) -> value != null)
.flatTransform(() -> new MetricsTransformer()) // Flat transforming the list to single metrics
.groupByKey()
.windowedBy(windows)
.count()
.toStream()
.to("output-topic");
度量列表示例 - 如果你注意到有一个单一的度量和3个计数(2个在0-10之间,1个在10秒后)。
[{ "metric": "metric1.count",
"timestamp": 1,
"value": 30
},{
"metric": "metric1.count",
"timestamp": 2,
"value": 30
}, {
"metric": "metric1.count",
"timestamp": 15,
"value": 30
}]
我的窗口是10秒,我想做一个获取计数的指标。我的 当前结果 看起来像
Window{startMs=0, endMs=10} and Value metric: metric1.count value: 3 aggregator: count interval: "10s"}
预期结果 -
Window{startMs=0, endMs=10} and Value metric: metric1.count value: 2 aggregator: count interval: "10s"}
Window{startMs=10, endMs=20} and Value metric: metric1.count value: 1 aggregator: count interval: "10s"}
很抱歉这个问题很长,但是有什么方法可以从一个包含消息集合的单一记录中提取多个时间戳?
Kafka Streams版本 - 2.4.1
的 TimestampExtractor
对你的用例没有帮助,因为它只能给你一个时间戳。使用 flatMap()
所有输出记录都继承了输入记录的时间戳。
如果你需要即时修改时间戳,你需要使用 transform()
来实现 "平面图"。对于每个输入记录,你可以调用 context.forward()
多次来做实际的平面映射(你可以只用 return null;
在最后不再发出任何额外的记录)。) 在每个 forward()
调用,你可以通过 To.all().withTimestamp(...)
:
public KeyValue transform(K key, V value) {
for (...) {
context.forward(newKey, newValue, To.all().withTimestamp(newTimestamp);
}
return null;
}