ActiveMQ做消息队列拦截功能

时间:2024-04-05 06:59:36

ActiveMQ做消息队列拦截功能

操作步骤

  1. 首先先到ActiveMQ官网下载最新的最稳定的版本 http://activemq.apache.org/activemq-5158-release.html 我下载的是Windows版本的
  2. 直接解压双击运行bin/win64/activemq.bat . 弹出黑窗口 (黑窗口不要关) 访问localhost:8161 如果出现页面 说明运行成功 登录密码为 admin /admin
  3. 测试向本地的ActiveMQ服务器发送消息
		//1.创建连接工厂  默认接收消息的端口为61616
		ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://localhost:61616");
		//2.创建连接
		Connection connection = connectionFactory.createConnection();
		//3.启动连接
		connection.start();
		//4.获取session(会话对象)  参数1:是否启动事务  参数2:消息确认方式
		Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
		//5.创建主题对象 发送的主题列
		Topic topic = session.createTopic("test");
		//6.创建消息生产者对象
		MessageProducer producer = session.createProducer(topic);
		//7.创建消息对象(文本消息)
		TextMessage textMessage = session.createTextMessage("这是一条消息");
		//8.发送消息
		producer.send(textMessage);
		//9.关闭资源
		producer.close();
		session.close();
		connection.close();
  1. 查看监控页面 消息已经发送成功
    ActiveMQ做消息队列拦截功能
  2. 默认的消息服务器是没有权限校验的 也就是说 谁都可以对消息队列服务器中的消息进行发送和监听 只要知道消息队列服务器的url
  3. 要想做权限校验 就需要对消息队列服务器做开发 来进行 权限的拦截 而ActiveMQ本身就支持本身做插件开发的
  4. 编写权限拦截器插件 请根据自己的需求编写校验
/**
 * 因为不同的需求有着不同的校验方式  
 * @author GEP
 * ActiveMQ消息拦截器
 */
public  class  ActivemqFilter extends BrokerFilter {
    private static final Logger logger = LoggerFactory.getLogger(ActivemqFilter.class);

	private final String PRODUCER_IDENTTIFICATION = "producer";

    private final String CUSTOMER_IDENTTIFICATION="customer";

    /**
     * 消息生产者对应的标识
     */
    private final String PRODUCER_NUMBER = "1";

    /**
     * 消息消费者对应的标识
     */
    private final String CUSTOMER_NUMBER = "2";

    /**
     * 超级管理员对应的标识
     */
    private final String SUPER_NUMBER = "0";
	//用户 这里是封装成实体对象
    private  User user ;
    //数据库连接  将用户的信息存到数据库方便进行管理
 	private JdbcTemplate jdbcTemplate;


    public ActivemqFilter(Broker next,JdbcTemplate jdbcTemplate) {
        super(next);
        this.jdbcTemplate = jdbcTemplate;
    }

    /**
     * 发送消息经过拦截器 这里是创建连接之后 对发送消息进行校验
     * @param producerExchange
     * @param messageSend
     * @throws Exception
     */
    @Override
    public void send(ProducerBrokerExchange producerExchange, Message messageSend) throws Exception {

        //判断是否经过创建连接
        if(this.getUser()==null){
            throw new SecurityException("请先去登录");
        }
        //获得发送时传递过来的对列名称
        String physicalName = messageSend.getDestination().getPhysicalName();
        if(StringUtils.isBlank(physicalName)){
            throw new SecurityException("请输入队列名称");
        }

        //校验用户是否有发送消息的权利
     if(PRODUCER_NUMBER.equals(this.getUser().getActiveMQStatus())||SUPER_NUMBER.equals(this.getUser().getActiveMQStatus())){

            //校验用户是否有对该队列的发送权利
            if(!physicalName.equals(this.getUser().getActiveMQQueuesName())){
                throw new SecurityException("你没有"+physicalName+"队列的发送权利");
            }

            //进行数据封装
            String data = null;

            //判断消息类型
            //如果是文本类型的消息
            if(messageSend instanceof ActiveMQTextMessage){
                ActiveMQTextMessage message=(ActiveMQTextMessage)messageSend;
                data = message.getText();
            //如果是Map类型的消息
            }else if(messageSend instanceof ActiveMQMapMessage){
                ActiveMQMapMessage mapMessage = (ActiveMQMapMessage)messageSend;
                data = mapMessage.getContentMap().toString();
            }else{
                throw new SecurityException("暂不支持该类型的消息");
            }

            //设置消息的发送结果
            String status = null;
            try {
                super.send(producerExchange, messageSend);
                //1表示发送成功
                status="1";
            }catch (Exception e){
                //0表示发送失败
                status="0";
            }

            //封装消息记录
           	...
            //对数据库进行插入 将发送消息的记录 记录到数据库中 
           jdbcTemplate.update(...);
        }else{
            throw new SecurityException("您没有发送消息的权利");
        }
    }


    /**
     * 消费消息经过拦截器
     * @param consumerExchange
     * @param ack
     * @throws Exception
     */
    @Override
    public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception {


        //判断是否经过创建连接
        if(this.getUser()==null){
            throw new SecurityException("请先去登录");
        }
        //判断队列名是否为空
        String physicalName = ack.getDestination().getPhysicalName();
        if(StringUtils.isBlank(physicalName)){
            throw new SecurityException("请输入队列名称");
        }

        //判断用户是否有有接受消息的权利
        if (CUSTOMER_NUMBER.equals(this.getUser().getActiveMQStatus()) || SUPER_NUMBER.equals(this.getUser().getActiveMQStatus())) {

            //校验用户是否有对该队列的接受权利
            if(!physicalName.equals(this.getUser().getActiveMQQueuesName())){
                throw new SecurityException("你没有"+physicalName+"队列的接收权利");
            }

            //判断消息消费状态
            String status = null;
            try {
                super.acknowledge(consumerExchange, ack);
                status = "1";
            } catch (Exception e) {
                status = "0";
            }

             //封装消息记录
           	...
            //对数据库进行插入 将发送消息的记录 记录到数据库中 
           jdbcTemplate.update(...);
        } else {
            throw new SecurityException("您没有接收消息的权利");
        }
    }

    /**
     * 创建链接的时候进行校验
     * @param context
     * @param info
     * @throws Exception
     */
    @Override
    public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception {
        //获得连接是对方传的用户名和密码
        String userName = info.getUserName().trim();
        String password = info.getPassword().trim();

        logger.info(userName+"请求连接");

        //用户校验
        auth(userName,password);

        //创建连接
        super.addConnection(context, info);
    }


    /**
     * 用户校验  具体实现需要根据需求来进行编写 请不要直接复制粘贴
     * @param userName
     * @param password
     */
    public  void  auth(String userName,String password){
        //如果用户名密码为空
        if(StringUtils.isBlank(userName)||StringUtils.isBlank(password)){
            throw new SecurityException("用户名或密码不能为空");
        }
        
        //进行数据库查询
        String sql = 查询自己数据库中的用户;
        List<User> users  = jdbcTemplate.query(sql, new UserMapper(), userName, password);

        //如果没查到
        if(isEmpty(users)){
            throw new SecurityException("用户名或密码错误");
        }
		
        this.setUser(users.get(0));

        //校验用户的连接权限  校验用户的时间段 校验用户的状态 这里需要自己来编写
        ...
    }

    public User getUser() {
        return user;
    }

    public void setUser(User user) {
        this.user = user;
    }

    /**
     *  判断是否查到用户
     */
    public boolean isEmpty(List list){
        if(list==null){
            return true;
        }
            return list.size()==0;
    }

}
  1. 编写注册插件的注册类
/**
* @author GEP
* 自定义消息插件
*/
public class ActivemqPlugin implements BrokerPlugin {


   //需要一个连接数据可的jdbcTemplate对象
   private JdbcTemplate jdbcTemplate;


   public JdbcTemplate getJdbcTemplate() {
       return jdbcTemplate;
   }


   public void setJdbcTemplate(JdbcTemplate jdbcTemplate) {
       this.jdbcTemplate = jdbcTemplate;

   }

   public ActivemqPlugin(JdbcTemplate jdbcTemplate) {

       this.jdbcTemplate = jdbcTemplate;
   }
   public ActivemqPlugin() {


   }

   /**
    * 注册插件
    * @param broker
    * @return
    * @throws Exception
    */
   public Broker installPlugin(Broker broker) throws Exception {
       return new ActivemqFilter(broker,jdbcTemplate);
   }


}
  1. 将编写好的代码打成jar包放到ActiveMQ的lib包下,还要放入数据库连接所需的jar包
    ActiveMQ做消息队列拦截功能
  2. 编辑ActiveMQ conf/activemq.xml 文件

添加读取数据库连接配置db,propertiesActiveMQ做消息队列拦截功能
注册springjdbcTemplate
ActiveMQ做消息队列拦截功能
注册插件
ActiveMQ做消息队列拦截功能
11. 编写将db.properties文件 并将其放到conf 目录下
ActiveMQ做消息队列拦截功能
12.重启ActiveMQ 测试发送 (数据库中必须先有符合自己校验规则的用户对象)

先不带用户名密码的
ActiveMQ做消息队列拦截功能
直接报没有校验信息的错误 再试试错误的用户名密码试试
ActiveMQ做消息队列拦截功能
用户名密码错误

再试试正确的用户名密码 错误的队列名称
ActiveMQ做消息队列拦截功能

再试试正确的用户名正确的队列名
ActiveMQ做消息队列拦截功能
没有报错发送成功(有一条是没做权限之前发送的)
ActiveMQ做消息队列拦截功能
搞定