Sample MultipleFileWordcount CombineFileInputFormat

时间:2021-10-25 01:59:31

在旧版本的samples中,使用的是旧的api,mapred下面的MultiFileInputFormat,现在已经过时。

现在推荐使用mapreduce下面的CombineInputFormat来处理。

应用场景:

如果文件数量大,而且单个文件又比较小,若是使用FileInputFormat进行分片,则会根据一个文件生成一个分片,

每个分片又丢给一个maptask,这样maptask处理的内容太小,很快就完成了,利用率不高,因为maptask本身启动

处理所占的时间和资源消耗就超过了信息处理本身所占的时间。推荐一个maptask至少运行一分钟左右。

解决方案:

使用combinefileinputformat来重定义了getSplits方法,这样可以根据我们指定的splitsize(一般是给定为blocksize大小,减少数据传输)

,打包多个小文件到一个inputsplit中去。这样减少了框架生成的maptask的数量。

示例:

例如我的englishwords目录下面有四个文件,使用wordcount示例来跑的话,默认生成4个maptask(不考虑失败又生成的maptask)一个reducetask.

使用旧版的api生成了2个maptask,使用新版的multiplefilewordcount示例生成了一个maptask.

CombineFileInputformat 中可以重写的一个重要方法是:

/**
* Specify the maximum size (in bytes) of each split. Each split is
* approximately equal to the specified size.
*/
protected void setMaxSplitSize(long maxSplitSize) {
this.maxSplitSize = maxSplitSize;
}

示例中又自己写了一个数据结构wordoffset, 是因为原来的只考虑一个文件(一个分片一个文件)中的信息,所以key是offset,value是当前行的值。

现在一个分片中会有多个文件,所以新的数据结构wordoffset就表示哪个文件的offset,这样更明晰。

有时候我们在项目中就需要自己定义maptask的参数。这个结构是需要实现writable接口的(可以序列化)。

使用CombineFileInputFormat最重要的就是实现 Reader的方法,Reader中最重要的就是next().

基本思路其实和单个文件的是类似的, 只是在这种情况下需要处理多个文件的情况,需要有一个index来标志是正在处理哪个文件。

一般在combineReader里面会有如下的代码:

public static class CombineFileLineRecordReader
extends RecordReader<WordOffset, Text> { private long startOffset; //offset of the chunk;
private long end; //end of the chunk;
private long pos; // current pos
private FileSystem fs;
private Path path;
private WordOffset key;
private Text value; private FSDataInputStream fileIn;
private LineReader reader; public CombineFileLineRecordReader(CombineFileSplit split,
TaskAttemptContext context, Integer index) throws IOException { this.path = split.getPath(index);
fs = this.path.getFileSystem(context.getConfiguration());
this.startOffset = split.getOffset(index);
this.end = startOffset + split.getLength(index);
boolean skipFirstLine = false; //open the file
fileIn = fs.open(path);
if (startOffset != 0) {
skipFirstLine = true;
--startOffset;
fileIn.seek(startOffset);
}
reader = new LineReader(fileIn);
if (skipFirstLine) { // skip first line and re-establish "startOffset".
startOffset += reader.readLine(new Text(), 0,
(int)Math.min((long)Integer.MAX_VALUE, end - startOffset));
}
this.pos = startOffset;
}

…………