MapReduce、Hbase接口API实践

时间:2023-03-10 01:46:17
MapReduce、Hbase接口API实践

读取hdfs中文件并做处理,取出卡号,通过卡号连接hbase查询出对应客户号,写入redis,因为不用输出,所以不调用context.write方法,整个操作在一个map中便可完成

protected HTable connect
//setup方法被MapReduce框架仅且执行一次,在执行Map任务前,进行相关变量或者资源的集中初始化工作。若是将资源初始化工作放在方法map()中,导致Mapper任务在解析每一行输入时都会进行资源初始化工作,导致重复,程序运行效率不高!
protected void setup(Context context) throws IOExcption,InterruptedException{
super.setup(context)
String jobName = context.getJobName();
//文件索引值
cartNoIndex = conf.get(jobName + "source.key","7");
//创建hbase连接,hbase-site.xml配置文件需要在jar包中
Configuration config = HBaseConfiguration.create();
connect = new HTable(config,"tableName")
} protected void map(writable key,Text value,Context context){
if(value == null || value.toString().trim().isEmpty()){
//计数器,记录处理的条数
context.getCounter(....).increment(1);
}else{
String[] values = Utils.split(value,separator,true);
//业务逻辑处理
int i = Integer.parseInt(cartNoIndex);
if(i<values.length){
cardNo = values[i];
}else{
logger.error("cardNo cannot find");
} //从hbase中查询出对应客户号
String rowkey = HTableManager.generatRowkey(cardNo);
Get getResult = new Get(rowkey.getBytes());
Result rs = connect.get(getResult);
String curNo = Bytes.toString(rs.getValue("f1".getBytes(),"column_name".getBtes());
RedisClient.getRedisClient().zincrbyset("spending:rank",countNum,custNo); protected void cleanup(context context)throws IOException,InterruptedException{
  super.cleanup(context);
  connect.close();
}
public static String[] split(String value,String separator,boolean trimSpace){
String[] rtn = split(value.separator);
if(trimSpace && rtn != null){
for(int i=0;i<rtn.length;i++){
rtn[i] = rtn[i].trim();
}
}
return rtn;
} public static String[] split(String value,String separator){
String[] rtn = null;
if(value != null){
boolean endBlank = false;
if(value.endsWith(separator)){
value +=" ";
endBlank = true;
}
separator = escapeExprSpecialWord(deparator);
if(endBlank){
rtn(rtn.length-1) = "";
}
}
return rtn;
} public static String escapeExprSpecialWord(String keyWord){
if(keyword != null && !keyword.isEmpty()){
String[] fbsArr = {"\\","|","(",")"};
for(String key : fbsArr){
if(keyword.contains(key){
keyword = keyword.replace(key,"\\"+key);
}
}
}
return keyword;
}