Elasticsearch:由覆盖日志文件引起的重复

问题描述 投票:0回答:1

我正在使用ELK堆栈。简单的Java应用程序每5分钟保存一次日志文件。然后Filebeat将它们扔到Logstash。由于覆盖,相同的消息被索引(它们的指纹相同)。唯一的区别是文档ID。每当文档被覆盖时,Elasticsearch都会为它们提供新的ID。如何消除重复项或保持文档ID相同?

Logstash输入:

    input {
  beats {
    port => 5044
    ssl => false
    ssl_certificate => "/etc/pki/tls/certs/logstash-beats.crt"
    client_inactivity_timeout => 200
    ssl_key => "/etc/pki/tls/private/logstash-beats.key"
  }
}
filter {
        if [fields][log_type] == "access" {
        grok {
        match => [ "message", "%{IP:client_ip} %{USER:ident} %{USER:auth} \[%{HTTPDATE:apache_timestamp}\] \"%{WORD:method} /%{WORD:servername}/%{NOTSPACE:requestpage} HTTP/%{NUMBER:http_version}\" %{NUMBER:server_response} %{NUMBER:answer_size}" ]
    }
    }
    else  if [fields][log_type] == "errors" {
      grok {
        match => {"message" => "%{DATESTAMP:maximotime}(.*)SystemErr"}
          }
          date {
        timezone => "Europe/Moscow"
        match => ["maximotime", "dd.MM.yy HH:mm:ss:SSS"]
       }
      mutate {
         copy => { "message" => "key" }
      }
      mutate {
        gsub => [
          "message", ".*SystemErr     R ", "",
          "key", ".*SystemErr     R", ""
        ]
      }
      truncate {
        fields => "key"
        length_bytes => 255
      }
      fingerprint {
        method => "SHA1"
        source => ["key"]
      }
      if "_grokparsefailure" in [tags] {
        drop { }
      }
    } else  if [fields][log_type] == "info" {
    grok {
      match => {"message" => ["%{TIMESTAMP_ISO8601:maximotime}.* ПОЛЬЗОВАТЕЛЬ = \(%{WORD:username}.*программа \(%{WORD:appname}\).*объект \(%{WORD:object}\).*: %{GREEDYDATA:sql}  \(выполнение заняло %{NUMBER:execution} миллисекунд\) \{conditions:%{GREEDYDATA:conditions}\}",  "%{TIMESTAMP_ISO8601:maximotime}.* ПОЛЬЗОВАТЕЛЬ = \(%{WORD:username}.*программа \(%{WORD:appname}\).*объект \(%{WORD:object}\).*: %{GREEDYDATA:sql} \{conditions:%{GREEDYDATA:conditions}\}", "%{TIMESTAMP_ISO8601:maximotime}.* ПОЛЬЗОВАТЕЛЬ = \(%{WORD:username}.*программа \(%{WORD:appname}\).*объект \(%{WORD:object}\).*: %{GREEDYDATA:sql}  \(выполнение заняло %{NUMBER:execution} миллисекунд\)"]}
      add_field => {
        "type" => "conditions"
      }
    }
      mutate {
       convert => {
        "execution" => "integer"
       }
      }
      fingerprint {
        method => "SHA1"
        source => ["message"]
      }
    if "_grokparsefailure" in [tags] {
        grok {
        match => {"message" => "%{TIMESTAMP_ISO8601:maximotime} (.*)getMboCount %{WORD:object}: mbosets \(%{WORD:mbosets}\), mbos \(%{WORD:mbos}\)"}
        add_field => {
        "type" => "maximoObjectCount"
        }
        remove_tag => ["_grokparsefailure"]
        }
        mutate {
        convert => {
         "mbosets" => "integer"
         "mbos" => "integer"
        }
        }
        fingerprint {
          method => "SHA1"
          source => ["message"]
        }
        if "_grokparsefailure" in [tags] {
          drop { }
        }
    }

    date {
        timezone => "Europe/Moscow"
        match => ["maximotime", "yyyy-MM-dd HH:mm:ss:SSS"]
        target => "maximotime"
       }
    }
}

Logstash输出:

output {
    stdout {codec => rubydebug}
    if [fields][log_type] == "access" {
        elasticsearch {  
            hosts => ["localhost"]
            manage_template => false
            index => "%{[@metadata][beat]}-%{+YYYY.MM.dd}"
            document_type => "%{[@metadata][type]}"
        }
    } else if [fields][log_type] == "errors"{
        elasticsearch {  
            hosts => ["localhost"]
            manage_template => false
            index => "%{[@metadata][beat]}-error-%{+YYYY.MM.dd}"
            document_type => "%{[@metadata][type]}"
        }
    } else if [fields][log_type] == "info"{
        elasticsearch {  
            hosts => ["localhost"]
            manage_template => false
            index => "%{[@metadata][beat]}-info-%{+YYYY.MM.dd}"
            document_type => "%{[@metadata][type]}"
            document_id => "%{fingerprint}"
        }
    }
}

Filebeat.yml:

filebeat.config:
  modules:
    path: ${path.config}/modules.d/*.yml
    reload.enabled: false

processors:
- add_cloud_metadata: ~

filebeat.inputs:
- type: log
  enabled: true
  paths:
    - /var/log/integration/*.log
  fields:  {log_type: access}
- type: log
  enabled: true
  paths:
    - /var/log/maximo_error_logs/*.log
  fields:  {log_type: errors}
  exclude_lines: '^((\*+)|Log file started at:)'
  multiline.pattern: '(^$|(\t|\s)at .*|.*Caused by:.*|.*SystemErr( ){5}R[ \t]{2}at .*|^ru.ocrv..*|^(\s|\t|)null.*|Обратитесь за.*|.*Закрытое со.*|^(\s|\t|)(ORA-.*|BMX.*)|^(\\s|\t)[А-Яа-я].*)|(.*\d more$)'
  multiline.negate: false
  multiline.match: after
- type: log
  enabled: true
  paths:
    - /var/log/maximo_logs/*.log
  fields:  {log_type: info}

output.logstash:
  hosts: ["elk:5044"]
  bulk_max_size: 200
elasticsearch logstash filebeat elk
1个回答
0
投票

我很傻。我正在重新启动Filebeat容器而不是ELK,所以我的Logstash配置没有应用...现在可以正常工作了,我的Logstash输出配置看起来像这样:

document_id => "%{type}-%{fingerprint}"
action => "create"
© www.soinside.com 2019 - 2024. All rights reserved.