使用bulkload向hbase中批量写入数据

时间:2023-03-09 04:59:34
使用bulkload向hbase中批量写入数据

1、数据样式

写入之前,需要整理以下数据的格式,之后将数据保存到hdfs中,本例使用的样式如下(用tab分开):

row1	N
row2 M
row3 B
row4 V
row5 N
row6 M
row7 B

2、代码

假设要将以上样式的数据写入到hbase中,列族为cf,列名为colb,可以使用下面的代码(参考)

 package com.testdata;

 import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
import org.apache.hadoop.hbase.mapreduce.PutSortReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class TestBulkLoad { public static class LoadMapper extends Mapper<Object,Text,ImmutableBytesWritable,Put>{ @Override
protected void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
String[] values = value.toString().split("\t");
if(values.length ==2 ){
byte[] rowkey = Bytes.toBytes(values[0]);
byte[] col_value = Bytes.toBytes(values[1]);
byte[] familly = Bytes.toBytes("cf");
byte[] column = Bytes.toBytes("colb");
ImmutableBytesWritable rowkeyWritable = new ImmutableBytesWritable(rowkey);
Put testput = new Put(rowkey);
testput.add(familly,column,col_value);
context.write(rowkeyWritable, testput);
} }
}
public static void main(String[] args) throws Exception {
if(args.length !=4 ){
System.exit(0);
} String in = args[0];
String out = args[1];
int unitmb =Integer.valueOf(args[2]);
String tbname = args[3]; Configuration conf = new Configuration();
conf.set("mapreduce.input.fileinputformat.split.maxsize", String.valueOf(unitmb * 1024 * 1024));
conf.set("mapred.min.split.size", String.valueOf(unitmb * 1024 * 1024));
conf.set("mapreduce.input.fileinputformat.split.minsize.per.node", String.valueOf(unitmb * 1024 * 1024));
conf.set("mapreduce.input.fileinputformat.split.minsize.per.rack", String.valueOf(unitmb * 1024 * 1024)); Job job = new Job(conf);
FileInputFormat.addInputPath(job, new Path(in));
FileOutputFormat.setOutputPath(job, new Path(out));
job.setMapperClass(LoadMapper.class);
job.setReducerClass(PutSortReducer.class);
job.setOutputFormatClass(HFileOutputFormat2.class);
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
job.setMapOutputValueClass(Put.class);
job.setJarByClass(TestBulkLoad.class); Configuration hbaseconf = HBaseConfiguration.create();
HTable table = new HTable(hbaseconf,tbname);
HFileOutputFormat2.configureIncrementalLoad(job, table); job.waitForCompletion(true);
LoadIncrementalHFiles loader = new LoadIncrementalHFiles(hbaseconf);
loader.doBulkLoad(new Path(out), table); } }

这段代码使用mapreduce程序对数据做了进一步处理,之后调用相关的api将数据写入hbase中。PutSortReducer是一个自带的reducer类,不需要再进行编写。

3、执行

数据保存在TEXT文件中,上面代码导出的jar包为bulkload,hbase的数据表名称为testdata,注意,先指定以下HADOOP_CLASSPATH,避免出错。

1 export HADOOP_CLASSPATH=$HBASE_HOME/lib/*:$HADOOP_CLASSPATH
2 hadoop jar ./Downloads/bulkload.jar com.testdata.TestBulkLoad Test hbasedata 64 testdata

4、结果

使用bulkload向hbase中批量写入数据