MapReduce操作Hbase --table2file

时间:2021-12-02 12:47:59

官方手册:http://hbase.apache.org/book.html#mapreduce.example

简单的操作,将hbase表中的数据写入到文件中。

RunJob 源码:

 import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner; /**
* Created by Edward on 2016/6/29.
*/
public class RunJob implements Tool { private Configuration conf = null; @Override
public int run(String[] strings) throws Exception { Configuration conf = this.getConf(); FileSystem fs = FileSystem.get(conf); Job job = Job.getInstance(conf,"etl");
job.setJarByClass(RunJob.class); job.setInputFormatClass(TableInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(TextOutputFormat.class); Scan scan = new Scan();
scan.setCaching(1024);
scan.setCacheBlocks(false); TableMapReduceUtil.initTableMapperJob("test1",
scan,
MyMapper.class,
Text.class,
Text.class,
job); Path path = new Path("/hbase_out");
if(fs.exists(path))
{
fs.delete(path,true);
} FileOutputFormat.setOutputPath(job, new Path("/hbase_out")); boolean b = job.waitForCompletion(true);
if(b)
{
System.out.println("执行成功");
}
return 0;
} @Override
public void setConf(Configuration configuration) { System.setProperty("HADOOP_USER_NAME","root");
configuration.set("hbase.zookeeper.quorum","node1,node2,node3");
configuration.set("mapred.jar","D:\\etl.jar"); this.conf = HBaseConfiguration.create(configuration);
} @Override
public Configuration getConf() {
return this.conf;
} public static void main(String[] args)
{
try {
ToolRunner.run(new Configuration(), new RunJob(), args);
} catch (Exception e) {
e.printStackTrace();
}
}
}

MyMapper代码:

 import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.io.Text; import java.io.IOException; /**
* Created by Edward on 2016/6/29.
*/
public class MyMapper extends TableMapper<Text, Text>{
@Override
protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException { String val = new String(value.getValue("info".getBytes(),"name".getBytes()));
String row = new String(value.getRow());
context.write(new Text(row), new Text(val));
}
}

MyReducer代码:

 import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; /**
* Created by Edward on 2016/6/29.
*/
public class MyReducer extends Reducer<Text,Text,Text,Text>{ @Override
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
for(Text t:values) {
context.write(key, t);
}
}
}