elk学习-logstash学习笔记

时间:2023-04-03 12:03:50

概念

logstash是一个开源的,服务端的数据处理pipeline(管道),它可以接收多个源的数据,然后对他们进行转换,最终将他们发送到指定的目的类型。它通过插件机制实现各种功能,可以在https://github.com/logstash-plguins下载各种功能的插件,可以自行编写插件。

参考官方文档。https://www.elastic.co/guide/en/logstash/6.3/introduction.html

工作原理

elk学习-logstash学习笔记

logstash实现的主要功能分为接收数据、解析过滤并转换数据,输出数据三个部分,对应的插件依次是input插件,filter插件,output插件,其中filter插件是可选的,其他的两个插件是必须的插件。

常用的input

input插件主要用于接收数据,logstash支持接收多种数据源,常用的有以下几种:

  • file

读取一个文件,类似linux下tail命令,一行行的实时读取

  • syslog

监听514端口的syslog message,并使用RFC3164的格式进行解析

  • redis

从redis服务器读取数据

  • kafka,

从kafka集群中读取数据,kafka加logstash架构一般用在数据量较大的业务场景,kafka作为数据的缓冲和存储。

  • filebeat

filebeat是文本日志收集器,性能稳定,并且占用系统资源很少

常用filter

filter插件主要用户数据的过滤,解析和格式化,也就是将非结构化的数据解析成结构化的,可查询的标准化数据,常用有以下几种:

  • grok

grok是logstash最重要的插件,可以解析并结构化任意的数据,支持正则表达式,并提供了很多内置的规则和模板,此插件用的最多,但是也最复杂

  • mutate

此插件提供丰富的基础类型数据处理能力,包括类型转换,字符串处理和字段处理等

  • date

处理转换日志记录中的时间字符串

  • GeoIP

根据IP地址提供对应的地域信息,包括国别,省市,经纬度等,对于可视化地图和区域统计非常有用

常用的output

output插件用于数据的输出,一个logstash事件可以穿过多个output,直到所有的output处理完毕,这个事件才算结束。常见的output有如下几种

  • elasticsearch

发送数据到elasticsearch中

  • file

发送数据到文件中

  • redis

发送数据到redis中,其中redis插件既可以用在input插件,也可以用在output插件中

  • kafka

发送数据到kafka中

安装

logstash需要安装java运行环境,可以从官网获取安装包,https://www.elastic.co/cn/downloads/logstash

#logstash 6.3使用java 8或者java 9
#安装java下载合适的java版本
sudo mkdir /usr/java
#jav安装到/usr/java目录下
tar -zxvf jdk-8u151-linux-x64.tar.gz -C /usr/
mv /usr/jdk1.8.0_151/ /usr/java


#设置jdk环境变量
echo "export JAVA_HOME=/usr/java/jdk1.8.0_151" >> /etc/profile
echo "export PATH=$PATH:$JAVA_HOME/bin" >> /etc/profile
echo "export CLASSPATH=.:$JAVA_HOME/lib/tools.jar:$JAVA_HOME/lib/dt.jar:$CLASSPATH" >> /etc/profile
source /etc/profile

#采用源码安装,版本为6.3.2
tar -zxvf logstash-6.3.2.tar.gz -C  /usr/local
mv /usr/local/logstash-6.3.2 /usr/local/logstash

安装完毕后,检查java版本

elk学习-logstash学习笔记


配置文件说明

以tar格式下载的源码举例,以下是程序目录结构

├── bin  																#二进制程序执行路径,启动以及插件安装
├── config														  #配置文件
├── CONTRIBUTORS
├── data
├── Gemfile
├── Gemfile.lock
├── lib
├── LICENSE.txt
├── logstash-core
├── logstash-core-plugin-api
├── modules
├── NOTICE.TXT
├── tools
├── vendor
└── x-pack

以下是常用的配置文件中的说明

├── jvm.options        			#包含jvm配置,该文件可以设置初始化和最大的堆空间使用的值
├── log4j2.properties			  #针对log4j2日志库的配置选项
├── logstash.yml            #logstash的配置,也可以通过命令配置
├── pipelines.yml           #包含了在一个单logstash示例中运行多个管道的框架和结构配置
└── startup.options         #包含 /usr/share/logstash/bin 中的系统安装脚本用于为系统构建相应启动脚本的选项

logstash.yml配置文件选项说明

node.name          					#配置节点名称
path.data									  #logstash以及相关插件的持久化数据保存路径
pipeline.id 							  #pepline管道的id
pipeline.workers					  #将并行执行管道的筛选器和输出阶段cpu核心数
pipeline.batch.size         #单个工作线程在尝试执行其筛选器和输出之前将从输入收集的最大事件数
log.level										#日志等级
log.format								  #日志格式
path.logs                   #logstash自身日志保存路径
path.plugins                #自定义插件保存路径


运行以及常用配置举例

运行logstash

cd /usr/local/logstash/
bin/logstash -e 'input{stdin{}} output{stdout{codec=>rubydebug}}'

运行命令参数解释

  • -f 代表指定配置文件
  • -e 代表执行,
  • input 输入,里面是输入的方式,此处选择stdin,就是标准输入,从终端输入
  • output输出,输出方式,此处选择stdout,标准输出,输出到终端
  • codec 插件,表明格式,放在stdout中,表示输出的格式,rubdebug是专门用来测试的格式,一般用在终端输出json格式
  • -t参数检查配置参数

运行测试

在上面运行后,输出hello word,可以看到日志输出结果


{
          "host" => "localhost.localdomain",
      "@version" => "1",
       "message" => "\\hell workd",
    "@timestamp" => 2023-04-02T11:53:25.747Z
}

elk学习-logstash学习笔记

logstash会在输出内容中给事件体检一些额外信息,比如version,host,timestamp等新增字段,而最重要的是timestamp字段,用来标记事件发生的时间。由于这个字段涉及到logstash内部流转,如果给他一个字符也是timestamp命名的话,logstash就会直接报错。另外也不能删除这个字段。

在logstash输出中,常见的字段还有type,表示事件的唯一类类型,tags表示事件的某方面属性,这些字段可以随意删除或者添加。

使用-e参数在命令行中指定配置是很常用的方式,但是如果logstash需要配置更多规则的话,就必须把配置固化到文件里面,就是logstash配置文件。以下是一个简单的配置文件示例

input{
sdtin{}
}

output{
stdout {code=>rubydebug}
}

#按照如下方式启动
/bin/logstash -f logsatsh-simple.conf
#后台启动
nohup bin/logstash -f logstash-simple.conf &

配置文件举例

配置举例1

#logstash-simple2.conf

input {

file {
path =>"/var/log/messages"
}
}
output {
stdout {
codec =>rubydebug
}

#检查配置参数
bin/logstash -f -t loststash-simple2.conf -t
#运行
nohup bin/logstash -f logstash-simple2.conf &

input插件定义了输入源为file,然后指定了文件的路径为/var/log/message,就是将此文件内容作为输入源,此处path是必填选项配置,后面的路径必须是绝对路径,不能是相对路径,如果要监控多个文件,可以通过逗号分隔。例如

path=>["var/log/*.log","/var/log/mesage","/var/log/secure"]

对于output插件,这里任然使用rubydebug的json输出格式、关闭一个进程,测试输出

systemctl stop firewalld 
cat /var/log/message  #查看linux系统日志
cat nohup.out         #查看logstash输出日志

elk学习-logstash学习笔记

以下是logstash收集到信息


          "path" => "/var/log/messages",
      "@version" => "1"
}
{
          "host" => "localhost.localdomain",
       "message" => "Apr  2 20:47:09 localhost firewalld[105087]: WARNING: AllowZoneDrifting is enabled. This is considered an insecure configuration option. It will be removed in a future release. Please consider disabling it now.",
    "@timestamp" => 2023-04-02T12:47:10.735Z,
          "path" => "/var/log/messages",
      "@version" => "1"
}

这就是json格式输出的内容,可以看到输入的内容放到了message字段中,保持原样输出了,还增加了思科字段,是logstash自动添加·上去的


配置举例2

以下是配置输出到kafka的配置

input {
file {
path => "/var/log/messages"
}
}

output {
kafka {
bootstrap_servers =>
"172.16.213.51:9092,172.16.213.75:9092,172.16.213.109:9092"
topic_id => "osmessages"
}

这里的输入源是还是/varlog/messages,output输出定义了kafka,通过bootstrap_server选项指定了kafka集群的IP地址和端口,每个ip通过逗号分隔,另外topic_id是指定输出到kafka中的哪个topic选项下,这里是osmessages,如果没有此topic,会自动创建topic。整个配置文件的含义是将系统中/var/log/messagres字段和内容实时同步到kafka集群名为osmessage的topic下。


配置举例3

可以将logstash配置为一个转发节点,输出到eslaticsearch 上。

input {
kafka {
bootstrap_servers =>
"172.16.213.51:9092,172.16.213.75:9092,172.16.213.109:9092"
topics => ["osmessages"]
codec => "json"
}
}

output {
elasticsearch {
hosts => ["172.16.213.37:9200","172.16.213.77:9200","172.16.213.78:9200"]
index => " osmessageslog-%{+YYYY-MM-dd}"
}
}

以上含义,将logstash作为一个二级转发节点,收集kafka的消息,输出到eslaticsearch上。通过bootstrap_severs和topic两个选项指定了接收kafka属性的信息,因为logstash从kafka获取到的数据内容为json格式,所以需要在input字段加入codec=>json来进行解析。接着output输出类型配置为  easticsearch,并通过hosts选项指定es集群地址,最后通过index选项指定了idnex 索引名称。

elk学习-logstash学习笔记


logstash常用插件

input以及编码codec插件

基本语法

logstash的配置文件有如下三个部分组成,其中input,output部分是必须配置,filter部分是可选配置,而filter就是过滤插件

input{
输入插件
}
fiter{
过滤插件
}
output{
输出插件
}

读取文件file

input {
    file {
        path => ["/var/log/messages"]
        type => "system"
        start_position => "beginning"
    }
}
output {
    stdout{
        codec=>rubydebug    
    }
}
  • path

要用作输入的文件的路径。您可以在此处使用文件名模式,例如 /var/log/*.log。如果使用类似 /var/log/**/*.log 的模式,则将对所有 *.log 文件执行 /var/log 的递归搜索。路径必须是绝对的,不能是相对的。

  • type

向此输入处理的所有事件添加一个类型字段。

  • discover_interval

logstash 每隔多久去检查一次被监听的 path 下是否有新文件。默认值是 15 秒。

  • exclude

不想被监听的文件可以排除出去,这里跟 path 一样支持 glob 展开。

  • sincedb_path

如果你不想用默认的 $HOME/.sincedb(Windows 平台上在 C:\Windows\System32\config\systemprofile\.sincedb),可以通过这个配置定义 sincedb 文件到其他位置。

  • sincedb_write_interval

logstash 每隔多久写一次 sincedb 文件,默认是 15 秒。

  • stat_interval

logstash 每隔多久检查一次被监听文件状态(是否有更新),默认是 1 秒。

  • start_position

logstash 从什么位置开始读取文件数据,默认是结束位置,也就是说 logstash 进程会以类似 tail -F 的形式运行。如果你是要导入原有数据,把这个设定改成 “beginning”,logstash 进程就从头开始读取,有点类似 cat,但是读到最后一行不会终止,而是继续变成 tail -F

标准输入

input{
    stdin{
        add_field=>{"key"=>"iivey"}
        tags=>["add1"]
        type=>"test1"
    }
}
output {
    stdout{
        codec=>rubydebug    
    }
}
  • add_fieds

对一个事件增加一个字段

  • tags

向事件添加任意数量的标签。

  • type

向此输入处理的所有事件添加一个类型字段,可以用作后面kibana查找

syslog

input {
  syslog {
    port => "5514"
  }
}

output {
    stdout{
        codec=>rubydebug    
    }
}
  • port

指定syslog端口

TCP

input {
  tcp {
    port => "5514"
  }
}

filter {
  grok {
    match => { "message" => "%{SYSLOGLINE}" }
  }
}

output {
    stdout{
        codec=>rubydebug
    }
  • port

指定tcp端口

codec编码插件

codec 的引入,使得 logstash 可以更好更方便的与其他有自定义数据格式的运维产品共存,比如 graphite、fluent、netflow、collectd,以及使用 msgpack、json、edn 等通用数据格式的其他产品等。

input{
    stdin {
    }
}
output{
    stdout {
         codec => "plain"
        }
}
  • plain

plain 是一个空的解析器,可以要用户自定指定格式,就是说输入是什么格式,输出就是什么格式

input {
    stdin {
        }
    }
output {
    stdout {
        codec => json
        }
  • json

将编码解析json格式,但是前提是收集的数据源最好也是json格式的

  • mutiline

有些时候,应用程序调试日志会包含非常丰富的内容,为一个事件打印出很多行内容。这种日志通常都很难通过命令行解析的方式做分析。

例如

input {
    stdin {
        codec => multiline {
            pattern => "^\["
            negate => true
            what => "previous"
        }
    }
}

其中patter代表匹配的正则表达式

negate,否定正则表达式模式(如果不匹配)。

 what ,如果模式匹配,事件是属于下一个事件还是上一个事件

fiter插件

Grok正则捕获

grok是一个十分强大的filter插件,可以通过正则解析任意的文本,将非结构化日志弄成结构化和方便查询的结构。他是目前解析非结构化日志数据中最好的方式。

官方有关于grok编写的最佳案例,详细参考https://github.com/elastic/logstash/tree/v1.4.2/patterns

语法规则是

%{语法:语义}

匹配举例

#原始日志信息
172.16.213.132 [07/Feb/2018:16:24:19 +0800] "GET / HTTP/1.1" 403 5039
#匹配的表达式
%{IP:clientip}\ \[%{HTTPDATE:timestamp}\]\ %{QS:referrer}\ %{NUMBER:response}\ %{NUMBER:bytes}

匹配表达中使用转义字符\,转义空格和中括号,将以上的原始日志信息用5个表达份表示

  • %{IP:clientip}

匹配结果clinetip:172.16.213.132,

  • \[%{HTTPDATE:timestamp}\]\  

匹配结果timestampe [07/Feb/2018:16:24:19 +0800]

  • %{QS:referrer}\

匹配结果"GET / HTTP/1.1

  • %{NUMBER:response}\

匹配结果403 

  • %{NUMBER:bytes}

匹配结果5039

logstash使用以上IP,HTTPDATE,QS,NUMBER等匹配模式字段来代替复杂的正则正则表达式。如下所示:

匹配模式              正则定义规则
NUMBER           (?:%{BASE10NUM})
HTTPDATE          %{MONTHDAY}/%{MONTH}/%{YEAR}:%{TIME} %{INT}
IP                (?:%{IPV6}|%{IPV4})
QS                %{QUOTEDSTRING}

logstash默认提供了近200个匹配模式,在目录/usr/local/logstash/vendor/bundle/jruby/1.9/gems/logstash-patterns-core-4.1.2/patterns中可以查看,官网也有定义好的内容,https://github.com/logstash-plugins/logstash-patterns-core/tree/main/patterns

当然,grok支持自定义正则表达式,语法如下

 ?<field_name>the pattern here)  #类似python的正则编写方式
 #举例
 (?<queue_id>[0-9A-F]{10,11}) #

以下是一个实际使用grok插件编写的配置文件

 input {
    stdin {}
}
filter {
    grok {
        match =>{ "message" =>"%{IP:clientip}\ \[%{HTTPDATE:timestamp}\]\ %{QS:referrer}\ %{NUMBER:response}\ %{NUMBER:bytes}" }
        remove_field =>[ "message" ]
   }
date {
        match =>["timestamp", "dd/MMM/yyyy:HH:mm:ss Z"]
    }
mutate {
            remove_field =>["timestamp"]
        }
}
output {
    stdout {
        codec =>"rubydebug"
    }
}

grok插件中选项:

1 match 匹配表达式,hash类型,{}

2 remove_field 删除字段,以上message匹配输出带有message信息且带有完整的信息,重复了

date插件,主要用来转换日志记录中时间字符串,编程logstash timestamp对象,然后转储到timestamp字段里面。一般原始日志中timestamp默认情况下是显示当前时间,而elasticsearch中也会使用timestamp,两者的时间会冲突共发生混乱。因此需要使用date插件。

mutate,重命名、移除、替换和修改事件中的字段。在使用date插件后,原始日志的timestamp就没有必要继续存在,使用mutate剔除原始日志中的timestamp字段。

date插件

使用date插件的原因,一方面是logastash会给收集到的每条日志自动打上时间戳timestamp,但是这个时间是input接收数据的时间,而不是日志生成的时间,会导致搜索数据时产生混乱;另外一方面,在codec编码格式输出中数据采用UTC时间,与我们时区并不一致。

以下时date插件的时间格式化含义

elk学习-logstash学习笔记


mutate数据修改

filters/mutate 插件是 Logstash 另一个重要插件。它提供了丰富的基础类型数据处理能力。包括类型转换,字符串处理和字段处理等。

convert 字段类型转换

以设置的转换类型包括:”integer”,”float” 和 “string

filter {
    mutate {
        convert => ["request_time", "float"]
    }
}

gsub正则表达式替换匹配字段

通过正则表达式替换字段中匹配的值,仅对字符串类型字段有效。

   gsub => ["urlparams", "/", "_"]

表示把urlparams字段中的/字符替换为"_"

split分隔字符串为数组

filter {
    mutate {
        split => ["message", "|"]
    }
}

表示将message字段以|符号分隔为数组

rename重命名

重命名某个字段,如果目的字段已经存在,会被覆盖掉

filter {
    mutate {
        rename => ["syslog_host", "host"]
    }
}

remove_field删除字段

删除某个字段

remove_filed =>["timestamp"]

filter插件配置举例

原始数据

2018-02-09T10:57:42+08:00|~|123.87.240.97|~|Mozilla/5.0 (iPhone; CPU iPhone OS 11_2_2 like Mac OS X) AppleWebKit/604.4.7 Version/11.0 Mobile/15C202 

logstash配置如下

input {
    stdin {}
}
filter {
    grok {
        match =>{ "message" =>"%{TIMESTAMP_ISO8601:localtime}\|\~\|%{IPORHOST:clientip}\|\~\|(%{GREEDYDATA:http_user_agent})\|\~\|(%{DATA:http_referer})\|\~\|%{GREEDYDATA:mediaid}\|\~\|%{GREEDYDATA:osid}" }
       remove_field =>[ "message" ]
   }
date {
        match =>["localtime", "yyyy-MM-dd'T'HH:mm:ssZZ"]
        target =>"@timestamp"
    }
mutate {
           remove_field =>["localtime"]
        }
}
output {
    stdout {
        codec =>"rubydebug"
    }

配置含义

1 从终端stdin输入数据

2 将输入日志内容分为6个字段

3 删除message字段

4 将输入日志的时间字段信息转存到timestampe字段里

5 删除日志的时间段

6 将输入内容以rubydebug格式在终端输出stdout

运行后,最后输出的结果如下:

{
         "@timestamp" =>2018-02-09T02:57:42.000Z,
       "http_referer" =>"http://m.sina.cn/cm/ads_ck_wap.html",
           "clientip" =>"123.87.240.97",
           "@version" =>"1",
               "host" =>"logstashserver",
               "osid" =>"DF0184266887D0E",
            "mediaid" =>"1460709836200",
    "http_user_agent" =>"Mozilla/5.0 (iPhone; CPU iPhone OS 11_2_2 like Mac OS X) AppleWebKit/604.4.7 Version/11.0 Mobile/15C202 Safari/604.1"
}

output插件

stdout

最基础和简单的输出插件,

output {
    stdout {
        codec => rubydebug
        workers => 2
    }
}

file

保存到一个文件中

output {
    file {
        path => "/path/to/%{+yyyy/MM/dd/HH}/%{host}.log.gz"
        message_format => "%{message}"
        gzip => true
    }
}

elasticsearch

输出到elasticsearh,是最常用的output插件

 output {
    elasticsearch {
        host =>["172.16.213.37:9200","172.16.213.77:9200","172.16.213.78:9200"]
        index =>"logstash-%{type}-%{+YYYY.MM.dd}"
        manage_template =>false
        template_name =>"template-web_access_log"
    }
}

选项含义

  • host

数组类型值,配置的是elasticsearch节点与端口,默认9200.

  • index

es的index索引名称,可以使用变量方式。在语法解析的时候,看到+开通的默认认为后面是时间格式。注意索引名称中不能有大写字母

  • manange_templagte

用来设置是否开启logstash自动管理模板功能

  • template_name

用来设置es中模板的名称


生产中logastash配置举例

收集kafka

input {
 kafka {
    bootstrap_servers => "tc-1:9092,tc-2:9092,tc-3:9092"
    topics => ["other-input-text"]
    type => "other-input-text"
    group_id => "new-other-input-text"
    #session_timeout_ms => "30000"
    #request_timeout_ms => "81000"
    #max_poll_records => "100"
    consumer_threads => 8
  }

 kafka {
    bootstrap_servers => "tc-2:9092,tc-1:9092,tc-3:9092"
    topics => ["classroom-input-text"]
    type => "classroom-input-text"
    group_id => "new-classroom-input-text"
    #max_poll_records => "100"
    consumer_threads => 8
  }

 kafka {
    bootstrap_servers => "tc-2:9092,tc-1:9092,tc-3:9092"
    topics => ["report-input-text"]
    type => "report-input-text"
    group_id => "new-report-input-text"
    #max_poll_records => "100"
    consumer_threads => 8
  }

 kafka {
    bootstrap_servers => "tc-2:9092,tc-1:9092,tc-3:9092"
    topics => ["action-input-text"]
    type => "action-input-text"
    group_id => "new-action-input-text"
    #max_poll_records => "100"
    consumer_threads => 8
  }

 kafka {
    bootstrap_servers => "172.26.5.79:9092"
    topics => ["svc_star"]
    type => "svc_star"
    group_id => "svc_star_pro_l"
    #max_poll_records => "100"
    consumer_threads => 3
  }

 kafka {
    bootstrap_servers => "172.26.5.79:9092"
    topics => ["quiz_info"]
    type => "quiz_info"
    group_id => "svc_star_pro_l"
    #max_poll_records => "100"
    consumer_threads => 3
  }

 kafka {
    bootstrap_servers => "172.26.5.79:9092"
    topics => ["quiz_submit"]
    type => "quiz_submit"
    group_id => "svc_star_pro_l"
    #max_poll_records => "100"
    consumer_threads => 3
  }

 kafka {
    bootstrap_servers => "172.26.5.79:9092"
    topics => ["classroom-final-star"]
    type => "classroom-final-star"
    group_id => "svc_star_pro_l"
    #max_poll_records => "100"
    consumer_threads => 3
  }


}

filter {
 if [type] == "other-input-text" or [type]=="classroom-input-text" or [type]=="report-input-text" or [type]=="action-input-text"{
      json {
           source => "message"
           remove_field => "message"
        }
  }

ruby {
        code => "event.set('index_date', event.get('[@timestamp]').time.localtime.strftime('%Y.%m.%d'))"
  }

 ruby {
        code => "event.set('index_date_hour', event.get('[@timestamp]').time.localtime.strftime('%H'))"
  }


}


output {
if [type] == "other-input-text" {
   elasticsearch {
     hosts => ["http://172.26.5.158:9200","http://172.26.5.161:9200","http://172.26.5.162:9200"]
     index => "other-input-text-%{index_date}"
   }
    file {
      path => "/data/upload_cos/other-input-text/date=%{index_date}/%{index_date_hour}.gz"
      gzip => true
   }

}

if [type] == "classroom-input-text" {
   elasticsearch {
     hosts => ["http://172.26.5.161:9200","http://172.26.5.162:9200","http://172.26.5.158:9200"]
     index => "classroom-input-text-%{index_date}"
   } 
    file {
      path => "/data/upload_cos/classroom-input-text/date=%{index_date}/%{index_date_hour}.gz"
      gzip => true
   }

}  

if [type] == "report-input-text" {
   elasticsearch {
     hosts => ["http://172.26.5.162:9200","http://172.26.5.158:9200","http://172.26.5.161:9200"]
     index => "report-input-text-%{index_date}"
   } 
    file {
      path => "/data/upload_cos/report-input-text/date=%{index_date}/%{index_date_hour}.gz"
      gzip => true
   }

}   

if [type] == "action-input-text" {
   elasticsearch {
     hosts => ["http://172.26.7.82:9200","http://172.26.5.161:9200","http://172.26.5.162:9200"]
     index => "action-input-text-%{index_date}"
   } 
    file {
      path => "/data/upload_cos/action-input-text/date=%{index_date}/%{index_date_hour}.gz"
      gzip => true
   }

}   

if [type] == "svc_star" {
    file {
      path => "/data/classroom_data/svc_star/date=%{index_date}/%{index_date_hour}.gz"
      gzip => true
   }   
}

if [type] == "classroom-final-star" {
    file {
      path => "/data/classroom_data/classroom-final-star/date=%{index_date}/%{index_date_hour}.gz"
      gzip => true
   }
}

if [type] == "quiz_info" {
    file {
      path => "/data/classroom_data/quiz_info/date=%{index_date}/%{index_date_hour}.gz"
      gzip => true
   }
}

if [type] == "quiz_submit" {
    file {
      path => "/data/classroom_data/quiz_submit/date=%{index_date}/%{index_date_hour}.gz"
      gzip => true
   }
}




}


input {
 kafka {
    bootstrap_servers => "172.26.5.83:9092"
    topics => ["nginx-sac"]
    type => "nginx-sac"
    group_id => "nginx-sac"
    #max_poll_records => "100"
    consumer_threads => 8
    codec => json {
                 charset => "UTF-8"
    }
  }

 kafka {
    bootstrap_servers => "172.26.5.83:9092"
    topics => ["nginx-h5"]
    type => "nginx-h5"
    group_id => "nginx-h5"
    #max_poll_records => "100"
    consumer_threads => 2
    codec => json {
        charset => "UTF-8"
    }
  }

 kafka {
    bootstrap_servers => "172.26.5.83:9092"
    topics => ["h5-app"]
    type => "h5-app"
    group_id => "h5-app"
    #max_poll_records => "100"
    consumer_threads => 1
    codec => json {
        charset => "UTF-8"
    }
  }

 kafka {
    bootstrap_servers => "172.26.5.83:9092"
    topics => ["nginx-omo"]
    type => "nginx-omo"
    group_id => "nginx-omo"
    #max_poll_records => "100"
    consumer_threads => 2
    codec => json {
                 charset => "UTF-8"
    }
  }

 kafka {
    bootstrap_servers => "172.26.5.83:9092"
    topics => ["nginx-opt"]
    type => "nginx-opt"
    group_id => "nginx-opt"
    #max_poll_records => "100"
    consumer_threads => 2
    codec => json {
                 charset => "UTF-8"
    }
  }


 kafka {
    bootstrap_servers => "172.26.5.83:9092"
    topics => ["nginx-xta"]
    type => "nginx-xta"
    group_id => "nginx-xta"
    #max_poll_records => "100"
    consumer_threads => 2
    codec => json {
                 charset => "UTF-8"
    }
  }

}
##############################################################
filter {
 if [type] == "nginx-sac" or [type] == "nginx-h5" or [type] =="h5-app" or [type] == "nginx-xta"  or [type] == "nginx-opt" or [type] =="nginx-omo" {
	json {
           source => "message"
           remove_field => ["message","[beat][name]","[beat][version]"]
        }
	geoip {
            source => "clientRealIp"
            fields => ["location", "country_name", "city_name","region_name"] 
        }
}


 ruby {
        code => "event.set('index_date', event.get('[@timestamp]').time.localtime.strftime('%Y.%m.%d'))"
  }
 ruby {
        code => "event.set('index_date_hour', event.get('[@timestamp]').time.localtime.strftime('%H'))"
  }


}
##############################################################
output {
if "nginx-api" in [tags] {
   elasticsearch {
     hosts => ["http://172.27.0.18:9200","http://172.27.0.40:9200","http://172.27.0.45:9200"]
     index => "nginx-api-%{index_date}"
   }
}
if "nginx-ms" in [tags] {
   elasticsearch {
     hosts => ["http://172.27.0.18:9200","http://172.27.0.40:9200","http://172.27.0.45:9200"]
     index => "nginx-ms-%{index_date}"
   }
}
if "nginx-roombox" in [tags] {
   elasticsearch {
     hosts => ["http://172.27.0.18:9200","http://172.27.0.40:9200","http://172.27.0.45:9200"]
     index => "nginx-roombox-%{index_date}"
   }
}

if "nginx-ppt" in [tags] {
   elasticsearch {
     hosts => ["http://172.27.0.40:9200","http://172.27.0.18:9200","http://172.27.0.45:9200"]
     index => "nginx-ppt-%{index_date}"
   }
}

if "nginx-h5-coursewaremgmt" in [tags] {
   elasticsearch {
     hosts => ["http://172.27.0.45:9200","http://172.27.0.40:9200","http://172.27.0.18:9200"]
     index => "nginx-h5-coursewaremgmt-%{index_date}"
   } 
}  

if "nginx-h5-icourseware" in [tags] {
   elasticsearch {
     hosts => ["http://172.27.0.45:9200","http://172.27.0.40:9200","http://172.27.0.18:9200"]
     index => "nginx-h5-icourseware-%{index_date}"
   } 
}  

if "h5-courseware" in [tags] {
   elasticsearch {
     hosts => ["http://172.27.0.45:9200","http://172.27.0.40:9200","http://172.27.0.18:9200"]
     index => "h5-courseware-%{index_date}"
   }
}

if "h5-html2pdf" in [tags] {
   elasticsearch { 
     hosts => ["http://172.27.0.45:9200","http://172.27.0.40:9200","http://172.27.0.18:9200"]
     index => "h5-html2pdf-%{index_date}"
   } 
}  

if "nginx-omo-ms" in [tags] {
   elasticsearch {
     hosts => ["http://172.27.0.40:9200","http://172.27.0.18:9200","http://172.27.0.45:9200"]
     index => "nginx-omo-ms-%{index_date}"
   }
}

if "nginx-opt-web" in [tags] {
   elasticsearch {
     hosts => ["http://172.27.0.40:9200","http://172.27.0.18:9200","http://172.27.0.45:9200"]
     index => "nginx-opt-%{index_date}"
   }
}

if [type] == "nginx-xta" {
   elasticsearch {
     hosts => ["http://172.27.0.40:9200","http://172.27.0.18:9200","http://172.27.0.45:9200"]
     index => "nginx-xta-%{index_date}"
   }
}

}

redis

input {
  redis {
	port => "6379"
	host => "172.27.0.48"
	db => 0
	data_type => "list"
	key => "java"
  } 

  redis {
        port => "6379"
        host => "172.27.0.48"
        db => 1
        data_type => "list"
        key => "nginx"
  }

  redis {
        port => "6379"
        host => "172.27.0.48"
        db => 2
        data_type => "list"
        key => "nodejs"
  }

}

filter {
  ruby {
        code => "event.set('index_date', event.get('[@timestamp]').time.localtime.strftime('%Y.%m.%d'))"
  }


 if  "nginx-stream" in [tags] {
      json {
           source => "message"
	   remove_field => ["message","[beat][name]","[beat][version]"]
        }
      	mutate {

		gsub => ["message", "\\x", "\\\x"]
	}

  }

 if "slb-java-bj" in [tags] or "slb-java-hk" in [tags]{
      json {
           source => "message"
           remove_field => ["[beat][name]","[beat][version]"]
        }
}  



 if "java-xxl-job-admin" in [tags]{
      grok {
              match => { "message" => "%{TIMESTAMP_ISO8601:log_date}\|%{GREEDYDATA:log_level}\|%{GREEDYDATA:log_thread}\|%{GREEDYDATA:log_logger}\|%{GREEDYDATA:log_file}\|%{GREEDYDATA:log_msg}"}
      }
       mutate {
                remove_field => ["message","[beat][name]","[beat][version]"]
        }
  }


 if "java-api-error" in [tags] or "java-schedule-error" in [tags] or "java-data-error" in [tags] or "java-stream-error" in [tags] or "java-ppt-to-h5-error" in [tags] or "java-omo-schedule-error" in [tags] or "java-omo-live-error" in [tags] or "java-api_login-error" in [tags] or "java-omo-login-error" in [tags]{
      grok {
              match => { "message" => "%{TIMESTAMP_ISO8601:log_date}\|%{GREEDYDATA:log_level}\|%{GREEDYDATA:log_thread}\|%{GREEDYDATA:log_class}\|%{GREEDYDATA:log_content}"}
         }
       mutate {
                remove_field => ["message","[beat][name]","[beat][version]"]
        }
  }

if  "java-api" in [tags] or "java-stream" in [tags] or "java-schedule" in [tags] or "java-ms" in [tags] or "java-data" in [tags] or "java-ppt-to-h5" in [tags] or "java-omo-schedule" in [tags] or "java-omo-live" in [tags] or "java-api_login" in [tags] or "java-omo-login" in [tags]{
      grok {
              match => { "message" => "%{TIMESTAMP_ISO8601:log_date}\|%{GREEDYDATA:log_level}\|%{GREEDYDATA:log_thread}\|%{GREEDYDATA:log_class}\|%{GREEDYDATA:log_content}"}
         }
       mutate {
                remove_field => ["message","[beat][name]","[beat][version]"]
        }
  }

}


output {
if "nginx-data" in [tags] {
   elasticsearch {
     hosts => ["http://172.27.0.18:9200","http://172.27.0.40:9200","http://172.27.0.45:9200"]
     index => "nginx-data-%{+YYYY.MM.dd}"
   }
}

if "nginx-roombox" in [tags] {
   elasticsearch {
     hosts => ["http://172.27.0.18:9200","http://172.27.0.40:9200","http://172.27.0.45:9200"]
     index => "nginx-roombox-%{+YYYY.MM.dd}"
   }
}

if "nginx-stream" in [tags] {
   elasticsearch {
     hosts => ["http://172.27.0.18:9200","http://172.27.0.40:9200","http://172.27.0.45:9200"]
     index => "nginx-stream-%{+YYYY.MM.dd}"
   }
}


if "nginx-h5-coursewaremgmt" in [tags] {
   elasticsearch {
     hosts => ["http://172.27.0.40:9200","http://172.27.0.40:9200","http://172.27.0.45:9200"]
     index => "nginx-h5-coursewaremgmt-%{+YYYY.MM.dd}"
   } 
}  

if "nginx-h5-icourseware" in [tags] {
   elasticsearch {
     hosts => ["http://172.27.0.40:9200","http://172.27.0.40:9200","http://172.27.0.45:9200"]
     index => "nginx-h5-icourseware-%{+YYYY.MM.dd}"
   } 
}  


if "slb-java-bj" in [tags] or "slb-java-hk" in [tags] {
   elasticsearch {
     hosts => ["http://172.27.0.40:9200","http://172.27.0.40:9200","http://172.27.0.45:9200"]
     index => "slb-java-%{+YYYY.MM.dd}"
   }
}


if [type] == "java-stream"{
   elasticsearch {
     hosts => ["http://172.27.0.18:9200","http://172.27.0.40:9200","http://172.27.0.45:9200"]
     index => "java-stream-%{index_date}"
   } 
}

if "java-api" in [tags] or "java-api-error" in [tags]{
   elasticsearch {
     hosts => ["http://172.27.0.18:9200","http://172.27.0.40:9200","http://172.27.0.45:9200"]
     index => "java-api-%{+YYYY.MM.dd}"
   } 
}  

if "java-api_login" in [tags] or "java-api_login-error" in [tags]{
   elasticsearch {
     hosts => ["http://172.27.0.18:9200","http://172.27.0.40:9200","http://172.27.0.45:9200"]
     index => "java-api_login-%{+YYYY.MM.dd}"
   } 
}  

if "java-data" in [tags] or "java-data-error" in [tags]{
   elasticsearch {
     hosts => ["http://172.27.0.18:9200","http://172.27.0.40:9200","http://172.27.0.45:9200"]
     index => "java-data-%{+YYYY.MM.dd}"
   }
}

if "java-ppt-to-h5" in [tags]{
   elasticsearch {
     hosts => ["http://172.27.0.18:9200","http://172.27.0.40:9200","http://172.27.0.45:9200"]
     index => "java-ppt-to-h5-%{+YYYY.MM.dd}"
   } 
}  

if "java-omo-live" in [tags] or "java-omo-live-error" in [tags]{
   elasticsearch {
     hosts => ["http://172.27.0.45:9200","http://172.27.0.40:9200","http://172.27.0.45:9200"]
     index => "java-omo-live-%{+YYYY.MM.dd}"
   }
}

if "java-omo-login" in [tags] or "java-omo-login-error" in [tags]{
   elasticsearch {
     hosts => ["http://172.26.6.82:9200","http://172.27.0.40:9200","http://172.27.0.45:9200"]
     index => "java-omo-login-%{+YYYY.MM.dd}"
   }
}


if "java-omo-schedule" in [tags] or "java-omo-schedule-error" in [tags]{
   elasticsearch {
     hosts => ["http://172.27.0.40:9200","http://172.27.0.40:9200","http://172.27.0.45:9200"]
     index => "java-omo-schedule-%{+YYYY.MM.dd}"
   }
}

if "java-omo-livekafka" in [tags] or "java-omo-livekafka-error" in [tags]{
   elasticsearch {
     hosts => ["http://172.27.0.40:9200","http://172.27.0.40:9200","http://172.27.0.45:9200"]
     index => "java-omo-livekafka-%{+YYYY.MM.dd}"
   }
}


if "java-xxl-job-admin" in [tags]{
   elasticsearch {
     hosts => ["http://172.27.0.40:9200","http://172.27.0.40:9200","http://172.27.0.45:9200"]
     index => "java-xxl-job-admin-%{+YYYY.MM.dd}"
   }
}

if "h5-courseware" in [tags] {
   elasticsearch {
     hosts => ["http://172.27.0.18:9200","http://172.27.0.40:9200","http://172.27.0.45:9200"]
     index => "h5-courseware-%{+YYYY.MM.dd}"
   }    
}  

if "h5-html2pdf" in [tags] {
   elasticsearch {
     hosts => ["http://172.27.0.18:9200","http://172.27.0.40:9200","http://172.27.0.45:9200"]
     index => "h5-html2pdf-%{+YYYY.MM.dd}"
   }
}


}


java

input {
 kafka {
    bootstrap_servers => "172.26.5.83:9092"
    topics => ["java-api"]
    type => "java-api"
    group_id => "java-api"
    #max_poll_records => "100"
    consumer_threads => 1
  }

 kafka {
    bootstrap_servers => "172.26.5.83:9092"
    topics => ["java-login"]
    type => "java-login"
    group_id => "java-login"
    #max_poll_records => "100"
    consumer_threads => 1
  }


 kafka {
    bootstrap_servers => "172.26.5.83:9092"
    topics => ["java-ms"]
    type => "java-ms"
    group_id => "java-ms"
    #max_poll_records => "100"
    consumer_threads => 1
  }

 kafka {
    bootstrap_servers => "172.26.5.83:9092"
    topics => ["java-class"]
    type => "java-class"
    group_id => "java-class"
    #max_poll_records => "100"
    consumer_threads => 1
  }

 kafka {
    bootstrap_servers => "172.26.5.83:9092"
    topics => ["java-stream"]
    type => "java-stream"
    group_id => "java-stream"
    #max_poll_records => "100"
    consumer_threads => 1
  }

 kafka {
    bootstrap_servers => "172.26.5.83:9092"
    topics => ["java-schedule"]
    type => "java-schedule"
    group_id => "java-schedule"
    #max_poll_records => "100"
    consumer_threads => 1
  }

 kafka {
    bootstrap_servers => "172.26.5.83:9092"
    topics => ["java-ppt-to-h5"]
    type => "java-ppt-to-h5"
    group_id => "java-ppt-to-h5"
    #max_poll_records => "100"
    consumer_threads => 1
  }

 kafka {
    bootstrap_servers => "172.26.5.83:9092"
    topics => ["java-xxl-job"]
    type => "java-xxl-job"
    group_id => "java-xxl-job"
    #max_poll_records => "100"
    consumer_threads => 1
  }

 kafka {
    bootstrap_servers => "172.26.5.83:9092"
    topics => ["java-internal-api"]
    type => "java-internal-api"
    group_id => "java-internal-api"
    #max_poll_records => "100"
    consumer_threads => 1
  }

 kafka {
    bootstrap_servers => "172.26.5.83:9092"
    topics => ["java-update-cache"]
    type => "java-update-cache"
    group_id => "java-update-cache"
    #max_poll_records => "100"
    #consumer_threads => 1
  }

 kafka {
    bootstrap_servers => "172.26.5.83:9092"
    topics => ["java-double-write"]
    type => "java-double-write"
    group_id => "java-double-write"
    #max_poll_records => "100"
    #consumer_threads => 1
  }

 kafka {
    bootstrap_servers => "172.26.5.83:9092"
    topics => ["java-dc-api"]
    type => "java-dc-api"
    group_id => "java-dc-api"
    #max_poll_records => "100"
    #consumer_threads => 1
  }

 kafka {
    bootstrap_servers => "172.26.5.83:9092"
    topics => ["java-omo-login"]
    type => "java-omo-login"
    group_id => "java-omo-login"
    #max_poll_records => "100"
    consumer_threads => 1
  }

 kafka {
    bootstrap_servers => "172.26.5.83:9092"
    topics => ["java-omo-livekafka"]
    type => "java-omo-livekafka"
    group_id => "java-omo-livekafka"
    #max_poll_records => "100"
    consumer_threads => 1
  }

 kafka {
    bootstrap_servers => "172.26.5.83:9092"
    topics => ["java-omo-openapi"]
    type => "java-omo-openapi"
    group_id => "java-omo-openapi"
    #max_poll_records => "100"
    consumer_threads => 1
  }

 kafka {
    bootstrap_servers => "172.26.5.83:9092"
    topics => ["java-omo-live"]
    type => "java-omo-live"
    group_id => "java-omo-live"
    #max_poll_records => "100"
    consumer_threads => 1
  }

 kafka {
    bootstrap_servers => "172.26.5.83:9092"
    topics => ["java-omo-schedule"]
    type => "java-omo-schedule"
    group_id => "java-omo-schedule"
    max_poll_records => "100"
    consumer_threads => 1
  }

 kafka {
    bootstrap_servers => "172.26.5.83:9092"
    topics => ["java-opt-backend"]
    type => "java-opt-backend"
    group_id => "java-opt-backend"
    #max_poll_records => "100"
    consumer_threads => 1
  }

 kafka {
    bootstrap_servers => "172.26.5.83:9092"
    topics => ["java-xta"]
    type => "java-xta"
    group_id => "java-xta"
    #max_poll_records => "100"
    consumer_threads => 1
  }

}

filter {
  ruby {
        code => "event.set('index_date', event.get('[@timestamp]').time.localtime.strftime('%Y.%m.%d'))"
  }
  ruby {
        code => "event.set('index_date_hour', event.get('[@timestamp]').time.localtime.strftime('%H'))"
  }

 if [type] == "java-stream"  or [type] == "java-schedule" or [type] == "java-dc-api" or [type] =="java-omo-login" or [type] =="java-internal-api" or [type] =="java-ppt-to-h5" or [type] =="java-update-cache" or [type]=="java-double-write" or [type]=="java-omo-livekafka" or [type]=="java-omo-openapi" or [type]=="java-omo-schedule" or [type]=="java-opt-backend" or [type]=="java-xta" {
        json {
           source => "message"
           #remove_field => ["message","[beat][name]","[beat][version]"]
        }
        dissect {
        mapping => { "message" => "%{log_date}|%{log_level}|%{log_thread}|%{log_class}|%{log_content}"}
        remove_field => ["message","[beat][name]","[beat][hostname]","[beat][version]"]
        }

     date {
        match => ["log_date", "yyyy-MM-dd HH:mm:ss,SSS"]
        target => "@timestamp"
     }
}

#add track_id
 if [type] == "java-api" or [type] == "java-ms" or [type] == "java-class" or [type] == "java-login" or [type]=="java-omo-live"{
        json {
           source => "message"
           #remove_field => ["message","[beat][name]","[beat][version]"]
        }
        dissect {
	mapping => { "message" => "%{log_date}|%{log_level}|%{log_thread}|%{track_id}|%{log_class}|%{log_content}"}
        remove_field => ["message","[beat][name]","[beat][hostname]","[beat][version]"]
        }

     date {
        match => ["log_date", "yyyy-MM-dd HH:mm:ss,SSS"]
        target => "@timestamp"
     }
}


 if [type] == "java-xxl-job"{
   json {
           source => "message"
        }
        dissect {
        mapping => { "message" => "%{log_date}|%{log_level}|%{log_thread}|%{log_logger}|%{log_file}|%{log_msg}"}
        remove_field => ["message","[beat][name]","[beat][hostname]","[beat][version]"]
        }

     date {
        match => ["log_date", "yyyy-MM-dd HH:mm:ss,SSS"]
        target => "@timestamp"
     }
}

###fiter end###
}

output {
if [type] =="java-api"{
   elasticsearch {
     hosts => ["http://172.27.0.18:9200","http://172.27.0.40:9200","http://172.27.0.45:9200"]
     index => "java-api-%{index_date}"
   } 
}  

if [type] =="java-login"{
   elasticsearch {
     hosts => ["http://172.27.0.40:9200","http://172.27.0.18:9200","http://172.27.0.45:9200"]
     index => "java-login-%{index_date}"
   } 
}  

if [type] == "java-stream"{
   elasticsearch {
     hosts => ["http://172.27.0.18:9200","http://172.27.0.40:9200","http://172.27.0.45:9200"]
     index => "java-stream-%{index_date}"
   } 
}

if [type] =="java-ms"{
   elasticsearch {
     hosts => ["http://172.27.0.18:9200","http://172.27.0.40:9200","http://172.27.0.45:9200"]
     index => "java-ms-%{index_date}"
   } 
}  

if [type] =="java-class"{
   elasticsearch {
     hosts => ["http://172.27.0.40:9200","http://172.27.0.18:9200","http://172.27.0.45:9200"]
     index => "java-class-%{index_date}"
   }
}

if "java-schedule" in [tags]{
   elasticsearch {
     hosts => ["http://172.27.0.18:9200","http://172.27.0.40:9200","http://172.27.0.45:9200"]
     index => "java-schedule-%{index_date}"
   } 
}  

if "java-ppt-schedule" in [tags]{
   elasticsearch {
     hosts => ["http://172.27.0.18:9200","http://172.27.0.40:9200","http://172.27.0.45:9200"]
     index => "java-schedule-%{index_date}"
   }
}


if "java-quiz-data-consumer" in [tags]{
   elasticsearch {
     hosts => ["http://172.27.0.18:9200","http://172.27.0.40:9200","http://172.27.0.45:9200"]
     index => "java-quiz-data-consumer-%{index_date}"
   }
}

if [type] == "java-internal-api"{
   elasticsearch {
     hosts => ["http://172.27.0.40:9200","http://172.27.0.18:9200","http://172.27.0.45:9200"]
     index => "java-internal-api-%{index_date}"
   } 
}

if [type] == "java-update-cache"{
   elasticsearch {
     hosts => ["http://172.27.0.45:9200","http://172.27.0.18:9200","http://172.27.0.40:9200"]
     index => "java-update-cache-%{index_date}"
   } 
}    

if [type] == "java-double-write"{
   elasticsearch {
     hosts => ["http://172.27.0.45:9200","http://172.27.0.18:9200","http://172.27.0.40:9200"]
     index => "java-double-write-%{index_date}"
   }
}

if [type] == "java-dc-api"{
   elasticsearch {
     hosts => ["http://172.27.0.45:9200","http://172.27.0.18:9200","http://172.27.0.40:9200"]
     index => "java-dc-api-%{index_date}"
   } 
}  

if [type] =="java-xxl-job"{
   elasticsearch {
     hosts => ["http://172.27.0.40:9200","http://172.27.0.18:9200","http://172.27.0.45:9200"]
     index => "java-xxl-job-admin-%{index_date}"
   } 
}  

if [type] == "java-ppt-to-h5"{
   elasticsearch {
     hosts => ["http://172.27.0.40:9200","http://172.27.0.18:9200","http://172.27.0.45:9200"]
     index => "java-ppt-to-h5-%{index_date}"
   } 
}  

if [type] == "java-omo-login"{
   elasticsearch {
     hosts => ["http://172.27.0.45:9200","http://172.27.0.40:9200","http://172.27.0.18:9200"]
     index => "java-omo-login-%{index_date}"
   } 
   file {
      path => "/data/upload_cos/java-omo-login/date=%{index_date}/%{index_date_hour}.gz"
      gzip => true
   }
}  

if [type] == "java-omo-livekafka"{
   elasticsearch {
     hosts => ["http://172.27.0.45:9200","http://172.27.0.40:9200","http://172.27.0.18:9200"]
     index => "java-omo-livekafka-%{index_date}"
   } 
   file {
      path => "/data/upload_cos/java-omo-livekafka/date=%{index_date}/%{index_date_hour}.gz"
      gzip => true
   } 
}

if [type] == "java-omo-openapi"{
   elasticsearch {
     hosts => ["http://172.27.0.45:9200","http://172.27.0.40:9200","http://172.27.0.18:9200"]
     index => "java-omo-openapi-%{index_date}"
   }
   file {
      path => "/data/upload_cos/java-omo-openapi/date=%{index_date}/%{index_date_hour}.gz"
      gzip => true
   } 
}

if [type] == "java-omo-live" {
   elasticsearch {
     hosts => ["http://172.27.0.45:9200","http://172.27.0.40:9200","http://172.27.0.18:9200"]
     index => "java-omo-live-%{+YYYY.MM.dd}"
   }
   file {
      path => "/data/upload_cos/java-omo-live/date=%{index_date}/%{index_date_hour}.gz"
      gzip => true
   }
}

if [type] == "java-omo-schedule" {
   elasticsearch {
     hosts => ["http://172.27.0.45:9200","http://172.27.0.40:9200","http://172.27.0.18:9200"]
     index => "java-omo-schedule-%{+YYYY.MM.dd}"
   }
}

if [type] == "java-opt-backend"{
   elasticsearch {
     hosts => ["http://172.27.0.45:9200","http://172.27.0.40:9200","http://172.27.0.18:9200"]
     index => "java-opt-backend-%{index_date}"
   }
}

if [type] == "java-xta"{
   elasticsearch {
     hosts => ["http://172.27.0.45:9200","http://172.27.0.40:9200","http://172.27.0.18:9200"]
     index => "java-xta-%{index_date}"
   }
} 

}