RabbitMQ简介、安装、基本特性API--Java测试

时间:2024-01-28 18:07:43

RabbitMQ简介、安装、基本特性API--Java测试

新的阅读体验地址:http://www.zhouhong.icu/post/141

本篇文章所有的代码:https://github.com/Tom-shushu/Distributed-system-learning-notes/tree/master/rabbitmq-api-demo

一、初识RabbitMQ

是一个开源的消息代理和队列服务器,用来通过普通协议在完全不同的应用之间共享数据,RabbitMQ是使用Erlang语言来编写的,并且RabbitMQ是基于AMQP协议的。
AMQP协议Advanced Message Queuing Protocol(高级消息队列协议)
 定义:具有现代特征的二进制协议,是一个提供统一消息服务的应用层标准高级消息队列协议,
是应用层协议的一个开放标准,为面向消息中间件设计。
AMQP专业术语:
  • Server:又称broker,接受客户端的链接,实现AMQP实体服务
  • Connection:连接,应用程序与broker的网络连接
  • Channel:网络信道,几乎所有的操作都在channel中进行,Channel是进行消息读写的通道。客户端可以建立多个channel,每个channel代表一个会话任务。
  • Message:消息,服务器与应用程序之间传送的数据,由Properties和Body组成.Properties可以对消息进行修饰,必须消息的优先级、延迟等高级特性;Body则是消息体内容。
  • virtualhost: 虚拟地址,用于进行逻辑隔离,最上层的消息路由。一个virtual host里面可以有若干个Exchange和Queue,同一个Virtual Host 里面不能有相同名称的Exchange 或 Queue。
  • Exchange:交换机,接收消息,根据路由键转单消息到绑定队列
  • Binding:  Exchange和Queue之间的虚拟链接,binding中可以包换routing key
  • Routing key: 一个路由规则,虚拟机可用它来确定如何路由一个特定消息。(如负载均衡)
RabbitMQ整体架构

Exchange和队列是多对多关系,实际操作一般为1个exchange对多个队列,为避免设计过于复杂.

二、单机版快速安装

  • 1、首先在Linux上进行一些软件的准备工作

yum install build-essential openssl openssl-devel unixODBC unixODBC-devel make gcc gcc-c++ kernel-devel m4 ncurses-devel tk tc xz

wget https://github.com/rabbitmq/erlang-rpm/releases/download/v23.0.4/erlang-23.0.4-1.el7.x86_64.rpm
wget http://repo.iotti.biz/CentOS/7/x86_64/socat-1.7.3.2-5.el7.lux.x86_64.rpm
wget https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.8.5/rabbitmq-server-3.8.5-1.el7.noarch.rpm
  • 3、安装服务命令
rpm -ivh erlang-23.0.4-1.el7.x86_64.rpm
rpm -ivh socat-1.7.3.2-5.el7.lux.x86_64.rpm
rpm -ivh rabbitmq-server-3.8.5-1.el7.noarch.rpm​
  • 4、启动

启动服务
systemctl start rabbitmq-server
查看是否启动
lsof -i:5672
  • 5、启动、安装web管理插件(管控台)

rabbitmq-plugins enable rabbitmq_management
  • 6、查看管理端口有没有启动

lsof -i:15672
或者:
netstat -tnlp | grep 15672
  • 7、添加用户

#添加用户 用户名 admin 密码 admin web管理工具可用此用户登录
sudo rabbitmqctl add_user admin admin
#设置用户角色 管理员
sudo rabbitmqctl set_user_tags admin administrator
#设置用户权限(接受来自所有Host的所有操作)
sudo rabbitmqctl set_permissions -p / admin "." "." ".*"  
#查看用户权限
sudo rabbitmqctl list_user_permissions admin
  • 重新启动

systemctl start rabbitmq-server
rabbitmq-plugins enable rabbitmq_management

  • 代码测试
  1. 引入依赖
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>3.6.5</version>
        </dependency>

     2.发送端:

package com.zhouhong.rabbitmq.api.helloworld;
import java.util.HashMap;
import java.util.Map;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Sender {
    public static void main(String[] args) throws Exception {
        //    1 创建ConnectionFactory
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.2.121");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");
        connectionFactory.setVirtualHost("/");
        //    2 创建Connection
        Connection connection = connectionFactory.newConnection();
        //    3 创建Channel
        Channel channel = connection.createChannel();  
        //    4 声明
        String queueName = "test001";  
        //    参数: queue名字,是否持久化,独占的queue(仅供此连接),不使用时是否自动删除, 其他参数
        channel.queueDeclare(queueName, false, false, false, null);
        Map<String, Object> headers = new HashMap<String, Object>();
        AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
        .deliveryMode(2)
        .contentEncoding("UTF-8")
        .headers(headers).build();
        for(int i = 0; i < 5;i++) {
            String msg = "Hello World RabbitMQ " + i;
            channel.basicPublish("", queueName , props , msg.getBytes());             
        }
    }
}

    3.接收端

package com.zhouhong.rabbitmq.api.helloworld;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery;
public class Receiver {
    public static void main(String[] args) throws Exception {        
        ConnectionFactory connectionFactory = new ConnectionFactory() ;          
        connectionFactory.setHost("192.168.2.121");
        connectionFactory.setPort(5672);
        connectionFactory.setPassword("admin");
        connectionFactory.setUsername("admin");
        connectionFactory.setVirtualHost("/");        
        connectionFactory.setAutomaticRecoveryEnabled(true);
        connectionFactory.setNetworkRecoveryInterval(3000);
        Connection connection = connectionFactory.newConnection();        
        Channel channel = connection.createChannel();          
        String queueName = "test001";  
        //    durable 是否持久化消息
        channel.queueDeclare(queueName, false, false, false, null);  
        QueueingConsumer consumer = new QueueingConsumer(channel);
        //    参数:队列名称、是否自动ACK、Consumer
        channel.basicConsume(queueName, true, consumer);  
        //    循环获取消息  
        while(true){  
            //    获取消息,如果没有消息,这一步将会一直阻塞  
            Delivery delivery = consumer.nextDelivery();  
            String msg = new String(delivery.getBody());    
            System.out.println("收到消息:" + msg);  
        } 
    }
}

    4.结果(先启动接收端进行监控,再启动发送端)

收到消息:Hello World RabbitMQ 0
收到消息:Hello World RabbitMQ 1
收到消息:Hello World RabbitMQ 2
收到消息:Hello World RabbitMQ 3
收到消息:Hello World RabbitMQ 4

三、RabbitMQ----交换机

  1. Name:交换机名称。
  2. Type:交换机类型 direct、topic、fanout、headers。
  3. Durability:是否持久化,ture为持久化。
  4. Auto Delete :当最后一个绑定道Exchange上的队列删除后,自动删除该Exchange。
  5. Internal:当前Exchange是否用于RabbitMQ内部使用,默认为False。
  6. Arguments:扩展参数,用于扩展AMQP协议自制定化使用。
  7. DirectExchange的消息被转发道RouteKey中指定的Queue。
交换机-----Direct exchange
Direct模式可以使用RabbitMQ自带的Exchange:default Exchange,所以不需要将Exchange进行任何绑定操作,消息传递时,RouteKey必须完全匹配才会被队列接收,否则该消息会被抛弃。

代码:
  • 发送端
package com.zhouhong.rabbitmq.api.exchange.direct;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Sender4DirectExchange {
    public static void main(String[] args) throws Exception {
        //1 创建ConnectionFactory
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.2.121");
        connectionFactory.setPort(5672);
        connectionFactory.setPassword("admin");
        connectionFactory.setUsername("admin");
        connectionFactory.setVirtualHost("/");
        //2 创建Connection
        Connection connection = connectionFactory.newConnection();
        //3 创建Channel
        Channel channel = connection.createChannel();  
        //4 声明
        String exchangeName = "test_direct_exchange";
        //必须要和接收端 routingKey 一一对应
        String routingKey = "test_direct_routingKey";
        //5 发送
        String msg = "Hello World RabbitMQ 4  Direct Exchange Message ... ";
        channel.basicPublish(exchangeName, routingKey , null , msg.getBytes());                 
    }
}
  •  接收端
package com.zhouhong.rabbitmq.api.exchange.direct;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery;
public class Receiver4DirectExchange {    
    public static void main(String[] args) throws Exception {
        ConnectionFactory connectionFactory = new ConnectionFactory() ;          
        connectionFactory.setHost("192.168.2.121");
        connectionFactory.setPort(5672);
        connectionFactory.setPassword("admin");
        connectionFactory.setUsername("admin");
        connectionFactory.setVirtualHost("/");    
        connectionFactory.setAutomaticRecoveryEnabled(true);
        connectionFactory.setNetworkRecoveryInterval(3000);
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();  
        //4 声明
        String exchangeName = "test_direct_exchange";
        String exchangeType = "direct";
        String queueName = "test_direct_queue";
        String routingKey = "test_direct_routingKey";
        channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
        channel.queueDeclare(queueName, false, false, false, null);
        channel.queueBind(queueName, exchangeName, routingKey);
        //durable 是否持久化消息
        QueueingConsumer consumer = new QueueingConsumer(channel);
        //参数:队列名称、是否自动ACK、Consumer
        channel.basicConsume(queueName, true, consumer);  
        //循环获取消息  
        while(true){  
            //获取消息,如果没有消息,这一步将会一直阻塞  
            Delivery delivery = consumer.nextDelivery();  
            String msg = new String(delivery.getBody());    
            System.out.println("收到消息:" + msg);  
        } 
    }
}
交换机-----topic exchange
exchange 将Routekey和某个topic进行一个模糊匹配,发送给对应队列、可以用通配符进行匹配

比如下面例子
代码:
  • 接收端
package com.zhouhong.rabbitmq.api.exchange.topic;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery;
public class Receiver4TopicExchange1 {
    public static void main(String[] args) throws Exception {        
        ConnectionFactory connectionFactory = new ConnectionFactory() ;         
        connectionFactory.setHost("192.168.2.121");
        connectionFactory.setPort(5672);
        connectionFactory.setPassword("admin");
        connectionFactory.setUsername("admin");
        connectionFactory.setVirtualHost("/");        
        connectionFactory.setAutomaticRecoveryEnabled(true);
        connectionFactory.setNetworkRecoveryInterval(3000);
        Connection connection = connectionFactory.newConnection();       
        Channel channel = connection.createChannel();  
        //4 声明
        String exchangeName = "test_topic_exchange";
        String exchangeType = "topic";
        String queueName = "test_topic_queue";
        // 只能匹配一个 例如:user.txt、user.py都可以,但是user.txt.py 不行
        //String routingKey = "user.*";
        // user.txt、user.py 、user.txt.py 都可以匹配到
        String routingKey = "user.#";
        channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
        channel.queueDeclare(queueName, false, false, false, null);
        channel.queueBind(queueName, exchangeName, routingKey);        
        //durable 是否持久化消息
        QueueingConsumer consumer = new QueueingConsumer(channel);
        //    参数:队列名称、是否自动ACK、Consumer
        channel.basicConsume(queueName, true, consumer);
        System.err.println("consumer1 start.. ");
        //    循环获取消息  
        while(true){  
            //    获取消息,如果没有消息,这一步将会一直阻塞  
            Delivery delivery = consumer.nextDelivery();  
            String msg = new String(delivery.getBody());    
            System.out.println("收到消息:" + msg + ", RoutingKey: " + delivery.getEnvelope().getRoutingKey());  
        } 
    }
}
  • 发送端
package com.zhouhong.rabbitmq.api.exchange.topic;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Sender4TopicExchange {    
    public static void main(String[] args) throws Exception {        
        //1 创建ConnectionFactory
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.2.121");
        connectionFactory.setPort(5672);
        connectionFactory.setPassword("admin");
        connectionFactory.setUsername("admin");
        connectionFactory.setVirtualHost("/");        
        //2 创建Connection
        Connection connection = connectionFactory.newConnection();
        //3 创建Channel
        Channel channel = connection.createChannel();  
        //4 声明
        String exchangeName = "test_topic_exchange";
        String routingKey1 = "user.save";
        String routingKey2 = "user.update";
        String routingKey3 = "user.delete.abc";
        //5 发送        
        String msg = "Hello World RabbitMQ 4 Topic Exchange Message ...";
        channel.basicPublish(exchangeName, routingKey1 , null , msg.getBytes()); 
        channel.basicPublish(exchangeName, routingKey2 , null , msg.getBytes());     
        channel.basicPublish(exchangeName, routingKey3 , null , msg.getBytes()); 
        channel.close();  
        connection.close();  
    }    
}
交换机-----Fanout exchange 广播模式
1.不处理路由键,只需要简单的将队列绑定到交换机上。
2.发送到交换机的消息都会被转发到与该交换机绑定的所有队列上。
3.Fanout交换机转发消息是最快的。
代码:见示例文章开始GitHub地址

四、RabbitMQ高级特性

1、消息如何保障 100% 的投递成功
生产端的可靠性投递的标志:
1、消息成功发出
2、mq节点成功接收
3、发送端MQ节点确认应答
4、完善的消息补偿机制
解决:消息信息落库,对消息状态进行打标
幂等性
    1、 select count(1) from t_order where id = 唯一id(或)指纹码
    2、唯一id或指纹码机制,利用数据库主键去重
2、Confirm
第一步:再channel上开启确认模式:channel.confirmSelect();
第二步:再channel上添加监听:addConfirmListener,监听成功和失败的返回结果,根据具体的结果对消息进行重新发送、或记录日期等后续处理!
3、return消息机制
ReturnListener用于处理不可路由的消息
我们的消息生产者,通过指定一个Exchage和Routingkey,把消息送达某一个队列中去,然后我们的消费者监听队列,进行消费处理操作,如果没有合适的队列,则会由returnListener进行接受。
Mandatory:如果为true,则监听器会接收到路由不可达的消息,然后进行后续处理,如果为false,那么broker端自动删除该消息。
4、消费端ACK与重回队列
消费端ACK:
  • 在工作的时候一般不会选择自动ack
  • 消费端的手工ack分为两种ACK和NACK
  • 消费端进行消费的时候,如果由于业务异常我们可以进行日志的记录,然后进行补偿。这种建议回复NACK,不要重回队列
  • 如果由于服务器宕机等严重问题,那我们就需要手工进行ACK保障消费端消费成功
消费端的重回队列
  • 是为了对没有处理成功的消息,把消息重新会投递给broker。
  • 重回队列,会回到队列的尾部
  • 也会造成一条消息一直重复投递,死循环了
  • 在实际应用中,都会关闭重回队列,也就是设置为false
5、TTL队列和消息
TTL: time to live的缩写,也就是生存时间。
  • RabbitMQ 支持消息过期时间,在消息发送时可以进行指定
  • RabbitMQ支持队列过期时间,从消息入队列开始计算,只要超过了队列的超时间时间配置,那么消息会自动的清除
死队列: DLX,Dead-Letter-Exchange
  • 利用DLX,当消息在一个队列中变成死信(dead message)之后,它能被重新publish到另一个Exchange,这个Exchange就是DLX.
消息变成死信的几种情况
  • 消息被拒绝 并且requeue = false
  • 消息TTL过期
  • 队列达到最大长度
DLX也是一个正常的Exchange,实际上是一个属性控制
  • 当队列中有死信时,RabbitMQ就会自动的将这个消息重新发布到设置的Exchange上,进而被路由到另一个队列.
  • 可以监听这个队列中消息做相应的处理,这个特性可以弥补rabbitMQ3.0以前的immediate参数功能。
  • 在正常队列上添加参数:arguments.put("x-dead-letter-exchange","dlx.exchange");这样消息过期、requeue、队列达到最大长度时,就可以直接路由到死信队列。