RedisMessageListenerContainer 监听redis的key的变化
maven依赖
<!-- redis -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.20.0</version>
</dependency>
配置文件
redis:
database: 0
# Redis服务器地址
host: 127.0.0.1
# Redis服务器连接端口
port: 8973
# Redis服务器连接密码(默认为空)
password: xxxx@2023
# 连接超时时间(毫秒)
timeout: 1000
listen:
# 监听新增或者修改事件(这里是两个短下划线)
keyspace: __keyspace@${spring.redis.database}__:house:floor:*
的监听流程:
用于消息监听,需要将 Topic和MessageListener 注册到 RedisMessageListenerContainer 中。
当 Topic 上有消息时,由 RedisMessageListenerContainer 通知 MessageListener,MessageListener通过 onMessage 方法拿到消息后,自行处理。
package com.example.demo.mqttRedisListener.listener;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.Topic;
/**
* redis监听器:监听redis的key的变化
*/
@Configuration
@Slf4j
public class RedisMessageListener {
@Value("${}")
public String patternKeyspace;
@Value("${}")
public String host;
/**
* redis消息监听器容器
* 可以添加多个监听不同话题的redis监听器,只需要把消息监听器和相应的消息订阅处理器绑定
*
* @param redisConnectionFactory Redis 连接的线程安全工厂
* @param topicMessageListener 实际处理的监听器
* @return
*/
@Bean
public RedisMessageListenerContainer listenerContainer(RedisConnectionFactory redisConnectionFactory,
MyTopicMessageListener topicMessageListener) {
log.info("pattern_set:" + patternKeyspace);
log.info("当前redis:" + host);
/*
模式 __keyspace@0__:xx:xxx_xxs:*
在 Redis 中 __keyspace@0__ 是一个特殊的键空间,用于监听键的事件。
0 是 Redis 数据库的索引,表示监听的是索引为 0 的数据库。
xx:xxx_xxs:* 是一个模式匹配的键,* 表示匹配任意字符。
监听 Redis 数据库索引为 0 的数据库中以 xx:xxx_xxs: 开头的键的新增或修改事件。当有键被新增或修改时,该监听器会触发相应的事件处理逻辑。
*/
RedisMessageListenerContainer listenerContainer = new RedisMessageListenerContainer();
listenerContainer.setConnectionFactory(redisConnectionFactory);
Topic topic_set = new PatternTopic(patternKeyspace);
/*RedisMessageListenerContainer 用于消息监听,需要将 Topic和MessageListener 注册到 RedisMessageListenerContainer 中。
当 Topic 上有消息时,由 RedisMessageListenerContainer 通知 MessageListener,
MessageListener通过 onMessage 方法拿到消息后,自行处理。*/
listenerContainer.addMessageListener(topicMessageListener, topic_set);
return listenerContainer;
}
}
模式匹配
模式 (这里是两个短下划线) __keyspace@0__:rx:xxx_xxs:*
- “——keyspace@0——” : 在 Redis 中是一个特殊的键空间,用于监听键的事件。
- 0 是 Redis 数据库的索引,表示监听的是索引为 0 的数据库。
- rx:xxx_xxs:* 是一个模式匹配的键,* 表示匹配任意字符。
作用:监听 Redis 数据库索引为 0 的数据库中以 rx:xxx_xxs:开头的键的事件。键发生新增、修改、过期等redis操作,该监听器会触发相应的事件处理逻辑。
3.自定义监听器MessageListener
通过 onMessage 方法拿到消息后,自行处理,写入我们的处理逻辑。
@Component
@Slf4j
public class MyTopicMessageListener implements MessageListener {
@Autowired
private MqttSendService mqttSendService;
@Autowired
private RedissonClient redissonClient;
/**
* 发布 事件
*
* @param message 监听key的操作类型 主要是设值和过期。set ,expired ,del
* @param bytes 监听的主题
*/
@Override
public void onMessage(Message message, byte[] bytes) {
String key = new String(message.getBody());//set,expired,del
String type = new String(message.getChannel());//__keyspace@0__:house:floor:cc
String redisKey = type.substring(15);//house:floor:cc
if (key.contains(RedisCommndEnum.SET.getCommand())) {
log.info("收到key设值(更新)的消息:操作{},redisKey={}", key, redisKey);
//查询
String content = (String) redissonClient.getBucket(redisKey).get();
//把redis的分隔符:替换成 mqtt的分隔符/
String topic = redisKey.replace(":", "/");
//发送到mqtt
mqttSendService.sendToMqtt(topic, 0, content);
log.info("发送到mqtt,topic={},content={}", topic, content);
} else if (key.contains(RedisCommndEnum.EXPIRED.getCommand())) {
log.info("收到key过期的消息:操作{},redisKey={}", key, redisKey);
//...todo
}
}
}