flume自定义Source(taildirSource),自定义Sink(数据库),开发完整步骤

时间:2023-03-09 20:34:43
flume自定义Source(taildirSource),自定义Sink(数据库),开发完整步骤

一、flume简单了解推荐网站(简介包括简单案例部署):  http://www.aboutyun.com/thread-8917-1-1.html

二、我的需求是实现从ftp目录下采集数据,目录下文件名称要符合特定正则,要求文件要一行一行读取并解析后写入数据库。且实现断点续传(服务重启后会从上次读的位置继续)。

  flume1.7.0中taildirSource实现的是监控目录下文件并且一行一行的读取,我只需选用这个source就可以实现。但是服务并不能直接部署在数据所在的服务器上,所以涉及到ftp的问题。这样就需要重写taildirSource,由于本人能力有限,并没有在源码的基础上修改。使用的别的实现方式:用taildirSource配置,但是使用流下载的方式读取数据。

三、直接走步骤,不足之处请指出,目前服务已经部署在使用,有改进优化之处欢迎留言(服务器1,服务器2)

  1.下载:从flume官网下载flume1.7.0  http://flume.apache.org/download.html

  2.安装:上传至服务器1目录/home/hadoop下,解压到apache-flume-1.7.0-bin(安装路径/home/hadoop自行定义)

  3.配置jdk:修改apache-flume-1.7.0-bin/conf/flume-env.sh.template中jdk的路径:eg: export JAVA_HOME=/usr/java/jdk1.8.0_92

  4.配置flume环境,在/etc/profile中增加:
      export FLUME_HOME=/安装路径/apache-flume-1.7.0-bin
      export PATH=$PATH:$FLUME_HOME/bin

  5.重启文件/etc/profile,使用命令source /etc/profile

  6.执行命令flume-ng version ,出现版本号说明安装成功

  -------以上步骤完成flume的安装,下面开始自定义source------

  7.写flume配置文件,在apache-flume-1.7.0-bin/conf/下新建taildirsource.conf

    agent1.sources = tail_dir_source
    agent1.sinks = taildirsink
    agent1.channels = channel1

    # source: tail_dir_source -----------------
    agent1.sources.tail_dir_source.type = com.esa.source.ftp.FtpSource3    //自定义类
    agent1.sources.tail_dir_source.channels = channel1
    agent1.sources.tail_dir_source.positionFile =/opt/flume/taildir_position.json  //json文件位置
    agent1.sources.tail_dir_source.filegroups =f1               //监控文件,配置多个以空格隔开(我是自己写的路径,这个属性没有用到)
    agent1.sources.tail_dir_source.filegroups.f1 =/Desktop/studyflume3/.*    //监控文件路径和正则(我是自己写的路径,这个属性没有用到)
    agent1.sources.tail_dir_source.filePath =/Desktop/studyflume3       //监控ftp的目录 (自己加的)
    agent1.sources.tail_dir_source.regex =.*                 //文件夹正则(自己加的)
    agent1.sources.tail_dir_source.batchSize = 100
    agent1.sources.tail_dir_source.backoffSleepIncrement  = 1000
    agent1.sources.tail_dir_source.maxBackoffSleep  = 5000
    agent1.sources.tail_dir_source.recursiveDirectorySearch = true
    agent1.sources.tail_dir_source.yarnApplicationHeader = true
    agent1.sources.tail_dir_source.yarnContainerHeader = true
    agent1.sources.tail_dir_source.ftpip = 192.160.111.211           //ftp地址
    agent1.sources.tail_dir_source.ftpport = 21                 //ftp端口
    agent1.sources.tail_dir_source.username = root              //用户名
    agent1.sources.tail_dir_source.password = root               //密码

    # Describe taildirsink
    agent1.sinks.taildirsink.type =com.esa.sink.db.OracleSink          //数据库驱动
    agent1.sinks.taildirsink.url=url                       //数据库url                 
    agent1.sinks.taildirsink.user=root                       //数据库用户名
    agent1.sinks.taildirsink.password=root                    //数据库密码
    agent1.sinks.taildirsink.regex=.*                      //数据解析规则
    agent1.sinks.taildirsink.sql=insert into table values(?,?,?)          //入库sql
    agent1.sinks.taildirsink.channel = channel1

    # Use a channel which buffers events in memory
    agent1.channels.channel1.type = memory
    agent1.channels.channel1.capacity = 1000
    agent1.channels.channel1.transactionCapactiy = 100

  8.新建maven工程,自定义taildirsource类FtpSource3继承AbstractSource,实现PollableSource, Configurable

public class FtpSource3 extends AbstractSource implements PollableSource, Configurable {

 private static final Logger logger = LoggerFactory.getLogger(FtpSource3.class);
    private String filePath;// 文件目录
    private String regex;// 匹配的正则
    private Table<String, String, String> headerTable;
    private int batchSize;
    private static String positionFilePath;// 校验文件目录
    private boolean skipToEnd;
    private boolean byteOffsetHeader;

    private SourceCounter sourceCounter;
    private FtpReliableTaildirEventReader reader;
    private ScheduledExecutorService idleFileChecker;
    private ScheduledExecutorService positionWriter;
    private int retryInterval = 1000;
    private int maxRetryInterval = 5000;
    private int idleTimeout;
    private int checkIdleInterval = 5000;
    private int writePosInitDelay = 5000;
    private int writePosInterval;
    private boolean cachePatternMatching;

    private Long backoffSleepIncrement;
    private Long maxBackOffSleepInterval;
    private boolean fileHeader;
    private String fileHeaderKey;

    private FTPClient ftp;// ftp对象

    private String ftpip;// 需要连接到的ftp端的ip

    private int ftpport;// 连接端口,默认21

    private String username;// 要连接到的ftp端的名字

    private String password;// 要连接到的ftp端的对应得密码
    private Map<String, Long> map;

  //获取配置文件中的配置

  @Override
    public void configure(Context context) {

        String fileGroups = context.getString(TaildirSourceConfigurationConstants.FILE_GROUPS);
        Preconditions.checkState(fileGroups != null,
                "Missing param: " + TaildirSourceConfigurationConstants.FILE_GROUPS);

        filePath = context.getString("filePath");// 文件目录
        regex = context.getString("regex");
        Preconditions.checkState(!filePath.isEmpty(), "Mapping for tailing files is empty or invalid: '"
                + TaildirSourceConfigurationConstants.FILE_GROUPS_PREFIX + "'");
        positionFilePath = context.getString("positionFile");// 写绝对路径

        headerTable = getTable(context, TaildirSourceConfigurationConstants.HEADERS_PREFIX);
        batchSize = context.getInteger(TaildirSourceConfigurationConstants.BATCH_SIZE,
                TaildirSourceConfigurationConstants.DEFAULT_BATCH_SIZE);
        skipToEnd = context.getBoolean(TaildirSourceConfigurationConstants.SKIP_TO_END,
                TaildirSourceConfigurationConstants.DEFAULT_SKIP_TO_END);
        byteOffsetHeader = context.getBoolean(TaildirSourceConfigurationConstants.BYTE_OFFSET_HEADER,
                TaildirSourceConfigurationConstants.DEFAULT_BYTE_OFFSET_HEADER);
        idleTimeout = context.getInteger(TaildirSourceConfigurationConstants.IDLE_TIMEOUT,
                TaildirSourceConfigurationConstants.DEFAULT_IDLE_TIMEOUT);
        writePosInterval = context.getInteger(TaildirSourceConfigurationConstants.WRITE_POS_INTERVAL,
                TaildirSourceConfigurationConstants.DEFAULT_WRITE_POS_INTERVAL);
        cachePatternMatching = context.getBoolean(TaildirSourceConfigurationConstants.CACHE_PATTERN_MATCHING,
                TaildirSourceConfigurationConstants.DEFAULT_CACHE_PATTERN_MATCHING);

        backoffSleepIncrement = context.getLong(PollableSourceConstants.BACKOFF_SLEEP_INCREMENT,
                PollableSourceConstants.DEFAULT_BACKOFF_SLEEP_INCREMENT);
        maxBackOffSleepInterval = context.getLong(PollableSourceConstants.MAX_BACKOFF_SLEEP,
                PollableSourceConstants.DEFAULT_MAX_BACKOFF_SLEEP);
        fileHeader = context.getBoolean(TaildirSourceConfigurationConstants.FILENAME_HEADER,
                TaildirSourceConfigurationConstants.DEFAULT_FILE_HEADER);
        fileHeaderKey = context.getString(TaildirSourceConfigurationConstants.FILENAME_HEADER_KEY,
                TaildirSourceConfigurationConstants.DEFAULT_FILENAME_HEADER_KEY);

        if (sourceCounter == null) {
            sourceCounter = new SourceCounter(getName());
        }

        ftpip = context.getString("ftpip");
        logger.info(ftpip + "---ftpip---");
        Preconditions.checkNotNull(ftpip, "ftpip must be set!!");
        ftpport = context.getInteger("ftpport");
        logger.info(ftpport + "---ftpport---");
        Preconditions.checkNotNull(ftpport, "ftpport must be set!!");
        username = context.getString("username");
        logger.info(username + "---username---");
        Preconditions.checkNotNull(username, "username must be set!!");
        password = context.getString("password");
        logger.info(password + "---password---");
    }

  //服务开始时执行一遍

  @Override
    public void start() {

        logger.info("{} TaildirSource source starting with directory: {}", getName(), filePath);
        logger.info("filePath==" + filePath);
        logger.info("positionFilePath==" + positionFilePath);
        

        // 先连接ftp
        ftp = new FTPClient();

        // 验证登录
        try {
            ftp.connect(ftpip, ftpport);
            System.out.println(ftp.login(username, password));

            ftp.setCharset(Charset.forName("UTF-8"));
            ftp.setControlEncoding("UTF-8");
            logger.info("fileHeaderKey==" + fileHeaderKey);
            logger.info("ftp==" + ftp);
             ftp.enterLocalActiveMode();//ftp客户端一定要切换主动模式,不然每次都会请求一个端口,ftp服务器端要设置成被动模式
             logger.info("转换路径是否成功" + ftp.changeWorkingDirectory("/"));
        } catch (IOException e) {
            e.printStackTrace();
        }
        // 创建json文件(用dom4j写的xml格式的文件,每个节点记录一个文件及位置)
        try {
            create();
        } catch (Exception e1) {
            e1.printStackTrace();
            logger.error("----创建文件失败----");
        }
        super.start();
        logger.debug("TaildirSource started");
        sourceCounter.start();

    }

@Override
    public Status process() throws EventDeliveryException {

        // 从ftp下载文件到本地
        Status status = Status.READY;
        logger.info("--进入主程序--");
        logger.info("--filePath--" + filePath);

        if (ftp.isConnected()) {
            logger.info("--ftp--" + ftp);
            try {
                FTPFile[] ftpFiles = null;
                // ftp.enterLocalPassiveMode();
                ftp.enterLocalActiveMode();
                boolean a = ftp.changeWorkingDirectory(filePath);
                logger.info("转换路径是否成功" + a);
                if (a) {
                    logger.info("---开始ftp.listFiles()---");
                    ftpFiles = ftp.listFiles(filePath);
                    logger.info("目录下文件个数==" + ftpFiles.length);

      //我这里规定好了文件结构,所以直接找结构里的文件。
                    for (int i = 0; i < ftpFiles.length; i++) {//ANH
                        if(ftpFiles[i].isDirectory()){
                            System.out.println("---isdirectory---" + ftpFiles[i]);
                            try {
                                System.out.println("---开始ftp.listFiles()---" + filePath + "/" + ftpFiles[i].getName());
                                FTPFile[] files = ftp.listFiles(filePath + "/" + ftpFiles[i].getName());
                                System.out.println("目录下文件个数---" + files.length);
                                for (int j = 0; j < files.length; j++) {//SESSIONLOG
                                    if(files[j].getName().equals("SESSIONLOG")){
                                        System.out.println("---isdirectory---" + "SESSIONLOG");
                                        System.out.println(ftp.changeWorkingDirectory(filePath + "/" + ftpFiles[i].getName() + "/SESSIONLOG"));
                                        FTPFile[] files1 = ftp.listFiles(filePath + "/" + ftpFiles[i].getName() + "/SESSIONLOG");
                                        for(FTPFile sFile : files1){
                                            System.out.println(filePath + "/" + ftpFiles[i].getName() + "/SESSIONLOG" + "下面的文件" + sFile);
                                            downloderFile(sFile, filePath + "/" + ftpFiles[i].getName() + "/SESSIONLOG");
                                        }
                                        
                                    }
                                    
                                }
                            } catch (IOException e) {
                                e.printStackTrace();
                            }
                        }
                        
                    }
                } else {
                    logger.error("连接的fpt上没有制定下载路径,没法下载数据!");
                }
            } catch (Exception e) {
                logger.error("下载失败!");
                logger.error("详情", e);
                status = Status.BACKOFF;
            }
        }
       
        return status;
    }

//下载文件

private void downloderFile(FTPFile ftpFile, String ftpFileDir) {
        // 筛选文件,下载那些文件
//        if (ftpFile.isDirectory()) {
//            logger.info("----isDirectory----");
//            try {
//                FTPFile[] files = ftp.listFiles(ftpFileDir + "/" + ftpFile.getName());
//                logger.info("---递归找文件---" + files.length);
//                for (int i = 0; i < files.length; i++) {
//                    System.out.println(files[i]);
//                    System.out.println(ftp.changeWorkingDirectory(ftpFileDir + "/" + ftpFile.getName()));
//                    ;
//                    downloderFile(files[i], ftpFileDir + "/" + ftpFile.getName());
//                }
//            } catch (IOException e) {
//                e.printStackTrace();
//            }
//        } else
        if(ftpFile.isFile()) {
            logger.info("---isfile---");
            if (ftpFile.getName().matches(regex)) {
                logger.info("---匹配成功---");
                logger.info("---ftpfile--" + ftpFile);
                map = new HashMap<>();
                // 把文件中数据加载到map中
                try {
                    SAXReader reader = new SAXReader();
                    Document doc = reader.read(new File(positionFilePath));
                    Element root = doc.getRootElement();
                    Element foo;
                    for (Iterator i = root.elementIterator("files"); i.hasNext();) {
                        foo = (Element) i.next();
                        if (!foo.elementText("name").isEmpty() && !foo.elementText("size").isEmpty()) {
                            long size = Long.parseLong(foo.elementText("size"));
                            map.put(foo.elementText("name"), size);
                        }

                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }

                logger.info("map====" + map);
                if (map.containsKey(ftpFile.getName())) {
                    logger.info("--- 进入包含---" + ftpFile.getName());
                    // 要修改文件中的值
                    readAppointedLineNumber(ftpFile, map.get(ftpFile.getName()));
                } else {
                    logger.info("--- 进入不包含---" + ftpFile.getName());
                    // 要写文件增加节点
                    InputStream iStream = null;
                    InputStreamReader inputStreamReader = null;
                    BufferedReader br = null;
                    try {

       // 这里定义一个字符流的输入流的节点流,用于读取文件(一个字符一个字符的读取)
                        iStream = ftp.retrieveFileStream(new String(ftpFile.getName().getBytes("gbk"),"utf-8"));                     
                        if (iStream != null) {
                            logger.info("---流不为空---");
                            inputStreamReader = new InputStreamReader(iStream);
                            br = new BufferedReader(inputStreamReader); // 在定义好的流基础上套接一个处理流,用于更加效率的读取文件(一行一行的读取)
                            long x = 0; // 用于统计行数,从0开始
                            String string = br.readLine();
                            while (string != null) { // readLine()方法是按行读的,返回值是这行的内容
                                x++; // 每读一行,则变量x累加1
                                getChannelProcessor().processEvent(EventBuilder.withBody(string.getBytes()));
                                string = br.readLine();
                            }
                            System.out.println(x);
                            try {
                                // 增加节点
                                add(ftpFile.getName(), x + "");
                            } catch (Exception e) {
                                e.printStackTrace();
                            }
                        }

                    } catch (IOException e) {
                        e.printStackTrace();
                    } finally {
                        if (br != null) {
                            try {
                                br.close();
                            } catch (IOException e) {
                                e.printStackTrace();
                            }
                        }
                        if (iStream != null) {
                            try {
                                iStream.close();
                            } catch (IOException e) {
                                e.printStackTrace();
                            }
                        }
                        if (inputStreamReader != null) {
                            try {
                                inputStreamReader.close();
                            } catch (IOException e) {
                                e.printStackTrace();
                            }
                        }
                        try {
                            ftp.getReply();// 加上这个防止第二次获取文件流为空的问题
                        } catch (IOException e) {
                            e.printStackTrace();
                        }
                    }

                }

            }
        }

    }

/**
     * 创建文件,存储文件及文件位置
     *
     * @param file
     * @throws Exception
     */
    public static void create() throws Exception {
        File file = new File(positionFilePath);
        if(file.exists()){
            logger.info("---json文件已存在---");
        }else{
            Document document = DocumentHelper.createDocument();
            Element root = document.addElement("rss");
            root.addAttribute("version", "2.0");
            XMLWriter writer = new XMLWriter(new FileOutputStream(new File(positionFilePath)), OutputFormat.createPrettyPrint());
            writer.setEscapeText(false);// 字符是否转义,默认true
            writer.write(document);
            writer.close();
            logger.info("---创建文件成功---");
        }
    }

    /**
     * 增加文件节点
     */
    public void add(String name, String length) throws Exception {
        logger.info("---开始增加节点---");
        SAXReader reader = new SAXReader();
        Document document = reader.read(new File(positionFilePath));
        Element root = document.getRootElement();
        try {
            Element resource = root.addElement("files");
            Element nameNode = resource.addElement("name");
            Element sizeNode = resource.addElement("size");
            nameNode.setText(name);
            sizeNode.setText(length);
            XMLWriter writer = new XMLWriter(new FileOutputStream(positionFilePath), OutputFormat.createPrettyPrint());
            writer.setEscapeText(false);// 字符是否转义,默认true
            writer.write(document);
            writer.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
        logger.info("---结束增加节点---");
    }

    /**
     * 更新节点
     *
     * @throws IOException
     */
    public void Update(String name, String size) throws IOException {
        logger.info("---开始文件更新---");
        SAXReader reader = new SAXReader();
        Document document = null;
        try {
            document = reader.read(new File(positionFilePath));
            Element root = document.getRootElement();
            Element foo;
            for (Iterator i = root.elementIterator("files"); i.hasNext();) {
                foo = (Element) i.next();

                if (name.equals(foo.elementText("name"))) {
                    foo.element("size").setText(size);

                }
            }
            XMLWriter writer = new XMLWriter(new FileOutputStream(positionFilePath), OutputFormat.createPrettyPrint());
            writer.setEscapeText(false);// 字符是否转义,默认true
            writer.write(document);
            writer.close();
        } catch (Exception e) {
            e.printStackTrace();
        }

        OutputFormat format = OutputFormat.createPrettyPrint();
        format.setEncoding("UTF-8");
        XMLWriter writer = new XMLWriter(new OutputStreamWriter(new FileOutputStream(positionFilePath)), format);
        writer.write(document);
        writer.close();
        logger.info("---文件更新成功---");
    }

@Override
    public void stop() {
        logger.info("----执行结束---");
        super.stop();
        if (ftp != null) {
            try {
                ftp.disconnect();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }

    }

    private Table<String, String, String> getTable(Context context, String prefix) {
        Table<String, String, String> table = HashBasedTable.create();
        for (Entry<String, String> e : context.getSubProperties(prefix).entrySet()) {
            String[] parts = e.getKey().split("\\.", 2);
            table.put(parts[0], parts[1], e.getValue());
        }
        return table;
    }

}

9.重写sink,新建类OracleSink 继承AbstractSink 实现Configurable

  public class OracleSink extends AbstractSink implements Configurable {

    private Logger logger = LoggerFactory.getLogger(OracleSink.class);
    private String driverClassName;
    private String url;
    private String user;
    private String password;
    private PreparedStatement preparedStatement;
    private Connection conn;
    private int batchSize;
    private String regex;
    private String sql;
    public OracleSink() {
        logger.info("---start---");
    }

    @Override
    public void configure(Context context) {
        url = context.getString("url");
        logger.info(url + "---url---");
        Preconditions.checkNotNull(url, "url must be set!!");
        driverClassName = context.getString("driverClassName");
        logger.info(driverClassName + "---driverClassName---");
        Preconditions.checkNotNull(driverClassName, "driverClassName must be set!!");
        user = context.getString("user");
        logger.info(user + "---user---");
        Preconditions.checkNotNull(user, "user must be set!!");
        password = context.getString("password");
        logger.info(password + "---password---");
        Preconditions.checkNotNull(password, "password must be set!!");
        regex = context.getString("regex");
        logger.info(regex + "---regex---");
        Preconditions.checkNotNull(regex, "regex must be set!!");
        sql = context.getString("sql");
        logger.info(sql + "---sql---");
        Preconditions.checkNotNull(sql, "sql must be set!!");
        batchSize = context.getInteger("batchSize", 100);
        logger.info(batchSize + "---batchSize---");
        Preconditions.checkNotNull(batchSize > 0, "batchSize must be a positive number!!");

    }

    @Override
    public void start() {
        super.start();
        try {
            //调用Class.forName()方法加载驱动程序
            Class.forName(driverClassName);
        } catch (ClassNotFoundException e) {
            e.printStackTrace();
        }
//        String url = "jdbc:oracle:thin:@" + hostname + ":" + port + ":" + databaseName;
        //调用DriverManager对象的getConnection()方法,获得一个Connection对象
        try {
            conn = DriverManager.getConnection(url, user, password);
            conn.setAutoCommit(false);
            //创建一个Statement对象
            preparedStatement = conn.prepareStatement(sql);

        } catch (SQLException e) {
            e.printStackTrace();
            System.exit(1);
        }

    }
    
    @Override
    public void stop() {
        logger.info("----执行结束---");
        super.stop();
        if (preparedStatement != null) {
            try {
                preparedStatement.close();
            } catch (SQLException e) {
                e.printStackTrace();
            }
        }

        if (conn != null) {
            try {
                conn.close();
            } catch (SQLException e) {
                e.printStackTrace();
            }
        }
    }
    
    
    @Override
    public Status process() throws EventDeliveryException {
        Status result = Status.READY;
        Channel channel = getChannel();
        Transaction transaction = channel.getTransaction();
        Event event;
        String content;

        List<Info> infos = Lists.newArrayList();
        transaction.begin();
        try {
            for (int i = 0; i < batchSize; i++) {
                event = channel.take();
                if (event != null) {//对事件进行处理
                    logger.info("---读取数据---");
                    logger.info("--数据库连接是否关闭--" + conn.isClosed());
                    content = new String(event.getBody());
                    Info info=new Info();
                    if(content != null && content != ""){
                        logger.info("---" + content + "---");
                        logger.info("---" + regex + "---");
                        //文件的字段顺序固定且有值?
                        Pattern p = Pattern.compile(regex);
                        Matcher m = p.matcher(content);
                        if (m.find()) {
                            logger.info("匹配上的长度==" + m.groupCount());
                            for (int j = 1; j <= m.groupCount(); j++) {
                                logger.info("---每个匹配后的值" + j + "=" + m.group(j) + "---");
                                switch (j) {
                                case 1:
                                    info.setProvinceid(m.group(j));
                                    break;
                                case 2:
                                    info.setProvincename(m.group(j));
                                    break;
                                case 3:
                                    info.setSessionid(m.group(j));
                                    break;
                                case 4:
                                    info.setMainacctid(m.group(j));
                                    break;
                                case 5:
                                    info.setMainacctname(m.group(j));
                                    break;
                                case 6:
                                    info.setSlaveacctid(m.group(j));
                                    break;
                                case 7:
                                    info.setOrgid(m.group(j));
                                    break;
                                case 8:
                                    info.setOrgname(m.group(j));
                                    break;
                                case 9:
                                    info.setClientaddr(m.group(j));
                                    break;
                                case 10:
                                    info.setResid(m.group(j));
                                    break;
                                case 11:
                                    info.setResip(m.group(j));
                                    break;
                                case 12:
                                    info.setResport(m.group(j));
                                    break;
                                case 13:
                                    info.setBelongsysid(m.group(j));
                                    break;
                                case 14:
                                    info.setBelongsysname(m.group(j));
                                    break;
                                case 15:
                                    info.setLogintime(m.group(j));
                                    break;
                                case 16:
                                    info.setLogouttime(m.group(j));
                                    break;
                                case 17:
                                    info.setVideoname(m.group(j));
                                    break;
                                case 18:
                                    info.setVideofiledir(m.group(j));
                                    break;
                                case 19:
                                    info.setReserve(m.group(j));
                                    break;

                                default:
                                    break;
                                }
                            }
                            infos.add(info);
                        }
                        
                    }
                } else {
                    result = Status.BACKOFF;
                    break;
                }
            }

            if (infos.size() > 0) {
                logger.info("infos的长度==" + infos.size());
                preparedStatement.clearBatch();
                for (Info temp : infos) {
                    preparedStatement.setString(1, temp.getProvinceid() != null ? temp.getProvinceid() : "");
                    preparedStatement.setString(2, temp.getProvincename() != null ? temp.getProvincename() : "");
                    preparedStatement.setString(3, temp.getSessionid() != null ? temp.getSessionid() : "");
                    preparedStatement.setString(4, temp.getMainacctid() != null ? temp.getMainacctid() : "");
                    preparedStatement.setString(5, temp.getMainacctname() != null ? temp.getMainacctname() : "");
                    preparedStatement.setString(6, temp.getSlaveacctid() != null ? temp.getSlaveacctid() : "");
                    preparedStatement.setString(7, temp.getOrgid() != null ? temp.getOrgid() : "");
                    preparedStatement.setString(8, temp.getOrgname() != null ? temp.getOrgname() : "");
                    preparedStatement.setString(9, temp.getClientaddr() != null ? temp.getClientaddr() : "");
                    preparedStatement.setString(10, temp.getResid() != null ? temp.getResid() : "");
                    preparedStatement.setString(11, temp.getResip() != null ? temp.getResip() : "");
                    preparedStatement.setString(12, temp.getResport() != null ? temp.getResport() : "");
                    preparedStatement.setString(13, temp.getBelongsysid() != null ? temp.getBelongsysid() : "");
                    preparedStatement.setString(14, temp.getBelongsysname() != null ? temp.getBelongsysname() : "");
                    preparedStatement.setString(15, temp.getLogintime() != null ? temp.getLogintime() : "");
                    preparedStatement.setString(16, temp.getLogouttime() != null ? temp.getLogouttime() : "");
                    preparedStatement.setString(17, temp.getVideoname() != null ? temp.getVideoname() : "");
                    preparedStatement.setString(18, temp.getVideofiledir() != null ? temp.getVideofiledir() : "");
                    preparedStatement.setString(19, temp.getReserve() != null ? temp.getReserve() : "");
                    preparedStatement.addBatch();
                }
                try{
                    preparedStatement.executeBatch();
                }catch(SQLException e){
                    logger.error("------批量执行sql错误");
                    e.printStackTrace();
                }
                

                conn.commit();
            }
            transaction.commit();
        } catch (Exception e) {
            e.printStackTrace();
            try {
                conn.rollback();
            } catch (SQLException e1) {
                e1.printStackTrace();
            }
            try {
                transaction.rollback();
            } catch (Exception e2) {
                logger.error("Exception in rollback. Rollback might not have been" +
                        "successful.", e2);
            }
            logger.error("Failed to commit transaction." +
                    "Transaction rolled back.", e);
            Throwables.propagate(e);
        } finally {
            transaction.close();
        }
        return result;
    }

}

10.把maven打包,放在apache-flume-1.7.0-bin/lib下,把依赖的包比如dom4j、ojdbc6也放进去。

11.运行flume,在apache-flume-1.7.0-bin写命令: 

  //运行flume(按ctrl+c可以停止服务)

  flume-ng agent -c conf -f conf/taildirsource.conf -n agent1 -Dflume.root.logger=info,console

  //后台运行flume(后台运行的flume,写命令ps  -ef|grep taildirsource.conf或者ps  -ef|grep flume ,获取进程id,然后kill掉)

  flume-ng agent -c conf -f conf/taildirsource.conf -n agent1 -Dflume.root.logger=info,console &