HDFS简单编程实例:文件合并

时间:2024-04-17 07:14:47

 下图显示了HDFS文件系统中路径为“localhost:50070/explorer.html#/user/hadoop”的目录中所有的文件信息:

对于该目录下的所有文件,我们将执行以下操作:

首先,从该目录中过滤出所有后缀名不为".abc"的文件。

然后,对过滤之后的文件进行读取。

最后,将这些文件的内容合并到文件“hdfs://localhost:9000/user/hadoop/merge.txt”中。

 

代码如下:

package mergeFile;

import java.io.IOException;
import java.io.PrintStream;
import java.net.URI;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;


class myPathFilter implements PathFilter{  //过滤掉文件名满足特定条件的文件
    String reg = null;
    myPathFilter(String reg){
        this.reg = reg;
    }
    public boolean accept(Path path) {
        if(!(path.toString().matches(reg)))
            return true;
        return false;
    }

}

public class merge {
    Path inputPath = null;  //待合并的文件所在的目录的路径
    Path outputPath = null; //输出文件的路径
    public merge(String input, String output){
        this.inputPath = new Path(input);
        this.outputPath = new Path(output);
    }
    public void doMerge() throws IOException{
        Configuration conf = new Configuration();
        conf.set("fs.defaultFS","hdfs://localhost:9000" );
        conf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");

        FileSystem fsSource = FileSystem.get(URI.create(inputPath.toString()),conf);
        FileSystem fsDst = FileSystem.get(URI.create(outputPath.toString()),conf);

        FileStatus[] sourceStatus = fsSource.listStatus(inputPath, new myPathFilter(".*\\.abc"));   //过滤掉目录中后缀为.abc的文件
        FSDataOutputStream fsdos = fsDst.create(outputPath);

        //下面分别读取过滤之后的每个文件的内容,并输出到同一个文件中
        for(FileStatus sta:sourceStatus){
            System.out.println("路径: " + sta.getPath() + "   文件大小: " + sta.getLen() + "   权限: " + sta.getPermission() + "  内容: ");
            FSDataInputStream fsdis = fsSource.open(sta.getPath());
            byte[] data = new byte[1024];
            int read = -1;
            PrintStream ps = new PrintStream(System.out);
            while((read = fsdis.read(data)) > 0){
                ps.write(data, 0, read);
                fsdos.write(data, 0, read);
            }
        }
        fsdos.close();
    }
    public static void main(String args[]) throws IOException{
        merge merge = new merge("hdfs://localhost:9000/user/hadoop/", "hdfs://localhost:9000/user/hadoop/merge.txt");
        merge.doMerge();
    }
}

执行结果: