应用于:对于不同用户创建的表目录,进行文件的下载,程序中执行hadoop cat命令 下载文件到本地,随后通过ftp传至目标服务器,并将hdfs文件目录的修改时间存入mysql中。每次修改前将mysql中记录的数据,与本批次下载的HDFS文件路径修改时间对比,如果改变,则决定是否下载文件:
入口:
package edm.spark.download.edm.spark.download; import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.util.Date;
import java.util.List;
import org.apache.hadoop.fs.Path; import edm.spark.download.edm.spark.util.HdfsFileProcessor;
import edm.spark.download.edm.spark.util.JdbcDirectUtils; public class FileDownload { public static void main(String[] args) throws Exception {
String local_path = args[0];//"/home/hdfs/ysy/";
String hdfs_path = args[1];//"hdfs://hdp/user/";
;
HdfsFileProcessor fileProcessor = new HdfsFileProcessor();
List<String> userLists = fileProcessor.getUserUnderFolder(hdfs_path);
List<Path> listPath = fileProcessor.getFileUnderFolder(userLists);
if (null != listPath && listPath.size() > 0) {
for (Path path : listPath) {
String pathName = path.toString();
String[] nameList = pathName.split("/");
String time = JdbcDirectUtils.DateTimeFormat(new Date());
String tableName = nameList[nameList.length - 1] + "_" + time
+ ".txt";
String userName = nameList[nameList.length - 3];
Process ps = null;
try {
// 提交本地进程
ps = Runtime.getRuntime().exec(
local_path + "download.sh " + pathName + " "
+ tableName + " " + userName);
System.out.println(local_path + "download.sh " + pathName
+ " " + tableName);
// 更新mysql中记录的时间
JdbcDirectUtils jdbcForTime = new JdbcDirectUtils();
long dateTime = jdbcForTime
.queryDate("select modify_time,path from download_time where path="
+ "'" + path.toString() + "'");
long insertTime = fileProcessor.getModifycationTime(path);
if (dateTime != 0) {
jdbcForTime.updateDateTime(insertTime, pathName);
} else {
// 第一次插入写入当前文件目录时间
jdbcForTime.insertDate(insertTime, path.toString());
}
jdbcForTime.destroy();
} catch (Exception e) {
e.printStackTrace();
}
BufferedReader br = new BufferedReader(new InputStreamReader(
ps.getInputStream()));
String line;
StringBuffer sb = new StringBuffer();
while ((line = br.readLine()) != null) {
sb.append(line).append("\n");
}
String result = sb.toString();
System.out.println(result);
ps.destroy();
}
} else {
System.out.println("no file to download"); }
// submit download cmd
}
}
HdfsFileProcessor:
package edm.spark.download.edm.spark.util; import java.io.IOException;
import java.sql.SQLException;
import java.util.List; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.security.AccessControlException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import com.google.common.collect.Lists; public class HdfsFileProcessor { static final Logger logger = LoggerFactory.getLogger(HdfsFileProcessor.class); protected FileSystem fileSystem; private Configuration conf; public HdfsFileProcessor(){
init();
} public void init(){
conf = new Configuration();
conf.addResource("resources/hdfs-site.xml");
conf.addResource("resources/core-site.xml");
try {
fileSystem = FileSystem.get(conf);
} catch (IOException e) {
logger.error("init error.......",e);
e.printStackTrace();
}
} public final boolean checkFile(String filePath){
boolean exists = false;
try{
Path path = new Path(filePath);
exists = fileSystem.exists(path);
}catch(IOException e){
logger.error("",e);
}catch(Exception e){
logger.error("",e);
}
return exists;
} public List<Path> getFileUnderFolder(List<String> names) throws IOException, SQLException{
JdbcDirectUtils jdbcForTime = new JdbcDirectUtils();
List<Path> paths = Lists.newArrayList();
for(String name : names){
Path folderPath = new Path("hdfs://hdp/user/" + name +"/");
if(fileSystem.exists(folderPath)){
try{
FileStatus[] fileStatus = fileSystem.listStatus(folderPath);
for(int i = 0; i< fileStatus.length;i++){
FileStatus fileStatu = fileStatus[i];
Path path = fileStatu.getPath();
if(path.toString().contains("tosas")){
FileStatus[] tableStatus = fileSystem.listStatus(path);
for(int j = 0; j < tableStatus.length;j++){
FileStatus tableStatu = tableStatus[i];
Path tablePath = tableStatu.getPath();
long modifycationTime = fileSystem.getFileStatus(tablePath).getModificationTime();
long dataTime = jdbcForTime.queryDate("select modify_time,path from download_time where path="
+"'"
+tablePath.toString()
+"'");
if(modifycationTime > dataTime){
paths.add(tablePath);
}
}
}
}
}catch(RemoteException e){
logger.error("",e);
}catch(AccessControlException e){
logger.error("",e);
}
}
} return paths;
} /**
* 查找文件目录属于哪个用户
* @param path
* @return
* @throws IOException
*/
public long getModifycationTime(Path path) throws IOException{
long modifycationTime = fileSystem.getFileStatus(path).getModificationTime();
return modifycationTime;
} public List<String> getUserUnderFolder(String Path) throws Exception{
List<String> userList = Lists.newArrayList();
Path userPath = new Path(Path);
if(fileSystem.exists(userPath)){
FileStatus[] fileStatus = fileSystem.listStatus(userPath);
for(int i = 0 ;i< fileStatus.length;i++){
FileStatus fileStatu = fileStatus[i];
String path = fileStatu.getPath().toString();
String pathes[] = path.split("/");
if(pathes.length > 4){
userList.add(pathes[4]);
}
}
}
return userList; } public void destory() throws IOException{
if(fileSystem != null){
fileSystem.close();
}
fileSystem = null;
}
}
JdbcDirectUtils:
package edm.spark.download.edm.spark.util; import java.io.IOException;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Map; import com.google.common.collect.Maps;
import com.mysql.jdbc.Connection;
import com.mysql.jdbc.Statement; public class JdbcDirectUtils { private static Connection conn ; private Statement stmt; private String file_dir = "/template/download_mysql.txt"; private Map<String,String> jdbcConfMap = Maps.newHashMap(); private LoadHdfsConf mysqlConf; public JdbcDirectUtils(){
initDriver();
} public void initDriver(){
try{
if(conn == null){
mysqlConf = new LoadHdfsConf();
jdbcConfMap = mysqlConf.readHdfsFile(file_dir);
Class.forName("com.mysql.jdbc.Driver");
String url = "jdbc:mysql://" + jdbcConfMap.get("url") + ":"
+ jdbcConfMap.get("port") + "/"
+ jdbcConfMap.get("schema") + "?user="
+ jdbcConfMap.get("user") + "@password="
+ jdbcConfMap.get("password")
+ "&useUnicode=true&characterEncoding="
+ jdbcConfMap.get("characterEncoding");
conn = (Connection) DriverManager.getConnection(url); }
}catch(ClassNotFoundException e){
e.printStackTrace();
}catch(IOException e){
e.printStackTrace();
}catch(SQLException e){
e.printStackTrace();
}
} /**
* 查询最新更新记录
* @param date
* @param path
* @throws SQLException
*/
public void updateDateTime(long date,String path) throws SQLException{
stmt.executeUpdate("update download_time set modify_time=" + date + "where path="+"'" + path + "'");
} public long queryDate(String sql) throws SQLException{
ResultSet rs = stmt.executeQuery(sql);
long dateTime = 0;
while(rs.next()){
dateTime = rs.getLong("modify_time");
}
return dateTime;
} public void insertDate(Long date,String path) throws SQLException{
stmt.executeUpdate("insert into download_time(path,modify_time) values " + "('" + path + "'" + "," + date + ")");
} /**
* String格式转Long
* @param date
* @return
*/
public long convert2Long(String date){
long time = 0;
String format = "yyyyMMdd";
SimpleDateFormat sf = new SimpleDateFormat(format);
try{
time = sf.parse(date).getTime();
}catch(java.text.ParseException e){
e.printStackTrace();
}
return time;
} public static String DateTimeFormat(Date date){
SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMdd");
String time = sdf.format(date);
return time;
} public void destroy() throws SQLException{
if(conn != null){
conn.close();
}
conn = null;
}
}
LoadHdfsConf:
package edm.spark.download.edm.spark.util; import java.io.IOException;
import java.io.InputStream;
import java.util.List;
import java.util.Map; import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import com.google.common.collect.Maps; public class LoadHdfsConf {
static final Logger logger = LoggerFactory.getLogger(LoadHdfsConf.class); protected FileSystem fileSystem; public final boolean checkFile(String filePath){
boolean exists = false;
try{
Path path = new Path(filePath);
exists = fileSystem.equals(path);
}catch(Exception e){
logger.error("",e);
}
return exists;
} public Map<String,String> readHdfsFile(String hdfsPath) throws IOException{
Configuration conf = new Configuration();
conf.addResource("resources/hdfs-site.xml");
conf.addResource("resources/core-site.xml");
fileSystem = FileSystem.get(conf);
Path path = new Path(hdfsPath);
InputStream in = fileSystem.open(path);
List<String> lines = IOUtils.readLines(in);
if(null == lines || lines.isEmpty()){
return null;
}
Map<String,String> map = Maps.newHashMap();
int rowNum = 0;
for(String line : lines){
rowNum++;
String[] content = line.split("=");
String code = content[0];
String value = content[1];
if(StringUtils.isEmpty(line) || StringUtils.isEmpty(value)){
logger.error("{}",rowNum,line);
continue;
}
map.put(code, value);
}
return map;
} }