redis发布订阅模式,消费端重复消费问题解决

时间:2025-05-10 22:30:08
  • package com.***.;
  • import ;
  • import com.***.jcpt.common.;
  • import com.***.jcpt.common.;
  • import com.***.;
  • import com.***.;
  • import com.***.;
  • import com.***.;
  • import 4;
  • import 4;
  • import .data.;
  • import .data.;
  • import .data.;
  • import ;
  • import ;
  • import ;
  • /**
  • * redis订阅监听器
  • */
  • @Component
  • public class RedisMessageListener implements MessageListener {
  • private final Logger logger = (RedisMessageListener.class);
  • @Resource
  • private RedisTemplate redisTemplate;
  • @Resource
  • private IBusinessQueryConfigService businessQueryConfigService;
  • @Resource
  • private IBusinessExecuteLogService businessExecuteLogService;
  • @Override
  • public void onMessage(Message message, byte[] pattern) {
  • // 获取消息
  • byte[] messageBody = ();
  • // 使用值序列化器转换
  • Object msg = ().deserialize(messageBody);
  • // 获取监听的频道
  • byte[] channelByte = ();
  • // 使用字符串序列化器转换
  • Object channel = ().deserialize(channelByte);
  • ("---频道---: " + channel);
  • ("---消息内容---: " + msg);
  • BusinessExecuteLog executeLog = new BusinessExecuteLog();
  • JSONObject subData = (());
  • String taskId = (subData != null && ("taskId")) ? ("taskId") : "";
  • if ((taskId)) {
  • ("消息格式不正确,缺失taskId");
  • return;
  • }
  • // 加锁
  • Boolean flag = ().setIfAbsent(taskId, "1", 300, );
  • // 加锁失败,已有消费端在此时对此消息进行处理,这里不再做处理
  • if (!flag) {
  • return;
  • }
  • // 查询数据库,验证任务是否已执行
  • BusinessExecuteLog businessExecuteLog = (taskId);
  • if (businessExecuteLog != null) {
  • return;
  • }
  • try {
  • // 解析参数
  • (("taskId"));
  • (("configCode"));
  • (("createBy"));
  • (("createDate"));
  • JSONObject params = ("params");
  • (());
  • BusinessQueryConfig config = (("businessConfig"), BusinessQueryConfig.class);
  • // 执行业务
  • AjaxResult respVo = (config, params);
  • if (() == 0) {
  • ("成功");
  • (String.format("执行成功,最终执行脚本:%s", ()));
  • } else {
  • ("失败");
  • (());
  • }
  • } catch (Exception e) {
  • ();
  • // 若解析参数报错,则重新赋予taskId并保存接收的消息,方便后续排查问题
  • if ((())) {
  • (());
  • (String.valueOf(msg));
  • }
  • ("失败");
  • (());
  • } finally {
  • // 释放锁
  • redisTemplate.delete(taskId);
  • }
  • // 保存结果记录
  • (executeLog);
  • }
  • }