Logstash使用grok插件解析Nginx日志

时间:2023-03-10 01:35:16
Logstash使用grok插件解析Nginx日志

grok表达式的打印复制格式的完整语法是下面这样的:

%{PATTERN_NAME:capture_name:data_type}
data_type 目前只支持两个值:int 和 float。

在线gork正则的地址:http://grokdebug.herokuapp.com/
Logstash基础正则地址:https://github.com/elastic/logstash/blob/v1.4.2/patterns/grok-patterns

也可以在你的安装路径下查找grok-patterns内置的正则表达式:

/usr/local/logstash-6.2.4/vendor/bundle/jruby/2.3.0/gems/logstash-patterns-core-4.1.2/patterns/grok-patterns

例如:

URIPATH (?:/[A-Za-z0-$.+!*'(){},~:;=@#%&_\-]*)+
URIPARAM \?[A-Za-z0-$.+!*'|(){},~@#%&/=:;_?\-\[\]<>]*

-----------------------------------------------------------------------------------------

logstash-filter-mutate 插件是Logstash 另一个重要插件,它提供了丰富的基础类型数据处理能力,包括类型转换,字符串处理和字段处理等

可以设置的转换类型包括:"integer","float" 和 "string"。示例如下:

filter {
mutate {
convert => ["request_time", "float"]
gsub => [ "message", "aa", "" ] 字符串替换,此处为清除
}
}

注意:mutate 除了转换简单的字符值,还支持对数组类型的字段进行转换,即将 ["1","2"] 转换成 [1,2]。但不支持对哈希类型的字段做类似处理。

对已有索引进行修改并且平滑过渡:

        mutate {
convert => [ "request_time", "float" ]
add_field => [ "response_time", "%{request_time}" ]
remove_field => [ "request_time" ]
}

--------------------------------------------------------------------------------

在线grok表达式解析地址:http://grokdebug.herokuapp.com/  (NGINXACCESS为规则名称,测试时不用填入)

根据已经生成的日志记录逐个参数进行调试即可。

需要注意的是当日志或可能出现“-”时,必须在NGINXACCESS中指定,例如: (?:%{NUMBER:serverelapsed}|-)

需要修改的文件位置:/usr/local/logstash/vendor/bundle/jruby/2.3.0/gems/logstash-patterns-core-4.1.2/patterns/grok-patterns

如果找不到的话,grep -rw NGINXACCESS /usr/local/logstash

首先对nginx日志格式设定如下:

log_format  main  '$remote_addr - $remote_user [$time_local] "$request_method $uri $query_string" $status $body_bytes_sent "$http_referer" "$http_user_agent" $request_time $upstream_response_time "$http_x_forwarded_for" ';

Logstash Grok默认的参数为:

NGUSERNAME [a-zA-Z\.\@\-\+_%]+
NGUSER %{NGUSERNAME}
NGINXACCESS %{IPORHOST:clientip} %{NGUSER:ident} %{NGUSER:auth} \[%{HTTPDATE:timestamp}\] "%{WORD:verb} %{URIPATHPARAM:request} HTTP/%{NUMBER:httpversion}" %{NUMBER:response} (?:%{NUMBER:bytes}|-) (?:"(?:%{URI:referrer}|-)"|%{QS:referrer}) %{QS:agent} %{QS:xforwardedfor} %{IPORHOST:host} %{BASE10NUM:request_duration}

我这边设置的是:

#nginx
WZ ([^ ]*)
NGINXACCESS %{IP:remote_ip} \- \- \[%{HTTPDATE:timestamp}\] "%{WORD:method} %{WZ:request} HTTP/%{NUMBER:httpversion}" %{NUMBER:status} %{NUMBER:bytes} (?:%{QS:referer}|-) %{QS:agent} %{NUMBER:elapsed} (?:%{NUMBER:serverelapsed}|-) %{QS:xforward}

或拆分请求的uri和请求参数:此处需要注意的是必须确保NGINXACCESS规则是一整行

WZ ([^ ]*)
URIPARAM [A-Za-z0-9$.+!*'|(){},~@#%&/=:;_?\-\[\]<>]* #此处重新定义该匹配规则,不匹配参数开头的?
NGINXACCESS %{IP:remote_ip} \- \- \[%{HTTPDATE:timestamp}\] "%{WORD:method} (%{URIPATH:request}|-|) (%{URIPARAM:requestParam}|-)" %{NUMBER:status}
%{NUMBER:bytes} %{QS:referer} %{QS:agent} %{NUMBER:elapsed} %{NUMBER:serverelapsed} %{QS:xforward}

自定义规则时也可以在/usr/local/logstash主目录下新建一个目录:patterns,并在该目录下新建一个grok的表达式解析文件nginx,内容同上,也可以实现相同的目的。

Nginx logstash配置文件:

input{
file{
path => "/usr/local/nginx/logs/access.log"
type => "nginx"
start_position => "beginning"
} file{
path => "/usr/local/nginx/logs/admin.log"
type => "admin-nginx"
start_position => "beginning"
}
} filter {
grok {
match => { "message" => "%{NGINXACCESS}" }
}
mutate {
convert => [ "elapsed", "float" ]
convert => [ "serverelapsed", "float" ]
}
} output{
if [type] == "nginx" {
elasticsearch {
hosts=> ["172.17.102.202:9200"]
index=> "nginx"
}
}
else {
elasticsearch {
hosts=> ["172.17.102.202:9200"]
index=> "admin-nginx"
}
}
}