用java api读取HDFS文件

时间:2025-05-05 21:34:14
import java.io.IOException;
import java.io.InputStream;
import java.security.PrivilegedExceptionAction;
import java.text.SimpleDateFormat;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FsStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.UserGroupInformation;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.ResponseBody; import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import bean.TableStatistic; @Controller
@RequestMapping("/dfview")
public class DataFrameViewController extends BaseController { private ConcurrentMap<String, UserGroupInformation> cache = new ConcurrentHashMap<String, UserGroupInformation>();
private ConcurrentMap<String, FileSystem> fileSystemCache = new ConcurrentHashMap<String, FileSystem>();
private Configuration hadoopConf = new Configuration();
private static final String HDFS_JSON_NAME = "jsonObj"; @RequestMapping(value = "/getDFviewOfColumn", method = { RequestMethod.GET })
@ResponseBody
public TableStatistic getDFviewOfTable(String tableName)
throws Exception {
String user = "bi";
String dirpath = "/user/cbt/datax/temp_transfer/zzzdes";
Path homePath = new Path(dirpath);
FileSystem fs = this.createFileSystem(user);
FileStatus[] stats = fs.listStatus(homePath);
StringBuffer txtContent = new StringBuffer();
for (int i = 0; i < stats.length; ++i) {
if (stats[i].isFile()) {
FileStatus file = stats[i];
if( HDFS_JSON_NAME.equalsIgnoreCase(file.getPath().getName())){
InputStream in = fs.open(file.getPath());
byte[] b = new byte[1];
while (in.read(b) != -1)
{
// 字符串拼接
txtContent.append(new String(b));
}
in.close();
break;
}
}
}
TableStatistic ts = JSON.parseObject(txtContent.toString(), TableStatistic.class);
return ts;
} public static void main(String[] args) throws Exception {
DataFrameViewController aaa = new DataFrameViewController();
FileSystem fs = aaa.createFileSystem("bi");
Path homePath = new Path("/user/cbt/datax/temp_transfer/zzzdes");
System.out.println("***********************************");
FileStatus[] stats = fs.listStatus(homePath);
for (int i = 0; i < stats.length; ++i) {
if (stats[i].isFile()) {
FileStatus file = stats[i];
StringBuffer txtContent = new StringBuffer();
if( "jsonObj".equalsIgnoreCase(file.getPath().getName())){
InputStream in = fs.open(file.getPath());
byte[] b = new byte[1];
while (in.read(b) != -1)
{
// 字符串拼接
txtContent.append(new String(b));
}
// IOUtils.copyBytes(fs.open(file.getPath()), System.out, 4096,false);
in.close();
// fs.close();
}
System.out.print(txtContent.toString());
System.out
.println("************************************************");
JSONObject jb = JSON.parseObject(txtContent.toString());
System.out.println("********!!!!! : " + jb.get("colUnique"));
TableStatistic ts = JSON.parseObject(txtContent.toString(), TableStatistic.class);
System.out.println("********!!!!! : " + ts.getColUnique().toString()); } else if (stats[i].isDirectory()) {
System.out.println(stats[i].getPath().toString());
} else if (stats[i].isSymlink()) {
System.out.println("&&&&&&&&" + stats[i].getPath().toString());
} }
FsStatus fsStatus = fs.getStatus(homePath);
} public FileSystem createFileSystem(String user) throws Exception {
final Configuration conf = loadHadoopConf();
conf.set("hadoop.job.ugi", user);
// conf.set("HADOOP_USER_NAME", user);
if (fileSystemCache.get(user) != null) {
return fileSystemCache.get(user);
}
UserGroupInformation ugi = getProxyUser(user);
FileSystem fs = ugi.doAs(new PrivilegedExceptionAction<FileSystem>() {
public FileSystem run() throws Exception {
return FileSystem.get(conf);
}
});
fileSystemCache.put(user, fs);
return fs;
} public static final ThreadLocal<SimpleDateFormat> appDateFormat = new ThreadLocal<SimpleDateFormat>() {
@Override
public SimpleDateFormat initialValue() {
SimpleDateFormat dateformat = new java.text.SimpleDateFormat(
"yyyy-MM-dd HH:mm:ss");
return dateformat;
}
}; private static final String[] HADOOP_CONF_FILES = { "core-site.xml",
"hdfs-site.xml" }; private Configuration loadHadoopConf() {
if (hadoopConf != null) {
return hadoopConf;
}
Configuration conf = new Configuration();
for (String fileName : HADOOP_CONF_FILES) {
try {
InputStream inputStream = DataFrameViewController.class
.getClassLoader().getResourceAsStream(fileName);
conf.addResource(inputStream);
} catch (Exception ex) {
}
}
return conf;
} public void destroy() {
for (UserGroupInformation ugi : cache.values()) {
try {
FileSystem.closeAllForUGI(ugi);
} catch (IOException ioe) {
// Logger.error("Exception occurred while closing filesystems for "
// + ugi.getUserName(), ioe);
}
}
cache.clear();
} private UserGroupInformation getProxyUser(String user) throws IOException {
cache.putIfAbsent(user, UserGroupInformation.createRemoteUser(user));
return cache.get(user);
}
}