我正在尝试找到一种方法来重新排序主题分区中的消息并将有序消息发送到新主题。
我有Kafka发布者发送以下格式的字符串消息:{system_timestamp}-{event_name}?{parameters}
例如:
1494002667893-client.message?chatName=1c&messageBody=hello
1494002656558-chat.started?chatName=1c&chatPatricipants=3
此外,我们为每条消息添加一些消息密钥,以将它们发送到相应的分区。
我想要做的是根据消息的{system-timestamp}部分重新排序事件,并在1分钟的窗口内,因为我们的发布者不保证将根据{system-timestamp}值发送消息。
例如,我们可以向主题提供首先具有更大{system-timestamp}值的消息。
我已经调查了Kafka Stream API并找到了一些关于消息窗口化和聚合的例子:
Properties streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "stream-sorter");
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181");
streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
KStreamBuilder builder = new KStreamBuilder();
KStream<String, String> stream = builder.stream("events");
KGroupedStream<String>, String> groupedStream = stream.groupByKey();//grouped events within partion.
/* commented since I think that I don't need any aggregation, but I guess without aggregation I can't use time windowing.
KTable<Windowed<String>, String> windowedEvents = stream.groupByKey().aggregate(
() -> "", // initial value
(aggKey, value, aggregate) -> aggregate + "", // aggregating value
TimeWindows.of(1000), // intervals in milliseconds
Serdes.String(), // serde for aggregated value
"test-store"
);*/
但接下来我应该用这个分组流做什么?我没有看到任何'sort()(e1,e2) - > e1.compareTo(e2)'方法可用,windows也可以应用于像aggregation(),reduce(),count()这样的方法,但我认为我不需要任何消息数据操作。
如何在1分钟窗口中重新排序邮件并将其发送到其他主题?
这是一个大纲:
创建一个处理器实现:
这种方法的问题是如果没有新的msgs到达以推进“流时间”,则不会触发punctuate()。如果这是您的情况下的风险,您可以创建一个外部调度程序,向您的主题的每个(!)分区发送定期“滴答”消息,您的处理器应该忽略,但它们会在没有时触发标点符号“真实的”消息。 KIP-138将通过添加对系统时间标点符号的明确支持来解决此限制:https://cwiki.apache.org/confluence/display/KAFKA/KIP-138%3A+Change+punctuate+semantics
这是我在项目中订购流的方式。
这个逻辑对我来说很好。