ActiveMQ学习--001--ActiveMQ和消息中间件

时间:2024-01-08 12:11:14

一、ActiveMQ简介

ActiveMQ学习--001--ActiveMQ和消息中间件

1,ActiveMQ是什么

  ActiveMQ是Apache推出的开源的,完全支持JMS1.1和J2EE 1.4规范的JMS  Provider实现的消息中间件(MOM)

2ActiveMQ能干什么

  最主要功能就是,实现 JMS Provider,用来帮助实现高可用、搞性能、可伸缩、易用和安全的企业级面向消息服务的系统。(系统解耦)

3,ActiveMQ的特点

  完全支持JMS1.1 和J2EE 1.4 规范(持久化、XA消息、事物)

  支持多种传输协议: in-VM ,TCP,SSL,NIO,UDP,JGroups,JXTA

  可插拔的体系结构,可以灵活定制,如:消息存储方式、安全管理等

  很容易和Application Server 集成使用

  多种语言和协议编写客户端。语言:java、C、C#、Ruby、Perl、Python、PHP

  从设计上报纸了高性能的集群,客户端-服务器,点对点

  可很容易和Spring集成

  支持通过JDBC和Journal提供高速的消息持久化

  支持和Axis的整合

二、消息中间件(Message Oriented Middleware,MOM)

MOM基本功能:

  将信息以消息的形式,从一个应用程序传送到另一个或多个应用程序。
MOM主要特点:

  1,消息异步接受,类似手机短信的行为,消息发送者不需要等待消息接受者的响应,减少软件多系统集成的耦合度;

  2 ,消息可靠接收,确保消息在中间件可靠保存,只有接收方收到后才删除消息,多个消息也可以组成原子事务

消息中间件的主要应用场景:

  在多个系统间进行整合和通讯的时候,通常会要求:

  1 :可靠传输,数据不能丢失,有的时候,也会要求不能重复传输;

   2 :异步传输,否则各个系统同步发送接受数据,互相等待,造成系统瓶颈

目前比较知名的消息中间件:

  ZeroMQ 、RabbitMQ、ActiveMQ、Kafka、RocketMQ

三、Centos安装ActiveMQ

1,下载linux安装包,上传到centos,直接解压并用mv命令重命名 mv apache-activemq-5.15.3   activemq-5.15

ActiveMQ学习--001--ActiveMQ和消息中间件

2,启动运行

a,普通启动,到bin目录下  ./activemq start

b,启动并指定日志文件 ./activemq start > /tmp/activemqlog

ActiveMQ学习--001--ActiveMQ和消息中间件

3,检查是否已经启动

  ActiveMQ默认采用61616端口提供JMS服务,使用8161提供管控台服务,执行以下爱命令检验是否启动成功:

  1,查看61616端口是否打开: netstat -an | grep 61616

  2,也可以直接查看控制台输出或者日志文件

  3,访问管控台 http://ip地址:8161/admin  默认用户名密码admin/admin

4,关闭ActiveMQ,直接使用 ./activemq stop

   暴力关闭,ps -ef | grep activemq 获取进程号 然后 kill 掉

四、helloworld程序

http://www.cnblogs.com/lihaoyang/p/8856636.html

五、JMS基本概念

一基本概念

JMS是什么

  JMS Java Message Service,Java消息服务,是J2EE的一个技术

JMS规范:JMS定义了Java访问消息中间件的接口,一流的公司定义规范,厂商去实现吧!实现JMS接口的消息中间件称为JMS Provider 。

JMS Message:JMS的消息,三部分组成:

  1,消息头,有对应的getter、setter

  2,消息属性:如果需要除消息头字段以外的值,可以使用消息属性

  3,消息体:封装具体的消息数据

JMS  producer:消息生产者,创建发送消息

JMS   consumer:消息接收者,接收处理消息

消息的消费可采用两种方式:

  1,同步消费:通过调用消费者的receive方法从目的地中显式提取消息,receive方法是阻塞的,一直阻塞到有消息到达。

  2,异步消费:客户可以为消息消费者注册一个消息监听器,以定义在消息到达时采取的处理

二 消息传递域

JMS  domains :消息传递域,JMS规范中定义了两种消息传递域,

    1,点对点(point-to-point,PTP) ;

     2,发布/订阅(publish/subscribe,简写pub/sub)

  2.1 点对点特点:

    (1)每个消息只能有一个消费者(一个消费者把消息消费,消息就从队列消失)

    (2)消息生产者和消费者之间没有时间上的相关性。发消息和接收消息都不管对方的状态

ActiveMQ学习--001--ActiveMQ和消息中间件

                      (点对点)

          2.2 发布/订阅 消息特点

    (1)每个消息可以有多个消费者

    (2)生产者和消费者之间有时间上的相关性。订阅一个主题的消费者只能消费从它订阅之后发布的消息。

       分 持久订阅:允许消费者消费它离线状态时  topic发布的消息

           非持久订阅:只能消费消费者在线时 topic发布的消息

      ActiveMQ学习--001--ActiveMQ和消息中间件

                        (topic/sub模式)

点对点消息传递域中,目的地被称为 队列(queue),在 发布/订阅 消息传递域中,目的地被称为 主题(topic)

JMS接口相关概念

Connection factory:连接工厂,用来创建连接对象,以连接到JMS的provider

JMS Connection:封装了客户与JMS 提供者之间的一个虚拟连接

JMS Session  :是生产和消费消息的一个单线程上下文,用于创建消息生产者(producer)、消息消费者(consumer)和消息(message)等。会话提供了一个事务性的上下文,在此上下文中,一组发送和接收被组合在了一个原子操作中。

Destination:消息发送到的目的地

Acknowledge:签收消息

Transaction:事物

JMS client:用来收发消息的java应用

Non-JMS  client:使用JMS provider本地API写的应用,用来替代JMS API实现收发消息的功能,通常提供一些其他特殊属性,如CORBA、RMI等

Administered objects:预定义的JMS对象,通常在provider规范中有定义,提供给JMS客户端来访问,如ConnectionFactory  、Destination

 JMS的消息结构-1

JMS 消息组成:消息头、属性、消息体

消息头包含:消息的识别信息、路由信息。消息头包含一些标准的属性如下

  1,JMSDestination :由 send方法设置

  2,JMSDeliveryMode:由send方法设置

  3,JMSExpiration:由send方法设置

  4,JMSPriority:由send方法设置

  5,JMSMessageID:由send方法设置

  6,JMSTimestamp:由客户端设置

  7,JMSCorrelationID:由客户端设置

  8,JMSReplyTo:由客户端设置

  9,JMSType:由客户端设置

  10,JMSRedelivered:由 JMS Provider设置

标准的JMS消息头包含以下属性:

1,JMSDestination:消息发送目的地,Queue或者Topic , 自动分配

2,JMSDeliveryMode:传送模式。

  两种:持久模式、非持久模式。持久性消息:JMS提供者出现故障,消息不会丢失,服务器恢复后会再次传输;非持久性消息:服务器故障消息丢失

3,JMSExpiration:消息过期时间,等于Destination的 send 方法中的 timeToLive 值加上发送时刻的GMT时间值。若timeToLive 为零,则JMSExpiration 被设为零,表示消息永不过期。过期时间到后如果消息还没发送到目的地,消息会被清除。自动分配

4,JMSPriority:消息优先级,值为0--9十个级别,0--4 是普通消息,5--9 是加急消息。默认值是4

5,JMSMessageID:每个消息唯一标识,由JMS Provider 产生。自动分配

6,JMSTimestamp:一个JMS Provider 在调用send 方法时自动设置的,消息被发送到接收的时间差,自动分配。

7,JMSCorrelationID:用来连接到另外一个消息,典型应用是在回复消息中连接到原消息。一般标记为JMSMessageID标识上一条消息的应答,也可以自己设置。

8,JMSReplyTo:提供本消息 消息的目的地的地址,开发者设置

9,JMSType:消息类型识别符,开发者设置

10,JMSRedelivered:如果一个客户端收到了一个设置了JMSRedelivered 属性的消息,则表示可能客户端之前收到过该消息但是没有签收(acknowledged)。如果该消息被重新发送,JMSRedelivered  = true 反之JMSRedelivered  = false ,自动设置。

 消息体:

JMS API定义了5种消息格式:TextMessage 、MapMessage、BytesMessage、StreamMessage和ObjectMessage

消息属性:三种类型

  1,自定义的属性,如  Message.setStringProperty("username",username);

  2,JMS定义的属性  使用 “JMSX” 作为属性名前缀 connection.getMetaData().getJMSXPropertyNames() ,返回所有连接支持的JMSX 属性的名字

  3,JMS供应商特定的属性

JMS定义的属性如下:

JMSXUserID:发送消息的用户标识,发送时提供商设置
JMSXGroupID:消息所在消息组的标识,由客户端设置
JMSXGroupSeq:组内消息的序号 第一个消息是1,依次是2,3,,,由客户端设置
JMSXDeliveryCount:转发消息重试次数,依次是1,2,3,,,发送时提供商设置
JMSXProducerTXID:产生消息的事务的事务标识,发送时提供商设置

JMSXAppID:发送消息的应用的标识,发送时提供商设置

JMSXConsumerTXID:消费消息的事务的事务标识,接收时提供商设置

JMSXRecvTimestamp:JMS转发消息到消费者的时间,接收时提供商设置

1,消息接收确认

  JMS消息只有在被确认之后,才认为被成功的消费了。消息成功消费通常包含三个阶段:客户接收消息、客户处理消息、消息被确认

  在事务性会话中:当一个事务(消费端)被提交的时候,确认自动发生。

  示例代码片段

  Sender :

  Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);

  producer.send(destination, message);

  session.commit();

Consumer:

  final Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);

  TextMessage message = (TextMessage) consumer.receive();

  // session.commit();

  如果Sender和Consumer都设置事务,自动签收,当Consumer不commit事务时,能够一直重复接收Sender 发送的消息,反之,只要调用了session.commit(); 就不能再接收之前发送的消息,认为是已确认了。

这里的事务指的是 消费端的事务,发送者和接收者的事务不是一回事,我认为发送端的事务只是控制发送消息的方式

  非事务性会话:消息何时被确认取决于创建会话时的应答模式(acknowledge mode)

  Session.AUTO_ACKNOWLEDGE:当 consumer.receive(); 之后,或者从MessageListener.onMessage方法成功返回后,会话自动确认客户收到的消息,就不能再接收刚才sender发送的消息了。

  Session.CLIENT_ACKNOWLEDGE:消费端调用 message.acknowledge(); 方法后,消息就被确认了。注意这种模式实在会话层进行的,只要确认一个消息将自动确认所有已被会话消费的消息,如一共10个消息,消费了5个,第6个确认,那么前5个消息都将被确认。下次接收只能接收到后四个。

  Session.DUPS_OK_ACKNOWLEDGE:不常用

消息持久性