ELK filebeat和logstash使用:配置单个文件来源、配置多个文件来源
# 数据输入:logstash单独使用的话的直接从我们的log文件中输入数据,这里我们从filebeat中输入数据 port 与 Logstash output 中我们配置的端口号相呼应
input {
beats {
port => 9601
}
#file {
#path => "E:/test/log/*/*.log"
#start_position => "beginning"
#stat_interval => 3
#}
}
# 数据过滤
filter {
if [fields][logtype] == "testlog" { # [fields][logtype] 等于 filebeat中的logtype字段
if ([message]=~ "正则表达式") { # message就是指原始消息:如果原始消息满足这个正则表达式,则丢弃该记录
drop{}
}
grok {
match => {
# 尝试匹配message的数据,把匹配到的数据存入es表,字段名如下 为自定义的 timestamp,client_ip,...
# message配置:string or array
"message" => [
"\[%{TIMESTAMP_ISO8601:timestamp}\] %{IP:client_ip} %{USERNAME:method} %{URL:url}%{CUSTOMURIPARAM:param} %{NUMBER:duration}"
]
}
}
# 把我们的时间格式 YYYY-mm-dd HH:ii:ss 转化为时间戳的形式,存入到es:替换es的@timestamp字段
date {
match => ["timestamp", "ISO8601"]
target => "@timestamp"
}
# 由于我们匹配出来了一个timestamp字段,并且替换了es默认的@timestamp字段;所以es的字段timestamp和@timestamp值是想等的,我们删除自己匹配出来的字段timestamp
mutate{
remove_field => ["timestamp"]
}
}
}
# 数据输出
output {
if [fields][logtype] == "testlog" { #文件来源是 testlog,对应filebeat的 logtype 字段
if "_grokparsefailure" not in [tags]{ #匹配成功,存es的success-log 表
elasticsearch {
# hosts配置:string or array
hosts => ["localhost:9200"]
index => "success-log-%{+}"
}
} else { #匹配失败,存es的error-log 表
elasticsearch {
hosts => ["localhost:9200"]
index => "error-log-%{+}"
}
}
} #else { #当filebeat有多个日志文件来源时,根据filebeat的 logtype 字段判断要存入哪张es表
#elasticsearch {
#hosts => ["localhost:9200"]
#index => "other-log-%{+}"
#}
#}
}