ELK filebeat和logstash使用:配置单个文件来源、配置多个文件来源

时间:2025-04-09 09:14:28
# 数据输入:logstash单独使用的话的直接从我们的log文件中输入数据,这里我们从filebeat中输入数据 port 与 Logstash output 中我们配置的端口号相呼应 input { beats { port => 9601 } #file { #path => "E:/test/log/*/*.log" #start_position => "beginning" #stat_interval => 3 #} } # 数据过滤 filter { if [fields][logtype] == "testlog" { # [fields][logtype] 等于 filebeat中的logtype字段 if ([message]=~ "正则表达式") { # message就是指原始消息:如果原始消息满足这个正则表达式,则丢弃该记录 drop{} } grok { match => { # 尝试匹配message的数据,把匹配到的数据存入es表,字段名如下 为自定义的 timestamp,client_ip,... # message配置:string or array "message" => [ "\[%{TIMESTAMP_ISO8601:timestamp}\] %{IP:client_ip} %{USERNAME:method} %{URL:url}%{CUSTOMURIPARAM:param} %{NUMBER:duration}" ] } } # 把我们的时间格式 YYYY-mm-dd HH:ii:ss 转化为时间戳的形式,存入到es:替换es的@timestamp字段 date { match => ["timestamp", "ISO8601"] target => "@timestamp" } # 由于我们匹配出来了一个timestamp字段,并且替换了es默认的@timestamp字段;所以es的字段timestamp和@timestamp值是想等的,我们删除自己匹配出来的字段timestamp mutate{ remove_field => ["timestamp"] } } } # 数据输出 output { if [fields][logtype] == "testlog" { #文件来源是 testlog,对应filebeat的 logtype 字段 if "_grokparsefailure" not in [tags]{ #匹配成功,存es的success-log 表 elasticsearch { # hosts配置:string or array hosts => ["localhost:9200"] index => "success-log-%{+}" } } else { #匹配失败,存es的error-log 表 elasticsearch { hosts => ["localhost:9200"] index => "error-log-%{+}" } } } #else { #当filebeat有多个日志文件来源时,根据filebeat的 logtype 字段判断要存入哪张es表 #elasticsearch { #hosts => ["localhost:9200"] #index => "other-log-%{+}" #} #} }