使用logstash进行ip映射(主机名或系统名)

时间:2024-04-09 21:22:28

需求场景

当使用elasticsearch进行日志数据可视化的时候,往往会遇到需要IP地址无法human-reading的情况。这时,我们需要将IP地址进行一定的格式转换,将其转换为主机名(hostname)或者系统名(application/service name)。

以WAF的日志为例,里面的dst_ip记录了被攻击的主机ip,scr_ip记录了发起攻击的系统的ip:
使用logstash进行ip映射(主机名或系统名)
可视化后,如果以dst_ip进行聚合,我们无法清楚的看到是哪个系统遭受了攻击
使用logstash进行ip映射(主机名或系统名)
因此,我们需要将IP到系统名进行一个映射。实际的生产工作中,我们一定还会在其他的地方遇到类似的需求。

解决方案

如果只有一两个值的情况,我们可以用script field和plainless的方式来解决。但如果我们有大量的记录,每次都通过脚本的方式进行转换会为集群带来大量的负担,并且会需要更长的处理时间,因此,我们需要在数据写入的时候就进行IP地址的转换。

logstash上提供了一个dns_filter_plugin,可以进行dns的查找。其中有几个重要的参数:

  • resolve, 是指将A记录(主机名),CNAME记录(服务名)等域名转换为IP地址
  • reverse,是指反向映射,将IP地址映射为域名
  • hostsfile, 包含映射关系的hosts文件
  • action, 需要执行的操作,提供的操作类型是replace和append。replace是现场替换,直接将field里的value进行替换。append是附加,会将field里的value转换为数组,并且附加到数组的末尾。

测试示例

我们可以快速的通过logstash的其他plugin,一起来验证dns是否可以完成我们的需求。

  • input: 使用generate plugin
  • filter:使用dns plugin
  • output: 使用stdout plugin

新建一个conf文件:

input{

  generator {
    message => "192.168.135.105"
    count => 1
  }

}

filter {
	dns {
	  action => "replace"
	  reverse => [ "message" ] ## 我们使用reverse,是因为我们需要根据IP,知道系统
	  hostsfile => ["/Users/Lex/Applications/myhosts"]
	}

}

output{

  stdout{
    codec => json
  }
}

先来一个简单的myhosts:

192.168.135.105         前置应用2

注意,这里的域名是中文的,因此这并非一个看起来合法的hosts文件,但是无所谓,我们要做的只是一个mapping。

运行一下logstash,我们来看看输出:

[2019-04-04T08:42:41,044][INFO ][logstash.agent           ] Pipelines running {:count=>1, :pipelines=>["main"]}
{"host":"MacBook-Pro.local","@timestamp":"2019-04-04T00:42:40.991Z","@version":"1","sequence":0,"message":"前置应用2"}[2019-04-04T08:42:41,488][INFO ][logstash.pipeline        ] Pipeline terminated {"pipeline.id"=>"main"}

message的值已经被转换为了前置应用2
我们添加更多的值来试试,将input改为下表:

input{

    generator {
    #message => "192.168.135.105"
    lines => [
      "192.168.200.22",
      "192.168.200.23",
      "192.168.200.24"
    ]
    count => 1
  }
}

对应的hosts文件改为:

192.168.135.105         前置应用2
192.168.200.22          客服系统
192.168.200.23          客服系统
192.168.200.24          客服系统
192.168.200.25          客服系统

测试一下:

{"@timestamp":"2019-04-04T00:59:12.424Z","sequence":0,"message":"客服系统","@version":"1","host":"MacBook-Pro.local"}{"@timestamp":"2019-04-04T00:59:12.453Z","sequence":0,"message":"客服系统","@version":"1","host":"MacBook-Pro.local"}{"@timestamp":"2019-04-04T00:59:12.455Z","sequence":0,"message":"客服系统","@version":"1","host":"MacBook-Pro.local"}[2019-04-04T08:59:12,904][INFO ][logstash.pipeline        ] Pipeline terminated {"pipeline.id"=>"main"}

没问题。
我们把原始的ip地址也保留一下:

filter {
	mutate {
	  add_field => { "service" => "%{message}" }
	}
	dns {
	  action => "replace"
	  reverse => [ "service" ] ## 我们使用reverse,是因为我们需要根据IP,知道系统
	  hostsfile => ["/Users/Lex/Applications/myhosts"]
	}

}

这时输出为:

[2019-04-04T09:04:32,714][INFO ][logstash.agent           ] Pipelines running {:count=>1, :pipelines=>["main"]}
{"@version":"1","message":"192.168.200.24","sequence":0,"@timestamp":"2019-04-04T01:04:32.711Z","service":"客服系统","host":"MacBook-Pro.local"}{"@version":"1","message":"192.168.200.23","sequence":0,"@timestamp":"2019-04-04T01:04:32.710Z","service":"客服系统","host":"MacBook-Pro.local"}{"@version":"1","message":"192.168.200.22","sequence":0,"@timestamp":"2019-04-04T01:04:32.598Z","service":"客服系统","host":"MacBook-Pro.local"}[2019-04-04T09:04:33,094][INFO ][logstash.pipeline        ] Pipeline terminated {"pipeline.id"=>"main"}

我们将系统名保存到了service中。

性能测试与调优

从代码上看,因为logstash每个worker每次处理一条event,在进入pipeline之后,每个包含映射逻辑的event都需要线程去加载一个文件,并且在这个文件中检索某个IP地址或域名,当我们有大量的吞吐时,可能会影响到实时性,看看文档上是怎么说的:

This filter, like all filters, only processes 1 event at a time, so the use of this plugin can significantly slow down your pipeline’s throughput if you have a high latency network. By way of example, if each DNS lookup takes 2 milliseconds, the maximum throughput you can achieve with a single filter worker is 500 events per second (1000 milliseconds / 2 milliseconds).

(显然,别人文档上是说当you have a high latency network时,才会影响吞吐,因为我们用的是hosts文件,而不是DNS server,索引不会受网络的影响,但总之,我们得测试一下)

关于性能的优化,这里的可选项是缓存,对应的参数是:

  • failed_cache_size
    • Value type is number
    • Default value is 0
    • cache size for failed requests
  • failed_cache_ttl
    • Value type is number
    • Default value is 5
    • how long to cache failed requests (in seconds)
  • hit_cache_size
    • Value type is number
    • Default value is 0
    • set the size of cache for successful requests
  • hit_cache_ttl
    • Value type is number
    • Default value is 60
    • how long to cache successful requests (in seconds)

我们都先用默认值,我们添加100个IP地址,循环发送5000次:

real	0m33.916s
user	2m14.692s
sys	0m7.794s

real	0m34.766s
user	2m15.708s
sys	0m7.777s

real	0m34.563s
user	2m14.599s
sys	0m7.626s

可能是因为我的电脑比较快,24秒处理50万条,当然,这里是多线程。因为没有其他cpu+memory的基线数据,所以只能以自己的mac作为基线了。

然后我修改了hit_cache_size参数,设置为100(因为我只有100个IP)。发现基本上最终的参数没有什么变化,在循环次数少的(循环100和1000)次的情况下,使用的时间还稍微多了点。

因此,在简单的测试了几分钟后,结论是:
没有遇到明显的性能问题的情况下,无需设置cache相关的参数。因为没有读源码,可能cache是默认打开了的