flume采集ftp上传文件到hadoop

时间:2024-05-18 22:45:13

flume服务器环境

centos 7.2

jdk 1.8

flume 1.8

hadoop平台环境

centos7.2

ambari 2.6.1

hdp 2.6.4

jdk 1.8

一、 ftp安装与配置

参考:https://blog.****.net/qq_39160721/article/details/80250975

二、flume安装与配置

参考 :https://blog.****.net/qq_39160721/article/details/80255194

三、创建flume配置文件

创建flumeftp.conf配置文件:

# Namethe components on this agent

ftpagent.sources= busS metroS busGPSS 

ftpagent.sinks= busK metroK busGPSK

ftpagent.channels= busC metroC busGPSC

#Describe/configure the source

#bus info

ftpagent.sources.busS.channels= busC

ftpagent.sources.busS.type= spooldir

ftpagent.sources.busS.spoolDir= /usr/local/ftpdir/busInfo

ftpagent.sources.busS.fileHeader= true

#metroinfo

ftpagent.sources.metroS.channels= metroC

ftpagent.sources.metroS.type= spooldir

ftpagent.sources.metroS.spoolDir= /usr/local/ftpdir/metroinfo

ftpagent.sources.metroS.fileHeader= true

#bus GPS

ftpagent.sources.busGPSS.channels= busGPSC

ftpagent.sources.busGPSS.type= spooldir

ftpagent.sources.busGPSS.spoolDir= /usr/local/ftpdir/busGPS

ftpagent.sources.busGPSS.fileHeader= true

 

# Use achannel which buffers events in file

#bus info

ftpagent.channels.busC.type= memory

ftpagent.channels.busC.capacity=1000

ftpagent.channels.busC.transactionCapacity=100

#metroinfo

ftpagent.channels.metroC.type= memory

ftpagent.channels.metroC.capacity=1000

ftpagent.channels.metroC.transactionCapacity=100

#bus GPS

ftpagent.channels.busGPSC.type= file

ftpagent.channels.busGPSC.checkpointDir= /usr/local/flumeftp/checkpoint

ftpagent.channels.busGPSC.dataDirs= /usr/local/flumeftp/data

#Describe the sink

#bus info

ftpagent.sinks.busK.channel= busC

ftpagent.sinks.busK.type= hdfs

ftpagent.sinks.busK.hdfs.path= hdfs://10.250.11.52:8020/source/flume/ftp/busInfo

ftpagent.sinks.busK.hdfs.writeFormat= Text

ftpagent.sinks.busK.hdfs.fileType= DataStream

ftpagent.sinks.busK.hdfs.rollInterval= 10

ftpagent.sinks.busK.hdfs.rollSize= 0

ftpagent.sinks.busK.hdfs.rollCount= 0

ftpagent.sinks.busK.hdfs.filePrefix= %Y-%m-%d-%H-%M-%S

ftpagent.sinks.busK.hdfs.useLocalTimeStamp= true

#metroinfo

ftpagent.sinks.metroK.channel= metroC

ftpagent.sinks.metroK.type= hdfs

ftpagent.sinks.metroK.hdfs.path= hdfs://10.250.11.52:8020/source/flume/ftp/metroinfo

ftpagent.sinks.metroK.hdfs.writeFormat= Text

ftpagent.sinks.metroK.hdfs.fileType= DataStream

ftpagent.sinks.metroK.hdfs.rollInterval= 10

ftpagent.sinks.metroK.hdfs.rollSize= 0

ftpagent.sinks.metroK.hdfs.rollCount= 0

ftpagent.sinks.metroK.hdfs.filePrefix= %Y-%m-%d-%H-%M-%S

ftpagent.sinks.metroK.hdfs.useLocalTimeStamp= true

#bus GPS

ftpagent.sinks.busGPSK.channel= busGPSC

ftpagent.sinks.busGPSK.type= hdfs

ftpagent.sinks.busGPSK.hdfs.path= hdfs://10.250.11.52:8020/source/flume/ftp/busGPS

ftpagent.sinks.busGPSK.hdfs.writeFormat= Text

ftpagent.sinks.busGPSK.hdfs.fileType= DataStream

ftpagent.sinks.busGPSK.hdfs.rollInterval= 10

ftpagent.sinks.busGPSK.hdfs.rollSize= 0

ftpagent.sinks.busGPSK.hdfs.rollCount= 0

ftpagent.sinks.busGPSK.hdfs.filePrefix= %Y-%m-%d-%H-%M-%S

ftpagent.sinks.busGPSK.hdfs.useLocalTimeStamp= true

四、测试

1) ftp上传文件

flume采集ftp上传文件到hadoop

2) 运行flume

>flume-ng agent --conf /usr/local/flume/conf  --conf-file /usr/local/flume/conf/flumeftp.conf  --name ftpagent -Dflume.root.logger=INFO,console

3)出现的错误与解决方法

  • ava.lang.IllegalStateException: Directory does not exist: /usr/local/ftpdir/busInfo

        解决方法:路径区分大小写,修改将配置中的/usr/local/ftpdir/busInfo 改为 /usr/local/ftpdir/businfo

  • ava.lang.NoClassDefFoundError: org/apache/hadoop/io/SequenceFile$CompressionType 
flume采集ftp上传文件到hadoop
        解决方法:这是因为没有相应的的jar包导致的, 将/usr/hdp/2.6.4.0-91/hadoop/client下的jar拷贝到 /usr/flume/lib/目录下就可以。或者将Hadoop安装包下/share/ common和/share/ common/lib的jar包拷贝到/usr/flume/lib/
  •   Permission denied: user=root, access=WRITE, inode="/source/flume/ftp/busInfo/2018-05-09-15-56-47.1525852607458.tmp":hdfs:hdfs:drwxr-xr-x

       解决方法:开放hdfs中文件夹权限:sudo -u hdfs hadoop fs -chmod -R 1777/source/flume/ftp

  • The channel is full, and cannot write data now. The source will try again after 4000 milliseconds
      解决方法:

        将flumeftp.conf配置文件中capacity、transactionCapacity的配置100改为10000

        ftpagent.channels.busC.capacity=10000

        ftpagent.channels.busC.transactionCapacity=10000

4)查看结果

hadoop平台中文件


flume采集ftp上传文件到hadoop
ftp文件

flume采集ftp上传文件到hadoop