流式数据过滤算法

问题描述 投票:0回答:1

我正在使用 Kafka Streams 开发基于 Kafka 的流数据应用程序。

这里我的应用程序将在高峰时段(大约 10 小时)处理大约 400M 的事件,这里我需要根据一些多个过滤条件过滤这些事件并相应地路由它们,这些条件可以是逻辑 EQUALS、NOTEQUALS、IN、NOTIN

这里的问题是我想编写一个时间复杂度为 O(1) 的过滤器,同时寻找一些我遇到的布隆过滤器算法,但布隆过滤器的问题是它可能有误报,而且时间复杂度是 O(H) (这里 H 是哈希函数的数量)。

我正在考虑创建 Hashmap 或实现 Caffiene 缓存的另一种方法,其中键将是带有过滤条件值的排列组合的字符串值,这里我担心堆内存利用率,因为排列将随着 HashMap/Cache 大小而不断增加

考虑以下场景:

empId : 1
deptId : 2
filter_conditions = [
    {field: id, operaor:EQ, value: 1},
    {field: deptId, operaor:IN, value:1,2,3,4}
]

任何人都可以建议我可以采取的最佳方法

java algorithm apache-kafka apache-kafka-streams
1个回答
0
投票

我不确定你为什么要研究布隆过滤器等数据结构,但首先让我们谈谈你的 O(1) 要求。

我知道您希望过滤器速度快,但请考虑一下,由于您是从 Kafka 流中提取记录,所以您正在从流中读取所有记录并将它们从字节转换为 POJO,并且此操作至少需要 O(| R|) 每条记录的时间,其中 |R|是记录的大小(以字节为单位)。

您的过滤应用程序的运行速度不可能比这更快。由于您可以实现的最低复杂度是每条记录 O(|R|),因此您可以花费 |R|每次过滤操作的时间,并且仍然实现尽可能低的最坏情况复杂性。

如果您的条件都使用您列出的过滤运算符:IN、NOT IN、EQUALS、NOT EQUALS,那么您很幸运。您可以将任何条件列表转换为过滤器,您可以在 O(|R|) 时间内针对记录执行该过滤器。

您编译的过滤器应包含以下结构:

  • 必须存在的
    Set<String> requiredFields
    字段。这些都是带有 EQUALS 或 IN 条件的字段。
  • A
    Map<String, Condition> conditions
    ,对于过滤器中提到的每个字段,包含应用于该字段的条件。
  • 每个
    Condition
    都需要一个
    boolean inclusive
    Set<String> values
    inclusive == true
    表示该字段的值必须在
    values
    集中。
    inclusive == false
    表示该字段的值不能在
    values
    集合中。

请注意,无论特定字段有多少个条件,所有这些条件都可以组合成一个

Condition
对象。

使用这种表示,我们可以按与记录大小成正比的时间处理每条记录,而无需考虑过滤条件的数量或大小。

对于每条记录:

  1. 检查每个字段是否在
    requiredFields
    中。计算匹配项,如果小于
    requiredFields.size()
    ,则该记录未通过过滤器。
  2. 查找每个字段的
    Condition
    。如果有,则检查该值是否在
    values
    集合中,然后根据
    inclusive
    的值判断是否记录失败。

请注意,我们仅迭代记录中实际存在的字段,并对每个字段执行几次 O(1) 操作。我们不会迭代与条件的数量或大小有关的任何内容。这使得我们能够在 O(|R|) 时间内处理记录。

© www.soinside.com 2019 - 2024. All rights reserved.