对fastdfs 文件清单进行检查,打印无效的文件
2017年12月12日 18:37:18 守望dfdfdf 阅读数:281 标签: fastdfssftpmysql 更多
个人分类: 工作 问题
编辑
版权声明:本文为博主原创文章,转载请注明文章链接。 https://blog.****.net/xiaoanzi123/article/details/78752333
需求说明:
写程序 获取所有的fastdfs文件目录清单,然后去数据库进行比对【数据库某个字段就是存的fastdfs文件的目录路径】,来确认 数据库是否存在对应的数据记录。 需求目的就是确认 文件服务器文件记录 和数据库 记录不一致的有哪些数据,把他们输出出来。
困难点:
1、从没有接触过fastdfs 分布式文件系统,对其了解始于现在,但是发现一篇很不错的fastdfs介绍文章,可以学习,链接如下:
http://blog.****.net/wallwind/article/details/39891105
2、不知道如何获取fastdfs文件列表。在网上查阅得知 没有此需求对应的api,但发现一篇文章,只不过是python实现的。如下:
https://www.cnblogs.com/xiaolinstudy/p/7777123.html python 才刚刚接触,一脸茫然。。。。。
目前已知的的是,在百度上了解:“”没有这样的API。如果确实需要这个功能,可以分析data/sync/目录下的binlog文件。“”
刚刚向老同学咨询下这个问题,真是一语点醒梦中人。他说把上面的python代码和这句话联系起来 。
解决问题的过程中自己只在表面,没有深入,如果想要深入长远发展下去,这种坏习惯要不得,自己以后必须要注意。
回归正题:
首先我要找到fastdfs所在的linux服务器,从没见过,我想看看这个binlog到底是个什么玩意,还有fastdfs配置的 base_path 路径我也不知道。
折腾半天才在运维系统平台找到文件服务器linux对应ip,又找人问了root 的密码。
我以为data目录 在 / 下,发现没有。那就先找base_path路径。
翻阅博客发现 在 /etc/fdfs 目录下,有storage.conf 和 tracker.conf 两个配置文件。
了解到tracker用于控制调度的,storage用于具体的执行。所以打开storage.conf查看。
【注:我第一次打开的tracker.conf,打开发现base_path,按照路径去找没有找到想要的binlog文件】,打开storage.conf,发现配置的base_path,
按照这个路径去找,果然发现了期待的data目录,data下有sync目录,打开sync,看到了期待的binlog文件。
binlig.index 打开发现为空,binlog.000打开,看到了期待的数据目录信息。
接下来,要在java中远程访问这个文件,把内容读取到本地,生成文件保存。参考了这篇博客的的代码:http://blog.****.net/qq_29663071/article/details/50497095
代码如下:【要引入jar包 j2ssh-core-0.2.9 我会上传到资源里面。】
/* * 远程访问 fastdfs 所在 linux服务器,获取 binlog.000 文件数据 */ public static void getBinlog() throws IOException{ SshClient client=new SshClient(); client.connect("1xxxxx87"); //此处是fastdfs 所在的 Linux服务器IP //设置用户名和密码 PasswordAuthenticationClient pwd = new PasswordAuthenticationClient(); pwd.setUsername("root"); pwd.setPassword("密码"); int result=client.authenticate(pwd); if(result==AuthenticationProtocolState.COMPLETE){ //如果连接完成 File file = new File("D:\\text.txt"); OutputStream os = new FileOutputStream(file); try{ client.openSftpClient().get("linux中binlog文件的绝对路径", os); }catch (Exception e) { e.printStackTrace(); System.out.println("获取binlog.000数据失败"); } } }
到此,成功把binlog数据储存到本地。接下来我的想法是把这个生成text.txt 文件,读取出来,每一条放入到一个list中,最后我遍历list,去和数据库中的记录对比来做验证。可是这个文件太大了,近350MB,sublime显示有616万条数据。我是第一次杰出这么大的数据量,有点慌。 ---------------------
果然,出问题了。代码如下
public static List<String> readFile(File fileRead){ 【注:已经把text.txt new 过了,作参数穿进来 】 List<String> listBinlog = new ArrayList<String>(); BufferedReader reader = null; try { //"以行为单位读取文件内容,一次读一整行:"); reader = new BufferedReader(new FileReader(fileRead)); String tempString = null; //一次读入一行,直到读入null为文件结束 while ((tempString = reader.readLine()) != null) { listBinlog.add(tempString);//我想把每一行数据放到list中。 } reader.close(); } catch (IOException e) { e.printStackTrace(); } finally { if (reader != null) { try { reader.close(); } catch (IOException e1) { } } } return listBinlog; }
很明显,六百多万条数据【一开始没在意到会那么多】,绝对的,内存溢出。
我就改为尝试下面两种方法
List<String> listBinlog = new ArrayList<String>(); //Java.util.Scanner类扫描文件的内容,一行一行连续地读取 FileInputStream inputStream = null; Scanner sc = null; try{ inputStream = new FileInputStream(path); sc = new Scanner(inputStream,"UTF-8"); while(sc.hasNextLine()) { String line = sc.nextLine(); listBinlog.add(line); } // note that Scanner suppresses exceptions if(sc.ioException() != null) { throw sc.ioException(); } }finally{ if(inputStream != null) { inputStream.close(); } if(sc != null) { sc.close(); } } return listBinlog; ----------------------------------------- List listBinlog = new ArrayList<String>(); //Apache Commons IO流 解决读取大文件 内存溢出问题 LineIterator it = FileUtils.lineIterator(fileRead, "UTF-8"); try{ while(it.hasNext()) { String line = it.nextLine(); listBinlog.add(line); } }finally{ LineIterator.closeQuietly(it); } return listBinlog;
----------------
上面的方法参考地址:http://blog.****.net/win7system/article/details/53747032
结果还是内存溢出。很明显,没什么改进,又尝试初始化链表时指定容量,还不行,之所以内存溢出,就是因为那么大的数据放在一个集合中导致的。
这时经理给我进行了指导,他的意思我把这些数据每一条存到本地数据库里面,添加flag等字段,标志该条数据的比对结果。如果放在list中,就算放得下没有内存溢出,
万一出现问题,之前的遍历白费了,还要重头开始,最好有个标记。
思路开始转变,之前我也没想过 还能去用本地数据库这一点。。。 我就在mysql建表,一个字段放想要存的 fastdfs文件dir ,另外一个字段作标志位。还是之前的一行一行读取数据,把读到的每一行数据处理下(只要自己想要的那一段字符串),存到数据库里面。这时候必须考虑用批处理,但是批处理也有容量啊,要不然肯定还是内存溢出,
参考了这篇文章:http://blog.****.net/babyniu411/article/details/47099783, 用事务 和 批处理 结合的形式类解决这个问题。
我的代码如下:
Class.forName("com.mysql.jdbc.Driver"); Connection coon = DriverManager.getConnection("jdbc:mysql://localhost:3306/binlogdata?" + "user=root&password=xxxxx&useUnicode=true&characterEncoding=UTF8"); //以行 为单位 读取存入本地的 这个文件,把 每行数据处理后存到mysql中 String allFileNamePath = "text.txt文件的本地路径"; File fileRead = new File(allFileNamePath); //System.out.println("存入数据库完毕"); FileInputStream inputStream = null; Scanner sc = null; inputStream = new FileInputStream(allFileNamePath); sc = new Scanner(inputStream,"UTF-8"); int i = 1; //多批次 按事务提交 coon.setAutoCommit(false); // 开始事务 Statement stmt = coon.createStatement(); while(sc.hasNextLine()) { String line = sc.nextLine(); String str = line.substring(13).trim(); String sql = "INSERT INTO BINLOGDATA (DIR,ISEXIT) VALUES ('"+str+"','0')"; stmt.addBatch(sql); i++; if((i%50000==0&& i!=0)){ stmt.executeBatch(); coon.commit(); coon.setAutoCommit(false);// 开始事务 stmt = coon.createStatement(); } } coon.commit();// 提交事务 System.out.println("存入数据库完毕"); stmt.close(); coon.close(); // note that Scanner suppresses exceptions if(sc.ioException() != null) { throw sc.ioException(); }
因为之前我也试了一下没用批处理的方式,速度对比简直天壤之别。
根据sublime打开显示,数据一共六百多万,我发现库里id出现九百多万的编号,肯定有问题。后发现是我建表的原因,因为我把di字段设为了自增长,由于之前清空了已经插入的数据,重新插入数据,id也就从六百多万开始计数了。取消自增长,此问题解决。但是上述代码出现一个问题,最后一批数据不足
50000的那一组并没有执行插入数据库的操作【通过获取文件总行数解决】。而且有一个大bug,i++的位置放错了,应该放到循环体的最后。。。。。
下面先放上最终提交的 所有代码 :
/* * mian()方法调用 compareFile(); 对FS 文件清单检查,打印无效的文件 * */ public static void compareFile() throws Exception { //getBinlog(); //获取 fastdfs 所在服务器 的binlog.000文件中的数据并存入本地文件中 Class.forName("com.mysql.jdbc.Driver"); //本地mysql数据库 String connect = "jdbc:mysql://localhost:3306/binlogdata?" + "user=root&password=XXXXX&useUnicode=true&characterEncoding=UTF8"; Connection coon = DriverManager.getConnection(connect); //对取回的数据文件 进行数据处理 存储到本地mysql 中 handingFile(coon); //从本地数据库获取数据,进行比对得出结果 //待完成 coon.close(); } /* * 读取 文件,数据处理后添加到本地数据库mysql中 */ public static void handingFile(Connection coon) throws ClassNotFoundException, SQLException, IOException{ //以行 为单位 读取存入本地的 这个文件,把 每行数据处理后存到mysql中 String allFileNamePath = "D:\\text.txt"; File fileRead = new File(allFileNamePath); FileInputStream inputStream = null; inputStream = new FileInputStream(allFileNamePath); //获取文件 行数 int lineNum = getLineNum(fileRead); //耗时20s @SuppressWarnings("resource") Scanner sc = new Scanner(inputStream,"UTF-8"); int i = 1; //多批次 按事务提交 coon.setAutoCommit(false); // 开始事务 String sql = "INSERT INTO BINLOGDATA (DIR,exitfastdfs,ID,exitdatabase) VALUES ( ? ,'1',?,'0')"; PreparedStatement stmt = coon.prepareStatement(sql); //预编译 while(sc.hasNextLine()) { if(i > lineNum){ break; } String line = sc.nextLine(); String str = line.substring(13).trim(); stmt.setString(1, str); stmt.setInt(2, i); stmt.addBatch(); if((i%500000==0&& i!=0 || i == lineNum)){ stmt.executeBatch(); coon.commit(); stmt.close(); coon.setAutoCommit(false);// 开始新的批次的事务 stmt = coon.prepareStatement(sql); } i++; } coon.commit();// 提交事务 System.out.println("存入数据库完毕"); coon.setAutoCommit(true); stmt.close(); //coon.close(); // note that Scanner suppresses exceptions if(sc.ioException() != null) { throw sc.ioException(); } } /* * 获取文件行数 */ public static int getLineNum(File file){ int index = 0; try { FileReader fileReader = new FileReader(file); BufferedReader bufferedReader = new BufferedReader(fileReader); while (bufferedReader.readLine()!=null){ index++; } } catch (IOException e) { e.printStackTrace(); } return index; } /* * 远程访问 fastdfs 所在 linux服务器,获取 binlog.000 文件数据 */ public static void getBinlog() throws IOException{ SshClient client=new SshClient(); client.connect("xxxxxxxx"); //此处是fastdfs 所在的 Linux服务器IP //设置用户名和密码 PasswordAuthenticationClient pwd = new PasswordAuthenticationClient(); pwd.setUsername("root"); pwd.setPassword("xxxxx"); int result=client.authenticate(pwd); if(result==AuthenticationProtocolState.COMPLETE){ //如果连接完成 File file = new File("D:\\text.txt"); OutputStream os = new FileOutputStream(file); try{ client.openSftpClient().get("binlog.000文件所在的服务器路径", os); }catch (Exception e) { e.printStackTrace(); System.out.println("获取binlog.000数据失败"); } } }
由于生产环境数据库无法连接,无法获取对应表中的字段来比对进行验证,所以这个任务暂时先完成到这一步。以后再说。
上述代码我还有两个疑问:
我是用分批次的事务和批出理 来吧数据填入本地mysql的。后来我把每次事务的提交改为50万,六百多万条数据大概15min左右。①这个addbatch有没有一个容量限制?合理容量是多大?
②每一个批次的事务提交后,我都把stmt关闭了,然后开启下一轮事务,在开新的stmt。从优化上讲,要不要这样,每一次关还是不关?不过connection我是最后关的,就一次。
stmt.executeBatch();
coon.commit();
stmt.close();
coon.setAutoCommit(false);// 开始新的批次的事务
stmt = coon.prepareStatement(sql);
这两个疑问有困惑,真心期待大佬赐教。若有错误欢迎指出,我一定认真接受。
-------------------------------------------------------分割线--------------------------------------------------------------
小总结:自己还是缺乏大量实战编程经验,踩过才知道有坑。因为很多地方并不是说很难,但是自己经验不足,总是犯一些低级错误,感觉路还很长很长。