我正在使用filebeat 7.x收集日志数据,但我面临一个问题,即日志的大小非常大(每天100GB)。
现在我在想如何从源文件中收集错误级别的日志。最好的方法是什么?
我正在使用filebeat来发送日志到kubernetes集群中的elasticsearch,我担心的是我是否必须使用kafka和logstash来定义规则?
请看下面正在使用的filebeat配置文件。
{
"filebeat.yml": "filebeat.inputs:
- type: container
paths:
- /var/log/containers/*.log
processors:
- add_kubernetes_metadata:
host: ${NODE_NAME}
matchers:
- logs_path:
logs_path: \"/var/log/containers/\"
output.elasticsearch:
host: '${NODE_NAME}'
hosts: '${ELASTICSEARCH_HOSTS:elasticsearch-master:9200}'
"
}
Filebeat → Kafka → Logstash → ElasticSearch → Kibana。
Filebeat读取&根据配置将服务器上的日志推送到Kafka主题上,然后,Logstash将从kafka主题上订阅这些日志,并根据需要进行解析过滤格式化,包括字段,并将处理后的日志数据发送到Elasticsearch Index。
然后,Logstash将订阅这些来自Kafka主题的日志,并按照要求进行解析过滤、格式化、排除和包含字段,并将处理后的日志数据发送到Elasticsearch Index。
通过仪表盘可视化你的数据
如果你问的是这个问题,那么Kafka并不是一个 "必须"。如果你受限于弹性集群中可以捕获的数据量,那么可以在你的logstash配置中应用解析逻辑来解析和过滤你需要的日志。Filebeat会将数据发送给logstash,logstash经过解析和过滤后会发送给elastic。