我在代码中有一个变量'worflow',一旦任何事件与该过滤器匹配,我都希望将其保留为下一个事件使用。问题-通过使用“添加字段”,对于实际上包含工作流的事件,工作流的值显示在csv中,但对于其他事件,显示%{workflow}(原样)。请为此提出任何建议。
input {
file{
path => "D:/alerts extracted from mails/*.log"
start_position => beginning
sincedb_path=>"NUL"
}
}
filter {
grok{
match=> {"message" => "(?<mess_time>%{DAY} %{MONTH} %{MONTHDAY} %{TIME} %{YEAR}) : %{DATA:Title}> %{WORD:Code} Workflow: (\[%{WORD:workflow}\]) %{GREEDYDATA:info}"}
match=> {"message" => "(?<mess_time>%{DAY} %{MONTH} %{MONTHDAY} %{TIME} %{YEAR}) : %{DATA:Title}> %{WORD:Code} %{GREEDYDATA:info}"}
}
if "_grokparsefailure" in [tags]{
drop { }
}
mutate {
add_field => { "client_id" => "BP" }
add_field => { "portfolio_id" => "NAGP" }
add_field => { "resource_id" => " " }
add_field => { "resource_name" => "%{workflow}" }
add_field => { "lob" => "creditrisk" }
add_field => { "mess" => "%{info} }
}
}
output{
csv{
fields =>["client_id","portfolio_id","resouce_id","resource_name","lob","mess_time","mess"]
path => "C:/Users/saavi.verma/Downloads/logstash-7.6.2/output_files/output1.csv"
}
}
您始终可以检查字段workflow
是否存在(意味着该字段已正确提取或与第一个grok模式匹配。)>
您将像这样进行条件检查:
input { file{ path => "D:/alerts extracted from mails/*.log" start_position => beginning sincedb_path=>"NUL" } } filter { grok{ match=> {"message" => "(?<mess_time>%{DAY} %{MONTH} %{MONTHDAY} %{TIME} %{YEAR}) : %{DATA:Title}> %{WORD:Code} Workflow: (\[%{WORD:workflow}\]) %{GREEDYDATA:info}"} match=> {"message" => "(?<mess_time>%{DAY} %{MONTH} %{MONTHDAY} %{TIME} %{YEAR}) : %{DATA:Title}> %{WORD:Code} %{GREEDYDATA:info}"} } if "_grokparsefailure" in [tags]{ drop { } } # check if field 'workflow' exists if [workflow]{ mutate{ add_field => { "resource_name" => "%{workflow}" } } } mutate { add_field => { "client_id" => "BP" } add_field => { "portfolio_id" => "NAGP" } add_field => { "resource_id" => " " } add_field => { "lob" => "creditrisk" } add_field => { "mess" => "%{info} } } } output{ csv{ fields =>["client_id","portfolio_id","resouce_id","resource_name","lob","mess_time","mess"] path => "C:/Users/saavi.verma/Downloads/logstash-7.6.2/output_files/output1.csv" } }
但是,有一种更优雅,更优化的方法来实现相同效果,而无需评估条件:
代替使用add_field
操作,可以使用rename
选项。因此,基本上,您将字段workflow
重命名为resource_name
。 如果工作流字段不存在,该操作将不会引发任何错误并继续进行处理。
您可以这样实现:
input { file{ path => "D:/alerts extracted from mails/*.log" start_position => beginning sincedb_path=>"NUL" } } filter { grok{ match=> {"message" => "(?<mess_time>%{DAY} %{MONTH} %{MONTHDAY} %{TIME} %{YEAR}) : %{DATA:Title}> %{WORD:Code} Workflow: (\[%{WORD:workflow}\]) %{GREEDYDATA:info}"} match=> {"message" => "(?<mess_time>%{DAY} %{MONTH} %{MONTHDAY} %{TIME} %{YEAR}) : %{DATA:Title}> %{WORD:Code} %{GREEDYDATA:info}"} } if "_grokparsefailure" in [tags]{ drop { } } mutate { add_field => { "client_id" => "BP" } add_field => { "portfolio_id" => "NAGP" } add_field => { "resource_id" => " " } # rename field; no failure if it does not exist rename => { "workflow" => "resource_name" } add_field => { "lob" => "creditrisk" } add_field => { "mess" => "%{info} } } } output{ csv{ fields =>["client_id","portfolio_id","resouce_id","resource_name","lob","mess_time","mess"] path => "C:/Users/saavi.verma/Downloads/logstash-7.6.2/output_files/output1.csv" } }
请注意,如果resource_name-field不存在,csv输出插件将改为写一个空字符串:
应将事件中的字段名称写入CSV文件。字段以与数组相同的顺序写入CSV。 如果事件中不存在任何字段,将写入一个空字符串。
[您还想修改csv输出插件的
fields
选项,以考虑该字段的存在/不存在:
output{ if [resource_name]{ csv{ fields =>["client_id","portfolio_id","resouce_id","resource_name","lob","mess_time","mess"] path => "C:/Users/saavi.verma/Downloads/logstash-7.6.2/output_files/output1.csv" } } else{ csv{ fields =>["client_id","portfolio_id","resouce_id","lob","mess_time","mess"] path => "C:/Users/saavi.verma/Downloads/logstash-7.6.2/output_files/output1.csv" } } }
希望我能为您提供帮助。