package com.***.;
import ;
import com.***.jcpt.common.;
import com.***.jcpt.common.;
import com.***.;
import com.***.;
import com.***.;
import com.***.;
import 4;
import 4;
import .data.;
import .data.;
import .data.;
import ;
import ;
import ;
/**
* redis订阅监听器
*/
@Component
public class RedisMessageListener implements MessageListener {
private final Logger logger = (RedisMessageListener.class);
@Resource
private RedisTemplate redisTemplate;
@Resource
private IBusinessQueryConfigService businessQueryConfigService;
@Resource
private IBusinessExecuteLogService businessExecuteLogService;
@Override
public void onMessage(Message message, byte[] pattern) {
// 获取消息
byte[] messageBody = ();
// 使用值序列化器转换
Object msg = ().deserialize(messageBody);
// 获取监听的频道
byte[] channelByte = ();
// 使用字符串序列化器转换
Object channel = ().deserialize(channelByte);
("---频道---: " + channel);
("---消息内容---: " + msg);
BusinessExecuteLog executeLog = new BusinessExecuteLog();
JSONObject subData = (());
String taskId = (subData != null && ("taskId")) ? ("taskId") : "";
if ((taskId)) {
("消息格式不正确,缺失taskId");
return;
}
// 加锁
Boolean flag = ().setIfAbsent(taskId, "1", 300, );
// 加锁失败,已有消费端在此时对此消息进行处理,这里不再做处理
if (!flag) {
return;
}
// 查询数据库,验证任务是否已执行
BusinessExecuteLog businessExecuteLog = (taskId);
if (businessExecuteLog != null) {
return;
}
try {
// 解析参数
(("taskId"));
(("configCode"));
(("createBy"));
(("createDate"));
JSONObject params = ("params");
(());
BusinessQueryConfig config = (("businessConfig"), BusinessQueryConfig.class);
// 执行业务
AjaxResult respVo = (config, params);
if (() == 0) {
("成功");
(String.format("执行成功,最终执行脚本:%s", ()));
} else {
("失败");
(());
}
} catch (Exception e) {
();
// 若解析参数报错,则重新赋予taskId并保存接收的消息,方便后续排查问题
if ((())) {
(());
(String.valueOf(msg));
}
("失败");
(());
} finally {
// 释放锁
redisTemplate.delete(taskId);
}
// 保存结果记录
(executeLog);
}
}