使用Kafka流将状态持久化到Kafka

问题描述 投票:0回答:1

[我试图绕过Kafka Streams,并提出一些我似乎无法自行解决的基本问题。我了解KTable和Kafka州立商店的概念,但是在决定如何解决这一问题时遇到了麻烦。我也在使用Spring Cloud Streams,在此之上又增加了另一层次的复杂性。

我的用例:

我有一个规则引擎,可以读取Kafka事件,处理该事件,返回匹配的规则列表并将其写入另一个主题。这是我到目前为止的内容:

@Bean
public Function<KStream<String, ProcessNode>, KStream<String, List<IndicatorEvaluation>>> process() {
    return input -> input.mapValues(this::analyze).filter((host, evaluation) -> evaluation != null);
}

public List<IndicatorEvaluation> analyze(final String host, final ProcessNode process) {
    // Does stuff
}

一些有状态规则看起来像:

[some condition] REPEATS 5 TIMES WITHIN 1 MINUTE
[some condition] FOLLOWEDBY [some condition] WITHIN 1 MINUTE
[rule A exists and rule B exists]

我当前的实现方式是将所有这些信息存储在内存中,以便能够执行分析。由于明显的原因,它不容易扩展。所以我想我会坚持到卡夫卡州立商店。

我不确定执行此操作的最佳方法。我知道有一种创建自定义状态存储的方法,可以提供更高的灵活性。我不确定Kafka DSL是否支持此功能。

仍然是Kafka Streams的新手,不介意听到各种各样的建议。

java apache-kafka apache-kafka-streams spring-cloud-stream
1个回答
2
投票

根据您的描述,我相信仍然可以使用Kafka Streams中的DSL来实现此用例。上面显示的代码不会跟踪任何状态。在拓扑中,您​​需要通过跟踪规则的计数来添加状态并将其存储在状态存储中。然后,仅在该计数达到阈值时才需要发送输出规则。这是伪代码背后的一般思想。显然,您必须对此进行调整,才能满足用例的特定规范。

© www.soinside.com 2019 - 2024. All rights reserved.