基于redis队列实现的生产者消费者

时间:2021-09-01 17:41:40

一.简介

基于redis队列的生产者消费者实现主要是利用redis的blpop或者brpop命令,以下是官方文档对这两个命令的描述:

BLPOP 是列表的阻塞式(blocking)弹出原语。

它是 LPOP 命令的阻塞版本,当给定列表内没有任何元素可供弹出的时候,连接将被 BLPOP 命令阻塞,直到等待超时或发现可弹出元素为止。

当给定多个 key 参数时,按参数 key 的先后顺序依次检查各个列表,弹出第一个非空列表的头元素。

BRPOP的描述差不多,这里就不重复了。


那么有了这两个命令,实现生产者消费者模式就有思路了,我们从外界数据源不停的传入数据到redis指定的list里面,此时不管有没有消费者,我们的数据是会存储在list里的。

然后消费者的程序只需要调用blpop命令,如果指定的list里面有数据,就能从里面取得list最左边的数据;如果指定的list里面没有数据,那么就会阻塞在那,直到list里面来了新数据或者已经达到阻塞时间为止。

二.普通生产者消费者代码:

生产者我们就用自己生成的数据模仿。

public class RedisProducer {
    public static void main(String[] args) throws InterruptedException {
        Jedis jedis=JavaRedisUtils.getJedis();
        jedis.select(4);
        int count=0;
        while(count<100){
            Thread.sleep(300);
            jedis.lpush("mylist",String.valueOf(count));
            count++;
        }
        jedis.close();
    }
}
然后消费者得集成Thread类,重写run方法,我们可以在run方法里面写一些对取出来的数据需要进行的业务操作,我这里就是简单的打印出来判断是否取出数据。
public class Consumer extends Thread{
    String name;

    public Consumer(String name) {
        this.name = name;
    }

    @Override
    public void run(){
            Jedis jedis = JavaRedisUtils.getJedis();
        while(true) {

            jedis.select(4);
            //阻塞式brpop,List中无数据时阻塞
            //参数0表示一直阻塞下去,直到List出现数据
            List<String> list = jedis.blpop(0, "mylist");
            for(String s : list) {
                System.out.println(name+"   "+s);
            }
            jedis.close();

        }
    }
}

下面是程序的consumer执行类:

public class RedisConsumer {
    public static void main(String[] args) {
        Consumer mq1=new Consumer("mq1");
        Consumer mq2=new Consumer("mq2");
        mq1.start();
        mq2.start();

    }

下面是程序运行部分结果:

基于redis队列实现的生产者消费者

我们可以从结果中看到,我们的消费者是真的取到了数据并且在原始没有数据的时候,我们的消费者是阻塞了的,直到新数据来临才继续取数据。

为了更加方便的观看到生产者和消费者的程序执行情况,我们将从"mylist"中的消费数据利用redis的brpoplpush命令将数据从mylist消费到各个消费者自己名字的列表中。

下面是brpoplpush的解释:

BRPOPLPUSH 是 RPOPLPUSH 的阻塞版本,当给定列表 source 不为空时, BRPOPLPUSH 的表现和 RPOPLPUSH 一样。

当列表 source 为空时, BRPOPLPUSH 命令将阻塞连接,直到等待超时,或有另一个客户端对 source 执行 LPUSH 或 RPUSH 命令为止。

超时参数 timeout 接受一个以秒为单位的数字作为值。超时参数设为 0 表示阻塞时间可以无限期延长(block indefinitely) 。

返回值:
假如在指定时间内没有任何元素被弹出,则返回一个 nil 和等待时长。
反之,返回一个含有两个元素的列表,第一个元素是被弹出元素的值,第二个元素是等待时长。
public class Consumer extends Thread{
    String name;

    public Consumer(String name) {
        this.name = name;
    }

    @Override
    public void run(){
        Jedis jedis = JavaRedisUtils.getJedis();
        jedis.select(4);
        while(true) { 
//调用brpoplpush方法 从mylist取出来然后放到对应name的list去
            String a=jedis.brpoplpush("mylist",name,0);
        }
    }
}

运行程序之后,redis库中出现了mq1以及mq2的list,并且他们分别消费了mylist中的所有数据:

基于redis队列实现的生产者消费者

以及它们分别消费的数目:

基于redis队列实现的生产者消费者

三.在消费过程中新增加消费者

上面我们已经做过实验了,它能够做到生产者和消费者能做到的事情:当list没有数据的时候,消费者会阻塞,当list新来数据的时候,它会接着进行消费。那么当新来一个新的消费者的时候,它会有什么变化呢?

新加入消费者的代码如下:

public class addconsumer {
    public static void main(String[] args) {
        Consumer mq3=new Consumer("mq3");
        mq3.start();
    }
}

下面我们先运行生产者,紧接着运行消费者mq1和mq2,等它们消费一段时间,并且生产者数据还在传输的时候,我们开启消费者mq3。让我们来看看结果会是怎么样。

redis数据库中产生了三个列表。

基于redis队列实现的生产者消费者

它们分别的数据量为:

基于redis队列实现的生产者消费者

说明,当新加入消费者的时候,它会和其它两个消费者内部竞争,然后一起消费没有消费过的数据。


以上是redis队列实现的消费者和生产者demo,希望可以给大家提供到帮助。