我正在使用 Kafka Streams 开发基于 Kafka 的流数据应用程序。
这里我的应用程序将在高峰时段(大约 10 小时)处理大约 400M 的事件,这里我需要根据一些多个过滤条件过滤这些事件并相应地路由它们,这些条件可以是逻辑 EQUALS、NOTEQUALS、IN、NOTIN
这里的问题是我想编写一个时间复杂度为 O(1) 的过滤器,同时寻找一些我遇到的布隆过滤器算法,但布隆过滤器的问题是它可能有误报,而且时间复杂度是 O(H) (这里 H 是哈希函数的数量)。
我正在考虑创建 Hashmap 或实现 Caffiene 缓存的另一种方法,其中键将是带有过滤条件值的排列组合的字符串值,这里我担心堆内存利用率,因为排列将随着 HashMap/Cache 大小而不断增加
考虑以下场景:
empId : 1
deptId : 2
filter_conditions = [
{field: id, operaor:EQ, value: 1},
{field: deptId, operaor:IN, value:1,2,3,4}
]
任何人都可以建议我可以采取的最佳方法
我不确定你为什么要研究布隆过滤器等数据结构,但首先让我们谈谈你的 O(1) 要求。
我知道您希望过滤器速度快,但请考虑一下,由于您是从 Kafka 流中提取记录,所以您正在从流中读取所有记录并将它们从字节转换为 POJO,并且此操作至少需要 O(| R|) 每条记录的时间,其中 |R|是记录的大小(以字节为单位)。
您的过滤应用程序的运行速度不可能比这更快。由于您可以实现的最低复杂度是每条记录 O(|R|),因此您可以花费 |R|每次过滤操作的时间,并且仍然实现尽可能低的最坏情况复杂性。
如果您的条件都使用您列出的过滤运算符:IN、NOT IN、EQUALS、NOT EQUALS,那么您很幸运。您可以将任何条件列表转换为过滤器,您可以在 O(|R|) 时间内针对记录执行该过滤器。
您编译的过滤器应包含以下结构:
Set<String> requiredFields
字段。这些都是带有 EQUALS 或 IN 条件的字段。Map<String, Condition> conditions
,对于过滤器中提到的每个字段,包含应用于该字段的条件。Condition
都需要一个 boolean inclusive
和 Set<String> values
。 inclusive == true
表示该字段的值必须在values
集中。 inclusive == false
表示该字段的值不能在values
集合中。请注意,无论特定字段有多少个条件,所有这些条件都可以组合成一个
Condition
对象。
使用这种表示,我们可以按与记录大小成正比的时间处理每条记录,而无需考虑过滤条件的数量或大小。
对于每条记录:
requiredFields
中。计算匹配项,如果小于requiredFields.size()
,则该记录未通过过滤器。Condition
。如果有,则检查该值是否在values
集合中,然后根据inclusive
的值判断是否记录失败。请注意,我们仅迭代记录中实际存在的字段,并对每个字段执行几次 O(1) 操作。我们不会迭代与条件的数量或大小有关的任何内容。这使得我们能够在 O(|R|) 时间内处理记录。