spark学习-57-Spark下Scala版HBase下的根据权重获取最真实数据

时间:2022-11-26 14:54:43

Scala版HBase下的根据权重获取最真实数据 和 java版HBase下的根据权重获取最真实数据 虽然处理流程相同,但是有很多细节需要注意
看这个之前先看 java版HBase下的根据权重获取最真实数据 http://blog.csdn.net/qq_21383435/article/details/78735885

目录结构
spark学习-57-Spark下Scala版HBase下的根据权重获取最真实数据

配置文件/HbaseSparkSQL2/src/config_scala.properties

# is test in windows
isWinTest=no

# zookeeper config
hbase_zookeeper_property_clientPort=2181
hbase_zookeeper_quorum=192.168.10.82

# souurce table
source.TableName=www:test_person
source.falmily=base
source.fields=number,zjhm,zjlx,xm,xb,csrq,jg,hyzk,whcd,mz,source,sg,tz,zy,grjj
# start 0 end ...(where source is)
source.sourceNum=10
# the weight
source.weight=00101,100=00098,82=00176,95=00056,96=00058,94=00108,53=00056,90=00106,80=00094,98=00022,76=00099,32=00102,15=00103,14=00001,70=00027,50=00154,40=00152,30=00128,5

# save table
save.TableName=www:test_person2
save.falmily=base

执行代码

package scala

import java.util.Map.Entry;


import org.apache.hadoop.hbase.HBaseConfiguration


import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.SparkSession
import java.util.concurrent.ConcurrentHashMap
import org.apache.spark.api.java.JavaPairRDD
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.client.HBaseAdmin
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat
import org.apache.hadoop.hbase.HColumnDescriptor
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.HTableDescriptor
import org.apache.hadoop.mapreduce.Job;

class SparkOnHbase extends java.io.Serializable {

}



object SparkOnHbase extends java.io.Serializable {




private var _isWinTest:String = _
private var _hbase_zookeeper_property_clientPort:String = _
private var _hbase_zookeeper_quorum:String = _

// 要赛选的Hbase表
private var _sorceTableName:String = _
private var _sorceTableFalmily:String = _
private var _sourceNumber:Int = _


private var _saveTableName:String = _
private var _saveFalmily:String = _

// 字段名与数字的映射 姓名-》1 身份证-》2 这样的映射
private var _filedsMap = new ConcurrentHashMap[Integer, String]()
// 字段名与数字的映射 姓名-》1 身份证-》2 这样的映射
private var _sourceWeightMap = new ConcurrentHashMap[String, Integer]()

// 初始化
init()


def main(args: Array[String]): Unit = {


var _sparkSession:SparkSession = SparkSession
.builder()
.appName("spark_scala_HBaseTest")
.master("local[4]")
// .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") // 重点不同:需要特别注意 有可能需要打开这个 不然报错
//java.io.NotSerializableException: org.apache.hadoop.hbase.io.ImmutableBytesWritable
.getOrCreate()


if(_isWinTest.equals("yes")){
System.setProperty("hadoop.home.dir", "F:\\02-hadoop\\hadoop-2.7.3\\");
System.setProperty("HADOOP_USER_NAME", "root");
System.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer");

}else{

}

// 创建 spark context
val sc = _sparkSession.sparkContext

// 创建hbase configuration
val hBaseConf = HBaseConfiguration.create()
hBaseConf.set("hbase.zookeeper.property.clientPort", _hbase_zookeeper_property_clientPort);
hBaseConf.set("hbase.zookeeper.quorum", _hbase_zookeeper_quorum);
//var con = ConnectionFactory.createConnection(hBaseConf)

//var table = con.getTable(TableName.valueOf(""))

hBaseConf.set(TableInputFormat.INPUT_TABLE,_sorceTableName)

// 创建 spark context
val sqlContext = new SQLContext(sc)
// 从数据源获取数据
var hbaseRDD = sc.newAPIHadoopRDD(hBaseConf,classOf[TableInputFormat],classOf[ImmutableBytesWritable],classOf[Result])

// 将数据映射为表 也就是将 RDD转化为 dataframe schema
val personsRDD = hbaseRDD.map(r=>{
var result:Result = r._2
var _mystr:StringBuffer = new StringBuffer()
var _myindex = 0
for( _myindex <- 0 to _filedsMap.size()-1 ){
var key:String = _filedsMap.get(_myindex)
//println("_sorceTableFalmily===="+_sorceTableFalmily)
//println("key===="+key)
var temp:String = Bytes.toString(result.getValue(Bytes.toBytes(_sorceTableFalmily), Bytes.toBytes( key )))
if( temp != null && !temp.equals("") && temp != "" && !temp.equals("null") && temp.contains(",") ){
// 因为下面全是用逗号分隔 这里把源数据中的逗号过滤掉
temp = temp.replaceAll(",", " ");
}
_mystr.append(temp).append(",");
}
_mystr.toString()

})

// (1x,[1x,1x,ID,张山,男,1991/11/1,河南省息县1,无,高中,汉,00106,86,78,计算机,简介XXXXX])
var pairRdd = personsRDD.map(r=>{
var arr = r.split(",")
var line = r.substring(0, r.length()-1)
line = "[" + line +"]"
new Tuple2(arr(0), line.toString())
})

var groupRdd = pairRdd.groupByKey();


// (3x,CompactBuffer([3x,3x,ID,王六,女,1991/11/2,汉南省息县7,无,大学,汉,00154,80,70,计算机,简介XXXXX]))
var oneDataRDD = groupRdd.filter(r =>{
var _mycount:Integer = 0
r._2.foreach(line =>{
_mycount = _mycount + 1
})
if(_mycount == 1){
true
}else{
false
}
})

// 3x,3x,ID,王六,女,1991/11/2,汉南省息县7,无,大学,汉,00154,80,70,计算机,简介XXXXX
var oneDataRDDtoString = oneDataRDD.map(r=>{
var hang:String = ""
r._2.foreach(line =>{
hang = line.substring(1, line.length-1)
})
hang
})

// (1x,[
// [1x,1x,ID,张山,男,1991/11/1,河南省息县1,无,高中,汉,00106,86,78,计算机,简介XXXXX],
// [1x,1x,ID,张三,女,1991/11/2,河南省息县3,无,大学,汉,00103,85,74,null,简介XXXXX],
// [1x,1x,ID,张三,男,1991/11/2,河南省息县4,无,初中,汉,00022,80,73,计算机,简介XXXXX]
// ]
// )
var manyDataRDD = groupRdd.filter(r =>{
var _mycount:Integer = 0
r._2.foreach(line =>{
_mycount = _mycount + 1
})
if( 1 < _mycount ){
true
}else{
false
}
})


var manyDataRDDtoString = manyDataRDD.map(r=>{
var mykey = r._1;
var result = r._2;
// 存储 {csrq={1991/11/2=95, 1991/11/1=80}, zjlx={ID=175}, whcd={大学=10, 高中=80, 初中=85}, tz={78=80, 73=85, 74=10}, xb={女=10, 男=165}, source={00022=85, 00106=80, 00103=10}, zjhm={1x=175}, mz={汉=175}, number={1x=175}, hyzk={无=175}, sg={80=85, 85=10, 86=80}, xm={张三=95, 张山=80}, grjj={简介XXXXX=175}, jg={河南省息县1=80, 河南省息县3=10, 河南省息县4=85}, zy={计算机=165}}
var mapValue = new ConcurrentHashMap[String,ConcurrentHashMap[String,Integer]]();
result.foreach(hang =>{
// 切分行数据
var shuzu: Array[String] = hang.substring(1, hang.length()-1).split(",")
// 获取行数据来源
var source = shuzu(_sourceNumber)
// 获取权重
var weight = _sourceWeightMap.get(source)

for(index <- 0 to _filedsMap.size()-1){
var key = _filedsMap.get(index)
var temp:ConcurrentHashMap[String,Integer]= null
if ( mapValue != null && !mapValue.containsKey(key) ) {
temp = new ConcurrentHashMap[String,Integer]()
mapValue.put(key, temp)
} else {
temp = mapValue.get(key)
}


if(!shuzu(index).equals("null") && shuzu(index) != null && shuzu(index) != "" && temp.containsKey(shuzu(index))){
var yuanlai = temp.get(shuzu(index)).intValue()
var now = yuanlai + weight
temp.put(shuzu(index), now )
}else if(!shuzu(index).equals("null") && shuzu(index) != null && shuzu(index) != "" && !temp.containsKey( shuzu(index) )){
temp.put(shuzu(index), weight)
}

// for循环结束
}
// hang 循环结束
})

// 查看一下数据 {1x=170} {张三=90, 张山=80} {女=14, 男=156} {大学=14, 高中=80, 初中=76}。。。。。
/*for(index <- 0 to _filedsMap.size()-1){
var key = _filedsMap.get(index)
println(mapValue.get(key))
}*/


// 根据value值获取最大value值对应的key
var lastStr:StringBuffer = new StringBuffer()
for(i <- 0 to _filedsMap.size()-1){
var key:String = _filedsMap.get(i)
var mymap:ConcurrentHashMap[String, Integer] = mapValue.get(key)
var value = "";
if(mymap.isEmpty()){
value = "无数据";
}else if(1 <= mymap.size()){
/*
这一点 我想实现java的功能
for(Entry<String, Integer> entry: mymap.entrySet()) {
value = entry.getKey();
}

怎么都搞不定,主要是实现key与value相反存储,根据value值获取key值
解决方法1:使用迭代器
var a = mymap.entrySet().iterator();
while(a.hasNext())
{
var me = a.next();//进行key和value分离
System.out.println(me.getKey() + "--->" + me.getValue());//输出关键字和内容
}
解决方法2:这个实现比较复杂,想得到里面value最大值的key 比较难以实现
import scala.collection.JavaConversions._
print(".....................>>>>>>>>"+mymap.map(f=>{(f._2,f._1)}).get(80).getOrElse(""))
*/

var maxValue = 0
var onlyKey = ""
var a = mymap.entrySet().iterator();
while(a.hasNext())
{
var me = a.next();//进行key和value分离
//System.out.println(me.getKey() + "--->" + me.getValue());//输出关键字和内容
if( maxValue < me.getValue()){
maxValue = me.getValue()
onlyKey = me.getKey()
}
value = onlyKey
}
}
lastStr.append(value).append(",")
}
lastStr.toString()
})

// 合并两个RDD 一起操作
var unionRDD = oneDataRDDtoString.union(manyDataRDDtoString)

// 转换成ImmutableBytesWritable才能批量存储
var saveRDD = unionRDD.map(line=>{
var hang:String = line.substring(0, line.length-1)
var fileds = hang.split(",")
var put:Put = new Put(Bytes.toBytes( fileds(0) ))
for(i <- 0 to fileds.length-1 ){
var filedName = _filedsMap.get(i)
put.addColumn(Bytes.toBytes( _saveFalmily ), Bytes.toBytes( filedName ), Bytes.toBytes( fileds(i) ))
}
new Tuple2[ImmutableBytesWritable, Put](new ImmutableBytesWritable(), put)
})

//存储到hbase
var admin = new HBaseAdmin(hBaseConf)

// habse表中是否 已经存在表名为saveTableName的表
if (!admin.isTableAvailable(_saveTableName)) {
System.out.println("Table Not Exists! Create Table")
var tableDesc = new HTableDescriptor(_saveTableName)
tableDesc.addFamily(new HColumnDescriptor(_saveFalmily.getBytes()))
admin.createTable(tableDesc)
}

var saveconfiguration = HBaseConfiguration.create()
saveconfiguration.set("hbase.zookeeper.property.clientPort", _hbase_zookeeper_property_clientPort)
saveconfiguration.set("hbase.zookeeper.quorum", _hbase_zookeeper_quorum)
// 这里引入的是 import org.apache.hadoop.mapreduce.Job;
var job = Job.getInstance(saveconfiguration)
job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, _saveTableName)
job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]]) // 重点不同:需要特别注意

saveRDD.saveAsNewAPIHadoopDataset(job.getConfiguration())

}



// 从config_scala.properties文件中初始化变量
def init(): Unit = {

_isWinTest = PropertiesScalaUtils.getValueByKey("isWinTest");

_hbase_zookeeper_property_clientPort = PropertiesScalaUtils.getValueByKey("hbase_zookeeper_property_clientPort");
_hbase_zookeeper_quorum = PropertiesScalaUtils.getValueByKey("hbase_zookeeper_quorum");

_sorceTableName = PropertiesScalaUtils.getValueByKey("source.TableName");
_sorceTableFalmily = PropertiesScalaUtils.getValueByKey("source.falmily");
// 字段名与数字的映射 姓名-》1 身份证-》2 这样的映射
var fileds = PropertiesScalaUtils.getValueByKey("source.fields").split(",")
var i:Int = 0
for(i <- 0 to fileds.length-1){
println("===i=>"+i+" fileds(i)="+fileds(i))
_filedsMap.put(i,fileds(i))
}
_sourceNumber = Integer.parseInt(PropertiesScalaUtils.getValueByKey("source.sourceNum"));

// 字段名与数字的映射 姓名-》1 身份证-》2 这样的映射
var sourceWeightfileds = PropertiesScalaUtils.getValueByKey("source.weight").split("=")
i = 0
for(i <- 0 to sourceWeightfileds.length-1){
_sourceWeightMap.put(sourceWeightfileds(i).split(",")(0),Integer.parseInt(sourceWeightfileds(i).split(",")(1)));
}

_saveTableName = PropertiesScalaUtils.getValueByKey("save.TableName");
_saveFalmily = PropertiesScalaUtils.getValueByKey("save.falmily");

}

}