【Flume NG用户指南】(2)构造

时间:2022-02-05 03:07:40
作者:周邦涛(Timen)

Email:zhoubangtao@gmail.com

转载请注明出处:  http://blog.csdn.net/zhoubangtao/article/details/28277575

上一篇请參考【Flume NG用户指南】(1)设置

3. 配置

前边的文章已经介绍过了,Flume Agent配置是从一个具有分层属性的Java属性文件格式的文件里读取的。

3.1 定义数据流

要在一个Flume Agent中定义数据流,你须要通过一个Channel将Source和Sink连接起来。你须要列出给定Agent的Source、Sink和Channel。一个Source能够指定多个Channel,可是一个Sink仅仅能指定一个Channel。格式例如以下:

# list the sources, sinks and channels for the agent
<Agent>.sources = <Source>
<Agent>.sinks = <Sink>
<Agent>.channels = <Channel1> <Channel2> # set channel for source
<Agent>.sources.<Source>.channels = <Channel1> <Channel2> ... # set channel for sink
<Agent>.sinks.<Sink>.channel = <Channel1>

比如,一个叫做agent_foo的Agent从一个外部的Avroclient读取数据,然后通过Memory Channel将数据发送到HDFS上。

配置文件例如以下:

# list the sources, sinks and channels for the agent
agent_foo.sources = avro-appserver-src-1
agent_foo.sinks = hdfs-sink-1
agent_foo.channels = mem-channel-1 # set channel for source
agent_foo.sources.avro-appserver-src-1.channels = mem-channel-1 # set channel for sink
agent_foo.sinks.hdfs-sink-1.channel = mem-channel-1

这个配置将会使Event通过一个叫mem-channel-1的Memory Channel从avro-AppSrv-source流向hdfs-Cluster1-sink。当Agent使用此配置文件启动的时候。它就会实例化这个数据流。

3.2 配置各自的组件

定义数据流之后,你须要设置每个Source、Sink和Channel的属性。属性位于每个组件类型配置的层次命名空间下。

# properties for sources
<Agent>.sources.<Source>.<someProperty> = <someValue> # properties for channels
<Agent>.channel.<Channel>.<someProperty> = <someValue> # properties for sinks
<Agent>.sources.<Sink>.<someProperty> = <someValue>

Flume的每个组件都须要设置“type”属性,以便理解究竟须要的是那种组件对象。每个Source、Sink和Channel类型都有它自己的属性集。这些属性都须要依据须要设置。就像前边的一个通过mem-channel-1的Memory Channel从avro-AppSrv-source流向hdfs-Cluster1-sink的数据流的样例。下边是这些组件配置的样例:

agent_foo.sources = avro-AppSrv-source
agent_foo.sinks = hdfs-Cluster1-sink
agent_foo.channels = mem-channel-1 # set channel for sources, sinks # properties of avro-AppSrv-source
agent_foo.sources.avro-AppSrv-source.type = avro
agent_foo.sources.avro-AppSrv-source.bind = localhost
agent_foo.sources.avro-AppSrv-source.port = 10000 # properties of mem-channel-1
agent_foo.channels.mem-channel-1.type = memory
agent_foo.channels.mem-channel-1.capacity = 1000
agent_foo.channels.mem-channel-1.transactionCapacity = 100 # properties of hdfs-Cluster1-sink
agent_foo.sinks.hdfs-Cluster1-sink.type = hdfs
agent_foo.sinks.hdfs-Cluster1-sink.hdfs.path = hdfs://namenode/flume/webdata #...

3.3 一个Agent中加入多个数据流

一个简单的Flume Agent能够包括多个独立的数据流配置。你能够在配置中列出多个Source、Sink和Channel。这些组件能够互联起来形成数据流:

# list the sources, sinks and channels for the agent
<Agent>.sources = <Source1> <Source2>
<Agent>.sinks = <Sink1> <Sink2>
<Agent>.channels = <Channel1> <Channel2>

之后你就能够连接Source和Sink到它们各自的Channel上,从而形成不同的数据流。比如。假设你须要在一个Agent中设置两个数据流,一个从外部Avroclient流向外部HDFS,还有一个从tail的输出流向Avro Sink。下边就是实现这么个数据流的配置:

# list the sources, sinks and channels in the agent
agent_foo.sources = avro-AppSrv-source1 exec-tail-source2
agent_foo.sinks = hdfs-Cluster1-sink1 avro-forward-sink2
agent_foo.channels = mem-channel-1 file-channel-2 # flow #1 configuration
agent_foo.sources.avro-AppSrv-source1.channels = mem-channel-1
agent_foo.sinks.hdfs-Cluster1-sink1.channel = mem-channel-1 # flow #2 configuration
agent_foo.sources.exec-tail-source2.channels = file-channel-2
agent_foo.sinks.avro-forward-sink2.channel = file-channel-2

3.4 配置一个多Agent数据流

为设置一个多层的数据流,你须要在第一跳上使用一个Avro/Thrift Sink指向下一跳的Avro/Thrift Source。这就会使第一个Flume Agent传输Event到下一个Flume Agent。比如,假设你在使用Avroclient周期性的发送文件(每一个Event包括一个文件)到本地Flume Agent,然后本地Agent又把Event传输到还有一个和终于存储系统挂载的Flume Agent上。

看下边的样例

Weblog Agent配置:

# list sources, sinks and channels in the agent
agent_foo.sources = avro-AppSrv-source
agent_foo.sinks = avro-forward-sink
agent_foo.channels = file-channel # define the flow
agent_foo.sources.avro-AppSrv-source.channels = file-channel
agent_foo.sinks.avro-forward-sink.channel = file-channel # avro sink properties
agent_foo.sources.avro-forward-sink.type = avro
agent_foo.sources.avro-forward-sink.hostname = 10.1.1.100
agent_foo.sources.avro-forward-sink.port = 10000 # configure other pieces
#...

HDFS Agent配置:

# list sources, sinks and channels in the agent
agent_foo.sources = avro-collection-source
agent_foo.sinks = hdfs-sink
agent_foo.channels = mem-channel # define the flow
agent_foo.sources.avro-collection-source.channels = mem-channel
agent_foo.sinks.hdfs-sink.channel = mem-channel # avro sink properties
agent_foo.sources.avro-collection-source.type = avro
agent_foo.sources.avro-collection-source.bind = 10.1.1.100
agent_foo.sources.avro-collection-source.port = 10000 # configure other pieces
#...

这里我们连接Weblog Agent的avro-forward-sink到HDFS Agent的avro-collection-source。这就会使来自外部appserver Source的Event终于存储到HDFS中。

3.5 扇出数据流

前边的章节已经讨论过,Flume支持从一个Source扇出数据到多个Channel中。有两种扇出模式:复制(replicating)和多路(multiplexing)。

在复制流中。Event被发送到全部配置Channel中。

在多路流中。Event被发送到一个特定的子集。为扇出数据流。你须要为这个Source指定一个Channel列表以及扇出的策略。这是通过加入一个Channel “selector”(能够是复制或者多路)实现的。然后假设多路的模式,还需进一步指定选择规则。假设不指定“selector”。默认是复制:

# List the sources, sinks and channels for the agent
<Agent>.sources = <Source1>
<Agent>.sinks = <Sink1> <Sink2>
<Agent>.channels = <Channel1> <Channel2> # set list of channels for source (separated by space)
<Agent>.sources.<Source1>.channels = <Channel1> <Channel2> # set channel for sinks
<Agent>.sinks.<Sink1>.channel = <Channel1>
<Agent>.sinks.<Sink2>.channel = <Channel2> <Agent>.sources.<Source1>.selector.type = replicating

多路选择另一些其它的配置来使数据流分叉。这就须要指定一个Event属性到一个Channel集合的映射。

选择器会检查Event Header上配置的每个属性。假设它匹配了指定的值,那么这个Event就会被发送到跟这个值映射的全部Channel上。

假设没有不论什么匹配。这个Event就会被发送到发送到默认配置的Channel集合上:

# Mapping for multiplexing selector
<Agent>.sources.<Source1>.selector.type = multiplexing
<Agent>.sources.<Source1>.selector.header = <someHeader>
<Agent>.sources.<Source1>.selector.mapping.<Value1> = <Channel1>
<Agent>.sources.<Source1>.selector.mapping.<Value2> = <Channel1> <Channel2>
<Agent>.sources.<Source1>.selector.mapping.<Value3> = <Channel2>
#... <Agent>.sources.<Source1>.selector.default = <Channel2>

为每一个值的映射的Channel集合同意重叠。

下边的样例是一个单一流,它多路输出到两个路径上。名叫agent_foo的Agent有一个单一的Avro Source和两个Channel连接到两个Sink上:

# list the sources, sinks and channels in the agent
agent_foo.sources = avro-AppSrv-source1
agent_foo.sinks = hdfs-Cluster1-sink1 avro-forward-sink2
agent_foo.channels = mem-channel-1 file-channel-2 # set channels for source
agent_foo.sources.avro-AppSrv-source1.channels = mem-channel-1 file-channel-2 # set channel for sinks
agent_foo.sinks.hdfs-Cluster1-sink1.channel = mem-channel-1
agent_foo.sinks.avro-forward-sink2.channel = file-channel-2 # channel selector configuration
agent_foo.sources.avro-AppSrv-source1.selector.type = multiplexing
agent_foo.sources.avro-AppSrv-source1.selector.header = State
agent_foo.sources.avro-AppSrv-source1.selector.mapping.CA = mem-channel-1
agent_foo.sources.avro-AppSrv-source1.selector.mapping.AZ = file-channel-2
agent_foo.sources.avro-AppSrv-source1.selector.mapping.NY = mem-channel-1 file-channel-2
agent_foo.sources.avro-AppSrv-source1.selector.default = mem-channel-1

多路选择器检查Event中一个叫做“State”的Header属性。假设它的值是“CA”,这个Event就会被发送到mem-channel-1,假设它是“AZ”,这个Event就会被发送到file-channel-2,或者假设它是“NY”。那么这个Event就会被发送到这两个Channel上。假设“State” Header属性没有设置或者没有匹配上以上3个的不论什么一个,这个Event就被发送到mem-channel-1上,它是默认的。

多路选择器也支持可选的Channel。为了为Header指定可选的Channel,“optional”配置參数须要像下边的方式一样使用:

# channel selector configuration
agent_foo.sources.avro-AppSrv-source1.selector.type = multiplexing
agent_foo.sources.avro-AppSrv-source1.selector.header = State
agent_foo.sources.avro-AppSrv-source1.selector.mapping.CA = mem-channel-1
agent_foo.sources.avro-AppSrv-source1.selector.mapping.AZ = file-channel-2
agent_foo.sources.avro-AppSrv-source1.selector.mapping.NY = mem-channel-1 file-channel-2
agent_foo.sources.avro-AppSrv-source1.selector.optional.CA = mem-channel-1 file-channel-2
agent_foo.sources.avro-AppSrv-source1.selector.mapping.AZ = file-channel-2
agent_foo.sources.avro-AppSrv-source1.selector.default = mem-channel-1

多路选择器将会首先尝试向必需的Channel上写入。即使这些必需的Channel中有不论什么一个没有成功消费Event,这整个事务将会失败。

事务将会在全部的必需的Channel上重试,一旦多有必需的Channel都成功的消费了Event,多路选择器才会尝试向可选的Channel上写入。而且可选的Channel中有不论什么消费Event失败的。Flume也会简单忽略它而且不会重试。

假设一个特殊Header的可选Channel集合和必选Channel集合有重叠的,那么这些Channel就被觉得是必选的,那自然在这些Channel的失败会导致全部Channel的重试。比如。上边的样例中。“CA” Header相应的mem-channel-1就被觉得是必选的Channel。虽然它同一时候被标记为必选和可选的,对这个Channel的写入失败将会导致跟这个选择器关联的全部Channel上重试。

注意假设一个Header并没有配置不论什么必选的Channel,那么这个Event将会被写入默认的Channel上。而且将会尝试写入到跟这个Header关联的可选Channel上。

假设指定了可选的Channel,而没有指定必选的Channel,依旧会导致Event被发送到默认的Channel上。假设没有Channel被指定为默认的而且也没有必选的,选择器会尝试将Event写入到可选的Channel中。

这样的情况下。不论什么的失败都会被简单忽略而且不在重试。

下一篇请參考【Flume NG用户指南】(3)Flume Sources

作者:周邦涛(Timen)

Email:zhoubangtao@gmail.com

转载请注明出处:  http://blog.csdn.net/zhoubangtao/article/details/28277575