分表
Sharding-JDBC
此处采用自定义分表策略,如分表策略较为简单(id/时间/hashMod),可直接在配置文件中用表达式完成分表,无需配置自定义规则。
配置方式见官方文档:
/document/legacy//document/cn/manual/sharding-jdbc/configuration/config-spring-boot/
1.引入依赖
<dependency>
<groupId>org.apache.shardingsphere</groupId>
<artifactId>sharding-jdbc-spring-boot-starter</artifactId>
<version>4.1.1</version>
</dependency>
2.数据源及分表原则配置
#启动项目报bean冲突配置
spring.main.allow-bean-definition-overriding=true
#分片sql打印
spring.shardingsphere.props.sql.show=true
#数据源名
spring.shardingsphere.datasource.names=master
#连接池
spring.shardingsphere.datasource.master.type=com.alibaba.druid.pool.DruidDataSource
#数据库驱动及连接配置
spring.shardingsphere.datasource.master.driver-class-name=com.mysql.cj.jdbc.Driver
spring.shardingsphere.datasource.master.url=jdbc:mysql://xxx.xxx.xxx.xxx:3306/xxx?serverTimezone=UTC&allowMultiQueries=true&useSSL=false
spring.shardingsphere.datasource.master.username=root
spring.shardingsphere.datasource.master.password=123456
#分表集合 此处表示共60张表 后缀为0-60
spring.shardingsphere.sharding.tables.t_material_produce_report.actual-data-nodes=master.t_material_produce_report_$->{0..60}
#分表键 包含分表键的sql会走分表原则 否则走全表
spring.shardingsphere.sharding.tables.t_material_produce_report.table-strategy.standard.sharding-column=material_production_code
#分表策略 此处为自定义分表策略
spring.shardingsphere.sharding.tables.t_material_produce_report.table-strategy.standard.precise-algorithm-class-name=com.example.plus.config.MesShardingRule
3.分表策略配置
package com.example.plus.config;
import org.apache.shardingsphere.api.sharding.standard.PreciseShardingAlgorithm;
import org.apache.shardingsphere.api.sharding.standard.PreciseShardingValue;
import org.springframework.stereotype.Component;
import java.util.Collection;
/**
* @Author:
* @Description: mes分表规则
* 表前缀 t_material_produce_report_
* 表后缀 1-27 为 一个机种
* 表后缀 28-60 为 一个机种
* 表后缀 0 为 不符合上述规则的数据
* 分表键 material_production_code
* 分表策略 对分表键判断机种后hash取模
* @Date: create in 2022/6/30 14:18
*/
@Component
public class MesShardingRule implements PreciseShardingAlgorithm<String> {
private static final int SHARDING_NUM_ONE = 27; // 1907 & 1908 1-27
private static final int SHARDING_NUM_TWO = 33; // 2576 & 2577 33-60
private static final String PREFIX = "t_material_produce_report_"; // 前缀
private static final String DIRTY_DATA_TABLE = "t_material_produce_report_0"; // 脏数据
@Override
public String doSharding(Collection<String> collection, PreciseShardingValue<String> preciseShardingValue) {
String value = preciseShardingValue.getValue();
return getTableName(value);
}
private String getTableName(String value){
String tableName = PREFIX + "error";
if(value.contains("MNZ2")||value.contains("ZNZ2")||value.contains("PYKZN")||value.contains("PYKMN")){
// 1908
tableName = PREFIX+(Math.abs(value.hashCode()) % SHARDING_NUM_ONE + 1);
}else if(value.contains("MNZ1")||value.contains("ZNZ1")){
// 1907
tableName = PREFIX+(Math.abs(value.hashCode()) % SHARDING_NUM_ONE + 1);
}else if(value.contains("H1YB")||value.contains("H1YL")||value.contains("H2YB")||value.contains("H2YL")){
tableName = PREFIX+(Math.abs(value.hashCode()) % SHARDING_NUM_TWO + SHARDING_NUM_ONE + 1);
}else{
tableName = DIRTY_DATA_TABLE;
}
return tableName;
}
}
继承PreciseShardingAlgorithm,并重写doSharding方法即可,doSharding返回值为指定表名,参数preciseShardingValue为分表键值,可依据preciseShardingValue值来实现分表逻辑,使sql操作落到指定表中,实现分表功能。
此处为hash取模分表。
4.分表原则
分表原则一般为自增主键/时间或关键字段hash取模。
自增键易于扩容,数据分布均匀,但热点数据容易落在单表上。
时间分表,同样易于扩容,存在热点数据访问问题,新数据访问频次高于旧数据,分表数据不一定均匀,由时间段内业务决定。
hash取模,用常用查询条件hashcode除分表数取模分表,数据分表均匀,但扩容麻烦,需要将旧数据重新取模分表,适用于确定需承载数据量时使用。
5.使用
项目整合了mybatis-plus,直接使用即可。
/**
* 分表键检索
* @param code
* @return
*/
@RequestMapping("/getListByCode")
@ResponseBody
public List<MaterialProduceReport> getListByCode(String code){
QueryWrapper<MaterialProduceReport> queryWrapper = new QueryWrapper<>();
queryWrapper.eq("material_production_code",code);
return materialProduceReportMapper.selectList(queryWrapper);
}
查询条件中带有分表键,会依据设定的分表策略确定去哪张子表查询。
查询条件中不带分表键,会走全表查询。
6.注意事项
1.使用mybatis/mybatis-plus的分页插件查询mysql数据库时,当分表数过多且单表数据量过大时,会出现页码越往后查询速度越慢的情况,由于分页是基于limit实现,查询结果为全字段,分页sql为 :
SELECT * XXX WHERE ... LIMIT 80000,20;
此时*中的字段含有非索引字段,导致查询效率极低。
代码中可进行改写
@RequestMapping("/listByPaging")
@ResponseBody
public IPage<MaterialProduceReport> listByPaging(int current,int size){
Page<MaterialProduceReport> page = new Page<>(current,size);
QueryWrapper<MaterialProduceReport> queryWrapper = new QueryWrapper<>();
queryWrapper.orderByAsc("id");
IPage<MaterialProduceReport> iPage = materialProduceReportMapper.selectPage(page,queryWrapper);
return iPage;
}
改写为
/**
* 分页检索全部数据
* @param current
* @param size
* @return
*/
@RequestMapping("/listByPaging")
@ResponseBody
public IPage<MaterialProduceReport> listByPaging(int current,int size){
Page<MaterialProduceReport> page = new Page<>(current,size);
QueryWrapper<MaterialProduceReport> queryWrapper = new QueryWrapper<>();
queryWrapper.orderByAsc("id");
queryWrapper.select("id","material_production_code");
IPage<MaterialProduceReport> iPage = materialProduceReportMapper.selectPage(page,queryWrapper);
List<MaterialProduceReport> list = new ArrayList<>();
for(MaterialProduceReport materialProduceReport : iPage.getRecords()){
QueryWrapper<MaterialProduceReport> queryWrapperItem = new QueryWrapper<>();
queryWrapperItem.eq("material_production_code",materialProduceReport.getMaterialProductionCode());
queryWrapperItem.eq("id",materialProduceReport.getId());
materialProduceReport = materialProduceReportMapper.selectOne(queryWrapperItem);
list.add(materialProduceReport);
}
iPage.setRecords(list);
return iPage;
}
先分页查询出建立索引的主键字段及分表字段,再依据主键及分表字段查询所有字段,此时sql为
SELECT id,material_production_code XXX WHERE ... LIMIT 80000,20;
查询效率可以得到提升。
2.启动时报错,但不影响项目运行,消除报错。
仅在整合到现行系统中出现,可能是版本问题,未深究。
异常内容:
org.springframework.dao.InvalidDataAccessApiUsageException: ConnectionCallback; isValid; nested exception is java.sql.SQLFeatureNotSupportedException: isValid
at org.springframework.jdbc.support.SQLExceptionSubclassTranslator.doTranslate(SQLExceptionSubclassTranslator.java:96)
at org.springframework.jdbc.support.AbstractFallbackSQLExceptionTranslator.translate(AbstractFallbackSQLExceptionTranslator.java:70)
at org.springframework.jdbc.support.AbstractFallbackSQLExceptionTranslator.translate(AbstractFallbackSQLExceptionTranslator.java:79)
at org.springframework.jdbc.core.JdbcTemplate.translateException(JdbcTemplate.java:1541)
at org.springframework.jdbc.core.JdbcTemplate.execute(JdbcTemplate.java:342)
at org.springframework.boot.actuate.jdbc.DataSourceHealthIndicator.isConnectionValid(DataSourceHealthIndicator.java:130)
at org.springframework.boot.actuate.jdbc.DataSourceHealthIndicator.doDataSourceHealthCheck(DataSourceHealthIndicator.java:116)
at org.springframework.boot.actuate.jdbc.DataSourceHealthIndicator.doHealthCheck(DataSourceHealthIndicator.java:100)
at org.springframework.boot.actuate.health.AbstractHealthIndicator.health(AbstractHealthIndicator.java:82)
at org.springframework.boot.actuate.health.HealthIndicator.getHealth(HealthIndicator.java:37)
at org.springframework.boot.actuate.health.HealthEndpoint.getHealth(HealthEndpoint.java:71)
at org.springframework.boot.actuate.health.HealthEndpoint.getHealth(HealthEndpoint.java:39)
at org.springframework.boot.actuate.health.HealthEndpointSupport.getContribution(HealthEndpointSupport.java:99)
at org.springframework.boot.actuate.health.HealthEndpointSupport.getAggregateHealth(HealthEndpointSupport.java:110)
at org.springframework.boot.actuate.health.HealthEndpointSupport.getContribution(HealthEndpointSupport.java:96)
at org.springframework.boot.actuate.health.HealthEndpointSupport.getHealth(HealthEndpointSupport.java:74)
at org.springframework.boot.actuate.health.HealthEndpointSupport.getHealth(HealthEndpointSupport.java:61)
at org.springframework.boot.actuate.health.HealthEndpoint.health(HealthEndpoint.java:65)
at org.springframework.boot.actuate.health.HealthEndpoint.health(HealthEndpoint.java:55)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:282)
at org.springframework.boot.actuate.endpoint.invoke.reflect.ReflectiveOperationInvoker.invoke(ReflectiveOperationInvoker.java:77)
at org.springframework.boot.actuate.endpoint.annotation.AbstractDiscoveredOperation.invoke(AbstractDiscoveredOperation.java:60)
at org.springframework.boot.actuate.endpoint.jmx.EndpointMBean.invoke(EndpointMBean.java:121)
at org.springframework.boot.actuate.endpoint.jmx.EndpointMBean.invoke(EndpointMBean.java:96)
at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.invoke(DefaultMBeanServerInterceptor.java:819)
at com.sun.jmx.mbeanserver.JmxMBeanServer.invoke(JmxMBeanServer.java:801)
at javax.management.remote.rmi.RMIConnectionImpl.doOperation(RMIConnectionImpl.java:1468)
at javax.management.remote.rmi.RMIConnectionImpl.access$300(RMIConnectionImpl.java:76)
at javax.management.remote.rmi.RMIConnectionImpl$PrivilegedOperation.run(RMIConnectionImpl.java:1309)
at javax.management.remote.rmi.RMIConnectionImpl.doPrivilegedOperation(RMIConnectionImpl.java:1401)
at javax.management.remote.rmi.RMIConnectionImpl.invoke(RMIConnectionImpl.java:829)
at sun.reflect.GeneratedMethodAccessor136.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at sun.rmi.server.UnicastServerRef.dispatch(UnicastServerRef.java:346)
at sun.rmi.transport.Transport$1.run(Transport.java:200)
at sun.rmi.transport.Transport$1.run(Transport.java:197)
at java.security.AccessController.doPrivileged(Native Method)
at sun.rmi.transport.Transport.serviceCall(Transport.java:196)
at sun.rmi.transport.tcp.TCPTransport.handleMessages(TCPTransport.java:568)
at sun.rmi.transport.tcp.TCPTransport$ConnectionHandler.run0(TCPTransport.java:826)
at sun.rmi.transport.tcp.TCPTransport$ConnectionHandler.lambda$run$0(TCPTransport.java:683)
at java.security.AccessController.doPrivileged(Native Method)
at sun.rmi.transport.tcp.TCPTransport$ConnectionHandler.run(TCPTransport.java:682)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.sql.SQLFeatureNotSupportedException: isValid
at org.apache.shardingsphere.shardingjdbc.jdbc.unsupported.AbstractUnsupportedOperationConnection.isValid(AbstractUnsupportedOperationConnection.java:157)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.jdbc.core.JdbcTemplate$CloseSuppressingInvocationHandler.invoke(JdbcTemplate.java:1614)
at com.sun.proxy.$Proxy313.isValid(Unknown Source)
at org.springframework.boot.actuate.jdbc.DataSourceHealthIndicator.isConnectionValid(DataSourceHealthIndicator.java:134)
at org.springframework.jdbc.core.JdbcTemplate.execute(JdbcTemplate.java:334)
... 46 common frames omitted
解决方法,创建配置类:
import com.taobao.arthas.core.util.StringUtils;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.actuate.autoconfigure.jdbc.DataSourceHealthContributorAutoConfiguration;
import org.springframework.boot.actuate.health.AbstractHealthIndicator;
import org.springframework.boot.actuate.jdbc.DataSourceHealthIndicator;
import org.springframework.boot.jdbc.metadata.DataSourcePoolMetadataProvider;
import org.springframework.context.annotation.Configuration;
import javax.sql.DataSource;
import java.util.Map;
/**
* @Author:
* @Description: 整合sharding-jdbc启动异常
* @Date: create in 2022/7/8 8:54
*/
@Configuration
public class DataSourceHealthConfig extends DataSourceHealthContributorAutoConfiguration {
// @Value("${:select 1}")
@Value("${.-query:select 1}")
private String defaultQuery;
public DataSourceHealthConfig(Map<String, DataSource> dataSources, ObjectProvider<DataSourcePoolMetadataProvider> metadataProviders) {
super(dataSources, metadataProviders);
}
@Override
protected AbstractHealthIndicator createIndicator(DataSource source) {
DataSourceHealthIndicator indicator = (DataSourceHealthIndicator) super.createIndicator(source);
if (!StringUtils.hasText(indicator.getQuery())) {
indicator.setQuery(defaultQuery);
}
return indicator;
}
}
7.现行系统分表改造
现有系统为单表设计,考虑到数据量增加过多,需要进行分表改造。
方案如下
1.导出现有数据(500w左右) 30 m
2.导入到备用表 60 m
3.分备用表数据 10 h
4.正式环境建立分表 导入备份分表数据
5.正式环境建立临时表
需停机 6.停机 部署新代码(所有操作走分表)
需停机 7.导出步骤1之后新增的数据至临时表(5w左右) 5 m
需停机 8.分临时表数据至分表 10 m
9.启动项目
mybatis-plus
mybatis-plus动态表名也能实现分表功能,且较为简单。但是好像不能走全表查询(?),局限性较高,未采用。
1.设置分表原则
package com.example.plus.config;
import com.baomidou.mybatisplus.extension.plugins.handler.TableNameHandler;
/**
* @Author:
* @Description:
* @Date: create in 2022/6/27 9:06
*/
public class IdModTableNameParser implements TableNameHandler {
private Integer mod;
private static ThreadLocal<Integer> id = new ThreadLocal<Integer>();
public static void setId(Integer idValue) {
id.set(idValue);
}
IdModTableNameParser(Integer modValue) {
this.mod = modValue;
}
@Override
public String dynamicTableName(String sql, String tableName) {
Integer idValue = id.get();
if (idValue == null) {
throw new RuntimeException("请设置id值");
} else {
String suffix = String.valueOf(idValue % mod);
//这里清除ThreadLocal的值,防止线程复用出现问题
id.set(null);
return tableName + "_" + suffix;
}
}
}
继承TableNameHandler ,重写dynamicTableName方法,返回值为分表名。
2.配置分表原则生效
package com.example.plus.config;
import com.baomidou.mybatisplus.annotation.DbType;
import com.baomidou.mybatisplus.extension.plugins.MybatisPlusInterceptor;
import com.baomidou.mybatisplus.extension.plugins.handler.TableNameHandler;
import com.baomidou.mybatisplus.extension.plugins.inner.BlockAttackInnerInterceptor;
import com.baomidou.mybatisplus.extension.plugins.inner.DynamicTableNameInnerInterceptor;
import com.baomidou.mybatisplus.extension.plugins.inner.PaginationInnerInterceptor;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
/**
* @Author:
* @Description:
* @Date: create in 2022/6/27 9:15
*/
@Configuration
@MapperScan(basePackages = "")
public class MybatisConfig {
@Bean
public MybatisPlusInterceptor mybatisPlusInterceptor() {
MybatisPlusInterceptor interceptor = new MybatisPlusInterceptor();
// 分页插件
PaginationInnerInterceptor paginationInnerInterceptor = new PaginationInnerInterceptor(DbType.MYSQL);
paginationInnerInterceptor.setMaxLimit(500L);
paginationInnerInterceptor.setOverflow(true);
interceptor.addInnerInterceptor(paginationInnerInterceptor);
// 防止全局更新删除
interceptor.addInnerInterceptor(new BlockAttackInnerInterceptor());
DynamicTableNameInnerInterceptor dynamicTableNameInnerInterceptor = new DynamicTableNameInnerInterceptor();
HashMap<String, TableNameHandler> map = new HashMap<String, TableNameHandler>();
//这里为不同的表设置对应表名处理器
// ("user_daily_record", new DaysTableNameParser());
map.put("t_log", new IdModTableNameParser(3));
dynamicTableNameInnerInterceptor.setTableNameHandlerMap(map);
interceptor.addInnerInterceptor(dynamicTableNameInnerInterceptor);
return interceptor;
}
}
map的key为逻辑表名,value为分表原则。
3.使用
@RequestMapping("/insert")
@ResponseBody
public void insert(String id){
IdModTableNameParser.setId(Integer.valueOf(id));
LogEntity logEntity = new LogEntity();
logEntity.setOid(id).setDuration("1").setEndTime("2022-06-24 16:50:30").setStartTime("2022-06-24 16:50:30").setErrorMessage("1").setEventName("1");
testMapper.insert(logEntity);
}