关于布隆过滤器,手写你真的知其原理吗?让我来带你手写redis布隆过滤器。

时间:2023-11-11 21:05:38

说到布隆过滤器不得不提到,redis,

redis作为现在主流的nosql数据库,备受瞩目;它的丰富的value类型,以及它的偏向计算向数据移动属性减少IO的成本问题。备受开发人员的青睐。通常我们使用redis作为数据缓存来使用,但是作为缓存redis会有一些问题,就是缓存穿透问题击穿雪崩一致性双写。本次主要讲解的就是穿透问题

首先我们先思考一下为什么会产生穿透的问题。

假设我们有一些数据,存储在了MySQL中,但是由于用户量的庞大我们需要在在用户访问数据的时候需要在redis中进行一个过滤、拦截,reids中存在则放行,不存在则直接拒绝;从而使用户不会过多的去操作数据库,减轻数据库的压力。但是此时就会有一个问题:

  • 我们如何保证redis在用户携带数据过来的时候进行一个判断呢,此时就需要写一个算法来将用户的数据进行一个拆解,计算来比对redis中已经存在的数据。到这里,我们理论上解决了数据的过滤问题。
  • 那么还有一个问题就是redis存储MySQL数据的时候如何存储呢,是将数据全部存储在redis中吗?如果是的话那么redis基于内存的一种nosql数据库,根本不可能存储那么多的数据量的啊?此时我们就需要利用redisbitmap的类型的特性。来进行数据的存储。

所谓的bitmap就是使用1Bit位来标记元素对应的value,而key就是该元素,想一下1Bytes是8个Bit,那么1个KB就是8192Bit,1M的话就是8388608Bit,可想而知,如果利用reids的bitmap处理大数据量的数据是不成问题的

缓存穿透的解决思路

基于以上的思路:整体的解决方案就是这样子的

  • 首先我们需要利用算法在项目启动的时候将需要缓存的数据加载到redis的bitmap中
  • 然后再写一个算法在用户访问的使用将数据进行拆解,比对redis的bitmap是否存在该条数据。存在放行,防止直接返回。

图解:

关于布隆过滤器,手写你真的知其原理吗?让我来带你手写redis布隆过滤器。

上述方案也可能存在一个漏掉的问题,误打误撞穿过去了,这种情况也不是不存在的。但是我们可以在穿透过去之后,在redis中加一个key,为这个error做一个标记,防止下一次再次穿透过去

注意

概率解决问题不可能百分之百解决问题>1% (No Silver Bullet)

  1. 你有什么
  2. 有的向bitmap标记
  3. 请求有可能被误标记
  4. 但是 一定概率减少数据放行 穿透
  5. 成本低

总结一句话:redis告诉你不存在的那么一定不存在,百分之百;但是redis告诉你有的,却不一定百分之百存在

大致的解决思路已经理清,接下来整理一下解决方案吧:

缓存穿透的解决方案

解决方案大概有三种:

关于布隆过滤器,手写你真的知其原理吗?让我来带你手写redis布隆过滤器。

首先我们先来实现第一种:(客户端实现bloom算法,自己承载bitmap)

方案一
public class test {
//位图的长度
public static final int NUM_SLOTS = 1024 * 1024 * 8;
//哈希函数的个数
public static final int NUM_HASH = 8;
//初始化位图
private static BigInteger bits = new BigInteger("0"); private static void addElement(String string) {
//增加元素将对应位图上的位置为1
//使用哈希函数计算哈希值:循环八次
for(int i = 0; i < NUM_HASH; i++){
int bit = hash(string, i);
if(!bits.testBit(bit)){
//BigInteger对象运行的必须是另外的BigInteger对象
//左移将对应位图上的位置为1
bits = bits.or(new BigInteger("1").shiftLeft(bit));
}
}
} private static int hash(String message, int index) {
//这里也可以使用其他的哈希函数来计算哈希值,不影响最终的结果
//使用md5得到加密后的字符串相当于哈希函数计算出hashCode的过程
message += index;
try {
MessageDigest md5 = MessageDigest.getInstance("md5");
byte bytes[] = message.getBytes();
md5.update(bytes);
byte bits[] = md5.digest();
BigInteger bi = new BigInteger(bits);
return Math.abs(bi.intValue()) % NUM_SLOTS;
} catch (NoSuchAlgorithmException e) {
e.printStackTrace();
}
return -1;
} private static boolean check(String string) {
//使用与填充位图的方法一致检查对应位上是否为一
for(int i = 0; i < NUM_HASH; i++){
int index = hash(string, i);
if(!bits.testBit(index)){
return false;
}
}
return true;
}
}
方案二
@Component
@Slf4j
public class RedisBloomUtil { @Autowired
private JedisPool jp; @PostConstruct
public void initJedis(){
jedisPool = jp;
} private static JedisPool jedisPool; private static Jedis jedis = null; /**
* 要存储的数据量
*/
private static long n = 1000000L; /**
* 容忍的错误率
*/
private static double fpp = 0.01F; /**
* bit数组的长度
*/
private static long numBits = optNumOfBits(n,fpp); /**
* hash函数的个数
*/
private static int hashNum = optNumOfHashFunction(n,numBits); /**
* 获取redis bitmap 中的数量
* @return
*/
public long getCount(){
jedis = jedisPool.getResource();
Pipeline pipeline = jedis.pipelined();
Response<Long> newsInfo = pipeline.bitcount(RedisConfig.newsCacheKey);
pipeline.sync();
Long count = newsInfo.get();
pipeline.close();
return count;
} /**
* 判断keys 是否存在集合 where中
* @param where
* @param key
* @return
*/
public static boolean isExist(String where,String key){
jedis = jedisPool.getResource();
long[] indexs = getIndex(key);
boolean flag;
// 这里同样采用管道的方式来降低过滤器运行当中访问redis的次数 降低redis并发量
Pipeline pipeline = jedis.pipelined();
try {
for (long index:indexs) {
pipeline.getbit(where,index);
}
flag = !pipeline.syncAndReturnAll().contains(false);
} finally {
pipeline.close();
}
// 不存在则放进redis的 bitmap 中
// if (!flag){
// putRedis(where,key);
// }
return flag;
} /**
* 将key存储在redis bitmap 中
* @param where
* @param key
*/
public static void putRedis(String where,String key){
jedis = jedisPool.getResource();
long[] indexs = getIndex(key);
// 这里使用redis管道来降低过滤器运行当中访问redis的次数 降低redis并发量
Pipeline pipeline = jedis.pipelined();
try {
for (long index: indexs) {
pipeline.setbit(where,index,true);
}
pipeline.sync();
// 这里可以将数据存储到mysql中
} finally {
pipeline.close();
}
Long ttl = jedis.ttl(where);
// 设置 key的过期时间 30天
if (ttl == -1 || ttl == -2){
jedis.expire(where,2592000);
}
} /**
* 根据key获取 bitmap 下表
* @param key
* @return
*/
public static long[] getIndex(String key){
long hash1 = hashOpt(key);
long hash2 = hash1 >>>16;
long[] result = new long[hashNum];
for (int i = 0; i < hashNum; i++) {
long combinedHash = hash1 + i * hash2;
if (combinedHash<0){
combinedHash = ~combinedHash;
}
result[i] = combinedHash % numBits;
}
return result;
} /**
* 获取一个hash值方法来自 guava
* @param key
* @return
*/
public static long hashOpt(String key){
Charset charset = Charset.forName("UTF-8");
return Hashing.murmur3_128().hashObject(key, Funnels.stringFunnel(charset)).asLong();
} /**
* 计算bit数组的长度
* @param n
* @param fpp
* @return
*/
private static long optNumOfBits(Long n,double fpp){
if (fpp == 0){
fpp = Double.MAX_VALUE;
}
return (long) (-n * Math.log(fpp) / (Math.log(2) *Math.log(2)) );
} /**
* 计算hash函数的个数
* @param n
* @param numBits
* @return
*/
private static int optNumOfHashFunction(long n, long numBits){
return Math.max(1,(int) Math.round((double) numBits/n * Math.log(2)));
} }

关于方案二需要的依赖:

<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>3.3.0</version>
</dependency>

好了,时间有限 。关于方案三暂时不做说明了。redis本身现在也支持bloom过滤器。如果有时间我在编写关于方案三吧。

感兴趣的小伙伴可以微信搜索码上遇见你获取更多精彩内容。