windows简单搭建 filebeat + kafka + logstash + Elasticsearch + Kibana日志收集系统

时间:2024-03-19 15:43:35

windows简单搭建 filebeat + kafka + logstash + Elasticsearch + Kibana日志收集系统

介绍

在日常运维工作中,分布式系统的日志查看很费劲。所以在这里分享一下自己部署的filebeat + kafka + ELK开源实时日志分析平台的记录过程。

1、ELK介绍

开源实时日志分析ELK平台能够完美的解决我们上述的问题,ELK由ElasticSearch、Logstash和Kiabana三个开源工具组成:
1)ElasticSearch是一个基于Lucene的开源分布式搜索服务器。它的特点有:分布式,零配置,自动发现,索引自动分片,索引副本机制,restful风格接口,多数据源,自动搜索负载等。它提供了一个分布式多用户能力的全文搜索引擎,基于RESTful web接口。Elasticsearch是用Java开发的,并作为Apache许可条款下的开放源码发布,是第二流行的企业搜索引擎。设计用于云计算中,能够达到实时搜索,稳定,可靠,快速,安装使用方便。
在elasticsearch中,所有节点的数据是均等的。
2)Logstash是一个完全开源的工具,它可以对你的日志进行收集、过滤、分析,支持大量的数据获取方法,并将其存储供以后使用(如搜索)。说到搜索,logstash带有一个web界面,搜索和展示所有日志。一般工作方式为c/s架构,client端安装在需要收集日志的主机上,server端负责将收到的各节点日志进行过滤、修改等操作在一并发往elasticsearch上去。
3)Kibana 是一个基于浏览器页面的Elasticsearch前端展示工具,也是一个开源和免费的工具,Kibana可以为 Logstash 和 ElasticSearch 提供的日志分析友好的 Web 界面,可以帮助您汇总、分析和搜索重要数据日志。

2、为什么要采用filebeat

Logstash是一个功能强大的日志服务,不过会占用较多的系统资源,filebeat作为一个轻量级的日志服务,具有以下特点:

1)健壮,从没错过任何一个细节

在读取和转发日志行的过程中,如果被中断,Filebeat会记录中断的位置。并且,当重新联机时,Filebeat会从中断的位置开始。

2)它不会让你超负荷工作

当发送数据到Logstash或Elasticsearch时,Filebeat使用一个反压力敏感(backpressure-sensitive)的协议来解释高负荷的数据量。当Logstash数据处理繁忙时,Filebeat放慢它的读取速度。一旦压力解除,Filebeat将恢复到原来的速度,继续传输数据。

3)使简单的事情保持简单

Filebeat附带了内部模块(auditd、Apache、Nginx、System和MySQL),这些模块简化了普通日志格式的聚集、解析和可视化。结合使用基于操作系统的自动默认设置,使用Elasticsearch Ingest Node的管道定义,以及Kibana仪表盘来实现这一点。

3、使用kafka

使用kafka,有着降低系统耦合度,缓冲消息,提高吞吐方面的作用,并且使系统的扩展变得很容易。

功能快捷键

首先到elastic官网https://www.elastic.co/cn/downloads/下载使用到的组件 并上传到准备安装的服务器。由于本次安装的环境在阿里云上,所以不用解决错综复杂的网络环境,默认服务器间是在一个局域网中的,无需打通网络环境。

部署架构如下图:
windows简单搭建 filebeat + kafka + logstash + Elasticsearch + Kibana日志收集系统

当前在测试环境搭了一套单点的服务,下面的内容为单点情况下部署的步骤,之后会补充集群的部署情况。

elasticsearch

下载解压elasticsearch
elasticsearch/config/elasticsearch.yml 为配置文件
配置network.host、端口号和Node 默认是 http://localhost:9200/
windows简单搭建 filebeat + kafka + logstash + Elasticsearch + Kibana日志收集系统
bin/elasticsearch.bat 为启动文件

elasticsearch用法自行百度

kibana

下载解压elasticsearch
kibana/config/kibana.yml 为配置文件
修改端口号、host、elasticsearch-host配置 默认是 http://localhost:5601/windows简单搭建 filebeat + kafka + logstash + Elasticsearch + Kibana日志收集系统
bin/kibana.bat 为启动文件

kibana用法自行百度

logstash

配置文件需要自己写,可参考config里面的logstash-sample.conf,或者直接访问官方的文档

#输入为kafka,topics是test,查询时间是1秒
input {
kafka {
enable_auto_commit => true
auto_commit_interval_ms => “1000”
codec => “json”
bootstrap_servers => “192.168.1.223:9092”
topics => [“test”]
}
#beats {
#port => 5044
#}
}
filter { # 对不同的log来源采用不同的处理逻辑
grok {
match => { “message” => “[%{WORD:info_level} %{DATESTAMP:timestamp} %{WORD:temp}:%{NUMBER:temp}] %{NUMBER:status} %{WORD:method} %{URIPATHPARAM:request} (%{IP:ip}) %{GREEDYDATA:C}”
} # 对日志整体进行切分
}
mutate {
split => [“request”, “?”]
add_field => {
“uri” => “%{[request][0]}”
}# 对切分的部分字段再加工
}
date {
match => [“timestamp”, “dd-MM-YY HH:mm:ss”]
timezone => “+08:00” # 默认存储时间会有时间差,+8以消除时差
} # 重定义timestamp,该字段为kibana时间统计的主要字段
}
#stdout{ codec=>rubydebug}用来调试使用,运行时会输出到屏幕,调试完成注释掉
#在filebeat里面我们定义了fields的logtype,在这里可以用来区别不同的日志
#index为ES的数据库index名
output {
stdout{ codec=>rubydebug}
if [fields][logtype] == “syslog”{
elasticsearch {
hosts => [“192.168.1.108:9200”]
index => “zjslog-%{+YYYY.MM.dd}”
}
}
if [fields][logtype] == “ejabberdlog”{
elasticsearch {
hosts => [“192.168.1.108:9200”]
index => “ejabberdlog-%{+YYYY.MM.dd}”
}
}
}
}

bin\logstash -f config\logstash.conf 为根据logstash.conf文件启动命令

filebeat

filebeat/filebeat.yml 为配置文件

锁定log日志文件windows简单搭建 filebeat + kafka + logstash + Elasticsearch + Kibana日志收集系统

输出的配置 windows简单搭建 filebeat + kafka + logstash + Elasticsearch + Kibana日志收集系统
filebeat -e -c filebeat.yml 我启动命令