Mapreduce读取Hbase表,写数据到一个Hbase表中

时间:2023-03-10 02:24:07
Mapreduce读取Hbase表,写数据到一个Hbase表中
public class LabelJob
{ public static void main(String[] args)
throws Exception
{
Job job = Job.getInstance(new Configuration());
job.setJarByClass(LabelJob.class);
job.setJobName("Hbase.LabelJob"); Configuration conf = job.getConfiguration();
conf.set("tablename", "product_tags"); Scan scan = new Scan();
scan.setCaching(500);
scan.setCacheBlocks(false);
//输入表
TableMapReduceUtil.initTableMapperJob("tb_user", scan, LabelMapper.class, Text.class, Text.class, job); job.setReducerClass(LabelReducer.class);
//输出表
TableMapReduceUtil.initTableReducerJob("usertags", LabelReducer.class, job);
job.waitForCompletion(true); } }

  

public class LabelMapper extends TableMapper<Text, Text>
{
protected void setup(Context context)
throws IOException, InterruptedException
{
super.setup(context);
String tablename = context.getConfiguration().get("tablename");
.................
}
protected void map(ImmutableBytesWritable rowKey, Result result, Context context)
throws IOException, InterruptedException
{
String userid = Bytes.toString(rowKey.get()); // 读取HBase用户表rowkey String strlabel = fhb.getStringValue(result, "labels", "label");
String[] userLabels = strlabel.split(",");
....................
}
}

  

public class LabelReducer extends TableReducer<Text, Text, ImmutableBytesWritable>
{
@Override
public void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException
{
String rowKey = key.toString();// 读取Map输出
for (Text v : values)
{
String tag = v.toString();
Long count = tagMap.get(tag);
tagMap.put(tag, (count == null) ? 1 : (count + 1));// 计数
}
Put put = new Put(productId.getBytes());
put.add("prodtags".getBytes(), "prodtags".getBytes(),outputlabel.toString().getBytes()); context.write(new ImmutableBytesWritable(productId.getBytes()), put); } }