卡夫卡流。如何使用抑制器在聚合窗口中发出最终结果

问题描述 投票:0回答:1

令我惊讶的是,我意识到“抑制”运算符不会在窗口关闭时发出最后一个事件,而是仅在流任务所属的分区上发布另一个事件时发出。那么,如何在没有永无止境的事件流的情况下发出最终的聚合结果呢? 在 CDC 模式中,我们不能等待后续的数据库操作(这可能会在很长一段时间后发生)来发出先前聚合的最终结果。

这个想法是安排一个 FutureTask 发送一个特定的事件,让我们假设有一个具有固定“FLUSH”值的键,该时间戳落在之前的聚合窗口之外。那么,这个“FLUSH”事件将在抑制步骤之后被过滤掉。

对于从流中查看的每条记录,都将安排一个“FLUSH”事件,最终替换之前尚未开始的记录,以最大程度地减少不必要的“FLUSH”事件。

在此示例中,我使用了翻滚窗口,但从概念上讲,它也适用于其他类型的窗口。

因此,假设我们有一个主题“用户”,并希望将落在 10 秒“翻滚窗口”内的所有记录聚合到一个列表中。

型号有:

用户.java

public class User {

    private String name;
    private String surname;
    private String timestamp;   
}

UserGrouped.java

public class UserGrouped {
      
      private List<User> userList = new ArrayList<User>();
}

拓扑

...
KStream<String, User> userEvents = builder.stream(userTopic, userConsumerOptions);

TimeWindows tumblingWindow = TimeWindows.of(Duration.ofSeconds(windowDuration))
                                        .grace(Duration.ofSeconds(windowGracePeriod));


KStream<String,UserGrouped> userGroupedStram = userEvents
             .peek( (key,value) -> {
                    //Filter out the previous "flush" event to avoid scheduler loop
                    if (!key.equalsIgnoreCase("FLUSH")) {
                        //For each event is scheduled a future task that
                        //will send a "flush" event to all partition assigned to the stream.
                         scheduleFlushEvent(value.getTimestamp());
                    }
                })
            .groupByKey()
            .windowedBy(tumblingWindow)
            .aggregate( 
                        //INITIALIZER
                        () -> new UserGrouped(),
                        //AGGREGATOR
                        (key, user, userGrouped) -> {
                            userGrouped.getUserList().add(user);
                            return userGrouped;
                        },
                        //STREAM STORE
                        Materialized.<String,UserGrouped,WindowStore<Bytes, byte[]>>
                                    as("userGroupedWindowStore")
                                    .withKeySerde(Serdes.String())
                                    .withValueSerde(JsonSerdes.UserGrouped()) //Custom Serdes
                                    )
            .suppress(Suppressed.untilWindowCloses(BufferConfig.unbounded().shutDownWhenFull()))
            .toStream( (windowedKey,value) ->  windowedKey.key())
            //Discard the flush event
            .filterNot((key,value) -> key.equalsIgnoreCase("FLUSH"))
            .peek( (key, value) -> {
                int sizeList = value != null && value.getUserList() != null ? value.getUserList().size() : 0;
                log.info("#### USER GROUPED KEY: {}, Num of elements: {}",key, sizeList);
            })
        ;

调度器方法


private void scheduleFlushEvent(String lastEventTimestamp) {
         
         //add 1 second to (windowSizeInSeconds + windowGracePeriod) to ensure that the flush event will be out of last window
         Long delay = Long.valueOf(windowDuration + windowGracePeriod + 1);
         
         //FIND PARTITIONS ASSIGNED TO THE CURRENT STREAM.
         //The partitions assigned may change after rebalance events,
         //so I need to get them in every iteration.
         //In a Spring context you can use a RebalanceListener to update a 'partitionList'
         //field of this class defined with @Component annotation
         Set<Integer> partitionList = new HashSet<Integer>();
         StreamThread currThread = (StreamThread)Thread.currentThread();
         for (TaskMetadata taskMetadata : currThread.threadMetadata().activeTasks()) {
            for(TopicPartition topicPartition : taskMetadata.topicPartitions()) {
                partitionList.add(topicPartition.partition());
            }
         }
         
         Callable<List<RecordMetadata>> task = () -> {
             try {
                List<RecordMetadata> recordMetadataList = new ArrayList<RecordMetadata>();
             
                Instant instant = Instant.from(DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSZ")
                                                                .parse(lastEventTimestamp));
                instant = instant.plusSeconds(delay);
                String flushEventTimestamp = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSZ")
                                                              .withZone(ZoneId.systemDefault() )
                                                              .format(instant);
                
                User userFlush = new User();
                userFlush.setTimestamp(flushEventTimestamp);
                String userFlushValue = new String(JsonSerdes.User().serializer().serialize(userTopic, userFlush));
                
                //SEND FLUSH EVENT TO ALL PARTITION ASSIGNED TO THE STREAM THREAD
                for(Integer partition : partitionList) {
                    ProducerRecord<String,String> userRecord = new ProducerRecord<String, String>(userTopic, partition, "FLUSH", userFlushValue);
                    RecordMetadata recordMetadata = userFlushProducer.send(userRecord).get();
                    recordMetadataList.add(recordMetadata);
                    log.info("SENT FLUSH EVENT PARTITION: {}, VALUE: {}",partition, userFlushValue);
                }
                
                return recordMetadataList;
             } catch (Exception e) {
                 log.error("ERROR", e);
                 return null;
             }
                
         };
        
        //TASK NOT SCHEDULED YET
        if(scheduledFuture == null
        || scheduledFuture.isDone()) {
            log.debug("task scheduled");
            scheduledFuture = ses.schedule(task, delay, TimeUnit.SECONDS);
        
        //TASK ALREADAY SCHEDULED.
        //Stop the previous scheduled task and start a newer task with an postponed delay
        } else {
            if(!scheduledFuture.isDone()
             && scheduledFuture.cancel(false)) {
                log.debug("task RE-scheduled");
                scheduledFuture = ses.schedule(task, delay, TimeUnit.SECONDS);
            } else {
                log.warn("task not RE-scheduled");
            }
        }
     }
java window apache-kafka-streams aggregates suppress
1个回答
0
投票

令我惊讶的是,我意识到“抑制”运算符不会在窗口关闭时发出最后一个事件,而是仅在流任务所属的分区上发布另一个事件时发出。那么,如何在没有永无止境的事件流的情况下发出最终的聚合结果?

这是因为 kafka 流采用“流时间”的概念,这本质上意味着时间仅通过从主题消耗的事件来提前,kafka 流不维护时钟。 kafka 流任务中的当前时间是迄今为止看到的最大记录时间戳。 因此,您注意到的行为。只有当任务接收到新事件时,它才会使用事件的时间戳提前时间,并发现前一个窗口应该关闭。

© www.soinside.com 2019 - 2024. All rights reserved.