令我惊讶的是,我意识到“抑制”运算符不会在窗口关闭时发出最后一个事件,而是仅在流任务所属的分区上发布另一个事件时发出。那么,如何在没有永无止境的事件流的情况下发出最终的聚合结果呢? 在 CDC 模式中,我们不能等待后续的数据库操作(这可能会在很长一段时间后发生)来发出先前聚合的最终结果。
这个想法是安排一个 FutureTask 发送一个特定的事件,让我们假设有一个具有固定“FLUSH”值的键,该时间戳落在之前的聚合窗口之外。那么,这个“FLUSH”事件将在抑制步骤之后被过滤掉。
对于从流中查看的每条记录,都将安排一个“FLUSH”事件,最终替换之前尚未开始的记录,以最大程度地减少不必要的“FLUSH”事件。
在此示例中,我使用了翻滚窗口,但从概念上讲,它也适用于其他类型的窗口。
因此,假设我们有一个主题“用户”,并希望将落在 10 秒“翻滚窗口”内的所有记录聚合到一个列表中。
型号有:
用户.java
public class User {
private String name;
private String surname;
private String timestamp;
}
UserGrouped.java
public class UserGrouped {
private List<User> userList = new ArrayList<User>();
}
拓扑
...
KStream<String, User> userEvents = builder.stream(userTopic, userConsumerOptions);
TimeWindows tumblingWindow = TimeWindows.of(Duration.ofSeconds(windowDuration))
.grace(Duration.ofSeconds(windowGracePeriod));
KStream<String,UserGrouped> userGroupedStram = userEvents
.peek( (key,value) -> {
//Filter out the previous "flush" event to avoid scheduler loop
if (!key.equalsIgnoreCase("FLUSH")) {
//For each event is scheduled a future task that
//will send a "flush" event to all partition assigned to the stream.
scheduleFlushEvent(value.getTimestamp());
}
})
.groupByKey()
.windowedBy(tumblingWindow)
.aggregate(
//INITIALIZER
() -> new UserGrouped(),
//AGGREGATOR
(key, user, userGrouped) -> {
userGrouped.getUserList().add(user);
return userGrouped;
},
//STREAM STORE
Materialized.<String,UserGrouped,WindowStore<Bytes, byte[]>>
as("userGroupedWindowStore")
.withKeySerde(Serdes.String())
.withValueSerde(JsonSerdes.UserGrouped()) //Custom Serdes
)
.suppress(Suppressed.untilWindowCloses(BufferConfig.unbounded().shutDownWhenFull()))
.toStream( (windowedKey,value) -> windowedKey.key())
//Discard the flush event
.filterNot((key,value) -> key.equalsIgnoreCase("FLUSH"))
.peek( (key, value) -> {
int sizeList = value != null && value.getUserList() != null ? value.getUserList().size() : 0;
log.info("#### USER GROUPED KEY: {}, Num of elements: {}",key, sizeList);
})
;
调度器方法
private void scheduleFlushEvent(String lastEventTimestamp) {
//add 1 second to (windowSizeInSeconds + windowGracePeriod) to ensure that the flush event will be out of last window
Long delay = Long.valueOf(windowDuration + windowGracePeriod + 1);
//FIND PARTITIONS ASSIGNED TO THE CURRENT STREAM.
//The partitions assigned may change after rebalance events,
//so I need to get them in every iteration.
//In a Spring context you can use a RebalanceListener to update a 'partitionList'
//field of this class defined with @Component annotation
Set<Integer> partitionList = new HashSet<Integer>();
StreamThread currThread = (StreamThread)Thread.currentThread();
for (TaskMetadata taskMetadata : currThread.threadMetadata().activeTasks()) {
for(TopicPartition topicPartition : taskMetadata.topicPartitions()) {
partitionList.add(topicPartition.partition());
}
}
Callable<List<RecordMetadata>> task = () -> {
try {
List<RecordMetadata> recordMetadataList = new ArrayList<RecordMetadata>();
Instant instant = Instant.from(DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSZ")
.parse(lastEventTimestamp));
instant = instant.plusSeconds(delay);
String flushEventTimestamp = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSZ")
.withZone(ZoneId.systemDefault() )
.format(instant);
User userFlush = new User();
userFlush.setTimestamp(flushEventTimestamp);
String userFlushValue = new String(JsonSerdes.User().serializer().serialize(userTopic, userFlush));
//SEND FLUSH EVENT TO ALL PARTITION ASSIGNED TO THE STREAM THREAD
for(Integer partition : partitionList) {
ProducerRecord<String,String> userRecord = new ProducerRecord<String, String>(userTopic, partition, "FLUSH", userFlushValue);
RecordMetadata recordMetadata = userFlushProducer.send(userRecord).get();
recordMetadataList.add(recordMetadata);
log.info("SENT FLUSH EVENT PARTITION: {}, VALUE: {}",partition, userFlushValue);
}
return recordMetadataList;
} catch (Exception e) {
log.error("ERROR", e);
return null;
}
};
//TASK NOT SCHEDULED YET
if(scheduledFuture == null
|| scheduledFuture.isDone()) {
log.debug("task scheduled");
scheduledFuture = ses.schedule(task, delay, TimeUnit.SECONDS);
//TASK ALREADAY SCHEDULED.
//Stop the previous scheduled task and start a newer task with an postponed delay
} else {
if(!scheduledFuture.isDone()
&& scheduledFuture.cancel(false)) {
log.debug("task RE-scheduled");
scheduledFuture = ses.schedule(task, delay, TimeUnit.SECONDS);
} else {
log.warn("task not RE-scheduled");
}
}
}
令我惊讶的是,我意识到“抑制”运算符不会在窗口关闭时发出最后一个事件,而是仅在流任务所属的分区上发布另一个事件时发出。那么,如何在没有永无止境的事件流的情况下发出最终的聚合结果?
这是因为 kafka 流采用“流时间”的概念,这本质上意味着时间仅通过从主题消耗的事件来提前,kafka 流不维护时钟。 kafka 流任务中的当前时间是迄今为止看到的最大记录时间戳。 因此,您注意到的行为。只有当任务接收到新事件时,它才会使用事件的时间戳提前时间,并发现前一个窗口应该关闭。