在Hadoop中重写FileInputFormat类以处理二进制格式存储的整数

时间:2023-03-08 23:28:47
在Hadoop中重写FileInputFormat类以处理二进制格式存储的整数

近期開始使用MapReduce,发现网上大部分样例都是对文本数据进行处理的,也就是说在读取输入数据时直接使用默认的TextInputFormat进行处理就可以。对于文本数据处理,这个类还是能满足一部分应用场景。可是假设要处理以二进制形式结构化记录存储的文件时,这些类就不再适合了。

本文以一个简单的应用场景为例:对依照二进制格式存储的整数做频数统计。当然,也能够在此基础上实现排序之类的其它应用。实现该应用的主要难点就是怎样处理输入数据。參考《权威指南·第三版》得知须要继承FileInputFormat这个类,并实现下面三个方法:

class MyInputFormat extends FileInputFormat<Type1, Type2> {
/*
* 查询推断当前文件能否够分块?"true"为能够分块,"false"表示不进行分块
*/
protected boolean isSplitable(Configuration conf, Path path) { } /*
* MapReduce的client调用此方法得到全部的分块,然后将分块发送给MapReduce服务端。
* 注意,分块中不包括实际的信息,而仅仅是对实际信息的分块信息。详细的说,每一个分块中
* 包括当前分块相应的文件路径,当前分块在该文件里起始位置,当前分块的长度以及相应的
* 实际数据所在的机器列表。在实现这个函数时,将这些信息填上就可以。
* */
public List<InputSplit> getSplits(Configuration conf) throws IOException {
} /*
* 类RecordReader是用来创建传给map函数的Key-Value序列,传给此类的參数有两个:一个分块(split)和作业的配置信息(context).
* 在Mapper的run函数中能够看到MapReduce框架运行Map的逻辑:
* public void run(Context context) throws IOException, InterruptedException {
* setup(context);
* 调用RecordReader方法的nextKeyValue,生成新的键值对。假设当前分块(Split)中已经处理完成了,则nextKeyValue会返回false.退出run函数
* while (context.nextKeyValue()) {
* map(context.getCurrentKey(), context.getCurrentValue(), context);
* }
* cleanup(context);
* }
**/
public RecordReader<LongWritable, IntWritable> createRecordReader(InputSplit split, TaskAttemptContext context)
throws IOException, InterruptedException {
}
}

在RecordReader函数中实现下面几个接口:

public class BinRecordReader extends RecordReader<LongWritable, IntWritable> {
/*关闭文件流
* */
public void close() {} /*
* 获取处理进度
**/
public float getProgress() {} /*
* 获取当前的Key
* */
public LongWritable getCurrentKey() throws IOException,
InterruptedException {} /* 获取当前的Value
* */
public IntWritable getCurrentValue() throws IOException,InterruptedException {} /*
* 进行初始化工作,打开文件流,依据分块信息设置起始位置和长度等等
* */
public void initialize(InputSplit inputSplit, TaskAttemptContext context)
throws IOException, InterruptedException {} /*生成下一个键值对
**/
public boolean nextKeyValue() throws IOException, InterruptedException {
}
}

下面为是三个文件的代码,首先是BinInputFormat.java的代码:

package org.apache.hadoop.examples;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List; import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.examples.BinRecordReader; class BinInputFormat extends FileInputFormat<LongWritable, IntWritable> { private static final double SPLIT_SLOP=1.1; /*
* 查询推断当前文件能否够分块?"true"为能够分块,"false"表示不进行分块
*/
protected boolean isSplitable(Configuration conf, Path path) {
return true;
} /*
* MapReduce的client调用此方法得到全部的分块,然后将分块发送给MapReduce服务端。
* 注意,分块中不包括实际的信息,而仅仅是对实际信息的分块信息。详细的说,每一个分块中
* 包括当前分块相应的文件路径,当前分块在该文件里起始位置,当前分块的长度以及相应的
* 实际数据所在的机器列表。在实现这个函数时,将这些信息填上就可以。
* */
public List<InputSplit> getSplits(Configuration conf) throws IOException {
List<InputSplit> splits = new ArrayList<InputSplit>();
long minSplitSize = conf.getLong("mapred.min.split.size",1);
long maxSplitSize = conf.getLong("mapred.max.split.size", 1);
long blockSize = conf.getLong("dfs.block.size",1);
long splitSize = Math.max(minSplitSize, Math.min(maxSplitSize, blockSize));
FileSystem fs = FileSystem.get(conf);
String path = conf.get(INPUT_DIR);
FileStatus[] files = fs.listStatus(new Path(path)); for (int fileIndex = 0; fileIndex < files.length; fileIndex++) {
FileStatus file = files[fileIndex];
System.out.println("input file: " + file.getPath().toString());
long length = file.getLen();
FileSystem fsin = file.getPath().getFileSystem(conf);
BlockLocation[] blkLocations = fsin.getFileBlockLocations(file, 0, length);
if ((length != 0) && isSplitable(conf, file.getPath())) {
long bytesRemaining = length;
while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
splits.add(new FileSplit(file.getPath(), length-bytesRemaining, splitSize,
blkLocations[blkIndex].getHosts()));
bytesRemaining -= splitSize;
} if (bytesRemaining != 0) {
splits.add(new FileSplit(file.getPath(), length-bytesRemaining, bytesRemaining,
blkLocations[blkLocations.length-1].getHosts()));
}
} else if (length != 0) {
splits.add(new FileSplit(file.getPath(), 0, length, blkLocations[0].getHosts()));
} else {
//Create empty hosts array for zero length files
splits.add(new FileSplit(file.getPath(), 0, length, new String[0]));
}
}
return splits;
} /*
* 类RecordReader是用来创建传给map函数的Key-Value序列,传给此类的參数有两个:一个分块(split)和作业的配置信息(context).
* 在Mapper的run函数中能够看到MapReduce框架运行Map的逻辑:
* public void run(Context context) throws IOException, InterruptedException {
* setup(context);
* 调用RecordReader方法的nextKeyValue,生成新的键值对。假设当前分块(Split)中已经处理完成了,则nextKeyValue会返回false.退出run函数
* while (context.nextKeyValue()) {
* map(context.getCurrentKey(), context.getCurrentValue(), context);
* }
* cleanup(context);
* }
**/
public RecordReader<LongWritable, IntWritable> createRecordReader(InputSplit split, TaskAttemptContext context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
BinRecordReader reader = new BinRecordReader();
reader.initialize(split,context);
return reader;
}
}

下面为BinRecordReader.java的代码:

package org.apache.hadoop.examples;

import java.io.IOException;

import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.RecordReader; /**
* Return a single record (filename, "") where the filename is taken from
* the file split.
*/
public class BinRecordReader extends RecordReader<LongWritable, IntWritable> {
private FSDataInputStream inputStream = null;
private long start,end,pos;
private Configuration conf = null;
private FileSplit fileSplit = null;
private LongWritable key = new LongWritable();
private IntWritable value = new IntWritable();
private boolean processed = false;
public BinRecordReader() throws IOException {
} /*关闭文件流
* */
public void close() {
try {
if(inputStream != null)
inputStream.close();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
} /*
* 获取处理进度
**/
public float getProgress() {
return ((processed == true)? 1.0f : 0.0f);
} /*
* 获取当前的Key
* */
public LongWritable getCurrentKey() throws IOException,
InterruptedException {
// TODO Auto-generated method stub
return key;
} /* 获取当前的Value
* */
public IntWritable getCurrentValue() throws IOException,InterruptedException {
// TODO Auto-generated method stub
return value;
} /*
* 进行初始化工作,打开文件流,依据分块信息设置起始位置和长度等等
* */
public void initialize(InputSplit inputSplit, TaskAttemptContext context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
fileSplit = (FileSplit)inputSplit;
conf = context.getConfiguration(); this.start = fileSplit.getStart();
this.end = this.start + fileSplit.getLength(); try{
Path path = fileSplit.getPath();
FileSystem fs = path.getFileSystem(conf);
this.inputStream = fs.open(path);
inputStream.seek(this.start);
this.pos = this.start;
} catch(IOException e) {
e.printStackTrace();
}
} /*生成下一个键值对
**/
public boolean nextKeyValue() throws IOException, InterruptedException {
// TODO Auto-generated method stub
if(this.pos < this.end) {
key.set(this.pos);
value.set(Integer.reverseBytes(inputStream.readInt()));
this.pos = inputStream.getPos();
return true;
} else {
processed = true;
return false;
}
}
}

下面是主文件BinCount.java的代码

package org.apache.hadoop.examples;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.examples.BinInputFormat; public class IntCount {
public static class TokenizerMapper extends Mapper<LongWritable, IntWritable, Text, IntWritable>{ private final static IntWritable one = new IntWritable(1);
private Text intNum = new Text(); public void map(LongWritable key, IntWritable value, Context context
) throws IOException, InterruptedException {
intNum.set(Integer.toString(value.get()));
context.write(intNum, one);
}
} public static class IntSumReducer
extends Reducer<Text,IntWritable,Text,IntWritable> {
private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values,
Context context
) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get(); }
result.set(sum);
context.write(key, result);
}
} public static void main(String[] args) throws Exception {
System.out.println("testing1");
Configuration conf = new Configuration();
String[] newArgs = new String[]{"hdfs://localhost:9000/read","hdfs://localhost:9000/data/wc_output21"};
String[] otherArgs = new GenericOptionsParser(conf, newArgs).getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage: wordcount <in> <out>");
System.exit(2);
}
Job job = new Job(conf, "IntCount");
job.setJarByClass(IntCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
//设置自己定义的输入类
job.setInputFormatClass(BinInputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}

接着我们用一段C语言生成二进制格式存储的文件,C语言代码例如以下:

#include<stdio.h>
int main(){
FILE * fp = fopen("tmpfile","wb");
int i,j;
for(i=0;i<10;i++) {
for(j=0;j<10;j++)
fwrite(&j,sizeof(int),1,fp);
}
fclose(fp);
return 0;
}

将生成的文件复制到/read/下,接着启动IntCount这个MapReduce程序,打开执行结果:

在Hadoop中重写FileInputFormat类以处理二进制格式存储的整数在Hadoop中重写FileInputFormat类以处理二进制格式存储的整数