如何生成每日重置的单调递增数字

问题描述 投票:0回答:1

所以我正在开发一个项目,该项目将非结构化日志转换为结构化日志,该日志以 2 秒的微批次的近实时流形式接收。
应用程序写入事件日志。每个日志的消息中都有一个字段,即事件 ID。使用此事件 ID,我们可以隔离不同的事件。
每个事件中需要提取的字段数量不同。例如,如果事件 id 为 1,那么通过解析该日志,我们将获得 5 个有用的字段。在事件 id 2 中,它可能是 10。此外,字段的含义也取决于事件。无论我们从事件 id 1 的位置 2 获得什么值,它都与我们从事件 id 2 的位置 2 获得的值绝对不同。导致每个事件 id 都有单独的模式。
我想将此事件存储在镶木地板中,因此我必须指定严格的架构。为此,我将所有事件隔离,然后将事件转换为适当的模式。但在这种情况下,我失去了所有事件到达或发生的顺序。我想这应该是数据结构化中非常正式和标准的问题。我们如何有效、可靠地应对这种情况。我正在使用logstash->kafka->spark-streaming堆栈。

我想到的一个解决方案是,我可以为每条消息添加递增计数器。由于某些原因,我无法在生成事件时添加这个不断增加的数字。我必须在logstash或kafka或spark-streaming中处理它。由于kafka和spark-streaming是分布式的。我们维护单序列计数器似乎很困难(也需要对此的建议)。

所以我想将它添加到logstash中。我想到了以下解决方案。拥有一个计数器并将其加一。但由于负载过重,一段时间内可能会超过这个数字。我们的更新非常少,所以一旦我启动 Logstash,单次启动可能会长达 5 年。这也可能导致达到最大限制。

Q1)所以我考虑实施计数器,使其在一天结束时重置为 0。因为我们的数据是按天分区的。尝试按以下方式实施,但缺点是每次我必须增加时,我都必须检查条件。有没有更好的办法来处理这种情况?

# I tested i have around 5000 messages coming in single second
seqCounter = 0

def increase():
   seqCounter +=1
   if (current_time == day_end_time): # checking this condition will add overhead it seems.
      seqCounter = 0

Q2)可靠性也存在一个问题。由于系统可能会失败或者由于某种原因需要重新启动logstash,那么也会丢失当前的序列。当它重新启动时,它将从 0 开始。为了处理这个问题,我们可以缓存当前序列,但我猜它会增加潜在的 io 延迟,因为每个增量数字都需要在文件中更新。它还会导致重复的序列号。有没有更好的办法来处理这种情况?

Q3)我可以使用 kafka 偏移量,但它是基于每个分区的。因此,如果单个主题将数据发送到多个分区(在我的情况下),偏移量将重复。有没有更好的办法来处理这种情况?

Q4)除了logstash还有其他解决方案来处理这种情况吗?

** 注意:在任何解决方案中,主要目标都是找回序列。序列号应该单调递增,以便我可以通过基于该序列号的排序来获取事件流。不需要连续的序列号。

如有任何帮助,我们将不胜感激。

apache-spark apache-kafka logstash streaming sequential
1个回答
0
投票

尝试在 Logstash 中生成此序列号仅当 Logstash 有一个单个工作人员(即

-w 1
)时才有效,否则如果有多个工作人员,事件将不会按照生成的顺序进行处理,而是并行处理,具体取决于有多少工人。

话虽这么说,通过强迫 Logstash 与单个工作人员一起工作,你会人为地减慢速度,所以这不是最好的主意。此外,正如您正确指出的那样,您需要处理 Logstash 崩溃或需要重新启动的情况。

该序列号确实需要是在 Logstash 外部生成的。我想到的第一个想法是使用Redis INCR,但没有官方

redis
过滤器插件

另一个想法是使用 SQL 序列(例如使用 Postgres),您可以使用

jdbc_streaming
过滤器插件轻松查询。

input {
   ...
}
filter {
  jdbc_streaming {
    jdbc_driver_library => "/path/to/jdbc-connector.jar"
    jdbc_driver_class => "com.product.jdbc.Driver"
    jdbc_connection_string => "jdbc:xxsql://localhost:1234/mydatabase"
    jdbc_user => "me"
    jdbc_password => "secret"
    statement => "SELECT nextval('sequence');"
    target => "sequence"
  }
}

仅当您已经拥有支持序列的数据库时,这才是一个可行的选择,但如果您需要为此设置一个数据库,则这不是一个可行的选择。另外,根据每秒生成的事件数量,必须在远程数据库中查询序列号可能会成为瓶颈。

有些插件存在,例如这个,但序列值仅存在于内存中,如果Logstash重新启动,序列值将丢失。您还可以创建自己的 Java 过滤器插件来生成递增序列值,但您仍然需要找到一种方法来在重新启动时保留该值。

有一些持续存在的问题(例如#6997)已开始解决,但大多数都已经过时。

您拥有的最佳选择仍然是在创建事件时立即生成该序列号。

© www.soinside.com 2019 - 2024. All rights reserved.