方式一:直接使用HBase Table的PUT方法
import .{HBaseConfiguration, TableName} import .{ConnectionFactory, Put, Table} import import .{SparkConf, SparkContext} /** * Description: Use Put method of Hbase Client insert data into hbase in Spark-streaming. * * Author : Adore Chen * Created: 2017-12-22 */ object SparkPut { /** * insert 100,000 cost 20762 ms * * @param args */ def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("SparkPut") val context = new SparkContext(conf) try { val rdd = (1 to 100000, 4) // column family val family = ("cf") // column counter --> ctr val column = ("ctr") println("count is :" + ()) (5).foreach(println) // mapPartition & foreachPartition // mapPartition is a lazy transformation, if no action, there is no result. // foreachPartition is an action (list => { val table = createTable() (value => { val put = new Put((value)) (family, column, (value)) (put) }) () }) } finally { () } } /** * create Hbase Table interface. * * @return */ def createTable(): Table = { val hbaseConf = () ("", "localhost") ("", "2181") ("", "true") val conn = (hbaseConf) (("test_table")) } }
方式二:Put(List)
import .{HBaseConfiguration, TableName} import .{ConnectionFactory, Put} import import .{SparkConf, SparkContext} import /** * Description: Use Mutator batch insert in spark context. * * Author : Adore Chen * Created: 2017-12-22 */ object SparkPutList { /** * Use mutator batch insert 100,000, (Put) cost: 22369 * Use put list insert 100,000, cost: 25571 * Use put list by Map 100,000, cost: 21299 * * @param args */ def main(args: Array[String]): Unit = { // putByList() putByMap() } def putByMap(): Unit = { val conf = new SparkConf().setAppName(().getSimpleName()) val context = new SparkContext(conf) // column family val family = ("cf") // column counter --> ctr val column = ("ctr") try { val rdd = (1 to 100000, 4) (value => { val put = new Put((value)) (family, column, (value)) }).foreachPartition( itr => { val hbaseConf = () val conn = (hbaseConf) val table = (("test_table")) (()) () }) } finally { () } } def putByList(): Unit = { val conf = new SparkConf().setAppName(().getSimpleName()) val context = new SparkContext(conf) // column family val family = ("cf") // column counter --> ctr val column = ("ctr") try { val rdd = (1 to 100000, 4) (list => { val hbaseConf = () val conn = (hbaseConf) val table = (("test_table")) val putList = new [Put]() (value => { val put = new Put((value)) (family, column, (value)) (put) }) (putList) () }) } finally { () } } def putByMutator(): Unit = { val conf = new SparkConf().setAppName(().getSimpleName()) val context = new SparkContext(conf) // column family val family = ("cf") // column counter --> ctr val column = ("ctr") try { val rdd = (1 to 100000, 4) (list => { val hbaseConf = () val conn = (hbaseConf) val mutator = (("test_table")) (value => { val put = new Put((value)) (family, column, (value)) (put) }) () }) } finally { () } } }
方式三: 使用map reduce job 写入Hbase
import import import import import import import import .{SparkConf, SparkContext} /** * Description: Put data into Hbase by map reduce Job. * * Author : Adore Chen * Created: 2017-12-22 */ object SparkMapJob { /** * insert 100,000 cost 21035 ms * * @param args */ def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("SparkPutByMap") val context = new SparkContext(conf) val hbaseConf =() (TableOutputFormat.OUTPUT_TABLE, "test_table") //IMPORTANT: must set the attribute to solve the problem (can't create path from null string ) ("", "/tmp") val job = (hbaseConf) (classOf[TableOutputFormat[ImmutableBytesWritable]]) (classOf[ImmutableBytesWritable]) (classOf[Put]) try{ val rdd = (1 to 100000) // column family val family = ("cf") // column counter --> ctr val column = ("ctr") (value => { var put = new Put((value)) (family, column, (value)) (new ImmutableBytesWritable(), put) }) .saveAsNewAPIHadoopDataset() }finally{ () } } }
测试环境:
ext{ sparkVersion = '2.3.0' } dependencies { compile 'org.slf4j:slf4j-api:1.7.25' compile '.log4j:log4j-api:2.11.0' compile '.log4j:log4j-core:2.11.0' // spark related compile ":spark-core_2.11:${sparkVersion}" compile ":spark-streaming_2.11:${sparkVersion}" compile ":spark-sql_2.11:${sparkVersion}" compile ":spark-streaming-kafka-0-10_2.11:${sparkVersion}" // hbase related compile ':hbase-client:1.3.1' compile ':hbase-server:1.3.1' // redis related compile ':jedis:2.9.0' // mysql connector compile 'mysql:mysql-connector-java:5.1.46' // hive jdbc compile ':hive-jdbc:2.3.3' compile '.log4j:log4j:2.11.0' compile ':avro:1.8.2' compile ':avro-tools:1.8.2' testCompile 'junit:junit:4.12' }
关于使用场景的说明:
第一二种场景,主要是独立使用HBase时候使用。
第三种场景,和spark、flink等集成时使用。
注意:代码中仅为显示使用方法,HBase Connection最好使用单例模式和Spark、Flink集成。
参考HBaseUtil里的代码:
/adorechen/article/details/106058924