Combine small files to Sequence file

时间:2023-03-08 21:46:36
Combine small files to Sequence file

Combine small files to sequence file or avro files are a good method to feed hadoop.

Small files in hadoop will take more namenode memory resource.

SequenceFileInputFormat 是一种Key value 格式的文件格式。

Key和Value的类型可以自己实现其序列化和反序列化内容。

SequenceFile示例内容:

Combine small files to Sequence file

其默认的key,value之间的分隔符 是\001,这个与hive文件的存储格式是匹配的,这样也方便直接把这种文件加载到hive里面。

以下的代码仅供参考作用,真实的项目中使用的时候,可以做适当的调整,以更高地节约资源和满足项目的需要。

示例代码如下:

package myexamples;

import java.io.File;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map; import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text; public class localf2seqfile {
/*
* Local folder has a lot of txt file
* we need handle it in map reduce
* so we want to load these files to one sequence file
* key: source file name
* value: file content
* */
static void write2Seqfile(FileSystem fs,Path hdfspath,HashMap<Text,Text> hm)
{
SequenceFile.Writer writer=null; try {
writer=SequenceFile.createWriter(fs, fs.getConf(), hdfspath, Text.class, Text.class); for(Map.Entry<Text,Text> entry:hm.entrySet())
writer.append(entry.getKey(),entry.getValue()); } catch (IOException e) {
e.printStackTrace();
}finally{
try{writer.close();}catch(IOException ioe){}
}
}
static HashMap<Text,Text> collectFiles(String localpath) throws IOException
{
HashMap<Text,Text> hm = new HashMap<Text,Text>();
File f = new File(localpath);
if(!f.isDirectory()) return hm;
for(File file:f.listFiles())
hm.put(new Text(file.getName()), new Text(FileUtils.readFileToString(file))); return hm;
}
static void readSeqFile(FileSystem fs, Path hdfspath) throws IOException
{
SequenceFile.Reader reader = new SequenceFile.Reader(fs,hdfspath,fs.getConf());
Text key = new Text();
Text value = new Text();
while (reader.next(key, value)) {
System.out.print(key +" ");
System.out.println(value);
}
reader.close(); }
public static void main(String[] args) throws IOException {
args = "/home/hadoop/test/sub".split(" "); Configuration conf = new Configuration();
conf.set("fs.default.name", "hdfs://namenode:9000"); FileSystem fs = FileSystem.get(conf);
System.out.println(fs.getUri());
Path file = new Path("/user/hadoop/seqfiles/seqdemo.seq");
if (fs.exists(file)) fs.delete(file,false);
HashMap<Text,Text> hm = collectFiles(args[0]);
write2Seqfile(fs,file,hm);
readSeqFile(fs,file); fs.close();
}
}