【分布式技术专题】「架构实践于案例分析」总结和盘点目前常用分布式事务特别及问题分析(下)

时间:2022-11-02 10:55:32

世界上解决一个计算机问题最简单的方法:“恰好”不需要解决它!

分布式事务方案设计

实际运用理论时进行架构设计时,许多人容易犯“手里有了锤子,看什么都觉得像钉子”的错误,设计方案时考虑的问题场景过多,各种重试,各种补偿机制引入系统,导致系统过于复杂,落地遥遥无期。

  1. 有些问题,看起来很重要,但实际上我们可以通过合理的设计或者将问题分解来规避。
  2. 设计分布式事务系统也不是需要考虑所有异常情况,不必过度设计各种回滚,补偿机制。
  3. 如果硬要把时间花在解决问题本身,实际上不仅效率低下,而且也是一种浪费。

如果系统要实现回滚流程的话,有可能系统复杂度将大大提升,且很容易出现 Bug,估计出现 Bug 的概率会比需要事务回滚的概率大很多。

RocketMQ的事务消息实现

【分布式技术专题】「架构实践于案例分析」总结和盘点目前常用分布式事务特别及问题分析(下)

代码演示

搭建RocketMQ

解压压缩包
unzip rocketmq-all-4.7.1-bin-release.zip
切换⽬录到RocketMQ根⽬录
cd rocketmq-all-4.7.1-bin-release
启动Name Server
nohup sh bin/mqnamesrv &
验证是否启动OK:
tail -f ~/logs/rocketmqlogs/namesrv.log
如果成功启动,能看到类似如下的⽇志:
2019-07-18 17:03:56 INFO main - The Name Server boot success. ...
启动 Broker
nohup sh bin/mqbroker -n localhost:9876 &
验证是否启动OK:
tail -f ~/logs/rocketmqlogs/broker.log
如果启动成功,能看到类似如下的⽇志:
2019-07-18 17:08:41 INFO main - The broker[itmuchcomdeMacBook-Pro.local, 192.16
8.43.197:10911] boot success. serializeType=JSON and name server is localhost:9876
消息⽣产者代码
MAVEN依赖
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.1.1</version>
</dependency>
application.yml配置
rocketmq:
name-server: 127.0.0.1:9876
producer:
group: test-group
发送者Java代码
Controller代码
@RestController
@RequestMapping("/admin/shares")
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
public class ShareAdminController {
private final ShareService shareService;
@PutMapping("/audit/{id}")
public Share auditById(@PathVariable Integer id, @RequestBody ShareAuditDTO auditDTO) {
return this.shareService.auditById(id, auditDTO);
}
}
Service代码
public Share auditById(Integer id, ShareAuditDTO auditDTO) {
// 1. 查询share是否存在,不存在或者当前的audit_status != NOT_YET,那么抛异常
Share share = this.shareMapper.selectByPrimaryKey(id);
if (share == null) {
throw new IllegalArgumentException("参数⾮法!该分享不存在!");
}
if (!Objects.equals("NOT_YET", share.getAuditStatus())) {
throw new IllegalArgumentException("参数⾮法!该分享已审核通过或审核不通过!");
}
// 3. 如果是PASS,那么发送消息给rocketmq,让⽤户中⼼去消费,并为发布⼈添加
if (AuditStatusEnum.PASS.equals(auditDTO.getAuditStatusEnum())) {
// 发送半消息
String transactionId = UUID.randomUUID().toString();
this.rocketMQTemplate.sendMessageInTransaction("add-bonus",MessageBuilder.withPayload(UserAddBonusMsgDTO.builder().userId(share.getUserId()).bonus(50).build()
.setHeader(RocketMQHeaders.TRANSACTION_ID, transactionId)// header也有妙⽤...
.setHeader("share_id", id).build(),auditDTO// arg有⼤⽤处 );
} else {
this.auditByIdInDB(id, auditDTO);
}
return share;
}

@Transactional(rollbackFor = Exception.class)
public void auditByIdInDB(Integer id, ShareAuditDTO auditDTO) {
Share share = Share.builder().id(id).auditStatus(auditDTO.getAuditStatusEnum().toString()).reason(auditDTO.getReason())
.build();
this.shareMapper.updateByPrimaryKeySelective(share);
// 4. 把share写到缓存
}

@Transactional(rollbackFor = Exception.class)
public void auditByIdWithRocketMqLog(Integer id, ShareAuditDTO auditDTO, String transactionId) {
this.auditByIdInDB(id, auditDTO);
this.rocketmqTransactionLogMapper.insertSelective(
RocketmqTransactionLog.builder().transactionId(transactionId).log("审核分享...").build());
}
Listener代码
@RocketMQTransactionListener
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
public class AddBonusTransactionListener implements RocketMQLocalTransactionListener {
private final ShareService shareService;
private final RocketmqTransactionLogMapper rocketmqTransactionLogMapper;
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
MessageHeaders headers = msg.getHeaders();
String transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);
Integer shareId = Integer.valueOf((String) headers.get("share_id"));
try {
this.shareService.auditByIdWithRocketMqLog(shareId, (ShareAuditDTO)arg, transactionId);
return RocketMQLocalTransactionState.COMMIT;
} catch (Exception e) {
return RocketMQLocalTransactionState.ROLLBACK;
}
}
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
MessageHeaders headers = msg.getHeaders();
String transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);
// select * from xxx where transaction_id = xxx
RocketmqTransactionLog transactionLog = this.rocketmqTransactionLogMapper.selectOne(RocketmqTransactionLog.builder()
.transactionId(transactionId).build());
if (transactionLog != null) {
return RocketMQLocalTransactionState.COMMIT;
}
return RocketMQLocalTransactionState.ROLLBACK;
}
}
消息消费者代码
Maven依赖
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.1.1</version>
</dependency>
配置:
rocketmq:
name-server: 127.0.0.1:9876
代码
@Service
@RocketMQMessageListener(consumerGroup = "consumer-group", topic = "add-bonus")
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
@Slf4j
public class AddBonusListener implements RocketMQListener<UserAddBonusMsgDTO> {
private final UserMapper userMapper;
private final BonusEventLogMapper bonusEventLogMapper;
@Override
public void onMessage(UserAddBonusMsgDTO message) {
// 1. 为⽤户加
Integer userId = message.getUserId();
Integer bonus = message.getBonus();
User user = this.userMapper.selectByPrimaryKey(userId);
user.setBonus(user.getBonus() + bonus);
this.userMapper.updateByPrimaryKeySelective(user);
// 2. 记录⽇志到bonus_event_log表⾥⾯
this.bonusEventLogMapper.insert(BonusEventLog.builder().userId(userId).value(bonus).event("CONTRIBUTE")
.createTime(new Date()).description("投稿加积分..").build());
log.info("积分添加完毕...");
}
}