Redis篇之操作、lettuce客户端、Spring集成以及Spring Boot配置

时间:2022-01-07 05:02:49

Redis篇之操作、lettuce客户端、Spring集成以及Spring Boot配置

目录

一、Redis简介

1.1 数据结构的操作

1.2 重要概念分析

二、Redis客户端

2.1 简介

2.2 连接

2.3 基本用法

2.4 同步与异步

2.5 消费RedisFuture<T>

2.6 使用消费者监听器

2.7 发布订阅(Pub/Sub)

2.8 事务(Transaction)

2.9 主从复制(Master/Replica)

2.10 集群

三、Spring Data Redis操作

3.1 简介

3.2 配置Lettuce连接

3.3 主写从读模式配置

3.4 Redis哨兵模式(Sentinel)

3.5 发布订阅(Pub/Seb)

3.6 事务(Transaction)

3.7 流水线(Pipelining)

3.8 集群(Cluster)

3.9 序列化与反序列化

四、源码浅析

4.1 RedisTemplate

4.2 Operations类和Commands类

4.3 数据结构

4.4 SessionCallback接口

4.5 RedisCallbacke接口

4.6 总结

五、Spring Boot整合Redis

5.1 pom.xml文件

5.2 spring-boot-autoconfigure配置

5.3 RedisProperties类

5.4 LettuceConnectionConfiguration类

5.5 RedisAutoConfiguration类

六、总结

七、附录 相关网址

一、Redis简介

官方给出的定义是:

  • Redis 是一个开源(BSD许可)的,内存中的数据结构存储系统,它可以用作数据库、缓存和消息中间件。
  • 它支持多种类型的数据结构,如 字符串(strings), 散列(hashes), 列表(lists), 集合(sets), 有序集合(sorted sets) 与范围查询, bitmaps, hyperloglogs 和 地理空间(geospatial) 索引半径查询。
  • Redis 内置了 复制(replication),LUA脚本(Lua scripting), LRU驱动事件(LRU eviction),事务(transactions)
  • 提供不同级别的 磁盘持久化(persistence), 并通过 Redis哨兵(Sentinel)和自动 分区(Cluster)提供高可用性(high availability)

那么接下来,逐步分析官方给出的定义的重要概念。

1.1 数据结构的操作

下面简单介绍常见命令:

(1)字符串(strings):

  • 添加:set key value
  • 取值:get key
  • (整型)递增: incr key
  • (多值)添加:mset key1 val1 key2 val2 key3 val3
  • (多值)取值:mget key1 key2 key3
  • 修改:set key newVal
  • 查询(是否存在):exists key
  • 删除:del key
  • 查询类型:type key
  • (创建值后)设置超时(time时间后将key对应值删除):expire key time
  • (创建值时)设置超时:set key val ex time
  • 去除超时:persist key
  • 查看超时剩余时间:ttl key

(2)散列(hashes):

  • 添加多值:hmset yourhash field val [field val ...]
  • 添加单值:hset yourhash field val
  • 取多值:hmget yourhash field [field ...]
  • 取单值:hget yourhash field
  • 取全值: hgetall yourhash
  • 删除值:hdel yourhash field [field ...]

(3)列表(lists)

  • 链表左边添加:lpush list val
  • 链表右边添加:rpush list val
  • 范围内取值:lrange list index_start index_end
  • 截取范围内值:ltrim list index_start index_end
  • 添加多值:rpush list val1 val2 val3 val4
  • 左边删除:lpop list
  • 右边删除: rpop list
  • 阻塞式访问左删除:blpop list 或者 blpop list1 list2 list3
  • 阻塞式访问右删除:brpop list 或者 brpop list1 list2 list3
  • 原子性地返回并移除存储在 list1 的列表的最后一个元素(列表尾部元素), 并把该元素放入存储在 list2的列表的第一个元素位置(列表头部) : rpoplpush list1 list2
  • 阻塞版RPOPLPUSH:brpoplpush list1 list2

(4)集合(sets): String 的无序排列 , 适合用于表示对象间的关系 。

  • 添加: sadd myset val1 val2 val3
  • 查询元素:smembers myset
  • 查询特定元素: sismember myset val
  • 删除(随机):spop myset

(5)有序集合(sorted sets)

  • 添加(更新):zadd mysortedset score val [score val ...]
  • 范围内取值:zrange mysortedset score_begin score_end
  • 取索引值:zscore mysortedset val
  • 删除索引值最大的值: zpopmax mysortedset
  • 删除索引值最小的值: zpopmin mysortedset

(6)bitmaps:不是实际的数据结构,而是一个字符串类型定义的面向比特的集合。

  • 添加值:setbit key offset [offset ...]
  • 取值:getbit key offset

(7)hyperloglogs:是一种概率的数据结构,用于计算唯一的数据。

  • 添加:pfadd key element [element ...]
  • 合并: pfmerge key key1 key2

想详细了解可通过点击下面链接:

redis官方命令大全

https://redis.io/commands

1.2 重要概念分析

(1)复制(基于主从结构)

机制:

  • 当主从连接正常时,master通过传输命令流来保持slave的数据同步;
  • 当主从连接由于网络原因或连接超时出现中断时,slave重新连接并试图部分重新同步(同步中断之后的数据);
  • 当部分同步无效后,slave将会申请完全同步。

特性:

  • Redis采用异步复制,而主从也是异步地确认需要处理的数据量;
  • 一个master可以有多个slave;
  • Slave可以连接其他slave;
  • 在master处,复制是非阻塞式的;
  • 在slave处,大部分复制也是非阻塞式的;
  • 复制可以用在实现Redis系统伸缩性,让多个slave负责提供只读查询,也可以用于提高数据安全和系统高可用性。
  • 可以使用复制来避免master将全部数据写入磁盘的开销。

(2)事务

保证机制:

  • 事务中所有命令都被序列化并会按序执行,并不可打断;
  • Readis事物是原子性的,若执行则全部执行,若失败则全部失败。

使用:

  • MULTI命令开启事务;
  • EXEC命令执行事务。

(3)持久化

持久化方式:

  • RDB持久化方式能够在指定的时间间隔进行数据集快照存储;
  • AOF持久化方式会记录每一个sever的写操作,当server重启时,将重新执行记录的写操作构建原始数据集;
  • 如果只希望数据存在于server运行时,可以不进行持久化;
  • 可以结合RDB和AOF进行持久还。

(4)哨兵(Sentinel)

功能:

  • 监控(monitoring):Sentinel不断检查master和slave以确保他们按预期运行;
  • 提醒(notification):当被监控的Redis出现问题时,将会通过api向用户或者另一个程序发送通知;
  • 自动故障转移(automatic failover):当master失效时,Sentinel会开启故障转移处理,将一个slave提升为master,原依附于故障master的slaves将重新配置依附于新的master,并在使用Redis server的应用连接时,告知其使用master的新地址;
  • 配置提供者(Configuretion provider):Sentinel充当客户端(clilent)服务发现的根据来源,客户端连接Sentinel,为一个指定的服务请求当前Redis master响应的地址(若发生故障转移,则会向所有连接Sentinel的客户端告知master的新地址)。

(5)分区(Partitioning)

目的:

  • 使用多个计算机提供的内存总和来构建更大的数据库(若没有分区,用户将只能使用单台计算机提供的内存量);
  • 拓展计算能力到多核和多计算机,将网络带宽拓展到多计算机和多适配器。

方式:

  • 客户端分区:客户端直接在正确的节点读取或写入key(大部分客户端已实现);
  • 代理分区:客户端向代理端请求数据,由代理端访问正确的节点;
  • 查询路由:客户端向一个随机实例请求查询,该实例会将请求转移到正确节点上。

二、Redis客户端

现在用得比较多的我比较关注的Redis Java客户端是Jedis和Lettuce,而在Spring Data框架以及Spring Boot中也是使用这两种客户端。因此,我将关注与Spring Data是如何整合Redis(实现Redis的操作和功能)以及Spring Boot中是如何实现自动配置的(包括提供的配置项)。在Jedis和Lettuce中,我对Lettuce更感兴趣(可能由于我比较喜欢吃生菜吧),所以,接下来的内容,将围绕者Lettuce客户端进行分析,以及之后的框架整合主要也是以lettuce为主。

2.1 简介

Lettuce是可拓展的线程安全的Redis客户端,提供同步、异步和响应式APIs。如果避免使用阻塞和事务性操作(例如,BLPOP和MULTI/EXEC),多个线程可以共享一个连接。Nttey的nio框架对多个连接提供了有效的管理。支持Redis的高级特性,例如哨兵(Sential)、集群(Cluster)以及Redis数据结构。

2.2 连接

Lettuce中RedisURI是创建连接的关键,那么,接下来,看看RedisURI是怎么创建的。

(1)构建方式:

  • 使用uri:RedisURI.create("redis://localhost/");
  • 使用Builder:RedisURI.Builder.redis("localhost",6379).auth("password").database(1).build();
  • 直接使用构造方法:new RedisURI("localhost", 6379, 60, TimeUnit.SECONDS);

(2)RedisURI句式:

  • Redis standalone模式:redis :// [: password@] host [: port] [/ database][? [timeout=timeout[d|h|m|s|ms|us|ns]] [&_database=database_]]
  • Redis standalone模式 (SSL):rediss :// [: password@] host [: port] [/ database][? [timeout=timeout[d|h|m|s|ms|us|ns]] [&_database=database_]]
  • Redis哨兵模式:redis-sentinel :// [: password@] host1[: port1] [, host2[: port2]] [, hostN[: portN]] [/ database][?[timeout=timeout[d|h|m|s|ms|us|ns]] [&_sentinelMasterId=sentinelMasterId_] [&_database=database_]

2.3 基本用法

RedisClient client = RedisClient.create(redisURI); (1)
StatefulRedisConnection<String, String> connection = client.connect(); (2)
RedisCommands<String, String> commands = connection.sync(); (3)
String value = commands.get("key"); (4)
...
connection.close(); (5)
client.shutdown(); (6)

(1)根据给出的redisURI(创建方法在2.2节)创建Redis客户端(默认连接6379端口);

(2)打开Redis standalone模式(模式由给出的RedisURI决定的,即redis、rediss和redis-sentinel的区别)连接。

(3)获得同步执行的命令API;

(4)发出一个GET命令获取“foo”对应的值;

(5)关闭连接;

(6)关掉客户端。

2.4 同步与异步

(1)RedisFuture<T>和CompleteableFuture<T>简介

每一个异步的API命令的调用都会创建一个可以取消、等待和订阅的RedisFuture<T>。而一个RedisFuture<T>或CompleteableFuture<T>是一个指向值计算尚未完成的最初未知结果的指针。一个RedisFuture<T>提供异步和链接的操作。

异步通过RedisFuture<T>进行操作,同步直接通过同步执行命令进行操作。

(2)创建RedisFuture<T>(Lettuce中)

  • 获取同步执行命令API
//根据redisURI创建客户端
RedisClient client = RedisClient.create(redisURI);
//创建连接
StatefulRedisConnection<String, String> connection = client.connect();
//获取同步执行命令
RedisCommands<String, String> sync = connection.sync();
//发送get请求,获取值
String value = sync.get("key");
...
//关闭连接
connection.close();
//关掉客户端
client.shutdown();
  • 获取异步执行命令API
RedisClient client = RedisClient.create(redisURI);
//创建连接
StatefulRedisConnection<String, String> connection = client.connect();
//获取异步执行命令api
RedisAsyCommands<String, String> commands = connection.async();
//获取RedisFuture<T>
RedisFuture<String> future = commands.get("key");

2.5 消费RedisFuture<T>:

  • 没有设置超时(拉模式)
RedisFuture<String> future = commands.get("key");
//使用拉模式调用get方法,阻塞调用线程,直到计算结果完成,
//最坏的情况是线程一直阻塞
String value = future.get();
System.out.println(value);
  • 设置超时:

//获取异步执行api

RedisAsyncCommands<String, String> async = client.connect().async();

//发送set请求

RedisFuture<String> set = async.set("key", "value");

//发送get请求

RedisFuture<String> get = async.get("key");

//设置set超时

set.await(1, SECONDS) == true

set.get() == "OK"

//设置get超时

get.get(1, TimeUnit.MINUTES) == "value"

2.6 使用消费者监听器:

  • 非阻塞
//发送get请求
RedisFuture<String> future = commands.get("key");
//设置监听器
future.thenAccept(new Consumer<String>() {
//该方法将会在future.complete()方法执行后,自动执行
@Override
public void accept(String value) {
...
}
});
  • 阻塞同步
try {
RedisFuture<String> future = commands.get("key");
//设置超时
String value = future.get(1, TimeUnit.MINUTES);
System.out.println(value);
} catch (Exception e) {
//超时后,将抛出TimeoutException
e.printStackTrace();
}
  • 推模式
RedisFuture<String> future = commands.get("key");
//在future完成执行(即future.complete())时,将触发该方法
//这样的执行流程就是推模式
future.thenAccept(new Consumer<String>() {
@Override
public void accept(String value) {
System.out.println(value);
}
});

2.7 发布订阅(Pub/Sub)

  • 同步订阅
//创建发布订阅连接
StatefulRedisPubSubConnection<String, String> connection = client.connectPubSub()
//添加监听器
connection.addListener(new RedisPubSubListener<String, String>() { ... })
//获取同步发布订阅执行命令API
RedisPubSubCommands<String, String> sync = connection.sync();
//订阅channel通道信息
sync.subscribe("channel");
//向channel通道发送message信息,
//暂时没找到该命令,等后期补充
...
//下面是自定义的业务代码
...
  • 异步订阅
StatefulRedisPubSubConnection<String, String> connection = client.connectPubSub()
connection.addListener(new RedisPubSubListener<String, String>() { ... })
//获取异步发布订阅执行命令API
RedisPubSubAsyncCommands<String, String> async = connection.async();
//获取向通道channel订阅的future
RedisFuture<Void> futureSub = async.subscribe("channel");
//获取向通道channel发布message的future
RedisFuture<Void> futurePub = async.push("channel","message");
//自定义业务代码业务代码
...
  • Redis Cluster发布订阅
//创建Redis Cluster发布订阅连接
StatefulRedisClusterPubSubConnection<String, String> connection = clusterClient.connectPubSub()
//向连接中添加监听器
connection.addListener(new RedisPubSubListener<String, String>() { ... })
//获取发布订阅同步执行代码
RedisPubSubCommands<String, String> sync = connection.sync();
//向连接中订阅channel通道信息
sync.subscribe("channel");
//自定义业务代码
...

2.8 事务(Transaction)

Lettuce通过WATCH, UWATCH,EXEC, MULTI 和DISCARD来控制事务(Transaction),同时允许同步、异步、响应式和集群使用事务。那么,下面将分析同步和异步事务。

  • 同步使用事务
//第一段代码
//创建同步连接
RedisCommands<String, String> redis = client.connect().sync();
//开启事务
redis.multi() //成功,则返回值为"OK"
redis.set(key, value) //未执行,返回为null
redis.exec() //执行事务,返回list("OK")
//第二段代码
RedisCommands<String, String> redis = client.connect().sync();
//开启事务
redis.multi() //成功,返回"OK"
redis.set(key1, value) //未执行,返回null
redis.set(key2, value) //未执行, 返回null
redis.exec() //事务执行成功,返回list("OK", "OK")
  • 异步使用事务
//获取异步执行命令API
RedisAsyncCommands<String, String> async = client.connect().async();
//获取发送开启事务的future
RedisFuture<String> multi = async.multi();
//执行future的set命令设置值
RedisFuture<String> set = async.set("key", "value");
//获取提交执行事务的future
RedisFuture<List<Object>> exec = async.exec();
//获取执行事务的结果
List<Object> objects = exec.get();
//获取set的执行结果
String setResult = set.get();
//测试事务操作与set操作结果是否一致
//即事务操作是否成功
objects.get(0) == setResult
  • 响应式使用事务
//创建响应式连接
RedisReactiveCommands<String, String> reactive = client.connect().reactive();
//开启事务
reactive.multi().subscribe(multiResponse -> {
//编写事务操作
reactive.set("key", "1").subscribe();
reactive.incr("key").subscribe();
//提交执行事务
reactive.exec().subscribe();
});

2.9 主从复制(Master/Replica)

(1)Redis standalone模式

//创建客户端
RedisClient redisClient = RedisClient.create();
//创建主从连接
StatefulRedisMasterSlaveConnection<String, String> connection = MasterSlave.connect(redisClient, new Utf8StringCodec(),
RedisURI.create("redis://localhost"));
//从ReadFrom.MASTER_PREFERRED中读取并复制,
//ReadFrom.MASTER_PREFERRED是一个ReadFromMasterPreferred类实例的引用
connection.setReadFrom(ReadFrom.MASTER_PREFERRED);
System.out.println("Connected to Redis");
//关闭连接
connection.close();
//关掉客户端
redisClient.shutdown();

(2)Redis哨兵模式

RedisClient redisClient = RedisClient.create();
//创建哨兵模式主从复制连接
StatefulRedisMasterSlaveConnection<String, String> connection = MasterSlave.connect(redisClient, new Utf8StringCodec(),
RedisURI.create("redis-sentinel://localhost:26379,localhost:26380/0#mymaster"));
//从ReadFrom.MASTER_PREFERRED中读取并进行复制
connection.setReadFrom(ReadFrom.MASTER_PREFERRED);
System.out.println("Connected to Redis");
connection.close();
redisClient.shutdown();

2.10 集群(Cluster)

(1)lettuce对集群的支持

  • 支持所有的CLUSTER命令;
  • 基于命令见hash slot的命令路由;
  • 所选集群命令的高级抽象;
  • 多集群节点的命令操作;
  • 通过slot和host/port获取集群节点的直接连接
  • SSL和身份验证;
  • 周期性灯芯集群拓扑图;
  • 发布/订阅。

(2)使用NodeSelection API

//创建Redis集群的高级异步连接
RedisAdvancedClusterAsyncCommands<String, String> async = clusterClient.connect().async();
//使用NodeSelection API连接所有副本
AsyncNodeSelection<String, String> replicas = connection.slaves();
//从所有副本中获取所有的keys(密钥)
AsyncExecutions<List<String>> executions = replicas.commands().keys("*");
//遍历得到的keys
executions.forEach(result -> result.thenAccept(keys -> System.out.println(keys)));

(3)连接到一个集群

RedisURI redisUri = RedisURI.Builder.redis("localhost").withPassword("authentication").build();
//创建集群客户端
RedisClusterClient clusterClient = RedisClusterClient.create(rediUri);
//创建连接
StatefulRedisClusterConnection<String, String> connection = clusterClient.connect();
//获取同步执行命令api
RedisAdvancedClusterCommands<String, String> syncCommands = connection.sync();
...
connection.close();
clusterClient.shutdown();

(4)连接到多个子节点的Redis集群

RedisURI node1 = RedisURI.create("node1", 6379);
RedisURI node2 = RedisURI.create("node2", 6379);
//创建拥有多个子节点的集群客户端
RedisClusterClient clusterClient = RedisClusterClient.create(Arrays.asList(node1, node2));
//创建连接
StatefulRedisClusterConnection<String, String> connection = clusterClient.connect();
//获取同步执行命令api
RedisAdvancedClusterCommands<String, String> syncCommands = connection.sync();
...
connection.close();
clusterClient.shutdown();

(5)开启周期性拓扑图更新

RedisClusterClient clusterClient = RedisClusterClient.create(RedisURI.create("localhost", 6379));
//创建周期性拓扑图更新操作的配置操作
ClusterTopologyRefreshOptions topologyRefreshOptions = ClusterTopologyRefreshOptions.builder()
.enablePeriodicRefresh(10, TimeUnit.MINUTES)
.build();
//向客户端设置刚才配置好的操作
clusterClient.setOptions(ClusterClientOptions.builder()
.topologyRefreshOptions(topologyRefreshOptions)
.build());
...
clusterClient.shutdown();

(6)开启自适应拓扑图更新

RedisURI node1 = RedisURI.create("node1", 6379);
RedisURI node2 = RedisURI.create("node2", 6379);
RedisClusterClient clusterClient = RedisClusterClient.create(Arrays.asList(node1, node2));
//配置自适应拓扑图更新操作
ClusterTopologyRefreshOptions topologyRefreshOptions = ClusterTopologyRefreshOptions.builder()
.enableAdaptiveRefreshTrigger(RefreshTrigger.MOVED_REDIRECT, RefreshTrigger.PERSISTENT_RECONNECTS)
.adaptiveRefreshTriggersTimeout(30, TimeUnit.SECONDS)
.build();
//向集群客户端中设置刚才配置好的操作
clusterClient.setOptions(ClusterClientOptions.builder()
.topologyRefreshOptions(topologyRefreshOptions)
.build());
...
clusterClient.shutdown();

(7)获取一个节点

RedisURI node1 = RedisURI.create("node1", 6379);
RedisURI node2 = RedisURI.create("node2", 6379);
//创建集群客户端
RedisClusterClient clusterClient = RedisClusterClient.create(Arrays.asList(node1, node2));
//创建连接
StatefulRedisClusterConnection<String, String> connection = clusterClient.connect();
//获取指定节点的同步执行命令api
RedisClusterCommands<String, String> node1 = connection.getConnection("host", 7379).sync();
...
//不需要关闭节点连接
connection.close();
clusterClient.shutdown();

三、Spring Data Redis操作

3.1 简介

关注于当前最新的集成版本是Spring Data Redis,该框架通过对Jedis和Lettuce的封装集成,将所有的Redis操作和数据类型都封装成为了一个个单独的操作类(Operation)和集合类(例如,RedisList,RedisMap等),同时提供各配置类(Configuration)来让用户自定义期望得到的客户端,对用户提供高级抽象的客户端RedisTemplate,用户不必要了解底层如何实现,只需通过RedisTemplate对Redis进行操作即可,而在最新一版Spring Data Redis 2.1中,通过Lettuce支持了主写从读的设置,而这正是这一版我比较感兴趣的一点。

接下来的几节里,先从操作层面尝试Spring Data Redis是如何使用而实现Redis的功能特性,然后再从源码层面去看框架是如何封装Redis的特性和操作的,最后再讨论Spring Data Redis整合Redis的优点和不足。

3.2 配置Lettuce连接

在Spring Data中通过提供RedisStandaloneConfiguration和RedisSentinelConfiguration两个配置类,来提供Redis Standalone模式配置和Redis Sentinel模式配置。

  • Redis standalone模式配置
@Configuration
class LettuceConnectConfig {
@Bean
public LettuceConnectionFactory redisConnectionFactory() {
//通过配置RedisStandaloneConfiguration实例来
//创建Redis Standolone模式的客户端连接创建工厂
//配置hostname和port
return new LettuceConnectionFactory(new RedisStandaloneConfiguration("server", 6379));
}
}

3.3 主写从读模式配置

@Configuration

class WriteToMasterReadFromReplicaConfiguration {

//配置Lettuce连接工厂

@Bean

public LettuceConnectionFactory redisConnectionFactory() {

LettuceClientConfiguration clientConfig = LettuceClientConfiguration.builder()

.readFrom(SLAVE_PREFERRED) //配置从读

.build();

//配置hostname和port

RedisStandaloneConfiguration serverConfig = new RedisStandaloneConfiguration("server", 6379);

//通过配置创建Lettuce连接工厂

return new LettuceConnectionFactory(serverConfig, clientConfig);

}

}

3.4 Redis 哨兵模式

@Configuration
public class LettuceSentinelConfig{
@Bean
public RedisConnectionFactory lettuceConnectionFactory() {
//创建哨兵配置
RedisSentinelConfiguration sentinelConfig = new RedisSentinelConfiguration()
.master("mymaster") //设置主机的NameNode
.sentinel("127.0.0.1", 26379) //配置哨兵的ip和端口
.sentinel("127.0.0.1", 26380);
return new LettuceConnectionFactory(sentinelConfig);
}
}

也可以在application.yml(项目配置文件)中通过配置spring.redis.sentinel.mater配置主节点的名字,和配置spring.redis.sentinel.nodes配置逗号分隔host:port的列表来配置节点。

3.5 发布订阅(Pub/Sub)

(1)发布(Publish)

在Spring Data Redis中,提供了两种方法进行发布和订阅,即低级的RedisConnection和高级的RedisTemplate,执行效果是一样的。那么,下面我们看看,他们的操作实例吧。

// 使用ReidsConnection发布信息
RedisConnection con = ...
//序列化发布的信息
byte[] msg = ...
//序列化发往的channel通道
byte[] channel = ...
//调用RedisConnection进行发布
//请注意,官方文档的该参数顺序出现颠倒
con.publish(channel, msg);
// 使用RedisTemplate进行序列化和发布
RedisTemplate template = ...
//该方法对序列化和发布进行了封装
//hello为channel通道名,
//world是要发送的message
template.convertAndSend("hello!", "world");

(2)订阅(Subscribe)

到目前为止,框架只提供低级的RedisConnection的subscribe(指定通道发布)和psubscribe(指定模式发布,即正则表达式)两种消息发布方法。而且在Spring Data Redis框架下,订阅操作是阻塞的,一旦开启订阅,一个connection将会等待消息,此时调用除了subscribe、psubscribe、unsubscribe和punsubscribe之外的命令,都会抛出异常。接下来,我们来看看,使用上有什么不同。

  • subscribe方法
//创建连接
RedisConnection con = ...
//创建序列化工具
RedisSerializer<String> stringSerializer = RedisSerializer.string();
//序列化通道
Byte[] channel = stringSerializer.serialize("hello");
//创建信息监听器,实现接口下onMessage方法
//在接收到redis发送过来的消息后执行
//使用RedisConnection进行订阅必须实现该方法
//接收的参数从源码上来看第二个参数都是Byte[]类型
MessageListener listener = (massage,channel)->System.out.println("Received message from "+channel);
//接下来便可以订阅消息
con.subscribe(listener, message);
  • psubscribe方法
//创建连接
RedisConnection con = ...
//创建序列化工具
RedisSerializer<String> stringSerializer = RedisSerializer.string();
//序列化正则表达式
Byte[] pattern = stringSerializer.serialize("hello*");
//创建信息监听器,实现接口下onMessage方法
//在接收到redis发送过来的消息后执行
//使用RedisConnection进行订阅必须实现该方法
//接收的参数从源码上来看第二个参数都是Byte[]类型
MessageListener listener = (massage,pattern)->System.out.println("Received message from "+pattern);
//接下来便可以使用正则表达式订阅消息
con.psubscribe(listener,pattern);

3.6 事务(Transaction)

(1)使用

RedisTemplate通过execute方法提供事务功能,但不保证所有的事务操作在同一个连接中进行。

//执行一次事务操作

//调用RedisTemplte的execute(SessionBack<T> session)方法

//需要实现SessionBack中的execute(RedisOperations<K,V> operations)方法

//使用lamda表达式

List<Object> txResults = redisTemplate.execute((RedisOperations<K, V> operations)->{

//开启事务

operations.multi();

//添加操作

operations.opsForSet().add("key", "value1");

//这个将会返回事务操作的所有结果

return operations.exec();

}

});

//打印结果

System.out.println("Number of items added to set: " + txResults.get(0));

(2)配置

//开启声明式事务管理
@Configuration
@EnableTransactionManagement
public class RedisTxContextConfiguration {
//配置RedisTemplate
@Bean
public StringRedisTemplate redisTemplate() {
StringRedisTemplate template = new StringRedisTemplate(redisConnectionFactory());
//显式启用事务支持
template.setEnableTransactionSupport(true);
return template;
}
//配置Redis连接工厂
@Bean
public RedisConnectionFactory redisConnectionFactory() {
// jedis || Lettuce
}
//配置事务管理器,但Spring Data Redis不提供事务管理器
//需要我们自己创建实现配置事务管理器
//若使用JDBC连接,则直接使用已存在的事务管理器
@Bean
public PlatformTransactionManager transactionManager() throws SQLException {
return new DataSourceTransactionManager(dataSource());
}
//配置数据源
@Bean
public DataSource dataSource() throws SQLException {
// ...
}
}

3.7 流水线(Pipelining)

Redis的pipelining功能,可以一次向server发送多个命令而不需要等待响应,之后通过一个步骤就可以获得所有响应。当需要连续发送多个命令时(如向同一个List添加多个元素),pipelining流水线功能将会提高应用性能。

该框架通过提供RedisTemplate的executePipelined方法,支持pipelining功能。下面来看看它的示例。

//从一个队列中删除指定数量的元素
List<Object> results = stringRedisTemplate.executePipelined(
//RedisCallback内部类
//实现doInRedis方法
//此处亦可以创建SessionCallack的内部类
//由用户自己决定
new RedisCallback<Object>() {
public Object doInRedis(RedisConnection connection) throws DataAccessException {
//创建连接
StringRedisConnection stringRedisConn = (StringRedisConnection)connection;
for(int i=0; i< batchSize; i++) {
//循环调用rpop进行batchSize次右删除
stringRedisConn.rPop("myqueue");
}
return null;
}
});

3.8 集群(Cluster)

Spring Data Redis 框架提供了Redis Cluster的配置类,即RedisClusterConfiguration类,提供两项spring.redis.cluster.nodes和spring.redis.cluster.max-redirects属性配置,配置集群节点和允许集群重定向的最大数量。可以通过该RedisClusterConfiguration类进行创建集群创建工厂,然后创建集群连接。

当然,在官方文档上,也提供了让我们自己配置以及读取创建连接工厂的方法,下面来看看怎么自定义创建。

//创建自定义集群配置属性
//从application的配置文件中
//读取前缀为spring.redis.cluster相应的属性
//注册成为Spring bean
@Component
@ConfigurationProperties(prefix = "spring.redis.cluster")
public class ClusterConfigurationProperties {
//集群节点列表
List<String> nodes;
public List<String> getNodes() {
return nodes;
}
public void setNodes(List<String> nodes) {
this.nodes = nodes;
}
}
//配置创建集群连接工厂
@Configuration
public class LettuceClusterAppConfig {
//自动装配配置Bean
@Autowired
ClusterConfigurationProperties clusterProperties;
@Bean
public RedisConnectionFactory connectionFactory() {
//根据上面配置bean创建连接工厂
return new LettuceConnectionFactory(
new RedisClusterConfiguration(clusterProperties.getNodes()));
}
}

3.9 序列化与反序列化

Spring Data Redis提供了RedisSerializer接口,同时提供了多个实现该接口的序列化工具,如JdkSerializationRedisSerializer、Jackson2JsonRedisSerializer和GenericToStringSerializer等。通过调用其实例的serialize和deserialize方法进行序列化和反序列化。

四、Spring Data Redis源码

从上一章中,我们发现由多个常出现的由Spring Data Redis 封装了的类和操作。那么,这一章主要来看其是如何封装客户端操作的。

4.1 RedisTemplate

该类是Spring Data Redis提供给用户的*的抽象客户端,用户可直接通过RedisTemplate进行多种操作,那么,我们先来看看RedisTemplate封装了哪些操作。下面这列表是RedisTemplate的继承关系和所有方法(已过滤重载方法,共有81个方法)

//类继承关系
//RedisAccessor是RedisTemplate定义普通属性的基类,不直接使用
//RedisOperations是指定RedisTemplate实现的Redis connection操作的集合接口
//BeanClassLoaderAware是给其实现类是设置类加载器的接口
1.RedisTemplate<K, V> extends RedisAccessor implements RedisOperations<K, V>, BeanClassLoaderAware
//方法
//配置默认序列化与反序列化工具类
2.afterPropertiesSet
//根据参数执行相关operation操作,例如,事务
3.execute
//执行pipelining流水线相关操作
4.executePipelined
//执行指定connection连接的相关操作
5.executeWithStickyConnection
//执行session内的execute方法
6.executeSession
//创建RedisConnection代理类
7.createRedisConnectionProxy
//connection连接的预处理
8.preProcessConnection
//结果的后处理,默认什么都不做
9.postProcessResult
//是否向RedisCallback暴露本地连接
10.isExposeConnection
//设置是否向RedisCallback暴露本地连接
11.setExposeConnection
//12到26都是设置和获取相关序列化工具类
12.isEnableDefaultSerializer
13.setEnableDefaultSerializer
14.getDefaultSerializer
15.setDefaultSerializer
16.setKeySerializer
17.getKeySerializer
18.setValueSerializer
19.getValueSerializer
20.getHashKeySerializer
21.setHashKeySerializer
22.getHashValueSerializer
23.setHashValueSerializer
24.getStringSerializer
25.setStringSerializer
26.setScriptExecutor
//27到34为私有方法,不对外提供使用
27.rawKey
28.rawString
29.rawValue
30.rawKeys
31.deserializeKey
32.deserializeMixedResults
33.deserializeSet
34.convertTupleValues
//执行事务
35.exec
36.execRaw
//删除操作
37.delete
//接触链接
38.unlink
//查看是否含有指定key
39.hasKey
40.countExistingKeys
//设置过期时间
41.expire
42.expireAt
//转换成字节流并向channel发送message
43.convertAndSend
//获取过期时间
44.getExpire
//根据传入的正则表达式返回所有的key
46.keys
//取消指定key的过期时间
47.persist
//移动指定的key和index到数据库中
48.move
//从键空间随机获取一个key
49.randomKey
//将指定key改成目标key
50.rename
//key不存在时,将指定key改成目标key
51.renameIfAbsent
//设置存储在指定key的类型
52.type
//检索存储在key的值的序列化版本
53.dump
//执行Redis的restore的命令
54.restore
//标记事务阻塞的开始
55.multi
//丢弃所有在multi之后发出的命令
56.discard
//观察指定key在事务处理开始即multi之后的修改情况
57.watch
//刷新先前观察的所有key
58.unwatch
//为key元素排序
59.sort
//关闭客户端连接
60.killClient
//请求连接客户端的相关信息和统计数据
61.getClientList
//更改复制配置到新的master
62.slaveOf
//将本机更改为master
63.slaveOfNoOne
//64到79都是获取相对应的操作
64.opsForCluster
65.opsForGeo
66.boundGeoOps
67.boundHashOps
68.opsForHash
69.opsForHyperLogLog
70.opsForList
71.boundListOps
72.boundSetOps
73.opsForSet
74.opsForStream
75.boundStreamOps
76.boundValueOps
77.opsForValue
78.boundZSetOps
79.opsForZSet
//设置是否支持事务
80.setEnableTransactionSupport
//设置bean的类加载器
81.setBeanClassLoader

4.2 Operations类和Commands类

在Spring Data Redis中,将对应的操作都封装成为了对应的Operations接口和实现类(在org.springframework.data.redis.core包中),有ListOperations接口、HashOperations接口、GeoOperatons接口、ClusterOperations接口、HyperLoglogOperations接口等等接口以及对应的DefaultListOperations类、DefaultHashOperations类等默认实现类。Command类是对redis命令的第一次封装,即由Redis的第三方Java客户端提供的,例如lettuce,这些Command类的命令就在在org.springframework.data.redis.connection.lettuce包内。

下面分析一下DefaultListOperations类:

  • leftPop方法源码
//左删除链表
public V leftPop(K key) {
return execute(new ValueDeserializingRedisCallback(key) {
@Override
protected byte[] inRedis(byte[] rawKey, RedisConnection connection) {
//调用connection的lpop(rawKey)方法,
//即调用底层创建出的Lettuce连接或者Jedis连接的lpop方法
//获取同步或异步执行命令API,然后执行lpop方法
return connection.lPop(rawKey);
}
}, true);
}
  • leftPush方法源码
//左添加链表
//key为链表,value为值
public Long leftPush(K key, V value) {
//序列化链表名
byte[] rawKey = rawKey(key);
//序列化链表值
byte[] rawValue = rawValue(value);
//调用底层实现的Lettuce连接或者Jedis连接
//获取同步或异步执行命令API,然后执行lPush命令
return execute(connection -> connection.lPush(rawKey, rawValue), true);
}

与此相同, 其他的操作最后都会调用底层创建的Lettuce或Jedis连接,即LettuceConnection或JedisConnection,然后通过相应的方法获取相应的命令封装类即Commands类(例如,LettuceListCommands类),最后调用Command类的方法(例如,LettuceCommands类的lPush或lPop方法)进行操作。在此之外,还有LettuceClusterConnection类(集群连接)和LettuceSentinelConnection类(哨兵连接),同样的Jedis也有相同实现,不再赘言。

4.3 数据结构

  在org.springframework.data.redis.support下的collections包里实现了列表,散列表,有序集合,集合,以及有序集合,即DefaultRedisList、DefaultRedisMap、DefaultRedisSet以及DefaultRedisZSet等,都绑定了相应的绑定类(例如,列表的绑定类为BoundLustOperations)。

  在org.springframework.data.redis.support下的actomic包里,由Redis事务的watch和multi方法实现CAS的操作更新键值。同时,提供了Double、Integer和Long的原子操作类型。

4.5 SessionCallback接口

  该接口是Redistemplate的execute方法中的参数类型,通过实现该接口唯一方法,RedisTemplate的execute(sessionCallback)方法将会执行在实现方法中的所有操作。可以通过该方式实现事务,即通过执行multi,discard,exec,watch和unwatch命令实现。

4.6 RedisCallback接口

  该接口和SessionCallback接口一样,也是Redistemplate的execute方法中的参数类型,但是是低级的redis回调方法。使用方法和上面接口一样,该接口有一个exec方法,通常将其实现为匿名类给execute方法使用,而最常用是链接多个get/set/trim操作等等。

4.7 总结

  该框架实现了Spring和Redis的整合,提供了高级抽象的客户端以及相应的配置,降低了用户的入门成本,以上是这框架的优点。而在阅读该项目源码时,也发现在项目源代码结构上的一些不足,也或许是自己没理解代码者的行为吧。例如,org.springframework.data.redis.core包下的各类Operations操作类完全可以将其放入一个operations包内;org.springframework.data.redis.config包内居然存放的不是配置类,而是解析类。就目前而言我发现,mybatis和spring框架的源码文件和注释是最清晰的最整洁的,作为Spring下的项目都应该秉持着这样的态度和风格。

五、Spring Boot 整合redis

Spring Boot提供starter的来向用户提供完成自动配置的redis,那么,关于Spring Boot对redis的整合主要关心它所提供配置选项。下面从源码来看看是如何整合的。

5.1 pom.xml文件

  在spring-boot-starter-data-redis项目中,主要通过pom.xml进行项目依赖。下面是项目下的pom.xml依赖源码。

<dependencies>
//添加spring-boot-starter依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
//添加spring-data-redis依赖
<dependency>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-redis</artifactId>
<exclusions>
//排除jcl-over-slf4j依赖
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>jcl-over-slf4j</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
//添加Lettuce客户端依赖
<groupId>io.lettuce</groupId>
<artifactId>lettuce-core</artifactId>
</dependency>
</dependencies>

  而在spring-boot-starter依赖中又引入了许多相关的依赖,尤其是自动配置的依赖。

<dependencies>
//添加spring-boot依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot</artifactId>
</dependency>
//添加spring-boot-autoconfigure项目依赖
//这个依赖很重要,和整合各式应用有关的配置都在该依赖下
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-autoconfigure</artifactId>
</dependency>
//添加日志依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-logging</artifactId>
</dependency>
//添加注解依赖
<dependency>
<groupId>javax.annotation</groupId>
<artifactId>javax.annotation-api</artifactId>
</dependency>
//添加spring核心依赖,例如ioc、aop等等
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-core</artifactId>
</dependency>
//添加yaml依赖
<dependency>
<groupId>org.yaml</groupId>
<artifactId>snakeyaml</artifactId>
<scope>runtime</scope>
</dependency>
</dependencies>

  下一节,重点分析在spring-boot-autoconfigure项目下的源码配置。

5.2 spring-boot-autoconfigure项目

  在该项目下的org.springframework.boot.autoconfigure.data.redis包内,提供了许多关于redis的配置,下面是该包下的截图。

Redis篇之操作、lettuce客户端、Spring集成以及Spring Boot配置

  由包中的源码可以得知,该包下提供了Jedis和Lettuce的客户端连接配置、Redis基本连接配置、响应式配置、Redis存储仓库配置以及Redis所有的配置属性。

5.3 RedisProperties类

  该类提供给用户使用,用户可以通过application文件配置Redis相关属性,下面是yml格式的配置列表。

spring:
redis:
database:
url:
host:
password:
port:
ssl:
timeout:
pool:
maxIdle:
minIdle:
maxActive:
maxWait:
sentinel:
master:
nodes:
cluster:
nodes:
maxRedirects:
jedis:
pool:
lettuce:
shutdownTimeout
pool

5.4 LettuceConnectionConfiguration类

  该配置类通过注解@ConditionalOnClass(RedisClient.class)进行bean化。类似注解解释如下(该解释摘自https://blog.csdn.net/blueheart20/article/details/81020262):

  • @ConditionalOnBean(仅仅在当前上下文中存在某个对象时,才会实例化一个Bean)
  • @ConditionalOnClass(某个class位于类路径上,才会实例化一个Bean),该注解的参数对应的类必须存在,否则不解析该注解修饰的配置类;
  • @ConditionalOnExpression(当表达式为true的时候,才会实例化一个Bean)
  • @ConditionalOnMissingBean(仅仅在当前上下文中不存在某个对象时,才会实例化一个Bean), 该注解表示,如果存在它修饰的类的bean,则不需要再创建这个bean;可以给该注解传入参数例如@ConditionOnMissingBean(name = “example”),这个表示如果name为“example”的bean存在,这该注解修饰的代码块不执行。
  • @ConditionalOnMissingClass(某个class类路径上不存在的时候,才会实例化一个Bean)
  • @ConditionalOnNotWebApplication(不是web应用)
  • @ConditionalOnProperty是指在application.yml里配置的属性是否为true,其他的几个都是对class的判断

  所以,在类路径上存在RedisClient才会bean化该类进行配置。

5.5 RedisAutoConfiguration类

  该类使用了四个注解:

  • @ConditionalOnClass(RedisOperations.class) :在类路径上存在RedisOperations.class时,才会bean实例化该类。
  • @EnableConfigurationProperties(RedisProperties.class) : 当@EnableConfigurationProperties注解应用到你的@Configuration时, 任何被@ConfigurationProperties注解的beans将自动被Environment属性配置,即RedisProperties.class类将会被Environment属性配置。
  • @Import({ LettuceConnectionConfiguration.class, JedisConnectionConfiguration.class }):将LettuceConnectionConnfiguration和JedisConnectionConfiguration导入成为bean。
  • @Configuration:配置类。

  该类里连个注入方法都使用了@ConditionalOnMissingBean,故只有上下文不存在RedisTemplate或StringRedisTemplate时,才会创建对应的bean。其他的配置大同小异,有兴趣可以自己查阅源码。

六、总结

至此,Redis基本操作,Redis的Lettuce客户端,Spring Data Redis项目,以及Spring Boot 提供的配置方案的了解分析,就已经结束了。我个人认为Lettuce已经提供了比较完善的Redis的操作,并且更贴近于Redis的执行流程和思想,并且有更宽松的定制方案,不出意外的话,我会选择使用Lettuce客户端。但是,Spring Boot项目提供的Redis配置方案是比较完善的,如果想要自己整合Spring boot和Lettuce客户端,可以学习它的配置思想。

而Spring Data Redis项目就效果而言,应该是很不错的,但就其项目架构而言,算不上优美。不过,它的确降低了用户的学习门槛。文章若有不当之处,欢迎指教。

附录 相关网址

redis中文官方地址:http://www.redis.cn

reids英文官方地址(由于中文官方有些地方翻译的不是很明白,可以查询英文官方):https://redis.io/

lettuce官方地址:https://lettuce.io

Spring Data Redis官方地址:https://spring.io/projects/spring-data-redis

Spring Boot redis 自动配置源码地址:https://github.com/spring-projects/spring-boot/tree/master/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/data/redis