Hbase + Spring Aop 配置Hbase链接的开启和关闭

时间:2022-01-09 02:41:09

本文转载自http://blog.csdn.net/whos2002110/article/details/36874389

spring 提供了HbaseTemplate 对Hbase数据库的常规操作进行了简单的封装。

get,find方法分别对应了单行数据查询和list查询。

这些查询都要开启和关闭hbase数据库链接

[java] view plain copy
  1. @Override  
  2.     public <T> T execute(String tableName, TableCallback<T> action) {  
  3.         Assert.notNull(action, "Callback object must not be null");  
  4.         Assert.notNull(tableName, "No table specified");  
  5.   
  6.         HTableInterface table = getTable(tableName);  
  7.   
  8.         try {  
  9.             boolean previousFlushSetting = applyFlushSetting(table);  
  10.             T result = action.doInTable(table);  
  11.             flushIfNecessary(table, previousFlushSetting);  
  12.             return result;  
  13.         } catch (Throwable th) {  
  14.             if (th instanceof Error) {  
  15.                 throw ((Error) th);  
  16.             }  
  17.             if (th instanceof RuntimeException) {  
  18.                 throw ((RuntimeException) th);  
  19.             }  
  20.             throw convertHbaseAccessException((Exception) th);  
  21.         } finally {  
  22.             releaseTable(tableName, table);  
  23.         }  
  24.     }  
  25.   
  26.     private HTableInterface getTable(String tableName) {  
  27.         return HbaseUtils.getHTable(tableName, getConfiguration(), getCharset(), getTableFactory());  
  28.     }  
  29.   
  30.     private void releaseTable(String tableName, HTableInterface table) {  
  31.         HbaseUtils.releaseTable(tableName, table, getTableFactory());  
  32.     }  
HTableInterface table = getTable(tableName); 获取数据库链接

releaseTable(tableName, table); 释放链接

在HbaseUtils.getHTable:

[java] view plain copy
  1. if (HbaseSynchronizationManager.hasResource(tableName)) {  
  2.             return (HTable) HbaseSynchronizationManager.getResource(tableName);  
  3.         }  

看见这个大家应该都有是曾相似的感觉吧,这和Spring事务管理核心类TransactionSynchronizationManager很像,而实现也基本一样

都是通过ThreadLocal将链接保存到当前线程中。

我们要做的就是要像Srping 事务配置一样,在进入service方法时通过Aop机制将tableNames对应的链接加入到线程中。

Spring提供了这个Aop方法拦截器 HbaseInterceptor:

[java] view plain copy
  1. public Object invoke(MethodInvocation methodInvocation) throws Throwable {  
  2.         Set<String> boundTables = new LinkedHashSet<String>();  
  3.           
  4.         for (String tableName : tableNames) {  
  5.             if (!HbaseSynchronizationManager.hasResource(tableName)) {  
  6.                 boundTables.add(tableName);  
  7.                 HTableInterface table = HbaseUtils.getHTable(tableName, getConfiguration(), getCharset(), getTableFactory());  
  8.                 HbaseSynchronizationManager.bindResource(tableName, table);  
  9.             }  
  10.         }  
  11.           
  12.         try {  
  13.             Object retVal = methodInvocation.proceed();  
  14.             return retVal;  
  15.         } catch (Exception ex) {  
  16.             if (this.exceptionConversionEnabled) {  
  17.                 throw convertHBaseException(ex);  
  18.             }  
  19.             else {  
  20.                 throw ex;  
  21.             }  
  22.         } finally {  
  23.             for (String tableName : boundTables) {  
  24.                 HTableInterface table = (HTableInterface) HbaseSynchronizationManager.unbindResourceIfPossible(tableName);  
  25.                 if (table != null) {  
  26.                     HbaseUtils.releaseTable(tableName, table);  
  27.                 }  
  28.                 else {  
  29.                     log.warn("Table [" + tableName + "] unbound from the thread by somebody else; cannot guarantee proper clean-up");  
  30.                 }  
  31.             }  
  32.         }  
  33.     }  

很明显在

[java] view plain copy
  1. Object retVal = methodInvocation.proceed();  
也就是我们的service方法执行前去获取Hbase链接并通过HbaseSynchronizationManager.bindResource(tableName, table);绑定到线程中。

finally中releaseTable。


Aop配置如下:

[html] view plain copy
  1. <!-- 自动扫描beans+注解功能注册 -->  
  2.     <context:component-scan base-package="com.xxx.xxx" />  
  3.       
  4.     <!-- 根据配置文件生成hadoopConfiguration -->  
  5.     <hdp:configuration resources="classpath:/hbase-site.xml" />  
  6.       
  7.     <!-- hadoopConfiguration == hdp:configuration -->  
  8. <!--     <hdp:hbase-configuration configuration-ref="hadoopConfiguration" /> -->  
  9.   
  10.     <bean id="hbaseTemplate" class="org.springframework.data.hadoop.hbase.HbaseTemplate">  
  11.         <!-- hadoopConfiguration == hdp:configuration -->  
  12.         <property name="configuration" ref="hadoopConfiguration" />  
  13.     </bean>  
  14.       
  15.     <bean id="hbaseInterceptor" class="org.springframework.data.hadoop.hbase.HbaseInterceptor">  
  16.         <property name="configuration" ref="hadoopConfiguration" />  
  17.         <property name="tableNames">  
  18.             <list>  
  19.                 <value>table_name1</value>  
  20.                 <value>table_name2</value>  
  21.             </list>  
  22.         </property>  
  23.     </bean>  
  24.       
  25.     <!-- 使用aop增强, 织入hbase数据库链接的开启和关闭  -->  
  26.     <aop:config>  
  27.         <aop:pointcut id="allManagerMethod"  
  28.             expression="execution(* com.xxx.xxx.*.service..*(..))" />  
  29.         <aop:advisor advice-ref="hbaseInterceptor" pointcut-ref="allManagerMethod" />  
  30.     </aop:config>  

Hbase的数据库表链接跟传统数据库不太一样, 开启链接必需要表名, 所以HbaseInterceptor中必需设置private String[] tableNames;

在进入servcie方法时,tableNames中对应的表链接都会开启。这必然会造成浪费,因为并不是每个service都会把表都查询一遍。