利用redis实现消息队列之queue模式

时间:2021-04-04 17:41:10

可以利用redis存储数据类型的list类型实现消息发送与消费的一对一模式,使用lpush向list的左端推送数据(发送消息),使用rpop从右端接收数据(消费消息)。由于rpop需要周期性的从list中获取数据,可以考虑使用brpop代替rpop,brpop是一个阻塞方法,直到获取到数据。代码如下

生产者的pom文件

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<groupId>com.tansun</groupId>
	<artifactId>ProducerTest</artifactId>
	<version>0.0.1-SNAPSHOT</version>

	<dependencies>
		<!-- https://mvnrepository.com/artifact/redis.clients/jedis -->
		<dependency>
			<groupId>redis.clients</groupId>
			<artifactId>jedis</artifactId>
			<version>2.9.0</version>
		</dependency>
	</dependencies>

</project>
生产者的主方法
package com.tansun;

import redis.clients.jedis.Jedis;

public class ProducerTest {
	
	@SuppressWarnings("resource")
	public static void main(String[] args) {
		Jedis jedis = new Jedis("192.168.229.128", 6379); 
		// 向键为“test queue”的值的左端推入数据
		jedis.lpush("test queue", "message: hello redis queue");
	}

}

消费者的pom文件与生产者相同

消费者的主方法

package com.tansun;

import java.util.List;

import redis.clients.jedis.Jedis;

public class ConsumerTest {
	
	@SuppressWarnings("resource")
	public static void main(String[] args) {
		Jedis jedis = new Jedis("192.168.229.128", 6379);  
		while(true){
			// 设置超时时间为0,表示无限期阻塞
			List<String> message = jedis.brpop(0, "test queue");
			System.out.println(message);
		}
	}

}

至此,已经实现了消息队列的queue模式发送和消费消息。

看一下brpop方法源码,发现还有一个重载的方法,源码如下:

public List<String> brpop(final int timeout, final String... keys) {
    return brpop(getArgsAddTimeout(timeout, keys));
  }
public List<String> brpop(String... args) {
    checkIsInMultiOrPipeline();
    client.brpop(args);
    client.setTimeoutInfinite();
    try {
      return client.getMultiBulkReply();
    } finally {
      client.rollbackTimeout();
    }
  }
public void brpop(final String[] args) {
    final byte[][] bargs = new byte[args.length][];
    for (int i = 0; i < bargs.length; i++) {
      bargs[i] = SafeEncoder.encode(args[i]);
    }
    brpop(bargs);
  }

protected Connection sendCommand(final Command cmd, final byte[]... args) {
    try {
      connect();
      Protocol.sendCommand(outputStream, cmd, args);
      pipelinedCommands++;
      return this;
    } catch (JedisConnectionException ex) {
      /*
       * When client send request which formed by invalid protocol, Redis send back error message
       * before close connection. We try to read it to provide reason of failure.
       */
      try {
        String errorMessage = Protocol.readErrorLineIfPossible(inputStream);
        if (errorMessage != null && errorMessage.length() > 0) {
          ex = new JedisConnectionException(errorMessage, ex.getCause());
        }
      } catch (Exception e) {
        /*
         * Catch any IOException or JedisConnectionException occurred from InputStream#read and just
         * ignore. This approach is safe because reading error message is optional and connection
         * will eventually be closed.
         */
      }
      // Any other exceptions related to connection?
      broken = true;
      throw ex;
    }
  }

从源码中可知这个方法可以从多个key获取元素,并且先从第一个key中获取元素,再依次向后获取,根据这个方法的特点,我们可以实现具有优先级的任务队列,代码不再赘述。

如果想了解redis实现消息队列的发布订阅模式,可以参考我的另一篇文章 http://blog.csdn.net/jia_costa/article/details/79033899