使用 Logstash Aggregate Filter 插件来处理可能或可能没有排序的数据。

问题描述 投票:0回答:1

大家好!我正试图使用Logstash v7.7的聚合过滤器插件来关联和合并来自两个不同CSV文件输入的数据,这两个文件代表了API数据调用。

我正试图使用 Logstash v7.7 的 Aggregate 过滤器插件来关联和合并来自两个不同 CSV 文件输入的数据,这两个文件代表 API 数据调用。我们的想法是产生一个显示组合图片的记录。正如你所期望的那样,数据可能会或可能不会以正确的顺序到达。

下面是一个例子。

dataincomingsource_1*.csv:

StartTime,AckTime,Operation,RefData1,RefData2,OpSpecificData1 2313232,44343545,Register,ref-Data-1a,ref-Data-2a,OpSpecific-Data-1 979898999,75758383,Register,ref-Data-1b,ref-Data-2b,OpSpecific-Data-2 354656466,98554321,Cancel,ref-Data-1c,ref-Data-2c,OpSpecific-Data-2。

dataincomingsource_1*.csv。

完成时间,操作,RefData1,RefData2,FinishSpecificData 67657657575,取消,ref-Data-1c,ref-Data-2c,FinishSpecific-Data-1 68445590877,注册,ref-Data-1a,ref-Data-2a,FinishSpecific-Data-2 55443444313,注册,ref-Data-1a,ref-Data-2a,FinishSpecific-Data-2。

我有一个单一的流水线,同时接收这两个CSV,我可以将它们作为单独的记录处理并写入一个索引。然而,我们的想法是将两个来源的记录合并成一条记录,每条记录代表一个与操作相关的信息的超集。

遗憾的是,尽管我多次尝试,但一直无法弄清楚如何通过Aggregate过滤插件来实现。我的首要问题是,这是否适合使用特定的插件?如果是,欢迎提出任何建议!

目前,我有这个

input {
   file {
      path => ['/data/incoming/source_1/*.csv']
      tags => ["source1"]
   }
   file {
      path => ['/data/incoming/source_2/*.csv']
      tags => ["source2"]
   }
   # use the tags to do some source 1 and 2 related massaging, calculations, etc

   aggregate {
         task_id = "%{Operation}_%{RefData1}_%{RefData1}"
         code => "
             map['source_files'] ||= []
             map['source_files'] << {'source_file', event.get('path') }
         "
         push_map_as_event_on_timeout => true
         timeout => 600 #assuming this is the most far apart they will arrive         
   }
  ...
}
output {
    elastic { ...}
}

以及其他类似的变化。然而,我不断地将单个记录写入索引,却无法将一个记录合并。然而,正如你从数据集中看到的那样,记录的顺序是无法保证的--所以我想知道,从一开始,过滤器是否是合适的工具?

或者是我不能正确地使用它!;-)

不管是哪种情况,都欢迎任何意见和建议。谢谢!

PS:这条消息被 转贴自 Elastic论坛。我在那里提供一个链接,以防那里也出现一些答案。

logstash elastic-stack logstash-configuration
1个回答
0
投票

答案是在upsert模式下使用Elastic搜索。请看具体内容 此处..

© www.soinside.com 2019 - 2024. All rights reserved.