Spring实现数据库读写分离

时间:2022-06-02 08:59:26

转载自:http://blog.csdn.net/zero__007/article/details/48711017

在数据库层面需要采用读写分离技术,就是一个master数据库,多个slave数据库。master库负责数据更新和实时数据查询,slave库负责非实时数据查询。因为在实际的应用中,数据库都是读多写少(读取数据的频率高,更新数据的频率相对较少),而读取数据通常耗时比较长,占用数据库服务器的CPU较多。

       采用读写分离技术的目标:有效减轻master库的压力,又可以把用户查询数据的请求分发到不同的slave库,从而保证系统的健壮性。

 

       如何方便的实现读写分离呢?
       1.第一种方式是最简单的方式,就是定义2个数据库连接,一个是masterDataSource,另一个是slaveDataSource。对DAO的dataSource属性注入是,根据需求分别如入不同的DataSource。有时由于在同一个DAO中可能既有select又有insert,那样就需要对同一类型的DAO写两个DAO,比如就可能需要将UserDaoImp拆分为UserDaoImp_w与UserDaoImp_r。
       2. 第二种方式,不将UserDaoImp拆分为UserDaoImp_w与UserDaoImp_r。在UserDaoImp的方法调用前使用this.setDataSource()来切换DataSource,但是存在线程安全问题。例如,线程一中,调用了f1(),将datasource切换为masterDataSource ,准备写数据到主数据库,但是这时线程二,调用了f2(),将datasource切换为slaveDataSource,准备从从数据库读数据,那么线程一继续执行f1()的方法就会出现问题。原因在于线程一、线程二使用的是一个UserDaoImp实例(spring加载的bean默认都是单例),共用的一个datasource。当然可不让UserDaoImp为单例,并结合ThreadLocal来避免线程安全问题,但是不推荐。
       3. 第三种方式动态数据源切换,就是在程序运行时,把数据源动态织入到程序中,从而选择读取主库还是从库。 这里Spring的AbstractRoutingDataSource提供了很好的支持。

[java] view plain copy 
  1. public abstract class AbstractRoutingDataSource extends AbstractDataSource implements InitializingBean {  
  2.   
  3.     private Map<Object, Object> targetDataSources;  
  4.   
  5.     private Object defaultTargetDataSource;  
  6.   
  7.     private boolean lenientFallback = true;  
  8.   
  9.     private DataSourceLookup dataSourceLookup = new JndiDataSourceLookup();  
  10.   
  11.     private Map<Object, DataSource> resolvedDataSources;  
  12.   
  13.     private DataSource resolvedDefaultDataSource;  
  14.           
  15.         ......  

       AbstractRoutingDataSource继承了AbstractDataSource ,而AbstractDataSource 又是DataSource 的子类。DataSource是javax.sql的数据源接口,定义如下:

[java] view plain copy 
  1. public interface DataSource  extends CommonDataSource, Wrapper {  
  2.   
  3.   Connection getConnection() throws SQLException;  
  4.    
  5.   Connection getConnection(String username, String password)  
  6.     throws SQLException;  
  7. }  

       DataSource 接口定义了2个方法,都是获取数据库连接。在来看下AbstractRoutingDataSource 如何实现了DataSource接口:

[java] view plain copy 
  1. public Connection getConnection() throws SQLException {  
  2.     return determineTargetDataSource().getConnection();  
  3. }  
  4.   
  5. public Connection getConnection(String username, String password) throws SQLException {  
  6.     return determineTargetDataSource().getConnection(username, password);  
  7. }  

       AbstractRoutingDataSource通过调用determineTargetDataSource()方法获取到connection。determineTargetDataSource方法定义如下:

[java] view plain copy 
  1. protected DataSource determineTargetDataSource() {  
  2.     Assert.notNull(this.resolvedDataSources, "DataSource router not initialized");  
  3.     Object lookupKey = determineCurrentLookupKey();  
  4.     DataSource dataSource = this.resolvedDataSources.get(lookupKey);  
  5.     if (dataSource == null && (this.lenientFallback || lookupKey == null)) {  
  6.         dataSource = this.resolvedDefaultDataSource;  
  7.     }  
  8.     if (dataSource == null) {  
  9.         throw new IllegalStateException("Cannot determine target DataSource for lookup key [" + lookupKey + "]");  
  10.     }  
  11.     return dataSource;  
  12. }  

      determineCurrentLookupKey()方法返回lookupKey, resolvedDataSources()方法就是根据 lookupKey从Map中获得数据源。

示例:
beans.xml

[java] view plain copy 
  1. <?xml version="1.0" encoding="UTF-8"?>  
  2. <beans xmlns="http://www.springframework.org/schema/beans"  
  3.     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"  
  4.     xsi:schemaLocation="  
  5.     http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd">  
  6.   
  7.     <bean id="m_dataSource"  
  8.         class="org.springframework.jdbc.datasource.DriverManagerDataSource">  
  9.         <property name="driverClassName" value="com.mysql.jdbc.Driver" />  
  10.         <property name="url" value="jdbc:mysql://127.0.0.1:3306/zero" />  
  11.         <property name="username" value="root/>  
  12.         <property name="password" value="123456" />  
  13.     </bean>  
  14.   
  15.     <bean id="s_dataSource"  
  16.         class="org.springframework.jdbc.datasource.DriverManagerDataSource">  
  17.         <property name="driverClassName" value="com.mysql.jdbc.Driver" />  
  18.         <property name="url"  
  19.             value="jdbc:mysql://127.0.0.1:3306/zero_test" />  
  20.         <property name="username" value="root" />  
  21.         <property name="password" value="123456" />  
  22.     </bean>  
  23.   
  24.     <bean id="dynamicDataSource" class="com.zero.springjdbc.DynamicDataSource">  
  25.         <property name="targetDataSources">  
  26.             <map key-type="java.lang.String">  
  27.                 <!-- write -->  
  28.                 <entry key="master" value-ref="m_dataSource" />  
  29.                 <!-- read -->  
  30.                 <entry key="slave" value-ref="s_dataSource" />  
  31.             </map>  
  32.         </property>  
  33.         <property name="defaultTargetDataSource" ref="m_dataSource" />  
  34.     </bean>  
  35.   
  36.   
  37.     <bean id="zeroDaoImpl" class="com.zero.springjdbc.ZeroDaoImpl">  
  38.         <property name="dataSource" ref="dynamicDataSource"></property>  
  39.     </bean>  
  40.   
  41. </beans>  

       以上的beans.xml中,配置了两个DataSource,m_dataSource与s_dataSource,然后将这两个DataSource交给com.zero.springjdbc.DynamicDataSource管理,由于com.zero.springjdbc.DynamicDataSource是继承了AbstractRoutingDataSource,所以给它的两个属性targetDataSources、defaultTargetDataSource注入值。而DAO层的dataSource属性所使用的是dynamicDataSource。

DynamicDataSourceHolder.java

[java] view plain copy 
  1. package com.zero.springjdbc;  
  2.   
  3. public class DynamicDataSourceHolder {  
  4.     public static final ThreadLocal<String> holder = new ThreadLocal<String>();  
  5.   
  6.     public static void setDataSource(String name) {  
  7.         holder.set(name);  
  8.     }  
  9.   
  10.     public static String getDataSouce() {  
  11.         return holder.get();  
  12.     }  
  13. }  

       这里利用了ThreadLocal,来指明每个线程在进行数据库操作时所用到的数据库。

DynamicDataSource.java

[java] view plain copy 
  1. package com.zero.springjdbc;  
  2.   
  3. import java.sql.SQLFeatureNotSupportedException;  
  4. import java.util.logging.Logger;  
  5.   
  6. import org.springframework.jdbc.datasource.lookup.AbstractRoutingDataSource;  
  7.   
  8. public class DynamicDataSource extends AbstractRoutingDataSource{  
  9.   
  10.     @Override  
  11.     protected Object determineCurrentLookupKey() {  
  12.         // TODO Auto-generated method stub  
  13.         return DynamicDataSourceHolder.getDataSouce();  
  14.     }  
  15.   
  16.     @Override  
  17.     public Logger getParentLogger() throws SQLFeatureNotSupportedException {  
  18.         // TODO Auto-generated method stub  
  19.         return null;  
  20.     }  
  21. }  

ZeroDaoImpl.java

[java] view plain copy 
  1. package com.zero.springjdbc;  
  2.   
  3. import org.springframework.jdbc.core.JdbcTemplate;  
  4.   
  5. public class ZeroDaoImpl extends JdbcTemplate {  
  6.   
  7.     public void update1() {  
  8.         DynamicDataSourceHolder.setDataSource("master");  
  9.         String sql01 = "update lineitem_record set fee=111111 where lineitemId='11238'";  
  10.         System.out.println("update1 lineitemId='11238' : "  
  11.                 + this.update(sql01));  
  12.           
  13.         String sql02 = "update lineitem_record set fee=111111 where lineitemId='111'";  
  14.         System.out.println("update1 lineitemId='111' : "  
  15.                 + this.update(sql02));  
  16.     }  
  17.   
  18.     public void update2() {  
  19.         try {  
  20.               
  21.             DynamicDataSourceHolder.setDataSource("slave");  
  22.             String sql01 = "update lineitem_record set fee=111111 where lineitemId='11238'";  
  23.             System.out.println("update2 lineitemId='11238' : "  
  24.                     + this.update(sql01));  
  25.               
  26.             String sql02 = "update lineitem_record set fee=222222 where lineitemId='111'";  
  27.             System.out.println("update2 lineitemId='111' : "  
  28.                     + this.update(sql02));  
  29.         } catch (Exception e) {  
  30.             // TODO: handle exception  
  31.             e.printStackTrace();  
  32.         }  
  33.     }  
  34. }  

       这里的每个方法都DynamicDataSourceHolder.setDataSource(),来指明接下来的操作将使用的是什么数据库,由于被ThreadLocal隔离了,所以是线程安全的。

Test.java

[java] view plain copy 
  1. package com.zero.springjdbc;  
  2.   
  3. import org.springframework.context.ApplicationContext;  
  4. import org.springframework.context.support.ClassPathXmlApplicationContext;  
  5.   
  6. public class Test {  
  7.   
  8.     public static void main(String[] args) {  
  9.         // TODO Auto-generated method stub  
  10.         ApplicationContext ctx = new ClassPathXmlApplicationContext(  
  11.                 "/com/zero/springjdbc/beans.xml");  
  12.         ZeroDaoImpl zeroDaoImpl = (ZeroDaoImpl) ctx.getBean("zeroDaoImpl");  
  13.         for (int i = 0; i < 5; i++) {  
  14.             new Thread(new Runnable() {  
  15.   
  16.                 @Override  
  17.                 public void run() {  
  18.                     // TODO Auto-generated method stub  
  19.                     zeroDaoImpl.update1();  
  20.                 }  
  21.             }, "thread1-" + i).start();  
  22.             ;  
  23.             new Thread(new Runnable() {  
  24.   
  25.                 @Override  
  26.                 public void run() {  
  27.                     // TODO Auto-generated method stub  
  28.                     zeroDaoImpl.update2();  
  29.                 }  
  30.             }, "thread2-" + i).start();  
  31.             ;  
  32.         }  
  33.   
  34.     }  
  35.   
  36. }  

      这里建了10个线程,每个线程执行zeroDaoImpl.update1()或zeroDaoImpl.update2()方法来检验是否是线程安全的。
      PS:master的lineitem_record表中有lineitemId='11238',没有lineitemId='111';而slave的lineitem_record表中没有lineitemId='11238',有lineitemId='111'。预期结果是update1 lineitemId='11238' 的值是1,而update1 lineitemId='111'的值是0,update2 lineitemId='11238' 的值是0,而update2 lineitemId='111'的值是1。如果不是这样,那么就说明在多线程环境下,依然会出现方式二所面临的问题。
测试结果:

[java] view plain copy 
  1. update2 lineitemId='11238' : 0  
  2. update1 lineitemId='11238' : 1  
  3. update2 lineitemId='11238' : 0  
  4. update1 lineitemId='11238' : 1  
  5. update2 lineitemId='11238' : 0  
  6. update1 lineitemId='11238' : 1  
  7. update1 lineitemId='11238' : 1  
  8. update1 lineitemId='11238' : 1  
  9. update2 lineitemId='11238' : 0  
  10. update1 lineitemId='111' : 0  
  11. update2 lineitemId='111' : 1  
  12. update1 lineitemId='111' : 0  
  13. update1 lineitemId='111' : 0  
  14. update2 lineitemId='111' : 1  
  15. update2 lineitemId='111' : 1  
  16. update1 lineitemId='111' : 0  
  17. update2 lineitemId='11238' : 0  
  18. update2 lineitemId='111' : 1  
  19. update1 lineitemId='111' : 0  
  20. update2 lineitemId='111' : 1  

      由结果可知,利用Spring的AbstractRoutingDataSource可以解决多数据源的问题

贴上AbstractRoutingDataSource源码:

[java] view plain copy 
    1. package org.springframework.jdbc.datasource.lookup;  
    2.   
    3. import java.sql.Connection;  
    4. import java.sql.SQLException;  
    5. import java.util.HashMap;  
    6. import java.util.Map;  
    7. import javax.sql.DataSource;  
    8.   
    9. import org.springframework.beans.factory.InitializingBean;  
    10. import org.springframework.jdbc.datasource.AbstractDataSource;  
    11. import org.springframework.util.Assert;  
    12.   
    13. public abstract class AbstractRoutingDataSource extends AbstractDataSource implements InitializingBean {  
    14.   
    15.     private Map<Object, Object> targetDataSources;  
    16.   
    17.     private Object defaultTargetDataSource;  
    18.   
    19.     private boolean lenientFallback = true;  
    20.   
    21.     private DataSourceLookup dataSourceLookup = new JndiDataSourceLookup();  
    22.   
    23.     private Map<Object, DataSource> resolvedDataSources;  
    24.   
    25.     private DataSource resolvedDefaultDataSource;  
    26.   
    27.     public void setTargetDataSources(Map<Object, Object> targetDataSources) {  
    28.         this.targetDataSources = targetDataSources;  
    29.     }  
    30.   
    31.     public void setDefaultTargetDataSource(Object defaultTargetDataSource) {  
    32.         this.defaultTargetDataSource = defaultTargetDataSource;  
    33.     }  
    34.   
    35.     public void setLenientFallback(boolean lenientFallback) {  
    36.         this.lenientFallback = lenientFallback;  
    37.     }  
    38.   
    39.     public void setDataSourceLookup(DataSourceLookup dataSourceLookup) {  
    40.         this.dataSourceLookup = (dataSourceLookup != null ? dataSourceLookup : new JndiDataSourceLookup());  
    41.     }  
    42.   
    43.   
    44.     public void afterPropertiesSet() {  
    45.         if (this.targetDataSources == null) {  
    46.             throw new IllegalArgumentException("Property 'targetDataSources' is required");  
    47.         }  
    48.         this.resolvedDataSources = new HashMap<Object, DataSource>(this.targetDataSources.size());  
    49.         for (Map.Entry entry : this.targetDataSources.entrySet()) {  
    50.             Object lookupKey = resolveSpecifiedLookupKey(entry.getKey());  
    51.             DataSource dataSource = resolveSpecifiedDataSource(entry.getValue());  
    52.             this.resolvedDataSources.put(lookupKey, dataSource);  
    53.         }  
    54.         if (this.defaultTargetDataSource != null) {  
    55.             this.resolvedDefaultDataSource = resolveSpecifiedDataSource(this.defaultTargetDataSource);  
    56.         }  
    57.     }  
    58.   
    59.     protected Object resolveSpecifiedLookupKey(Object lookupKey) {  
    60.         return lookupKey;  
    61.     }  
    62.   
    63.     protected DataSource resolveSpecifiedDataSource(Object dataSource) throws IllegalArgumentException {  
    64.         if (dataSource instanceof DataSource) {  
    65.             return (DataSource) dataSource;  
    66.         }  
    67.         else if (dataSource instanceof String) {  
    68.             return this.dataSourceLookup.getDataSource((String) dataSource);  
    69.         }  
    70.         else {  
    71.             throw new IllegalArgumentException(  
    72.                     "Illegal data source value - only [javax.sql.DataSource] and String supported: " + dataSource);  
    73.         }  
    74.     }  
    75.   
    76.   
    77.     public Connection getConnection() throws SQLException {  
    78.         return determineTargetDataSource().getConnection();  
    79.     }  
    80.   
    81.     public Connection getConnection(String username, String password) throws SQLException {  
    82.         return determineTargetDataSource().getConnection(username, password);  
    83.     }  
    84.   
    85.     @Override  
    86.     @SuppressWarnings("unchecked")  
    87.     public <T> T unwrap(Class<T> iface) throws SQLException {  
    88.         if (iface.isInstance(this)) {  
    89.             return (T) this;  
    90.         }  
    91.         return determineTargetDataSource().unwrap(iface);  
    92.     }  
    93.   
    94.     @Override  
    95.     public boolean isWrapperFor(Class<?> iface) throws SQLException {  
    96.         return (iface.isInstance(this) || determineTargetDataSource().isWrapperFor(iface));  
    97.     }  
    98.   
    99.     protected DataSource determineTargetDataSource() {  
    100.         Assert.notNull(this.resolvedDataSources, "DataSource router not initialized");  
    101.         Object lookupKey = determineCurrentLookupKey();  
    102.         DataSource dataSource = this.resolvedDataSources.get(lookupKey);  
    103.         if (dataSource == null && (this.lenientFallback || lookupKey == null)) {  
    104.             dataSource = this.resolvedDefaultDataSource;  
    105.         }  
    106.         if (dataSource == null) {  
    107.             throw new IllegalStateException("Cannot determine target DataSource for lookup key [" + lookupKey + "]");  
    108.         }  
    109.         return dataSource;  
    110.     }  
    111.   
    112.     protected abstract Object determineCurrentLookupKey();  
    113.   
    114. }