spark集成hbase与hive数据转换与代码练习

时间:2023-03-08 22:58:57
spark集成hbase与hive数据转换与代码练习

  帮一个朋友写个样例,顺便练手啦~一直在做平台的各种事,但是代码后续还要精进啊。。。

 import java.util.Date

 import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.{Put, Scan, Result}
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapred.TableOutputFormat
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.mapred.JobConf
import org.apache.log4j.{Level, Logger}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.{SparkContext, SparkConf} /**
* Created by ysy on 2/10/17.
*/
object test { case class ysyTest(LS_certifier_no: String,loc: String,LS_phone_no: String) def main (args: Array[String]) {
val sparkConf = new SparkConf().setMaster("local").setAppName("ysy").set("spark.executor.memory", "1g")
val sc = new SparkContext(sparkConf)
val sqlContext = new HiveContext(sc)
sqlContext.sql("drop table pkq")
val columns = "LS_certifier_no,LS_location,LS_phone_no"
val hbaseRDD = dataInit(sc,"EVENT_LOG_LBS",columns).map(data =>{
val id =Bytes.toString(data._2.getValue("f1".getBytes, "LS_certifier_no".getBytes))
val loc = Bytes.toString(data._2.getValue("f1".getBytes, "LS_location".getBytes))
val phone = Bytes.toString(data._2.getValue("f1".getBytes, "LS_phone_no".getBytes))
(id,loc,phone)
})
val showData = hbaseRDD.foreach(println)
val datas = hbaseRDD.filter(_._1 != null).filter(_._2 != null).filter(_._3 != null)
val hiveDF = initHiveTableFromHbase(sc:SparkContext,sqlContext,datas)
writeHiveTableToHbase(sc,hiveDF) } def initHiveTableFromHbase(sc:SparkContext,sqlContext: HiveContext,hiveRDD:RDD[(String,String,String)]) : DataFrame = {
val hRDD = hiveRDD.map(p => ysyTest(p._1,p._2,p._3))
val hiveRDDSchema = sqlContext.createDataFrame(hiveRDD)
hiveRDDSchema.registerTempTable("pkq")
hiveRDDSchema.show(10)
hiveRDDSchema
} def dataInit(sc : SparkContext,tableName : String,columns : String) : RDD[(ImmutableBytesWritable,Result)] = {
val configuration = HBaseConfiguration.create()
configuration.addResource("hbase-site.xml")
configuration.set(TableInputFormat.INPUT_TABLE,tableName )
val scan = new Scan
val column = columns.split(",")
for(columnName <- column){
scan.addColumn("f1".getBytes(),columnName.getBytes())
}
val hbaseRDD = sc.newAPIHadoopRDD(configuration,classOf[TableInputFormat],classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],classOf[org.apache.hadoop.hbase.client.Result])
System.out.println(hbaseRDD.count())
hbaseRDD
} def writeHiveTableToHbase(sc : SparkContext,hiveDF : DataFrame) = {
val configuration = HBaseConfiguration.create()
configuration.addResource("hbase-site.xml ")
configuration.set(TableOutputFormat.OUTPUT_TABLE,"EVENT_LOG_LBS")
val jobConf = new JobConf(configuration)
jobConf.setOutputFormat(classOf[TableOutputFormat]) val putData = hiveDF.map(data =>{
val LS_certifier_no = data(0)
val LS_location = data(1)
val LS_phone_no = data(2)
(LS_certifier_no,LS_location,LS_phone_no)
}) val rdd = putData.map(datas =>{
val put = new Put(Bytes.toBytes(Math.random()))
put.addColumn("f1".getBytes(),"LS_certifier_no".getBytes(),Bytes.toBytes(datas._1.toString))
put.addColumn("f1".getBytes(),"LS_location".getBytes(),Bytes.toBytes(datas._2.toString))
put.addColumn("f1".getBytes(),"LS_phone_no".getBytes(),Bytes.toBytes(datas._3.toString))
(new ImmutableBytesWritable, put)
})
val showRdd = rdd.foreach(println)
rdd.saveAsHadoopDataset(jobConf)
} }

spark集成hbase与hive数据转换与代码练习

spark集成hbase与hive数据转换与代码练习