多线程查询数据,将结果存入到redis中,最后批量从redis中取数据批量插入数据库中【我】

时间:2022-06-30 16:24:50

多线程查询数据,将结果存入到redis中,最后批量从redis中取数据批量插入数据库中

package com.xxx.xx.reve.service;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service; import com.xxx.xx.common.service.ParentService;
import com.xxx.xx.redis.service.JedisClient;
import com.xxx.xx.reve.vo.RevenueSt;
import com.xxx.xx.util.PropertieUtil; import net.sf.json.JSONObject; @Service
@SuppressWarnings("rawtypes")
public class RevenueStServiceImpl2 extends ParentService implements RevenueStService{
private final static Logger logger = LoggerFactory.getLogger(RevenueStServiceImpl.class); @Autowired
private JedisClient jedisClient; //预处理对象转String方法
private String o2s(Object o) {
if(o!=null&&!"".equals(o)&&!"null".equals((o+"").trim().toLowerCase())) {
return o+"";
}
return null;
} //生成总收入查询参数
private String genAllIncomeParam(String s) {
StringBuffer sb = new StringBuffer(); for (int i = 1; i <= getMonth(s); i++) {
sb.append("SUM(TY_"+i+")+");
}
String s2 = sb.toString();
String res = s2.substring(0, s2.length()-1);
logger.info("总收入查询参数:{}",res);
return res;
} //截取字符串获取月份信息 "201912";
private Integer getMonth(String s) {
//截取后两位
s = s.substring(s.length() -2,s.length());
char c1 = s.charAt(0);
char c2 = s.charAt(1);
if ((c1+"").equals("0")) {
return Integer.parseInt(c2+"");
}
return Integer.parseInt(s);
} //线程池
private ExecutorService exec = Executors.newFixedThreadPool(Integer.parseInt(PropertieUtil.getConfig("revenue.threadNum")));
//redis中缓存队列key
private String redisKeyName = "EVENUE_STATISTICS_LIST"; //获取统计数据方法
@SuppressWarnings("unchecked")
@Override
public void getStatisticsData() { //清空缓存
jedisClient.del(redisKeyName);
//普通查询参数
Map param = new HashMap();
param.put("tablename","REVENUE_STATISTICS_RES");
//将原表数据备份到历史表
Integer countRevenueNum = this.selectOne("ds2", "reve.countRevenueNum", param);
logger.info("----历史表revenue_statistics_res数据量为{}",countRevenueNum);
if (countRevenueNum!=null && countRevenueNum >Integer.parseInt(PropertieUtil.getConfig("revenue.copyLimitNum"))) {
//删除历史表
try {
this.update("ds2", "reve.dropHisTable", param);
logger.info("----历史表revenue_statistics_res_his删除成功");
} catch (Exception e) {
logger.info("----历史表revenue_statistics_res_his不存在");
}
//创建历史表并复制数据
// CREATE TABLE revenue_statistics_res_his AS SELECT * FROM revenue_statistics_res
this.update("ds2", "reve.copyToHisTable", param);
logger.info("----历史表revenue_statistics_res_his数据复制成功");
}
// 清空原表
// truncate table ${tablename}
this.update("ds2", "reve.truncateTable", param);
//记录开始时间
long t1 = System.currentTimeMillis();
//准备插入结果表对象列表
List<RevenueSt> preInsertList = new ArrayList<RevenueSt>();
// 产品分类:10:移动;20:宽带;30:固化;40:电路出租;50:网元出租;-1:其他;
Map<String, String> productMap = new HashMap<String,String>();
productMap.put("10", "移动");
productMap.put("20", "宽带");
productMap.put("30", "固话");
productMap.put("40", "电路");
productMap.put("50", "网元");
productMap.put("-1", "其他");
//查询eda表的参数
Map ep = new HashMap();
//查询行业列表,得到所有行业信息,即 行业id,父id,级别
List<Map> industryList = this.selectList("ds2", "reve.queryAllIndusty",ep);
//全部人数
Integer allTotalNum = this.selectOne("ds2", "reve.queryAllTotal",ep);
//当前最大账期(应该有收入数据的年月)
String accountDay = this.selectOne("ds2", "reve.getMaxAccountDay",ep);
logger.info("查询到当前最新账期:{}",accountDay);
String genAllIncomeParam = genAllIncomeParam(accountDay);
logger.info("拼接完当前最新账期查询参数:{}",genAllIncomeParam);
String sql2 = "SELECT /*+ PARALLEL(12) */ ("+genAllIncomeParam+") FROM EDA_CUST_INC"; //查询本年累计全部客户收入
String allIncome = this.selectOne("ds2", "reve.queryAllIncome",sql2);
//获取省份列表,获取省份对应的Nub,eda表中只有Nub
List<Map> provList = this.selectList("ds2", "reve.queryTotalNum",param);
//获取省份对应的本地网列表
List<Map> cityList = new ArrayList<Map>();
for (Map map : provList) {
param.put("COMMON_REGION_ID",o2s(map.get("REGION_ID")));
Map provInfo = this.selectOne("ds2", "reve.queryRegionNbr",param);
String REGION_ID = o2s(map.get("REGION_ID"));
List<Map> subCity = this.selectList("ds2", "reve.querySubCity",REGION_ID);
for (Map city : subCity) {
city.put("PAR_NBR", provInfo.get("REGION_NBR"));
}
cityList.addAll(subCity);
} //市级数据(本地网)--------
//遍历市列表获取统计数据
for (Map city : cityList) {
//产品
for (String produce : productMap.keySet()) {
//行业
for (Map industry : industryList) {
ep.clear();
//查询参数:地区行业
ep.put("STD_PRVNCE_CD", o2s(city.get("PAR_NBR")));
ep.put("STD_LATN_CD", o2s(city.get("REGION_NBR")));
ep.put("PROD_TYPE", produce);
//加入行业类型相关参数
ep.putAll(industry);
//返回数据
ep.put("PROVINCE_REGION_ID", o2s(city.get("PAR_REGION_ID")));
ep.put("CITY_REGION_ID", o2s(city.get("COMMON_REGION_ID")));
ep.put("REGION_NAME", o2s(city.get("REGION_NAME")));
//存入公共数据
ep.put("ALL_CUST_NUM", allTotalNum);
ep.put("ALL_INCOME", allIncome);
//查询参数:身份证临时
ep.put("IDENTITY_TYPE", 1);
addVo(ep,preInsertList);
//查询参数:身份证正式
ep.put("IDENTITY_TYPE", 2);
addVo(ep,preInsertList);
}
}
} exec.shutdown();
while (true) {
if (exec.isTerminated()) {
logger.info("--》--收入统计获取数据,所有子线程都结束了,共计耗时:{}",(System.currentTimeMillis()-t1));
break;
}
}
//批量插入数据库
insertBatchRevenue();
} //从redis中批量获取数据,批量插入统计结果表,
private void insertBatchRevenue() {
//每次取出数量
int onceNum = Integer.parseInt(PropertieUtil.getConfig("revenue.batchNum"));
//统计个数
int sum = 0;
//开始角标
int startIndex = 0;
//步进
int step = onceNum-1;
//结束角标
int endIndex = 0;
// 按照范围取,每次取出onceNum条
for (int i = 1; ; i++) {
endIndex = startIndex+step;
logger.info("----第"+i+"次取数据,角标起始:"+startIndex+"---"+endIndex);
List<String> lrange = jedisClient.lrange(redisKeyName, startIndex, endIndex);
//如果取完了退出循环
if (lrange==null || lrange.size()==0) {
break;
}
//统计计数
sum += lrange.size();
//插入数据库
//遍历lrange,转成Vo,放入新的list中,插入数据库
//判断当前的表是哪个表,执行对应的一个表的批量插入逻辑
try {
List<RevenueSt> paramList = new ArrayList<RevenueSt>();
for (String s : lrange) {
RevenueSt vo = (RevenueSt)JSONObject.toBean(JSONObject.fromObject(s), RevenueSt.class);
paramList.add(vo);
}
long t1 = System.currentTimeMillis();
//批量插入结果表
this.insert("ds2", "reve.insertRevenue", paramList);
logger.info("----第"+i+"次取数据,插入完成,耗时毫秒:"+(System.currentTimeMillis()-t1));
} catch (Exception e) {
logger.error("----批量统计数据插入发生错误,当前队列名:{},批次起始角标:{}~{},异常详情:{}",redisKeyName,startIndex,endIndex,e);
e.printStackTrace();
}
//起始位置
startIndex = endIndex+1;
}
logger.info("----插入完成,共插入:{} 条记录",sum);
} //复制map,为多线程准备
private Map copyMap(Map<String, Object> oldMap) {
HashMap pMap = new HashMap();
for ( Map.Entry<String,Object> entry : oldMap.entrySet()) {
pMap.put(entry.getKey(), entry.getValue());
}
return pMap;
} //因为单次查询比较慢,所以开启多线程查询,每次查询完将结果存入redis的list中(多线程中只查询,不插入或更新数据库会避免数据库锁表,大大提升效率)
private void addVo(Map ep,List<RevenueSt> list) {
//这里一定要在循环内 new 参数map,以确保每个线程中使用的都是单独new的参数Map对象,否则会有并发问题
//复制参数对象
Map epT = copyMap(ep);
//开启多线程
Runnable task = new Runnable() {
@Override
public void run() {
long t1 = System.currentTimeMillis();
logger.info("--》--收入统计线程: {} -- 开始执行",Thread.currentThread().getName());
// 查询数据,查询数据库的动作要封装到一个方法中,直接在多线程的run方法中写 this.selectOne("ds2", "reve.queryIncome",ep); 会报错
RevenueSt vo = genVo(epT);
//将查询结果对象转为 json 字符串 存入 redis队列中
String upJson = JSONObject.fromObject(vo).toString();
jedisClient.rpush(redisKeyName, upJson);
//本地同步处理方法,不用redis本地同步方法版本,比用redis慢太多了
// synchronized (RevenueStServiceImpl.class) {
// //添加到集合
// list.add(vo);
// if (list.size()>=500) {
// //批量插入结果表
// insertRevenue(list);
// list.clear();
// }
// }
logger.info("--》--收入统计线程: {} -- 结束,耗时{}",Thread.currentThread().getName(),System.currentTimeMillis()-t1);
}
};
exec.submit(task);
} /**
* 查询数据并生成临时待插入对象
* @param ep EDA表查询条件
* @param comp 公共参数,用于拼接结果vo
* @return
*/
private RevenueSt genVo(Map ep) {
//返回对象
RevenueSt re = new RevenueSt();
//查序列
String eq = this.selectOne("ds2", "reve.queryRevenueEQ",ep);
re.setID(eq);
//本年累计全部客户收入
re.setALL_INCOME(o2s(ep.get("ALL_INCOME")));
//全部政企客户数
re.setALL_CUST_NUM(o2s(ep.get("ALL_CUST_NUM")));
//备注
re.setREMARK(o2s(ep.get("REMARK")));
//条件
//省Id
re.setPROVINCE_REGION_ID(o2s(ep.get("PROVINCE_REGION_ID")));
re.setCITY_REGION_ID(o2s(ep.get("CITY_REGION_ID")));
//地区名称++
re.setREGION_NAME(o2s(ep.get("REGION_NAME")));
//地区级别
re.setREGION_GRADE(o2s(ep.get("REGION_GRADE")));
//身份证临时/正式
re.setIDENTITY_TYPE(o2s(ep.get("IDENTITY_TYPE")));
//行业
re.setINDUSTRY_TYPE_ID(o2s(ep.get("INDUSTRY_TYPE_ID")));
//行业代码++
re.setINDUSTRY_TYPE_CODE(o2s(ep.get("INDUSTRY_TYPE_CODE")));
//行业名称++
re.setINDUSTRY_TYPE_NAME(o2s(ep.get("INDUSTRY_TYPE_NAME")));
re.setPAR_INDUSTRY_TYPE_ID(o2s(ep.get("PAR_INDUSTRY_TYPE_ID")));
re.setINDUSTRY_TYPE_GRADE(o2s(ep.get("INDUSTRY_TYPE_GRADE")));
//产品类型
re.setPROD_TYPE(o2s(ep.get("PROD_TYPE")));
//查询EDA表
Map income = this.selectOne("ds2", "reve.queryIncome",ep);
logger.info("---查询结果:"+income);
//查询合规客户数
re.setAUDIT_CUST_NUM(o2s(income.get("CUSTNUM")));
//查询合规客户身份证数
re.setAUDIT_CUST_PARTY_NUM(o2s(income.get("PARTYNUM")));
//查询收入列表(今年、去年全部12个月)
re.setTY_1(o2s(income.get("TY1")));
re.setTY_2(o2s(income.get("TY2")));
re.setTY_3(o2s(income.get("TY3")));
re.setTY_4(o2s(income.get("TY4")));
re.setTY_5(o2s(income.get("TY5")));
re.setTY_6(o2s(income.get("TY6")));
re.setTY_7(o2s(income.get("TY7")));
re.setTY_8(o2s(income.get("TY8")));
re.setTY_9(o2s(income.get("TY9")));
re.setTY_10(o2s(income.get("TY10")));
re.setTY_11(o2s(income.get("TY11")));
re.setTY_12(o2s(income.get("LY12")));
re.setLY_1(o2s(income.get("LY1")));
re.setLY_2(o2s(income.get("LY2")));
re.setLY_3(o2s(income.get("LY3")));
re.setLY_4(o2s(income.get("LY4")));
re.setLY_5(o2s(income.get("LY5")));
re.setLY_6(o2s(income.get("LY6")));
re.setLY_7(o2s(income.get("LY7")));
re.setLY_8(o2s(income.get("LY8")));
re.setLY_9(o2s(income.get("LY9")));
re.setLY_10(o2s(income.get("LY10")));
re.setLY_11(o2s(income.get("LY11")));
re.setLY_12(o2s(income.get("LY12")));
logger.info("---------------拼装对象:"+re);
return re;
} //批量插入统计结果表,本地list缓存版本
// private void insertBatchRevenue2(List<RevenueSt> preInsertList) {
// System.out.println("入参集合数量:"+preInsertList.size());
// //统计数量
// int num = 0;
// //每批个数
// int batchLen = 2000;
// // 每批集合临时存储
// List<RevenueSt> batchlist = new ArrayList<RevenueSt>();
// for (int i = 0; i < preInsertList.size(); i++) {
// RevenueSt item = preInsertList.get(i);
// batchlist.add(item);
// if ((i + 1) % batchLen == 0) {
// insertRevenue(batchlist);
// num += batchlist.size();
// System.out.println("----本次插入数量:"+num);
// batchlist.clear();// 处理完清空批次集合
// }
// }
// // 处理最后一批数据
// if (batchlist.size() > 0) {
// //批量插入结果表
// insertRevenue(batchlist);
// num += batchlist.size();
// }
// System.out.println("----一共插入数量:" + num);
// } }

注意:

如果数据量在100万以下可以,一直往redis的一个list中存,最后处理,

如果数据量大于100万,可能撑爆redis,这时,可以 单独开启一守护线程,里面用while true 循环 加 wait一定时间,定时从 list的头部取数据,批量插入数据库(模拟消息队列),等所有的查询线程都结束,再最后执行一次从redis中取剩下的所有数据批量插入。

多线程查询数据,将结果存入到redis中,最后批量从redis中取数据批量插入数据库中【我】的更多相关文章

  1. php中CURL技术模拟登陆抓取数据实战,抓取某校教务处学生成绩。

    这两天有基友要php中curl抓取教务处成绩的源码,用于微信公众平台的开发.下面笔者只好忍痛割爱了.php中CURL技术模拟登陆抓取数据实战,抓取沈阳工学院教务处学生成绩. 首先,教务处登录需要验证码 ...

  2. 【hibernate spring data jpa】执行了save&lpar;&rpar;方法 sql语句也执行了,但是数据并未插入数据库中

    执行了save()方法  sql语句也执行了,但是数据并未插入数据库中 解决方法: 是因为执行了save()方法,也执行了sql语句,但是因为使用的是 @Transactional 注解,不是手动去提 ...

  3. SqlBulkCopy将DataTable中的数据批量插入数据库中

    #region 使用SqlBulkCopy将DataTable中的数据批量插入数据库中 /// <summary> /// 注意:DataTable中的列需要与数据库表中的列完全一致.// ...

  4. php中封装的curl函数&lpar;抓取数据&rpar;

    介绍一个封闭好的函数,封闭了curl函数的常用步骤,方便抓取数据. 代码如下: <?php /** * 封闭好的 curl函数 * 用途:抓取数据 * edit by www.jbxue.com ...

  5. SqlBulkCopy实现大容量数据快速插入数据库中

    一般情况下,我们手写sqlhelper类,在此类中定义一个数据插入到数据库的一个方法.将数据库连接密封在using()的语句中.using显示了Idispose接口.可以及时释放数据库连接资源.代码如 ...

  6. Java中获取刚插入数据库中的数据Id(主键,自动增长)

    public int insert(String cName, String ebrand, String cGender) { String sql = "insert into Clot ...

  7. mysql循环插入数据库中数据。

    DELIMITER ;; CREATE PROCEDURE test_insert () BEGIN DECLARE i INT DEFAULT 1; WHILE i<100 DO insert ...

  8. sql 数据库中只靠一个数据,查询到所在表和列名

    有时候我们想通过一个值知道这个值来自数据库的哪个表以及哪个字段,在网上搜了一下,找到一个比较好的方法,通过一个存储过程实现的.只需要传入一个想要查找的值,即可查询出这个值所在的表和字段名. 前提是要将 ...

  9. MySQL 查询某个数据库中所有包含数据记录的表名

    MySQL 查询某个数据库中所有包含数据记录的表名 有时根据实际应用需要,需要对数据进行备份. 如果一个数据库中有很多数据表,但是只想备份包含数据记录的那些表数据(空表不做数据备份). 如果通过如下S ...

随机推荐

  1. UVM的类库

    [转]http://www.asicdv.com/ 一个UVM验证平台可以看成由多个模块组合在一起的,这和以前的verilog代码,以及verilog结合其它各种语言的验证手段在理念上是一样的,最大的 ...

  2. JPEG格式

    Jpg文件格式[参考] 微处理机中的存放顺序有正序(big endian)和逆序(little endian)之分.正序存放就是高字节存放在前低字节在后,而逆序存放就是低字节在前高字节在后.例如,十六 ...

  3. response实现验证码图片

    package com.zhangbz.response; import java.awt.Color; import java.awt.Font; import java.awt.Graphics2 ...

  4. Unity 5&period;x---00使用重力

    Unity 5.x---00使用重力 步骤一: 打开一个工程(导入Unity自带的资源),并创建并配置好必要的GameObject ,如下图: 步骤二: 1.创建一个Cube,使其位于平面上方.    ...

  5. Note &vert; javascript权威指南&lbrack;第六版&rsqb; 第2章:词法结构

      语法结构规定了诸如变量名是什么样的.怎么写注释,以及程序语句之间如何分隔等规则.本章用很短的篇幅来介绍JavaScript的词法结构.   2.1.字符集   JavaScript程序是用Unic ...

  6. php单例模式与工厂模式

    单例模式:单例模式又称为职责模式,它用来在程序中创建一个单一功能的访问点,通俗地说就是实例化出来的对象是唯一的. 所有的单例模式至少拥有以下三种公共元素:1. 它们必须拥有一个构造函数,并且必须被标记 ...

  7. 服务器Windows 2008R2 C盘清理

    今天因为连服务器的时间慢了很多,然后看了一下C盘的空间,OMG剩下222K.然后一直上网找解决方案. 按照惯例,应该开一个360看看,C盘清理啊,搬家什么的.360告知的竟然是没有可以搬移的,所以,这 ...

  8. 腾讯技术分享:GIF动图技术详解及手机QQ动态表情压缩技术实践

    本文来自腾讯前端开发工程师“ wendygogogo”的技术分享,作者自评:“在Web前端摸爬滚打的码农一枚,对技术充满热情的菜鸟,致力为手Q的建设添砖加瓦.” 1.GIF格式的历史 GIF ( Gr ...

  9. HTML5之日历控件

    HTML5定义了几个与日期有关的新控件.支持日期控件的浏览器会提供一个方便的下拉式日历,供用户选择. 以下测试和截图都是在谷歌浏览器完成的,其他浏览器可能略有差异. 1.日期时间控件 HTML代码: ...

  10. 第七十七课 最小生成树(Kruskal)

    添加kruskal算法: #ifndef GRAPH_H #define GRAPH_H #include "Object.h" #include "SharedPoin ...