介绍如何在 Spring Boot 中使用 Redis List 的
BRPOPLPUSH
命令来实现一个线程安全且可靠的消息队列。
整合Redis
整合Redis
用Redis的List实现消息队列
Redis的List相关指令
**「LPUSH key element [element ...]
」**把元素插入到 List 的首部,如果 List 不存在,会自动创建。
「BRPOPLPUSH source destination timeout
」
移除并且返回 List (source)尾部的最后一个元素,并且同时会把这个元素插入到另一个 List (destination)的首部。
当 source List 中没有元素时,Redis 会阻塞连接,直到有其他客户端向其推送元素或超时。超时时间(秒)为 0 表示永远不超时。
注意,这个命令是 「原子性」 的,也就是说只要客户端获取到了返回的元素,那么这个元素一定就会在 destination List 有备份。这是实现可靠消息队列的关键!
**「RPOPLPUSH source destination
」**同上,它是 BRPOPLPUSH
命令的 「非阻塞」 版,如果 List 中没有元素就会立即返回 null
。
**「LREM key count element
」**从 List 中删除元素,count 的值不同,删除的方式也不同:
- count > 0:从头到尾开始搜索,删除与 element 相等的元素,最多删除 count 个。
- count < 0:从尾到头开始搜索,删除与 element 相等的元素,最多删除 count (绝对值)个。
- count = 0:删除所有与元素相等的元素。
实现思路
一个简单易用且可靠的消息队列:
- 生产者使用
LPUSH
命令往消息队列生产消息 - 消费者使用
BRPOPLPUSH
命令从队列消费消息,并且还会在获取并返回消息的时候把该消息推送到另一个消息队列,也就是 Pending 队列,这个队列中存储的就是未被消费者 ACK 的消息 - 消费者成功消费完毕后,使用
LREM
命令从 Pending 队列中删除这条消息,整个消费过程结束 - 如果消费者在消费过程中出现异常、宕机,那么需要在恢复后从 Pending 队列中获取到这条消息,再进行重新消费,从而保证了消息队列的可靠性,不会丢失消息(可能存在重复消费,需要做好幂等处理)
在 Spring Boot 中实现
Redis 队列 Key 常量
/**
* Redis 常量
*/
public class RedisConstants {
// 消息队列
public static final String QUEUE_NAME = "queue_orders";
// pending 队列,即待确认消息的队列
public static final String PENDINGQUEUE_NAME = "pending_queue_orders";
}
消费者
创建一个 OrderConsumer
Bean 模拟从队列中消费订单 ID。
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;
import com.keepc.springredis.config.JsonRedisTemplate;
import com.keepc.springredis.constant.RedisConstants;
/**
* 模拟从队列中消费订单 ID
* OrderConsumer 实现了 ApplicationRunner 接口,在应用就绪后创建新的消费线程进行消费。
*/
@Component
public class OrderConsumer implements ApplicationRunner, Runnable {
static final Logger log = LoggerFactory.getLogger(OrderConsumer.class);
@Autowired
JsonRedisTemplate jsonRedisTemplate;
@Override
public void run(ApplicationArguments args) throws Exception {
// 应用启动后,创建新的线程来执行消费任务
Thread thread = new Thread(this);
thread.setName("order-consumer-thread");
thread.start();
}
@Override
public void run() {
while (true) {
try {
// 1:消费者,从队列未弹出消息,并推送到 pending 队列,整个过程是原子性的
// 最多阻塞 5 秒,超过 5 秒后还没有消息,则返回 null
// stringRedisTemplate.opsForList().rightPopAndLeftPush 该方法底层调用的正是 brpoplpush 命令
Object item = jsonRedisTemplate.opsForList().rightPopAndLeftPush(RedisConstants.QUEUE_NAME,
RedisConstants.PENDINGQUEUE_NAME, 5,
TimeUnit.SECONDS);
if (item == null) {
log.info("等待消息 ...");
continue;
}
try {
// 2:解析为 Long
Long orderId = Long.parseLong(String.valueOf(item));
// 模拟消息消费
log.info("消费消息: {}", orderId);
} catch (Exception e) {
log.error("消费异常:{}", e.getMessage());
continue;
}
// 3:消费成功,从 pending 队列删除记录,相当于确认消费
// 底层正是 LREM 命令
jsonRedisTemplate.opsForList().remove(RedisConstants.PENDINGQUEUE_NAME, 0, item);
} catch (Exception e) {
log.error("队列监听异常:{}", e.getMessage());
break;
}
}
log.info("退出消费");
}
}
OrderConsumer
实现了 ApplicationRunner
接口,在应用就绪后创建新的消费线程进行消费。
stringRedisTemplate.opsForList().rightPopAndLeftPush
方法从 queue
队列消费一条消息,同时把消息添加到 pendingQueue
队列。该方法底层调用的正是 brpoplpush
命令,最多阻塞 5 秒,超时后返回 null
。
得到消息后解析为 Long
类型,模拟消费,即输出到日志。如果消费成功,则调用 stringRedisTemplate.opsForList().remove
方法(底层正是 LREM
命令)从 pendingQueue
队列中删除消息。如果消费失败,失败的消息会在 pendingQueue
队列中继续存在,不会丢失,可以重新投递消费或者是人工处理。
生产者
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import com.keepc.springredis.config.JsonRedisTemplate;
import com.keepc.springredis.constant.RedisConstants;
/**
* 订单生产者
*/
@Component
public class OrderProducer {
static final Logger log = LoggerFactory.getLogger(OrderProducer.class);
@Autowired
JsonRedisTemplate jsonRedisTemplate;
/**
* 发送订单到队列
*
* @param orderId 订单号
*/
public void send(Long orderId) {
if (orderId == null) {
return;
}
log.info("发送订单到队列:{}", orderId);
jsonRedisTemplate.opsForList().rightPush(RedisConstants.QUEUE_NAME, orderId);
}
}
测试
@SpringBootTest
class SpringRedisApplicationTests {
static final Logger logger = LoggerFactory.getLogger(SpringRedisApplicationTests.class);
// 注入 JsonRedisTemplate
@Autowired
JsonRedisTemplate jsonRedisTemplate;
@Autowired
OrderProducer orderProducer;
/**
* 进行消息队列测试的函数。
* 测试中,向消息队列发送四个随机长整型消息,然后等待3秒。
* 使用了JUnit的@Test注解标识这是一个测试方法,并通过@Timeout注解设置了方法执行的最长时间为10秒。
*/
@Test
@Timeout(value = 10, unit = TimeUnit.SECONDS)
public void mqTest() throws InterruptedException {
// 发送一个空消息到消息队列
orderProducer.send(null);
// 创建一个随机数生成器
Random random = new Random();
// 分别发送三个随机生成的长整型消息到消息队列
orderProducer.send(random.nextLong(0L, 10000000L));
orderProducer.send(random.nextLong(0L, 10000000L));
orderProducer.send(random.nextLong(0L, 10000000L));
// 暂停3秒,以便观察消息队列处理情况
TimeUnit.SECONDS.sleep(3L);
}
}
结果:
2024-04-26T16:47:52.626+08:00 INFO 5456 --- [ main] c.k.springredis.mqByList.OrderProducer : 发送订单到队列:6514813
2024-04-26T16:47:52.706+08:00 INFO 5456 --- [ main] c.k.springredis.mqByList.OrderProducer : 发送订单到队列:1380282
2024-04-26T16:47:52.742+08:00 INFO 5456 --- [ main] c.k.springredis.mqByList.OrderProducer : 发送订单到队列:5429620
2024-04-26T16:47:52.905+08:00 INFO 5456 --- [consumer-thread] c.k.springredis.mqByList.OrderConsumer : 消费消息: 6514813
2024-04-26T16:47:52.925+08:00 INFO 5456 --- [consumer-thread] c.k.springredis.mqByList.OrderConsumer : 消费消息: 5429620
2024-04-26T16:47:52.935+08:00 INFO 5456 --- [consumer-thread] c.k.springredis.mqByList.OrderConsumer : 消费消息: 1380282
2024-04-26T16:47:55.934+08:00 WARN 5456 --- [ionShutdownHook] d.r.c.l.LettucePoolingConnectionProvider : LettucePoolingConnectionProvider contains unreleased connections
2024-04-26T16:47:55.973+08:00 ERROR 5456 --- [consumer-thread] c.k.springredis.mqByList.OrderConsumer : 队列监听异常:Redis exception
2024-04-26T16:47:55.973+08:00 INFO 5456 --- [consumer-thread] c.k.springredis.mqByList.OrderConsumer : 退出消费
Redis 的持久化方式
Redis 是一个内存数据库,为了保证数据的安全不丢失,它提供了两种数据备份(持久化)方式,即 「RDB」 和 「AOF」。
- 「RDB」:生成某一时刻的数据快照,通过子进程进行备份,数据可能不完整(取决于备份周期)。RDB 是 Redis 默认的持久化方式。它会在特定的时间间隔内将内存中的数据集快照写入磁盘,生成一个dump.rdb文件。
- 「AOF」:通过记录执行的指令到文件来实现数据备份,相对完整性较高,但是会记录每一条执行命令,性能会有一定影响。
这就需要根据你的业务场景来选择合适的持久化方式,也可以同时配合使用 「RDB」 和 「AOF」 两种方式,兼顾性能和数据安全。