ELK ElasticFlow数据流分析图和部署

时间:2023-02-14 15:30:36

 一、ELK ElasticFlow介绍和分析图

1、ElasticFlow的目的

ElasticFlow利用sFlow等技术来收集网络中有关流量的信息,集流量收集、分析、报告于一体,深入了解流量与带宽的占用情况等用户最关心的问题,为全面了解企业的网络活动,合理有效分配和规划网络带宽提供科学的依据,保证企业的关键业务应用畅通运行。小结目的1、确定带宽使用者以及情况;2、网络的流量&协议可视化能力、调查问题的根源

2、ElasticFlow分析图

2.1 Overview(总揽)

图里发现10.88.3.1承担了28%的流量,追踪发现3.1是一个无线控制器,简单分析得知局域网的流量有线占据72%无线占据28%

ELK ElasticFlow数据流分析图和部署

2.2 Top-N(排行)

通过挖掘POP3接收SMTP发送Email协议情况、SSH判断连接servers是否符合情况和客户端流量

ELK ElasticFlow数据流分析图和部署

2.3 Threats(威胁)

接入TALOS团队对IP信誉进行评估

ELK ElasticFlow数据流分析图和部署

2.4 Flows(流)

已知局域网10.88.35.15是台FTP服务器,通过flows得知今天谁访问过,统计周期内流量大小情况

ELK ElasticFlow数据流分析图和部署

已知局域网10.88.35.4是台AD服务器,通过flows得知与其交互情况,感知局域网的威胁

ELK ElasticFlow数据流分析图和部署

2.5 Geo IP(地理知识产权)

目标地址的世界地理位置

ELK ElasticFlow数据流分析图和部署

2.6 AS Traffic(自治系统流量)

ELK ElasticFlow数据流分析图和部署

2.7 Exporters(网络接口流量)

分析设备接口和总背板吞吐情况,入口Ingress Interfaces (bits/s)、出口Egress Interfaces (bits/s)

ELK ElasticFlow数据流分析图和部署

2.8 Traffic Details(流量详细图)

通过查看SSH协议判断servers和状态、通过查看3389审计远程主机合法性。统计周期时间轴流量情况

ELK ElasticFlow数据流分析图和部署

2.9 Flows Records(流记录)

Flows的时间流水

ELK ElasticFlow数据流分析图和部署

二、ELK ElasticFlow部署

1、jdk环境和其他工具准备

yum install java-openjdk-devel java-openjdk    # jdk
java -version
yum install git unzip net-tools lrzsz

2、下载elk安装包并安装

# 创建安装包存放目录
mkdir -p /home/apps/elk
cd /home/apps/elk
# 下载安装包
wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-7.8.1-x86_64.rpm
wget https://artifacts.elastic.co/downloads/kibana/kibana-7.8.1-x86_64.rpm
wget https://artifacts.elastic.co/downloads/logstash/logstash-7.8.1.rpm
# 安装
rpm -ivh elasticsearch-7.8.1-x86_64.rpm kibana-7.8.1-x86_64.rpm logstash-7.8.1.rpm
# 加入systemd管理,使其开机自启
systemctl daemon-reload
systemctl enable elasticsearch.service
systemctl enable kibana.service
systemctl enable logstash.service

3、修改配置文件

vim /etc/elasticsearch/elasticsearch.yml

检查配置文件关键配置如下,通过esc+dG批量删除且粘贴下述内容

[root@testhost elk]# grep -v "^$\|^#" /etc/elasticsearch/elasticsearch.yml
cluster.name: elastiflow
node.name: elastiflow
path.data: /var/lib/elasticsearch
path.logs: /var/log/elasticsearch
bootstrap.memory_lock: true
network.host: 0.0.0.0
http.port: 9200
action.destructive_requires_name: true
indices.query.bool.max_clause_count: 8192
search.max_buckets: 250000
discovery.type: single-node

修改es的jvm配置

​vim /etc/elasticsearch/jvm.options​检查配置文件关键配置如下

-Xms4g
-Xmx4g

修改kibana配置文件​vim /etc/kibana/kibana.yml​

server.host: "0.0.0.0"
elasticsearch.hosts: ["http://localhost:9200"]

4、启动服务

顺序启动

systemctl restart elasticsearch.service
systemctl status elasticsearch.service
systemctl restart kibana.service
systemctl status kibana.service

5、安装logstash plugin

/usr/share/logstash/bin/logstash-plugin install logstash-codec-sflow
/usr/share/logstash/bin/logstash-plugin install logstash-codec-netflow
/usr/share/logstash/bin/logstash-plugin install logstash-input-udp
/usr/share/logstash/bin/logstash-plugin install logstash-input-tcp
/usr/share/logstash/bin/logstash-plugin install logstash-filter-dns
/usr/share/logstash/bin/logstash-plugin install logstash-filter-geoip
/usr/share/logstash/bin/logstash-plugin install logstash-filter-translate

6、logstash中的elastiflow模块下载部署

git clone https://github.com/robcowart/elastiflow.git
wget https://github.com/robcowart/elastiflow/archive/master.zip
unzip master.zip
cp -a /home/apps/elk/elastiflow-master/logstash/elastiflow/. /etc/logstash/elastiflow/
cp -a /home/apps/elk/elastiflow-master/logstash.service.d/. /etc/systemd/system/logstash.service.d/
sz /home/apps/elk/elastiflow-master/kibana/elastiflow.kibana.7.8.x.ndjson # 把kibana模板保存到宿主机,后面步骤需要在kibana页面中导入此模板文件

修改logstash jvm配置文件​vim /etc/logstash/jvm.options​(此项很重要,需要把jvm堆改大,否则logstash启动会报OOM)

[root@testhost elk]# grep -v "^$\|^#" /etc/logstash/jvm.options 
-Xms4g
-Xmx4g
-XX:+UseConcMarkSweepGC
-XX:CMSInitiatingOccupancyFraction=75
-XX:+UseCMSInitiatingOccupancyOnly
-Djava.awt.headless=true
-Dfile.encoding=UTF-8
-Djruby.compile.invokedynamic=true
-Djruby.jit.threshold=0
-Djruby.regexp.interruptible=true
-XX:+HeapDumpOnOutOfMemoryError
-Djava.security.egd=file:/dev/urandom
-Dlog4j2.isThreadContextMapInheritable=true

修改管道配置文件​vim /etc/logstash/pipelines.yml​

# This file is where you define your pipelines. You can define multiple.
# For more information on multiple pipelines, see the documentation:
# https://www.elastic.co/guide/en/logstash/current/multiple-pipelines.html

#- pipeline.id: main
# path.config: "/etc/logstash/conf.d/*.conf"

- pipeline.id: elastiflow
path.config: "/etc/logstash/elastiflow/conf.d/*.conf"

执行logstash系统脚本​/usr/share/logstash/bin/system-install​启动logstash并设置开机自启

systemctl daemon-reload
systemctl enable logstash
systemctl start logstash.service

7、其他测试项

systemctl stop firewalld.service
systemctl disable firewalld.service

# 临时禁用selinux
setenforce 0

测试es:curl http://0.0.0.0:9200

ELK ElasticFlow数据流分析图和部署

检查端口:netstat -antupl |grep java

ELK ElasticFlow数据流分析图和部署

检查日志:tailf /var/log/logstash/logstash-plain.log

ELK ElasticFlow数据流分析图和部署

elasticflow各协议的端口查看:

grep -i sflow /etc/systemd/system/logstash.service.d/elastiflow.conf    # 只查看sflow相关端口和配置

ELK ElasticFlow数据流分析图和部署

8、kibana配置

浏览器打开http://{此处输入部署服务器地址}:5601

ELK ElasticFlow数据流分析图和部署

导入第六步kibana模板

ELK ElasticFlow数据流分析图和部署

高级设置

ELK ElasticFlow数据流分析图和部署

过滤器是否默认具有全局状态(被固定)

ELK ElasticFlow数据流分析图和部署

URL有时会变得太大,一些浏览器无法处理。为了解决这个问题,我们正在测试将部分URL存储在会话存储中是否有帮助

ELK ElasticFlow数据流分析图和部署

三、交换机配置(H3C)

ELK ElasticFlow数据流分析图和部署

# 全局sflow配置
sflow agent ip X.X.X.X #此处我不确定配置网关接口是否可行,实际情况我通过loopback来实现,比较安全
sflow source ip X.X.X.X #可选项,缺省情况下,设备使用路由决定的源IP地址作为sFlow报文的源IP地址
sflow collector 1 ip X.X.X.X{应为ElasticFlow地址} description "TestCollector"
sflow sampling-rate 1000

# 接口下应用sflow配置(int G&H&X&T)
sflow flow collector 1
sflow sampling-rate 1000
sflow counter collector 1
sflow counter interval 60

# 查看sflow状态
<H3C>dis sflow

ELK ElasticFlow数据流分析图和部署