Logstash 父子关系 配置

时间:2022-12-23 23:49:20

最近在使用Lostash的过程中遇到了一个问题:在一个log文件里包含两类数据,而且两类数据之间存在父子关系,那如何使用lostash的configuration实现这个需求呢

思路:

  1. 首先定义父事件的pattern,因为子事件不匹配父pattern,所以logstash会自动为子事件添加_grokparesefailure 标签。通过该标签即可知道当前事件是父事件还是子事件
  2. 使用filter->ruby生成document_id,并把它放到ruby全局变量中 ,这样子事件就可以访问到父事件的document_id
  3. 同时为父事件和子事件添加一个字段例如doc_id用来存放步骤二中生成的document_id,单独为子事件添加一个字段例如parent_id,用来存储父事件的document_id。

在此要感谢elastic官方论坛的一个帖子:"keep global variable in logstash",它让我知道了如何使用filter->ruby来实现全局变量。

以下是logstash的完整配置

input {
beats {
port => 5044
}
} filter {
# remove the empty lines
if [message] =~ /^\s*$/ {
drop { }
}
# define parent event pattern
grok {
match => {"message" => "%{DATESTAMP:EventTime},%{NUMBER:Mil:INT} %{WORD:Type} %{GREEDYDATA:Item} %{GREEDYDATA:RIC} %{GREEDYDATA:Detail} %{GREEDYDATA:Category}"}
}
# children events
if "_grokparsefailure" in [tags] {
grok {
match => {"message" => "\<%{NUMBER:FID:INT}\>,%{GREEDYDATA:FName},%{WORD:FType},%{GREEDYDATA:FValue}"}
add_field => {"DocID" => '' "ParentID" => ''}
add_tag => ["%{FType}"]
remove_tag => ["_grokparsefailure"]
}
ruby {
code => "require 'digest/md5';
event['ParentID'] = @@parentid;
event['DocID'] = Digest::MD5.hexdigest(@@parentdate+event['FID'])"
}
}
else{
mutate {
add_field => {"DocID" => ''}
add_tag => ["parent"]
}
# define a global variable to keep the parent id
# must set the default value for the variables in ruby -> init block, or it will raise exception
ruby {
init => "@@parentid = '';@@parentdate=''"
code => "require 'digest/md5';
@@parentid = Digest::MD5.hexdigest(event['EventTime']+event['Mil']);
event['DocID'] = @@parentid;
@@parentdate = event['EventTime']+event['Mil']"
}
}
#remove the redundant fields created by filebeat. you can ignore it if you don't use filebeat as shipper
mutate {
remove_field => ["[beat][hostname]","[beat][name]","count","fields","input_type","offset","type","beat","@version"]
}
} output {
elasticsearch {
hosts => ["localhost:9200"]
index => "%{[@metadata][beat]}-%{+YYYY.MM.dd}"
#set the document_id
document_id => %{"DocID"}
document_type => "%{[@metadata][type]}"
#template => "/appserver/ELK/logstash-2.3.4/conf/template_tolreport.json"
#template_name =>"template_tolreport"
#template_overwrite => true
}
# file {
# path => "./test-%{+YYYY-MM-dd}.txt"
# }
}

英语好的同学可以参考我在elastic 的官方论坛中发的帖子:https://discuss.elastic.co/t/logstash-parent-child-event-configuration/58117