Flume 读取JMS 消息队列消息,并将消息写入HDFS

时间:2023-03-09 20:45:15
Flume 读取JMS 消息队列消息,并将消息写入HDFS

利用Apache Flume 读取JMS 消息队列消息。并将消息写入HDFS,flume agent配置例如以下:

flume-agent.conf

#name the  components on this agent

  agentHdfs.sources  = jms_source

  agentHdfs.sinks =  hdfs_sink

  agentHdfs.channels  = mem_channel



  #  Describe/configure the source



 agentHdfs.sources.jms_source.type  = jms

# Bind to all interfaces

agentHdfs.sources.jms_source.initialContextFactory = org.apache.activemq.jndi.ActiveMQInitialContextFactory

agentHdfs.sources.jms_source.connectionFactory = ConnectionFactory

agentHdfs.sources.jms_source.destinationName = BUSINESS_DATA  #AMQ queue

agentHdfs.sources.jms_source.providerURL = tcp://hadoop-master:61616

agentHdfs.sources.jms_source.destinationType = QUEUE





# Describe  the sink

agentHdfs.sinks.hdfs_sink.type = hdfs

agentHdfs.sinks.hdfs_sink.hdfs.path hdfs://hadoop-master/data/flume/%Y-%m-%d/%H

agentHdfs.sinks.hdfs_sink.hdfs.filePrefix = %{hostname}/events-

agentHdfs.sinks.hdfs_sink.hdfs.maxOpenFiles = 5000

agentHdfs.sinks.hdfs_sink.hdfs.batchSize= 500

agentHdfs.sinks.hdfs_sink.hdfs.fileType = DataStream

agentHdfs.sinks.hdfs_sink.hdfs.writeFormat =Text

agentHdfs.sinks.hdfs_sink.hdfs.rollSize = 0

agentHdfs.sinks.hdfs_sink.hdfs.rollCount = 1000000

agentHdfs.sinks.hdfs_sink.hdfs.rollInterval = 600

agentHdfs.sinks.hdfs_sink.hdfs.useLocalTimeStamp = true





# Use a  channel which buffers events in memory



agentHdfs.channels.mem_channel.type  = memory

agentHdfs.channels.mem_channel.capacity  = 1000

agentHdfs.channels.mem_channel.transactionCapacity  = 100



# Bind the  source and sink to the channel

agentHdfs.sources.jms_source.channels  = mem_channel

agentHdfs.sinks.hdfs_sink.channel  = mem_channel