ActiveMQ
1.ActiveMQ是什么
ActiveMQ是Apache推出的一款开源的完全支持JMS1.1和J2EE1.4规范的JMS Provider实现的消息中间件(Message Oriented Middleware,MOM)。
2.ActiveMQ能干什么
最主要的功能就是实现JMS Provider,用来帮助实现高可用,高性能,可伸缩,易用和安全的企业级面向消息服务的系统!
3.ActiveMQ特点
.完全支持JMS1.1和J2EE1.4规范(持久化,XA消息,事务)
.支持多种传输协议:in-VM,TCP,SSL,NIO,UDP,JGroups,JXTA.
.可插拔的体系结构,可以灵活定制,如消息存储方式,安全管理等。
.很容易和Application Server集成使用。
.多种语言和协议编写客户端,语言:Java,C,C++,C#,Ruby,Perl,Python,PHP.
.从设计上保证了高性能的集群,客户端-服务器,点对点
.可以很容易的和Spring结合使用
.支持通过JDBC和journal提供高速的消息持久化
.支持与Axis的整合
消息中间件
1.MOM的基本功能
将信息以消息的形式,从一个应用程序传送到另一个或多个应用程序
2.MOM的主要特点
1).消息异步接受,类似手机短信的行为,消息的发送者不需要等待消息接受者的响应,减少软件多系统集成的耦合度,
2).消息可靠接收,确保消息在中间件可靠保存,只有接收方收到信息后才删除消息,多个消息也可以组成原子事务
3.MOM的应用场景
在多个系统间进行整合和通信的时候,通常会要求
1).可靠传输,数据不能丢失,有的时候也会要求不能重复传输
2).异步传输,否则各个系统同步发送接收数据,互相等待,造成性能瓶颈
ActiveMQ安装和基本使用
1.下载并安装ActiveMQ服务器端
1).从http://activemq.apache.org/download-archives.html下载最新的ActiveMQ
2).直接解压,然后拷贝到你要安装的位置就可以了。
2.启动运行
1)普通启动:到ActiveMQ/bin下面,./activemq start
2)启动并指定日志文件./activemq start > /tmp/activemqlog
3.检查是否已经启动
ActiveMQ默认采用61616端口提供JMS服务,使用8161端口提供管理控制台服务,执行以下命令以便检验是否已经成功启动ActiveMQ服务:
1).比如查看61616端口是否打开:netstat -an | grep 61616
2).也可以直接查看控制台输出或者日志文件
3).还可以直接访问ActiveMQ的管理页面:http://192.168.125.128:8161/admin/
默认的用户名和密码是admin/admin
4.关闭ActiveMQ,可以用./activemq stop
暴力点的可以用ps-ef| grepactivemq来得到进程号,然后kill掉
基本使用
<dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.9.0</version> </dependency>
生产者
package com.winner.queue; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; /** * Created by winner_0715 on 2017/1/1. */ public class QueueSend { public static void main(String[] args) throws Exception { //连接到ActiveMQ服务器 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.125.128:61616"); Connection connection = connectionFactory.createConnection(); connection.start(); Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); //生产者生成的消息放在哪 Destination destination = session.createQueue("my-queue"); MessageProducer producer = session.createProducer(destination); for (int i = 0; i < 3; i++) { TextMessage message = session.createTextMessage("message--" + i); Thread.sleep(1000); //通过消息生产者发出消息 producer.send(message); } session.commit(); session.close(); connection.close(); } }
运行效果
消费者
package com.winner.queue; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageConsumer; import javax.jms.Session; import javax.jms.TextMessage; /** * Created by winner_0715 on 2017/1/1. */ public class QueueReceiver { public static void main(String[] args) throws JMSException { //连接到ActiveMQ服务器 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.125.128:61616"); Connection connection = connectionFactory.createConnection(); connection.start(); final Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); //生产者将消息发送到my-queue,所以消费者要到my-queue去取 Destination destination = session.createQueue("my-queue"); MessageConsumer consumer = session.createConsumer(destination); int i = 0; while (i < 3) { i++; TextMessage message = (TextMessage) consumer.receive(); session.commit(); System.out.println("收到消息:" + message.getText()); } session.close(); connection.close(); } }
运行效果、
收到消息:message--0 收到消息:message--1 收到消息:message--2
我们将生产者看成A系统,消费者看成B系统,AMQ就是消息中间件。
A把消息发送至消息中间件,B从消息中间件取消息践行消费