Java客户端访问HBase集群解决方案(优化)

时间:2022-12-14 08:33:20

测试环境:Idea+Windows10

准备工作:

   <1>、打开本地 C:\Windows\System32\drivers\etc(系统默认)下名为hosts的系统文件,如果提示当前用户没有权限打开文件;第一种方法是将hosts文件拖到桌面进行配置后再拖回原处;第二种一劳永逸的方法是修改当前用户对该文件的权限为完全控制;

   <2>、打开后hosts文件后,添加HBase集群服务器的用户名及IP地址如下:

Java客户端访问HBase集群解决方案(优化)

Java客户端访问HBase集群解决方案(优化)

   <3>、由于是windows系统下远程连接HBase,而HBase底层依赖Hadoop,所以需要下载hadoop二进制包存放到本地目录将来会在程序中引用该目录,否则会报错。你也可以理解为windows下需要模拟linux环境才能正常连接HBasehadoop;(注:windows下的版本需要和linux下一致,这里我仅仅提供的2.6.0hadoop版本解析包)

程序代码:

pom.xml:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>

	<groupId>com.example</groupId>
	<artifactId>spring_hbase</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<packaging>jar</packaging>

	<name>spring_hbase</name>
	<description>Demo project for Spring Boot</description>

	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>2.0.4.RELEASE</version>
		<relativePath/> <!-- lookup parent from repository -->
	</parent>

	<properties>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
		<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
		<java.version>1.8</java.version>
	</properties>

	<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
		</dependency>

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
			<scope>test</scope>
		</dependency>
		<!--HBase依赖-->
		<dependency>
			<groupId>org.apache.hbase</groupId>
			<artifactId>hbase-client</artifactId>
			<version>1.2.0</version>
			<exclusions>
				<exclusion>
					<groupId>org.slf4j</groupId>
					<artifactId>slf4j-log4j12</artifactId>
				</exclusion>
			</exclusions>
		</dependency>
		<dependency>
			<groupId>org.springframework.data</groupId>
			<artifactId>spring-data-hadoop</artifactId>
			<version>2.5.0.RELEASE</version>
		</dependency>
		<dependency>
			<groupId>org.apache.hadoop</groupId>
			<artifactId>hadoop-hdfs</artifactId>
			<version>2.5.1</version>
		</dependency>
		<dependency>
			<groupId>org.springframework.data</groupId>
			<artifactId>spring-data-hadoop-core</artifactId>
			<version>2.4.0.RELEASE</version>
		</dependency>
		<dependency>
			<groupId>org.apache.hbase</groupId>
			<artifactId>hbase</artifactId>
			<version>1.2.1</version>
			<type>pom</type>
		</dependency>
		<!--HBase依赖-->
	</dependencies>

	<build>
		<plugins>
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
			</plugin>
		</plugins>
	</build>


</project>
Java客户端访问HBase集群解决方案(优化)

HBaseUtils.class:

package com.example.spring_hbase; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.*; import org.springframework.data.hadoop.hbase.HbaseTemplate; import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Properties; /** * HBase工具类 * Author JiaPeng_lv */ public class HBaseUtils { private static Connection connection; private static Configuration configuration; private static HBaseUtils hBaseUtils; private static Properties properties; /** * 创建连接池并初始化环境配置 */ public void init(){ properties = System.getProperties(); //实例化HBase配置类 if (configuration==null){ configuration = HBaseConfiguration.create(); } try { //加载本地hadoop二进制包 properties.setProperty("hadoop.home.dir", "D:\\hadoop-common-2.6.0-bin-master"); //zookeeper集群的URL配置信息 configuration.set("hbase.zookeeper.quorum","k1,k2,k3,k4,k5"); //HBase的Master configuration.set("hbase.master","hba:60000"); //客户端连接zookeeper端口 configuration.set("hbase.zookeeper.property.clientPort","2181"); //HBase RPC请求超时时间,默认60s(60000) configuration.setInt("hbase.rpc.timeout",20000); //客户端重试最大次数,默认35 configuration.setInt("hbase.client.retries.number",10); //客户端发起一次操作数据请求直至得到响应之间的总超时时间,可能包含多个RPC请求,默认为2min configuration.setInt("hbase.client.operation.timeout",30000); //客户端发起一次scan操作的rpc调用至得到响应之间的总超时时间 configuration.setInt("hbase.client.scanner.timeout.period",200000); //获取hbase连接对象 if (connection==null||connection.isClosed()){ connection = ConnectionFactory.createConnection(configuration); } } catch (IOException e) { e.printStackTrace(); } } /** * 关闭连接池 */ public static void close(){ try { if (connection!=null)connection.close(); } catch (IOException e) { e.printStackTrace(); } } /** * 私有无参构造方法 */ private HBaseUtils(){} /** * 唯一实例,线程安全,保证连接池唯一 * @return */ public static HBaseUtils getInstance(){ if (hBaseUtils == null){ synchronized (HBaseUtils.class){ if (hBaseUtils == null){ hBaseUtils = new HBaseUtils(); hBaseUtils.init(); } } } return hBaseUtils; } /** * 获取单条数据 * @param tablename * @param row * @return * @throws IOException */ public static Result getRow(String tablename, byte[] row) throws IOException{ Table table = null; Result result = null; try { table = connection.getTable(TableName.valueOf(tablename)); Get get = new Get(row); result = table.get(get); }finally { table.close(); } return result; } /** * 查询多行信息 * @param tablename * @param rows * @return * @throws IOException */ public static Result[] getRows(String tablename,List<byte[]> rows) throws IOException{ Table table = null; List<Get> gets = null; Result[] results = null; try { table = connection.getTable(TableName.valueOf(tablename)); gets = new ArrayList<Get>(); for (byte[] row : rows){ if(row!=null){ gets.add(new Get(row)); } } if (gets.size() > 0) { results = table.get(gets); } } catch (IOException e) { e.printStackTrace(); }finally { table.close(); } return results; } /** * 获取整表数据 * @param tablename * @return */ public static ResultScanner get(String tablename) throws IOException{ Table table = null; ResultScanner results = null; try { table = connection.getTable(TableName.valueOf(tablename)); Scan scan = new Scan(); scan.setCaching(1000); results = table.getScanner(scan); } catch (IOException e) { e.printStackTrace(); }finally { table.close(); } return results; } /** * 单行插入数据 * @param tablename * @param rowkey * @param family * @param cloumns * @throws IOException */ public static void put(String tablename, String rowkey, String family, Map<String,String> cloumns) throws IOException{ Table table = null; try { table = connection.getTable(TableName.valueOf(tablename)); Put put = new Put(rowkey.getBytes()); for (Map.Entry<String,String> entry : cloumns.entrySet()){ put.addColumn(family.getBytes(),entry.getKey().getBytes(),entry.getValue().getBytes()); } table.put(put); } catch (IOException e) { e.printStackTrace(); }finally { table.close(); close(); } } } 
Java客户端访问HBase集群解决方案(优化)

①、保证该工具类唯一实例

②、全局共享重量级类Connection,该类为线程安全,使用完毕后关闭连接池

③、每次执行内部CRUD方法会创建唯一对象Table,该类为非线程安全,使用完毕后关闭

由于时间原因,内部功能方法及测试较少,有其他需求的可以自行百度添加更多方法,这里主要以类结构及配置为主。

Test.class:

package com.example.spring_hbase; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.util.Bytes; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; import java.io.IOException; import java.util.*; @RunWith(SpringRunner.class) @SpringBootTest public class SpringHbaseApplicationTests { @Test public void contextLoads() { } @Test public void test01(){ HBaseUtils.getInstance(); try { Long time = System.currentTimeMillis(); Result result = HBaseUtils.getRow("GPS_MAP", Bytes.toBytes(1)); System.out.println("本次查询耗时:"+(System.currentTimeMillis()-time)*1.0/1000+"s"); NavigableMap<byte[],NavigableMap<byte[],NavigableMap<Long,byte[]>>> navigableMap = result.getMap(); for (byte[] family:navigableMap.keySet()){ System.out.println("columnFamily:"+ new String(family)); for (byte[] column : navigableMap.get(family).keySet()){ System.out.println("column:"+new String(column)); for (Long t : navigableMap.get(family).get(column).keySet()){ System.out.println("value:"+new String(navigableMap.get(family).get(column).get(t))); } } } } catch (IOException e) { e.printStackTrace(); }finally { HBaseUtils.close(); } } @Test public void test02(){ HBaseUtils.getInstance(); ResultScanner results = null; try { Long time = System.currentTimeMillis(); results = HBaseUtils.get("GPS_MAP"); System.out.println("本次查询耗时:"+(System.currentTimeMillis()-time)*1.0/1000+"s"); for (Result result : results){ NavigableMap<byte[],NavigableMap<byte[],NavigableMap<Long,byte[]>>> navigableMap = result.getMap(); for (byte[] family:navigableMap.keySet()){ System.out.println("columnFamily:"+ new String(family)); for (byte[] column : navigableMap.get(family).keySet()){ System.out.println("column:"+new String(column)); for (Long t : navigableMap.get(family).get(column).keySet()){ System.out.println("value:"+new String(navigableMap.get(family).get(column).get(t))); } } } } } catch (IOException e) { e.printStackTrace(); }finally { results.close(); HBaseUtils.close(); } } @Test public void test03(){ HBaseUtils.getInstance(); Result[] results = null; List<byte[]> list = null; try { list = new ArrayList<byte[]>(); list.add(Bytes.toBytes(1)); list.add(Bytes.toBytes(2)); Long time = System.currentTimeMillis(); results = HBaseUtils.getRows("GPS_MAP",list); System.out.println("本次查询耗时:"+(System.currentTimeMillis()-time)*1.0/1000+"s"); for (Result result : results){ NavigableMap<byte[],NavigableMap<byte[],NavigableMap<Long,byte[]>>> navigableMap = result.getMap(); for (byte[] family:navigableMap.keySet()){ System.out.println("columnFamily:"+ new String(family)); for (byte[] column : navigableMap.get(family).keySet()){ System.out.println("column:"+new String(column)); for (Long t : navigableMap.get(family).get(column).keySet()){ System.out.println("value:"+new String(navigableMap.get(family).get(column).get(t))); } } } } } catch (IOException e) { e.printStackTrace(); }finally { HBaseUtils.close(); } } @Test public void test04(){ HBaseUtils.getInstance(); try { Map<String,String> cloumns = new HashMap<String, String>(); cloumns.put("test01","test01"); cloumns.put("test02","test02"); Long time = System.currentTimeMillis(); HBaseUtils.put("GPS_MAP","3","TEST",cloumns); System.out.println("本次插入耗时:"+(System.currentTimeMillis()-time)*1.0/1000+"s"); } catch (IOException e) { e.printStackTrace(); }finally { HBaseUtils.close(); } } } 
Java客户端访问HBase集群解决方案(优化)

测试后发现查询和插入效率相对于没有优化过的类耗时大大缩减;