Spark中使用scala方式- 操作Hbase 表:增删改查

时间:2021-03-23 08:31:18
Auth: FuRenjie


在build.sbt中配置依赖(行之间需要空格)
ame := "test2"
scalaVersion := "2.10.4"
libraryDependencies ++= Seq(
 "org.apache.spark" % "spark-core" % "1.0.0",
  "org.apache.hbase" % "hbase" % "1.2.1-hadoop1",
  "org.apache.hbase" % "hbase-client" % "1.2.1-hadoop1",
  "org.apache.hbase" % "hbase-common" % "1.2.1-hadoop1",
  "org.apache.hbase" % "hbase-server" % "1.2.1-hadoop1"
)

version := "1.0"


在环境变量中配置该工程需要的JAR包,命名为SPARK_TEST_JAR。
————————————————————————————————————————

import org.apache.hadoop.hbase.client._
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase._
import org.apache.spark._
/**
 * Created by gongxuan on 2/3/15.
 * hadoop1.2.1  scala2.10.4  hbase0.98.9  spark1.0.0
 *
 */
object HBaseTest {
  def main(args: Array[String]) {
    //create table test_table1
    var table_name = "test1"
    val conf = HBaseConfiguration.create
    val admin = new HBaseAdmin(conf)
    if (admin.tableExists(table_name))
    {
      admin.disableTable(table_name)
      admin.deleteTable(table_name)
    }
    val htd = new HTableDescriptor(table_name)
    val hcd = new HColumnDescriptor("id")
    //add  column to table
    htd.addFamily(hcd)
    admin.createTable(htd)




    //put data to HBase table
    val tablename = htd.getName
    val table = new HTable(conf, tablename)
    val databytes = Bytes.toBytes("id")
    for (c <- 1 to 10) {
      val row = Bytes.toBytes("row" + c.toString)
      val p1 = new Put(row)
      p1.add(databytes, Bytes.toBytes(c.toString), Bytes.toBytes("value" + c.toString))
      table.put(p1)
    }
    for (c <- 1 to 10) {
      val g = new Get(Bytes.toBytes("row" + c.toString))
      println("Get:" + table.get(g))
    }




    //search table
    val config = HBaseConfiguration.create
    val sc = new SparkContext("local", "HBaseTest",
      System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_TEST_JAR")))




    config.set(TableInputFormat.INPUT_TABLE,table_name)


// 用hadoopAPI创建一个RDD


    val hbaseRDD = sc.newAPIHadoopRDD(config, classOf[TableInputFormat],
      classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
      classOf[org.apache.hadoop.hbase.client.Result])




    val count = hbaseRDD.count()
    println("HbaseRDD Count:" + count)
    hbaseRDD.cache()


//找到result对象   ,返回类型Array[(org.apache.hadoop.hbase.io.ImmutableBytesWritable,org.apache.hadoop.hbase.client.Result)]


//key是一个不变的ImmutableBytesWritable,value是Hbase的Result


//res(0)._2返回第二个参数result
    val res = hbaseRDD.take(count.toInt)
    for (j <- 1 to count.toInt) {
      var rs = res(j - 1)._2


//遍历res.raw取出每一个单元的值  ,返回类型:Array[org.apache.hadoop.hbase.KeyValue]
      var kvs = rs.raw
      for (kv <- kvs) //再遍历每一个单元里面的记录
        println("row:" + new String(kv.getRow()) +
          " cf:" + new String(kv.getFamily()) +
          " column:" + new String(kv.getQualifier()) +
          " value:" + new String(kv.getValue()))
    }
    //drop table
    admin.disableTable(table_name)
    admin.deleteTable(table_name)
  }
}