redis事务
Redis 事务可以一次执行多个命令, 并且带有以下两个重要的保证:
- 批量操作在发送 EXEC 命令前被放入队列缓存。
- 收到 EXEC 命令后进入事务执行,事务中任意命令执行失败,其余的命令依然被执行。
- 在事务执行过程,其他客户端提交的命令请求不会插入到事务执行命令序列中。
一个事务从开始到执行会经历以下三个阶段:
- 开始事务。
- 命令入队。
- 执行事务。
示例:
//根据multi开启事务
redis 127.0.0.1:6379> MULTI
OK redis 127.0.0.1:6379> SET book-name "Mastering C++ in 21 days"
QUEUED redis 127.0.0.1:6379> GET book-name
QUEUED redis 127.0.0.1:6379> SADD tag "C++" "Programming" "Mastering Series"
QUEUED redis 127.0.0.1:6379> SMEMBERS tag
QUEUED
//触发事务
redis 127.0.0.1:6379> EXEC
1) OK
2) "Mastering C++ in 21 days"
3) (integer) 3
4) 1) "Mastering Series"
2) "C++"
3) "Programming"
注:
单个 Redis 命令的执行是原子性的,但 Redis 没有在事务上增加任何维持原子性的机制,所以 Redis 事务的执行并不是原子性的。
事务可以理解为一个打包的批量执行脚本,但批量指令并非原子化的操作,中间某条指令的失败不会导致前面已做指令的回滚,也不会造成后续的指令不做。
redis 127.0.0.1:7000> multi
OK
redis 127.0.0.1:7000> set a aaa
QUEUED
redis 127.0.0.1:7000> set b bbb
QUEUED
redis 127.0.0.1:7000> set c ccc
QUEUED
redis 127.0.0.1:7000> exec
1) OK
2) OK
3) OK
如果在 set b bbb 处失败,set a 已成功不会回滚,set c 还会继续执行。
redis事务命令
multi:标记一个事务块的开始。
exec:执行所有事务块内的命令。
discard:取消事务,放弃执行事务块内的所有命令。
watch:监视一个(或多个) key ,如果在事务执行之前这个(或这些) key 被其他命令所改动,那么事务将被打断。
unwatch:取消 WATCH 命令对所有 key 的监视。
将日志记录到redis
1、需求
- 获取日志的产生的线程名称,记录器名称,上下文产生时间,日志发生时间,自定义日志的信息
- 将获取的信息以json的形式保存到redis中
2、思路
- 配置logback使用自定义Appender实现,来获取对应的日志信息
- 配置一个单列的redis工具类(不影响其他业务),将获取的日志信息保存起来
3、配置依赖
<!-- 日志:slf4j是接口,log4j是具体实现 -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.12</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
<version>1.1.1</version>
</dependency>
<!-- 实现slf4j接口并整合 -->
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.1.1</version>
</dependency>
<!--redis-->
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.7.3</version>
</dependency>
4、配置redis以及logback
a、logback.xml
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<!--自定义日志-->
<appender name="MyAppender" class="log.RedisAppender" />
<root level="info">
<appender-ref ref="MyAppender" />
</root>
</configuration>
b、redis.properties
maxIdle=100
maxWait=3000
testOnBorrow=true
host=127.0.0.1
port=6379
timeout=3000
pass=你的密码
5、MyAppender
package log; import ch.qos.logback.classic.spi.LoggingEvent;
import ch.qos.logback.core.AppenderBase;
import dto.log.LogData;
import redis.clients.jedis.Jedis;
import tool.JsonBuilder;
import tool.RedisBuilder; /**
* 自定义日志处理
*/
public class RedisAppender extends AppenderBase<LoggingEvent> { @Override
protected void append(LoggingEvent loggingEvent) {
//获取日志数据
LogData logData=new LogData(loggingEvent);
//设置日志保存数据库
Jedis jedis=RedisBuilder.getSingleJedis(2);
//设置日志的key
String key="logData";
//获取日志条数
Integer index=RedisBuilder.getRedisIndexByName(key);
//保存日志
jedis.set(key+index, JsonBuilder.getString(logData));
//关闭链接
jedis.close(); }
}
6、LogData
package dto.log; import ch.qos.logback.classic.spi.LoggingEvent;
import tool.DataFormatBuilder; /**
* Created by yuyu on 2018/3/15.
* 用于保存日志数据
*/
public class LogData { private String message;//日志的信息
private String loggerTime;//上下文产生时间
private String loggerName;//记录器名称
private String threadName;//线程名称
private String happenStamp;//日志发生时间 public LogData() {
} public LogData(LoggingEvent loggingEvent) { this.message=loggingEvent.toString();
this.loggerTime=DataFormatBuilder.getTimeStampFormat(null, loggingEvent.getContextBirthTime());
this.loggerName=loggingEvent.getLoggerName();
this.threadName=loggingEvent.getThreadName();
this.happenStamp=DataFormatBuilder.getTimeStampFormat(null, loggingEvent.getTimeStamp());
}
//getter,setter略
}
7、RedisBuilder
package tool; import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig; import java.util.Properties; /**
* Created by yuyu on 2018/3/15.
* redis相关的操作,获取一个单例的连接池
*/
public class RedisBuilder { private static JedisPool jedisPool; private static RedisBuilder singleRedisBuilder=new RedisBuilder(); //单利模式
private RedisBuilder(){
//获取配置信息
Properties properties=PropertiesGetter.get("redis");
//设置连接池参数
JedisPoolConfig config = new JedisPoolConfig();
config.setMaxIdle(Integer.parseInt(properties.getProperty("maxIdle")));//最大的jedis实例
config.setMaxWaitMillis(Integer.parseInt(properties.getProperty("maxWait")));//最大等待时间(毫秒)
config.setTestOnBorrow(Boolean.parseBoolean(properties.getProperty("testOnBorrow")));//借用时测试
//redis链接
jedisPool=new JedisPool(config, properties.getProperty("host")
, Integer.parseInt(properties.getProperty("port"))
, Integer.parseInt(properties.getProperty("timeout"))
, properties.getProperty("pass"));
} /**
* 从连接池中获取一个Jedis对象
* @param db 数据库[0,15]
* @return
*/
public static Jedis getSingleJedis(Integer db) {
if (db==null||(db<0&&db>15)){
return null;
}
Jedis back=jedisPool.getResource();
back.select(db);
return back;
} /**
*传入名称获取保存在redis中的Index号码
* @param name
* @return
*/
public static Integer getRedisIndexByName(String name){
if (name==null){
return null;
}
Jedis jedis=RedisBuilder.getSingleJedis(15);
Integer index;
String value=jedis.get(name);
//获取保存的index数据,没有的时候取0
if (null==value){
index=0;
}else{
index =Integer.parseInt(value)+1;
}
//将index保存
jedis.set(name,index.toString());
jedis.close();
return index;
}
}
8、DateFormatBuilder
package tool; import java.text.SimpleDateFormat;
import java.util.Date; /**
* Created by yuyu on 2018/3/15.
* 用于日期处理相关函数
*/
public class DataFormatBuilder { /**
* 根据传进来的时间戳 获取对应的时间格式
* @param format 时间格式
* @param stamp 时间戳
* @return
*/
public static String getTimeStampFormat(String format,Long stamp){
if (stamp==null){
return null;
}
if (format==null){
format="yyyy-MM-dd HH:mm:ss/SSS";
}
SimpleDateFormat df = new SimpleDateFormat(format);//设置日期格式
return df.format(stamp);//传进来的时间戳为获取当前系统时间
}
}
9、PropertiesGetter
package tool; import exception.DobeoneException; import java.io.IOException;
import java.io.InputStream;
import java.util.Properties; /**
* Created by yuyu on 2018/3/12.
* 用于参数配置文件中的数据获取
*/
public class PropertiesGetter {
/**
* 获取配置文件的配置信息
* @param name
* @return
*/
public synchronized static Properties get(String name){
String file="/properties/"+name+".properties";
Properties prop = new Properties();
InputStream in = PropertiesGetter.class.getResourceAsStream(file);
try {
prop.load(in);
} catch (IOException e) {
throw new DobeoneException("获取配置文件异常!-file-"+file,e);
}
return prop;
}
}
10、JsonBuilder
package tool; import com.fasterxml.jackson.databind.ObjectMapper;
import exception.DobeoneException; /**
* Created by yuyu on 2018/3/15.
* json相关的函数
*/
public class JsonBuilder {
/**
* 将一个实体类转换成json字符串
* @param object
* @return
*/
public static String getString(Object object){
//安全判断
if (object==null){
return null;
}
ObjectMapper mapper = new ObjectMapper();
String back;
try{
back=mapper.writeValueAsString(object);
}catch (Exception e){
//抛出一个自定义异常
throw new DobeoneException("json字符转换失败!-object-"+object,e);
}
return back;
}
}
测试:
package tool; import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory; /**
* Created by yuyu on 2018/3/15.
* 测试日志到redis
*/
public class TestLogger { private Logger logger= LoggerFactory.getLogger(this.getClass()); @Test
public void testLogger(){
logger.info("hahha");
}
}
redis分布式锁
一般生产环境都是服务器集群,那么如果希望某个操作只能由一台服务器去运行,那么如何实现呢?例如本人之前做过的一个需求,通过kettle将数据同步到我们的数据库中,这个同步是每天凌晨3点,每天一次,数据到达我们数据库之后我们要进行分发,并做后续的处理。因此这个分发就只能由一台服务器去操作,如果同时有多台服务器分发,就会出现任务重复的问题。
1、分布式锁要求
- 互斥性。在任意时刻,只有一个客户端能持有锁。
- 不会发生死锁。即使有一个客户端在持有锁的期间崩溃而没有主动解锁,也能保证后续其他客户端能加锁。
- 具有容错性。只要大部分的Redis节点正常运行,客户端就可以加锁和解锁。
- 解铃还须系铃人。加锁和解锁必须是同一个客户端,客户端自己不能把别人加的锁给解了。
2、分布式锁
a、添加依赖
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.9.0</version>
</dependency>
b、代码实现
public class RedisTool { private static final String LOCK_SUCCESS = "OK";
private static final String SET_IF_NOT_EXIST = "NX";
private static final String SET_WITH_EXPIRE_TIME = "PX"; /**
* 尝试获取分布式锁
* @param jedis Redis客户端
* @param lockKey 锁
* @param requestId 请求标识
* @param expireTime 超期时间
* @return 是否获取成功
*/
public static boolean tryGetDistributedLock(Jedis jedis, String lockKey, String requestId, int expireTime) {
//第一个为key,我们使用key来当锁,因为key是唯一的;
//第二个为value,我们传的是requestId,很多童鞋可能不明白,有key作为锁不就够了吗,为什么还要用到value?原因就是我们在上面讲到可靠性时,分布式锁要满足第四个条件解铃还须系铃人,
通过给value赋值为requestId,我们就知道这把锁是哪个请求加的了,在解锁的时候就可以有依据。requestId可以使用UUID.randomUUID().toString()方法生成。
//第三个为nxxx,这个参数我们填的是NX,意思是SET IF NOT EXIST,即当key不存在时,我们进行set操作;若key已经存在,则不做任何操作;
//第四个为expx,这个参数我们传的是PX,意思是我们要给这个key加一个过期的设置,具体时间由第五个参数决定。
//第五个为time,与第四个参数相呼应,代表key的过期时间。
//总结:利用redis单个操作的原子性,将加锁操作放在一条命令中
//1、String set(String key, String value)
//2、String set(String key, String value, String nxxx)
//3、String set(String key, String value, String nxxx, String expx, int time)
//4、String set(String key, String value, String nxxx, String expx, long time)
String result = jedis.set(lockKey, requestId, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, expireTime); if (LOCK_SUCCESS.equals(result)) {
return true;
}
return false; } }
关于分布式锁的各种问题可以参考这篇文章:Redis分布式锁的正确实现方式(Java版)