如何检查logstash是否从suricata接收/解析数据到elasticsearch?

问题描述 投票:0回答:2

尝试在 Mac OS X 10.10.3 Yosemite 上使用 ElasticSearch(v1.5.2)-Logstash(v1.4.2)-Kibana(v4.0.2) 配置 suricata v2.0.8。

suricata.yaml:

# Extensible Event Format (nicknamed EVE) event log in JSON format
  - eve-log:
      enabled: yes
      type: file #file|syslog|unix_dgram|unix_stream
      filename: eve.json
      # the following are valid when type: syslog above
      #identity: "suricata"
      #facility: local5
      #level: Info ## possible levels: Emergency, Alert, Critical,
                   ## Error, Warning, Notice, Info, Debug
      types:
        - alert
        - http:
            extended: yes     # enable this for extended logging information
            # custom allows additional http fields to be included in eve-log
            # the example below adds three additional fields when uncommented
            #custom: [Accept-Encoding, Accept-Language, Authorization]
        - dns
        - tls:
            extended: yes     # enable this for extended logging information
        - files:
            force-magic: yes   # force logging magic on all logged files
            force-md5: yes     # force logging of md5 checksums
        #- drop
        - ssh
        #- smtp
        #- flow

logstash.conf:

input {
  file {
    path => ["/var/log/suricata/eve.json"]
    sincedb_path => ["/var/lib/logstash/"]
    codec =>   json
    type => "SuricataIDPS"
    start_position => "beginning"
  }

}

filter {
  if [type] == "SuricataIDPS" {
    date {
      match => [ "timestamp", "ISO8601" ]
    }
    ruby {
      code => "if event['event_type'] == 'fileinfo'; event['fileinfo']['type']=event['fileinfo']['magic'].to_s.split(',')[0]; end;"
    }
  }

  if [src_ip]  {
    geoip {
      source => "src_ip"
      target => "geoip"
      #database => "/usr/local/opt/logstash/libexec/vendor/geoip/GeoLiteCity.dat"
      add_field => [ "[geoip][coordinates]", "%{[geoip][longitude]}" ]
      add_field => [ "[geoip][coordinates]", "%{[geoip][latitude]}"  ]
    }
    mutate {
      convert => [ "[geoip][coordinates]", "float" ]
    }
    if ![geoip.ip] {
      if [dest_ip]  {
        geoip {
          source => "dest_ip"
          target => "geoip"
          #database => "/usr/local/opt/logstash/libexec/vendor/geoip/GeoLiteCity.dat"
          add_field => [ "[geoip][coordinates]", "%{[geoip][longitude]}" ]
          add_field => [ "[geoip][coordinates]", "%{[geoip][latitude]}"  ]
        }
        mutate {
          convert => [ "[geoip][coordinates]", "float" ]
        }
      }
    }
  }
}

output {
  elasticsearch {
    host => localhost
    #protocol => http
  }
}

Suricata 将所有事件成功记录到 eve.json。当我在浏览器中打开 kibana 时,我看不到仪表板或来自 suricata 的任何信息...所以我假设 Logstash 没有从 eve.json 读取数据,或者没有将数据解析到 elasticsearch(或两者)...有什么方法可以检查发生了什么吗?

elasticsearch logstash kibana kibana-4 logstash-configuration
2个回答
0
投票

在logstash中打开调试输出:

output {
   stdout {
      codec = rubydebug
   }
}

此外,尝试直接对 Elasticsearch 运行查询 (curl),而不是使用 kibana。


-1
投票

我将 nginx 日志改编为 suricata 日志。我可以在 suricata 日志中获取 geoip 信息。我通过样本进行调整并发送到 filebeat 中配置的日志文件。

例如: nginx.access.referrer:NAT 的 ET INFO 会话遍历实用程序(STUN 绑定请求)[**

nginx.access.geoip.location: { “长”:-119.688, “纬度”:45.8696 }

使用样本读取 suricata 日志并将其发送到将进行适配的 shell 脚本。

例如: echo "$IP - - [$nd4] \"GET $IP2:$PORT2 --- $TYPE HTTP/1.1\" 777 0 \"$CVE\" \"Mozilla/5.0 (无) (无) 无\" “ >> /var/log/suricata_mod.log

然后配置filebeat.yml:

  • 文档类型:nginx-access

    路径:

    • /var/log/suricata_mod.log

重新启动filebeat。

最后配置logstash:

    filter {
         if [type] == "nginx-access" {
       grok {
  match => { "message" => ["%{IPORHOST:[nginx][access][remote_ip]} - %{DATA:[nginx][access][user_name]} \[%{HTTPDATE:[nginx][access][time]}\] \"%{WORD:[nginx][access][method]} %{DATA:[nginx][access][url]} HTTP/%{NUMBER:[$
  remove_field => "message"} 

    mutate {      
    add_field => { "read_timestamp" => "%{@timestamp}" }} 

    date {
    match => [ "[nginx][access][time]", "dd/MMM/YYYY:H:m:s Z" ]
    remove_field => "[nginx][access][time]"}

    useragent {
       source => "[nginx][access][agent]"
       target => "[nginx][access][user_agent]"
       remove_field => "[nginx][access][agent]"} 

    geoip {
       source => "[nginx][access][remote_ip]"
       target => "[nginx][access][geoip]"
       database => "/opt/GeoLite2-City.mmdb"}} }     output {
      elasticsearch {
          hosts => [ "xxx.xxx.xxx.xxx:9200" ]
          manage_template => false
          document_type => "%{[@metadata][type]}"
          index => "%{[@metadata][beat]}-%{+YYYY.MM.dd}"}}

并重新启动logstash。在 Kibana 中创建一个 filebeat-* 索引。准备好了。

© www.soinside.com 2019 - 2024. All rights reserved.