使用spark写数据到Hbase的三种方式

时间:2025-05-15 08:38:35

方式一:直接使用HBase Table的PUT方法

import .{HBaseConfiguration, TableName}
import .{ConnectionFactory, Put, Table}
import 
import .{SparkConf, SparkContext}

/**
  * Description: Use Put method of Hbase Client insert data into hbase in Spark-streaming.
  *
  * Author : Adore Chen
  * Created: 2017-12-22
  */
object SparkPut {

  /**
    * insert 100,000 cost 20762 ms
    *
    * @param args
    */
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("SparkPut")
    val context = new SparkContext(conf)

    try {
      val rdd = (1 to 100000, 4)

      // column family
      val family = ("cf")
      // column counter --> ctr
      val column = ("ctr")

      println("count is :" + ())
      (5).foreach(println)

      // mapPartition & foreachPartition
      // mapPartition is a lazy transformation, if no action, there is no result.
      // foreachPartition is an action
      (list => {
        val table = createTable()
        (value => {
          val put = new Put((value))
          (family, column, (value))
          (put)
        })
        ()
      })
    } finally {
      ()
    }
  }

  /**
    * create Hbase Table interface.
    *
    * @return
    */
  def createTable(): Table = {
    val hbaseConf = ()
    ("", "localhost")
    ("", "2181")
    ("", "true")
    val conn = (hbaseConf)
    (("test_table"))
  }
}

 

 

方式二:Put(List)

import .{HBaseConfiguration, TableName}
import .{ConnectionFactory, Put}
import 
import .{SparkConf, SparkContext}
import 

/**
  * Description: Use Mutator batch insert in spark context.
  *
  * Author : Adore Chen
  * Created: 2017-12-22
  */
object SparkPutList {

  /**
    * Use mutator batch insert 100,000, (Put) cost: 22369
    * Use put list insert 100,000, cost: 25571
    * Use put list by Map 100,000, cost: 21299
    *
    * @param args
    */
  def main(args: Array[String]): Unit = {
    //    putByList()
    putByMap()
  }

  def putByMap(): Unit = {
    val conf = new SparkConf().setAppName(().getSimpleName())
    val context = new SparkContext(conf)

    // column family
    val family = ("cf")
    // column counter --> ctr
    val column = ("ctr")

    try {
      val rdd = (1 to 100000, 4)
      (value => {
        val put = new Put((value))
        (family, column, (value))
      }).foreachPartition(
        itr => {
          val hbaseConf = ()
          val conn = (hbaseConf)
          val table = (("test_table"))
          (())
          ()
        })
    } finally {
      ()
    }
  }

  def putByList(): Unit = {
    val conf = new SparkConf().setAppName(().getSimpleName())
    val context = new SparkContext(conf)

    // column family
    val family = ("cf")
    // column counter --> ctr
    val column = ("ctr")

    try {
      val rdd = (1 to 100000, 4)
      (list => {
        val hbaseConf = ()
        val conn = (hbaseConf)
        val table = (("test_table"))
        val putList = new [Put]()
        (value => {
          val put = new Put((value))
          (family, column, (value))
          (put)
        })
        (putList)
        ()
      })
    } finally {
      ()
    }
  }

  def putByMutator(): Unit = {
    val conf = new SparkConf().setAppName(().getSimpleName())
    val context = new SparkContext(conf)

    // column family
    val family = ("cf")
    // column counter --> ctr
    val column = ("ctr")

    try {
      val rdd = (1 to 100000, 4)
      (list => {
        val hbaseConf = ()
        val conn = (hbaseConf)
        val mutator = (("test_table"))
        (value => {
          val put = new Put((value))
          (family, column, (value))
          (put)
        })
        ()
      })
    } finally {
      ()
    }
  }
}

 

 

方式三: 使用map reduce job 写入Hbase

import 
import 
import 
import 
import 
import 
import 
import .{SparkConf, SparkContext}

/**
  * Description: Put data into Hbase by map reduce Job.
  *
  * Author : Adore Chen
  * Created: 2017-12-22
  */
object SparkMapJob {

    /**
      * insert 100,000 cost 21035 ms
      *
      * @param args
      */
    def main(args: Array[String]): Unit = {
      val conf = new SparkConf().setAppName("SparkPutByMap")
      val context = new SparkContext(conf)

      val hbaseConf =()
      (TableOutputFormat.OUTPUT_TABLE, "test_table")
      //IMPORTANT: must set the attribute to solve the problem (can't create path from null string )
      ("", "/tmp")

      val job = (hbaseConf)
      (classOf[TableOutputFormat[ImmutableBytesWritable]])
      (classOf[ImmutableBytesWritable])
      (classOf[Put])

      try{
        val rdd = (1 to 100000)

        // column family
        val family = ("cf")
        // column counter --> ctr
        val column = ("ctr")

        (value => {
          var put = new Put((value))
          (family, column, (value))
          (new ImmutableBytesWritable(), put)
          })
          .saveAsNewAPIHadoopDataset()
      }finally{
        ()
      }
    }

}

 

 

测试环境:

 

ext{
    sparkVersion = '2.3.0'
}

dependencies {

    compile 'org.slf4j:slf4j-api:1.7.25'
    compile '.log4j:log4j-api:2.11.0'
    compile '.log4j:log4j-core:2.11.0'

    // spark related
    compile ":spark-core_2.11:${sparkVersion}"
    compile ":spark-streaming_2.11:${sparkVersion}"
    compile ":spark-sql_2.11:${sparkVersion}"
    compile ":spark-streaming-kafka-0-10_2.11:${sparkVersion}"

    // hbase related
    compile ':hbase-client:1.3.1'
    compile ':hbase-server:1.3.1'

    // redis related
    compile ':jedis:2.9.0'

    // mysql connector
    compile 'mysql:mysql-connector-java:5.1.46'

    // hive jdbc
    compile ':hive-jdbc:2.3.3'

    compile '.log4j:log4j:2.11.0'
    compile ':avro:1.8.2'
    compile ':avro-tools:1.8.2'

    testCompile 'junit:junit:4.12'
}

 

关于使用场景的说明:

第一二种场景,主要是独立使用HBase时候使用。

第三种场景,和spark、flink等集成时使用。

注意:代码中仅为显示使用方法,HBase Connection最好使用单例模式和Spark、Flink集成。 

参考HBaseUtil里的代码: 

/adorechen/article/details/106058924

相关文章