Java高并发实战:利用线程池和Redis实现高效数据入库

时间:2025-04-25 08:34:24
package io.jack.service.impl; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONArray; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import javax.annotation.Resource; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * 数据批量入库服务类 */ @Component @Slf4j public class BatchDataStorageService implements InitializingBean { /** * 最大批次数量 */ @Value("${:800}") private int maxBatchCount; /** * 最大线程数 */ @Value("${:100}") private int maxBatchThreads; /** * 超时时间,单位毫秒 */ @Value("${:3000}") private int batchTimeout; /** * 当前批次数量 */ private int batchCount = 0; /** * 当前批次号 */ private static long batchNo = 0; /** * 线程池执行器 */ private ExecutorService executorService = null; /** * 缓存服务 */ @Resource private CacheService cacheService; /** * 设备实时服务 */ @Resource private DeviceRealTimeService deviceRealTimeService; /** * Redis工具类 */ @Resource private RedisUtils redisUtils; /** * 初始化线程池 */ @Override public void afterPropertiesSet() { executorService = Executors.newFixedThreadPool(maxBatchThreads); } /** * 保存设备实时数据 * * @param deviceRealTimeDTO 设备实时数据传输对象 */ public void saveRealTimeData(DeviceRealTimeDTO deviceRealTimeDTO) { final String failedCacheKey = "device:real_time:failed_records"; try { // 生成批次和持续时间的缓存键 String durationKey = "device:real_time:batchDuration" + batchNo; String batchKey = "device:real_time:batch" + batchNo; // 如果当前批次持续时间不存在,则创建并启动超时处理线程 if (!cacheService.exists(durationKey)) { cacheService.put(durationKey, System.currentTimeMillis()); new BatchTimeoutCommitThread(batchKey, durationKey, failedCacheKey).start(); } // 将设备实时数据加入当前批次 cacheService.lPush(batchKey, deviceRealTimeDTO); if (++batchCount >= maxBatchCount) { // 达到最大批次,执行入库逻辑 dataStorage(durationKey, batchKey, failedCacheKey); } } catch (Exception ex) { log.warn("[DB:FAILED] 设备上报记录入批处理集合异常: " + ex.getMessage() + ", DeviceRealTimeDTO: " + JSON.toJSONString(deviceRealTimeDTO), ex); cacheService.lPush(failedCacheKey, deviceRealTimeDTO); } finally { updateRealTimeData(deviceRealTimeDTO); } } /** * 更新实时数据到Redis * * @param deviceRealTimeDTO 设备实时数据传输对象 */ private void updateRealTimeData(DeviceRealTimeDTO deviceRealTimeDTO) { redisUtils.set("real_time:" + deviceRealTimeDTO.getDeviceId(), JSONArray.toJSONString(deviceRealTimeDTO)); } /** * 批量入库处理 * * @param durationKey 持续时间标识 * @param batchKey 批次标识 * @param failedCacheKey 错误记录标识 */ private void dataStorage(String durationKey, String batchKey, String failedCacheKey) { batchNo++; batchCount = 0; cacheService.del(durationKey); if (batchNo >= Long.MAX_VALUE) { batchNo = 0; } executorService.execute(new BatchWorker(batchKey, failedCacheKey)); } /** * 批量工作线程 */ private class BatchWorker implements Runnable { private final String failedCacheKey; private final String batchKey; public BatchWorker(String batchKey, String failedCacheKey) { this.batchKey = batchKey; this.failedCacheKey = failedCacheKey; } @Override public void run() { final List<DeviceRealTimeDTO> deviceRealTimeDTOList = new ArrayList<>(); try { // 从缓存中获取批量数据 DeviceRealTimeDTO deviceRealTimeDTO = cacheService.lPop(batchKey); while (deviceRealTimeDTO != null) { deviceRealTimeDTOList.add(deviceRealTimeDTO); deviceRealTimeDTO = cacheService.lPop(batchKey); } long timeMillis = System.currentTimeMillis(); try { // 将DTO转换为实体对象并批量入库 List<DeviceRealTimeEntity> deviceRealTimeEntityList = ConvertUtils.sourceToTarget(deviceRealTimeDTOList, DeviceRealTimeEntity.class); deviceRealTimeService.insertBatch(deviceRealTimeEntityList); } finally { cacheService.del(batchKey); log.info("[DB:BATCH_WORKER] 批次:" + batchKey + ",保存设备上报记录数:" + deviceRealTimeDTOList.size() + ", 耗时:" + (System.currentTimeMillis() - timeMillis) + "ms"); } } catch (Exception e) { log.warn("[DB:FAILED] 设备上报记录批量入库失败:" + e.getMessage() + ", DeviceRealTimeDTO: " + deviceRealTimeDTOList.size(), e); for (DeviceRealTimeDTO deviceRealTimeDTO : deviceRealTimeDTOList) { cacheService.lPush(failedCacheKey, deviceRealTimeDTO); } } } } /** * 批次超时提交线程 */ class BatchTimeoutCommitThread extends Thread { private final String batchKey; private final String durationKey; private final String failedCacheKey; public BatchTimeoutCommitThread(String batchKey, String durationKey, String failedCacheKey) { this.batchKey = batchKey; this.durationKey = durationKey; this.failedCacheKey = failedCacheKey; this.setName("batch-thread-" + batchKey); } @Override public void run() { try { Thread.sleep(batchTimeout); } catch (InterruptedException e) { log.error("[DB] 内部错误,直接提交:" + e.getMessage()); } if (cacheService.exists(durationKey)) { // 达到最大批次的超时间,执行入库逻辑 dataStorage(durationKey, batchKey, failedCacheKey); } } } }