HDFS-JavaAPI

时间:2023-03-10 06:12:28
HDFS-JavaAPI

一、pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion> <groupId>yofc</groupId>
<artifactId>root</artifactId>
<version>1.0-SNAPSHOT</version> <dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.9.2</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
</dependencies> <build>
<plugins>
<!-- 指定jdk -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.0</version>
<configuration>
<encoding>UTF-8</encoding>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
</project>

二、测试

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.BasicConfigurator;
import org.junit.After;
import org.junit.Before;
import org.junit.Test; import java.io.IOException;
import java.net.URI; public class HDFSClient { private Configuration conf;
private FileSystem fs; @Before
public void init() throws Exception {
// 设置 HADOOP_HOME 环境变量
System.setProperty("hadoop.home.dir", "D:/DevelopTools/hadoop-2.9.2/");
// 日志初始化
BasicConfigurator.configure(); conf = new Configuration();
//conf.set("fs.defaultFS", "hdfs://192.168.8.136:9000");
//fs = FileSystem.get(conf ); // 获取 hdfs 客户端对象,指定用户名,避免无权限
fs = FileSystem.get(new URI("hdfs://192.168.8.136:9000"), conf, "root");
} @After
public void close() throws IOException {
fs.close();
} // 在 hdfs 上创建文件夹
@Test
public void mkdirs() throws IOException {
fs.mkdirs(new Path("/10086/"));
}
}

HDFS-JavaAPI

文件上传

@Test
public void testCopyFromLocalFile() throws Exception{
fs.copyFromLocalFile(new Path("D:/MyFile/Downloads/Writage-1.12.msi"), new Path("/Writage-1.12.msi"));
}

手动 IO 流方式

@Test
public void putFileToHDFS() throws Exception{
// 获取输入流
FileInputStream fis = new FileInputStream(new File("D://MyFile/Downloads/Writage-1.12.msi"));
// 获取输出流
FSDataOutputStream fos = fs.create(new Path("/Writage-1.12.msi"));
// 流的对拷
IOUtils.copyBytes(fis, fos, conf);
// 关闭资源
IOUtils.closeStream(fos);
IOUtils.closeStream(fis);
}

文件下载

@Test
public void testCopyToLocalFile() throws Exception{
// fs.copyToLocalFile(new Path("/AAA.txt"), new Path("d:/BBB.txt"));
/**
* delSrc:是否删除原数据
* src:hdfs 路径
* dst:本地 路径
* useRawLocalFileSystem:crc 文件完整性校验
*/
fs.copyToLocalFile(false, new Path("/Writage-1.12.msi"), new Path("D://Writage.msi"), true);
}

手动 IO 流方式

@Test
public void getFileFromHDFS() throws Exception{
// 获取输入流
FSDataInputStream fis = fs.open(new Path("/Writage-1.12.msi"));
// 获取输出流
FileOutputStream fos = new FileOutputStream(new File("D://Writage.msi"));
// 流的对拷
IOUtils.copyBytes(fis, fos, conf);
// 关闭资源
IOUtils.closeStream(fos);
IOUtils.closeStream(fis);
}

分块方式,这里要下载的文件被 hdfs 切割成了 3 块

// 下载第一块
@Test
public void readFileSeek1() throws Exception{
// 获取输入流
FSDataInputStream fis = fs.open(new Path("/hadoop-2.9.2-win10-64.tar.gz"));
// 获取输出流
FileOutputStream fos = new FileOutputStream(new File("D://hadoop-2.9.2-win10-64.tar.gz"));
// 流的对拷(只拷贝128m)
byte[] buf = new byte[1024];
for (int i = 0; i < 1024 * 128; i++) {
fis.read(buf);
fos.write(buf);
}
// 关闭资源
IOUtils.closeStream(fos);
IOUtils.closeStream(fis);
}
// 下载第二块
@Test
public void readFileSeek2() throws Exception{
// 获取输入流
FSDataInputStream fis = fs.open(new Path("/hadoop-2.9.2-win10-64.tar.gz"));
// 设置指定读取的起点
fis.seek(1024*1024*128);
// 获取输出流
FileOutputStream fos = new FileOutputStream(new File("D://hadoop-2.9.2-win10-64.tar.gz2"));
// 流的对拷(只拷贝128m)
byte[] buf = new byte[1024];
for (int i = 0; i < 1024 * 128; i++) {
fis.read(buf);
fos.write(buf);
}
// 关闭资源
IOUtils.closeStream(fos);
IOUtils.closeStream(fis);
}
// 下载第三块
@Test
public void readFileSeek3() throws Exception{
// 获取输入流
FSDataInputStream fis = fs.open(new Path("/hadoop-2.9.2-win10-64.tar.gz"));
// 设置指定读取的起点
fis.seek(1024*1024*128*2);
// 获取输出流
FileOutputStream fos = new FileOutputStream(new File("D://hadoop-2.9.2-win10-64.tar.gz3"));
// 流的对拷
IOUtils.copyBytes(fis, fos, conf);
// 关闭资源
IOUtils.closeStream(fos);
IOUtils.closeStream(fis);
}

分块下载完毕后合并文件

# Windows 环境下

# 将 hadoop-2.9.2-win10-64.tar.gz2 追加到  hadoop-2.9.2-win10-64.tar.gz
type hadoop-2.9.2-win10-64.tar.gz2 >> hadoop-2.9.2-win10-64.tar.gz
# 将 hadoop-2.9.2-win10-64.tar.gz3 追加到 hadoop-2.9.2-win10-64.tar.gz
type hadoop-2.9.2-win10-64.tar.gz3 >> hadoop-2.9.2-win10-64.tar.gz # 最后 hadoop-2.9.2-win10-64.tar.gz 就是一个完整的文件了

文件删除

@Test
public void testDelete() throws Exception{
/**
* var1:hdfs 路径
* var2:是否递归删除,若为文件夹则必须为 true
*/
fs.delete(new Path("/Writage-1.12.msi"), true);
}

重命名

@Test
public void testRename() throws Exception{
// 把根目录的 10086 改为 mkmk
fs.rename(new Path("/10086/"), new Path("/mkmk/"));
}

查看文件详情

@Test
public void testListFiles() throws Exception{
// 递归获取根目录下的所有文件
RemoteIterator<LocatedFileStatus> listFiles = fs.listFiles(new Path("/"), true);
while(listFiles.hasNext()){
LocatedFileStatus fileStatus = listFiles.next();
// 文件名称
System.out.println(fileStatus.getPath().getName());
// 文件权限
System.out.println(fileStatus.getPermission());
// 文件长度
System.out.println(fileStatus.getLen());
// 文件块信息
BlockLocation[] blockLocations = fileStatus.getBlockLocations();
for (BlockLocation blockLocation : blockLocations) {
// 文件块所在的主机名
String[] hosts = blockLocation.getHosts();
for (String host : hosts) {
System.out.println(host);
}
}
System.out.println("-------------------");
}
}

判断是文件还是文件夹

@Test
public void testListStatus() throws Exception{
FileStatus[] listStatus = fs.listStatus(new Path("/"));
for (FileStatus fileStatus : listStatus) {
if (fileStatus.isFile()) {
System.out.println("文件:"+fileStatus.getPath().getName());
}else{
System.out.println("文件夹:"+fileStatus.getPath().getName());
}
}
}

Windows 运行 Hadoop 问题:https://wiki.apache.org/hadoop/WindowsProblems