解析多行日志时出现logstash配置问题

问题描述 投票:1回答:1

我有下面的多行日志,我正尝试用我的logstash配置解析。

2020-05-27 11:59:17 ----------------------------------------------------------------------
2020-05-27 11:59:17 Got context
2020-05-27 11:59:17 Raw context:
                    [email protected]
                    NAME=abc.def
                    PAGER=+11111111111111
                    DATE=2020-05-27
                    AUTHOR=
                    COMMENT=
                    ADDRESS=1.1.1.1
                    ALIAS=abc.example.com
                    ATTEMPT=1
2020-05-27 11:59:17 Previous service hard state not known. Allowing all states.
2020-05-27 11:59:17 Computed variables:
                    URL=abc.example.com
                    STATE=UP                
2020-05-27 11:59:17 Preparing flexible notifications for abc.def
2020-05-27 11:59:17  channel with plugin sms
2020-05-27 11:59:17  - Skipping: set
2020-05-27 11:59:17  channel with plugin plain email        
2020-05-27 11:59:20 --------------------------------------------------------------------

这是我的logstash配置:

    input {
      stdin { }
    }

    filter {

            grok {
                match => { "message" => "(?m)%{GREEDYDATA:data}"}
            }
            if [data] {
                  mutate {
                     gsub => [
                         "data", "^\s*", ""
                     ]
                  }
                  mutate {
                      gsub => ['data', "\n", " "]
                  }
             }
}

    output {
      stdout { codec => rubydebug }
    }

Filebeat配置:

  multiline.pattern: '^[[:space:]][A-Za-z]* (?m)'
  multiline.negate: false
  multiline.match: after

我想要实现的目标:多行日志将首先与多行模式匹配,并且将分成几行,如

  Message1:  2020-05-27 11:59:17 ----------------------------------------------------------------------

  Message2: 2020-05-27 11:59:17 Got context

  Message3:  2020-05-27 11:59:17 Raw notification context:
                        [email protected]
                        NAME=abc.def
                        PAGER=+11111111111111
                        DATE=2020-05-27
                        AUTHOR=
                        COMMENT=
                        ADDRESS=1.1.1.1
                        ALIAS=abc.example.com
                        ATTEMPT=1

此后,当解析这些日志行时,它将再次使用定界符进行拆分,然后我可以使用kv过滤器在单个消息号3中读取每个键值对,例如ALIAS = abc.example.com。

您能建议如何实现吗?

elasticsearch logstash logstash-configuration filebeat
1个回答
1
投票

[我建议您使用多行编解码器从文件中读取(如果您使用的是stdin,也可以在过滤器部分中定义它),同时为每个新行提供带有时间戳前缀的模式。

然后在您的grok过滤器中使用KV filter来分割字段和值,如下所示:

input {
    file {
        path => "C:/work/elastic/logstash-6.5.0/config/test.txt"
        start_position => "beginning"
        codec => multiline {
            pattern => "^%{TIMESTAMP_ISO8601}"
            negate => true
            what => "previous"
        }
    }
}
filter {
    kv {
        field_split => "\r\n"
        value_split => "="
        source => "message"
    }
}

output {
    elasticsearch {
        hosts => ["http://localhost:9200"]
        index => "test"
    }
}

Kibana中的结果应如下所示:

enter image description here

和此:

enter image description here

编辑:在您说的评论中,您看不到完整的值,包括空格。我已经使用您提供的新STATE重新测试了我的解决方案,并且效果很好:

enter image description here

© www.soinside.com 2019 - 2024. All rights reserved.