概述
对Acknowledge机制进行测试。
此处的测试是针对Consumer的确认设计的;对于Producer的确认是透明的,无法提供测试。
测试实例
设计demo,测试三种确认机制。
测试机制 | 测试实例 | 结果预测 |
AUTO_ACKNOWLEDGE | 接收正常 | 消息出队量=消息入队量 |
接收异常 | 消息出队量=0 | |
CLIENT_ACKNOWLEDGE | 1次确认/2条消息 - 每2条消息确认1次 | 每次确认2条信息 |
从不确认 | 消息出队量=0 | |
DUPS_OK_ACKNOWLEDGE | 每一次接收消息后,使线程睡眠数秒;观察消息出队情况 | 符合批量确认、延迟确认的特点 |
demo设计
demo设计图

测试分工
测试类 | 测试方法 |
AutoAckConsumer.java - 测试AUTO_ACKNOWLEDGE |
receiveNormal():void - 测试“接收正常” |
receiveIntentionalException():void - 测试“接收异常” |
|
ClientAckConsumer.java - 测试CLIENT_ACKNOWLEDGE |
receivePerTwice():void - 测试“1次确认/2条消息” |
receiveWithoutAck():void - 测试“从不确认” |
|
DupsOkAckConsumer.java - 测试DUPS_OK_ACKNOWLEDGE |
receive():void - 测试批量确认和延迟确认 |
测试步骤和结果
1.测试AUTO_ACKNOWLEDGE
1.1.接收正常
测试步骤 |
|
测试截图 |
![]() |
1.2.接收异常
测试步骤 |
|
测试截图 |
![]() |
结论整理 |
|
2.测试CLIENT_ACKNOWLEDGE
2.1.每2条消息确认1次
测试步骤 |
|
测试截图 |
![]() |
结论整理 |
每次确认不是只对当前的Message进行确认,而是对自上次确认以来的所有Message进行确认.在这里,每次确认2条. |
2.2.从不确认
测试步骤 |
|
测试截图 |
![]() |
3.测试DUPS_OK_ACKNOWLEDGE
测试步骤 |
|
结论整理 |
|
代码
文件目录结构
jms-producer
|---- src/main/resources/
|---- jndi.properties
|---- src/main/java/
|---- cn.sinobest.asj.producer.jms.acknowledge
|---- SimpleProducer.java # 发送
jms-consumer
|---- src/main/resources/
|---- jndi.properties
|---- src/main/java/
|---- cn.sinobest.asj.consumer.jms.acknowledge
|---- AutoAckConsumer.java # 测试AUTO_ACKNOWLEDGE
|---- ClientAckConsumer.java # 测试AUTO_ACKNOWLEDGE
|---- DupsOkAckConsumer.java # 测试DUPS_OK_ACKNOWLEDGE
文件内容
1.jndi.properties
jms-producer端
java.naming.factory.initial=org.apache.activemq.jndi.ActiveMQInitialContextFactory # use the following property to configure the default connector
java.naming.provider.url=tcp://localhost:61616 # register some queues in JNDI using the form
# queue.[jndiName] = [physicalName]
queue.exampleQueue=example.queue # register some topics in JNDI using the form
# topic.[jndiName] = [physicalName]
topic.exampleTopic=example.topic
jms-consumer端
java.naming.factory.initial=org.apache.activemq.jndi.ActiveMQInitialContextFactory # use the following property to configure the default connector
java.naming.provider.url=tcp://localhost:61616 # register some queues in JNDI using the form
# queue.[jndiName] = [physicalName]
queue.exampleQueue=example.queue # register some topics in JNDI using the form
# topic.[jndiName] = [physicalName]
topic.exampleTopic=example.topic
2.SimpleProducer.java
package cn.sinobest.asj.producer.jms.acknowledge;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import org.junit.Test;
/**
* A simple demo for producer client to send message to ActiveMQ.<br>
* 对{@link cn.sinobest.asj.producer.jms.clientmode.SimpleProducer}的改进.
*
* @author lijinlong
*
*/
public class SimpleProducer {
/** JNDI name for ConnectionFactory */
static final String CONNECTION_FACTORY_JNDI_NAME = "ConnectionFactory";
/** JNDI name for Queue Destination (use for PTP Mode) */
static final String QUEUE_JNDI_NAME = "exampleQueue";
/** JNDI name for Topic Destination (use for Pub/Sub Mode) */
static final String TOPIC_JNDI_NAME = "exampleTopic";
/**
* 发送消息到队列.<br>
* PTP Mode.
*/
@Test
public void sendToQueue() {
send(QUEUE_JNDI_NAME);
} /**
* 发送消息到主题.<br>
* PTP Mode.
*/
@Test
public void sendToTopic() {
send(TOPIC_JNDI_NAME);
}
/**
* 发送到指定的目的地.
*
* @param destJndiName
* 目的地的JNDI name:{@link #QUEUE_JNDI_NAME}或
* {@link #TOPIC_JNDI_NAME}.
*/
private void send(String destJndiName) {
Context jndiContext = null;
ConnectionFactory connectionFactory = null;
Connection connection = null;
Session session = null;
Destination destination = null;
MessageProducer producer = null;
// create a JNDI API IntialContext object
try {
jndiContext = new InitialContext();
} catch (NamingException e) {
System.out.println("Could not create JNDI Context:"
+ e.getMessage());
System.exit(1);
}
// look up ConnectionFactory and Destination
try {
connectionFactory = (ConnectionFactory) jndiContext
.lookup(CONNECTION_FACTORY_JNDI_NAME);
destination = (Destination) jndiContext.lookup(destJndiName);
} catch (NamingException e) {
System.out.println("JNDI look up failed:" + e.getMessage());
System.exit(1);
}
// send Messages and finally release the resources.
try {
connection = connectionFactory.createConnection();
session = connection.createSession(Boolean.FALSE,
Session.AUTO_ACKNOWLEDGE);
producer = session.createProducer(destination);
TextMessage message = session.createTextMessage();
for (int i = 0; i < 3; i++) {
message.setText(String.format("This is the %dth message.",
i + 1));
producer.send(message);
}
} catch (JMSException e) {
e.printStackTrace();
} finally {
try {
if (session != null)
session.close();
if (connection != null)
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
SimpleProducer.java
3.AutoAckConsumer.java
package cn.sinobest.asj.consumer.jms.acknowledge;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import org.junit.Test;
import cn.sinobest.asj.consumer.util.Hold;
/**
* AUTO_ACKNOWLEDGE确认模式的Consumer.<br>
* 基于PTP Mode,采用异步的方式接收消息,研究抛出或不抛出异常的情况下,Queue中的消息的出队情况.<br>
*
* @author lijinlong
*
*/
public class AutoAckConsumer {
/** JNDI name for ConnectionFactory */
static final String CONNECTION_FACTORY_JNDI_NAME = "ConnectionFactory";
/** JNDI name for Queue Destination (use for PTP Mode) */
static final String QUEUE_JNDI_NAME = "exampleQueue";
/**
* 正常的接收.<br>
*/
@Test
public void receiveNormal() {
MessageListener listener = new MessageListener() {
public void onMessage(Message message) {
try {
String text = ((TextMessage) message).getText();
System.out.println(text);
} catch (JMSException e) {
e.printStackTrace();
}
}
};
receive(listener);
}
/**
* 故意抛出异常的接收.<br>
* 结果:
* <ul>
* <li>JMS Provider重复发送消息给Consumer。重复次数达到一定的阀值,JMS
* Provider认为此消息无法消费,此消息将会被删除或者迁移到"dead letter"通道中。</li>
* <li>在测试过程中,会重发6次(共发7次),然后移到ActiveMQ.DLQ队列;DLQ - dead letter queue.</li>
* <li>重发次数可以配置 -
* 在brokerUrl中指定参数jms.redeliveryPolicy.maximumRedeliveries=3,则重发3次(共4次).</li>
* </ul>
*/
@Test
public void receiveIntentionalException() {
MessageListener listener = new MessageListener() {
public void onMessage(Message message) {
try {
String text = ((TextMessage) message).getText();
System.out.println(text);
} catch (JMSException e) {
e.printStackTrace();
}
boolean intentional = true;
if (intentional) {
throw new RuntimeException("故意抛出的异常。");
}
}
};
receive(listener);
} /**
* 接收消息.<br>
*
* @param listener
* 监听器,如果消息接收成功,将被回调.
*/
private void receive(MessageListener listener) {
Context jndiContext = null;
ConnectionFactory connectionFactory = null;
Connection connection = null;
Session session = null;
Destination destination = null;
MessageConsumer consumer = null;
// create a JNDI API IntialContext object
try {
jndiContext = new InitialContext();
} catch (NamingException e) {
System.out.println("Could not create JNDI Context:"
+ e.getMessage());
System.exit(1);
}
// look up ConnectionFactory and Destination
try {
connectionFactory = (ConnectionFactory) jndiContext
.lookup(CONNECTION_FACTORY_JNDI_NAME);
destination = (Destination) jndiContext.lookup(QUEUE_JNDI_NAME);
} catch (NamingException e) {
System.out.println("JNDI look up failed:" + e.getMessage());
System.exit(1);
}
// receive Messages and finally release the resources.
try {
connection = connectionFactory.createConnection();
connection.start(); // connection should be called in
// receiver-client
session = connection.createSession(Boolean.FALSE,
Session.AUTO_ACKNOWLEDGE);
consumer = session.createConsumer(destination);
// key code for asynchronous receive:set messageListener
consumer.setMessageListener(listener);
Hold.hold(); // 阻塞程序继续执行
} catch (JMSException e) {
e.printStackTrace();
} finally {
try {
if (session != null)
session.close();
if (connection != null)
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
AutoAckConsumer.java
4.ClientAckConsumer.java
package cn.sinobest.asj.consumer.jms.acknowledge;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import org.junit.Test;
import cn.sinobest.asj.consumer.util.Hold;
/**
* CLIENT_ACKNOWLEDGE确认模式的Consumer.<br>
* 基于PTP Mode,采用异步的方式接收消息,研究从不确认、每2次确认的情况下,Queue中的消息的出队情况.<br>
*
* @author lijinlong
*
*/
public class ClientAckConsumer {
/** JNDI name for ConnectionFactory */
static final String CONNECTION_FACTORY_JNDI_NAME = "ConnectionFactory";
/** JNDI name for Queue Destination (use for PTP Mode) */
static final String QUEUE_JNDI_NAME = "exampleQueue";
/**
* 从不确认的接收.<br>
* 结果:
* <ul>
* <li>只接收一次,但是消息不会出队.</li>
* <li>Consumer重启,会再次接收到消息.</li>
* </ul>
*/
@Test
public void receiveWithoutAck() {
MessageListener listener = new MessageListener() {
public void onMessage(Message message) {
try {
String text = ((TextMessage) message).getText();
System.out.println(text);
} catch (JMSException e) {
e.printStackTrace();
}
}
};
receive(listener);
} private int ack_count = 0; // 确认次数统计
/**
* 每接收两次确认一次.<br>
* 结果:每次确认不是只对当前的Message进行确认,而是对自上次确认以来的所有Message进行确认.在这里,每次确认2条.
*/
@Test
public void receivePerTwice() {
MessageListener listener = new MessageListener() {
public void onMessage(Message message) {
try {
String text = ((TextMessage) message).getText();
System.out.println(text); ack_count ++;
if (ack_count % 2 == 0)
message.acknowledge(); } catch (JMSException e) {
e.printStackTrace();
}
}
};
receive(listener);
}
/**
* 接收消息.<br>
*
* @param listener
* 监听器,如果消息接收成功,将被回调.
*/
private void receive(MessageListener listener) {
Context jndiContext = null;
ConnectionFactory connectionFactory = null;
Connection connection = null;
Session session = null;
Destination destination = null;
MessageConsumer consumer = null;
// create a JNDI API IntialContext object
try {
jndiContext = new InitialContext();
} catch (NamingException e) {
System.out.println("Could not create JNDI Context:"
+ e.getMessage());
System.exit(1);
}
// look up ConnectionFactory and Destination
try {
connectionFactory = (ConnectionFactory) jndiContext
.lookup(CONNECTION_FACTORY_JNDI_NAME);
destination = (Destination) jndiContext.lookup(QUEUE_JNDI_NAME);
} catch (NamingException e) {
System.out.println("JNDI look up failed:" + e.getMessage());
System.exit(1);
}
// receive Messages and finally release the resources.
try {
connection = connectionFactory.createConnection();
connection.start(); // connection should be called in
// receiver-client
session = connection.createSession(Boolean.FALSE,
Session.CLIENT_ACKNOWLEDGE);
consumer = session.createConsumer(destination);
// key code for asynchronous receive:set messageListener
consumer.setMessageListener(listener);
Hold.hold(); // 阻塞程序继续执行
} catch (JMSException e) {
e.printStackTrace();
} finally {
try {
if (session != null)
session.close();
if (connection != null)
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
ClientAckConsumer.java
5.DupsOkAckConsumer.java
package cn.sinobest.asj.consumer.jms.acknowledge;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import org.junit.Test;
import cn.sinobest.asj.consumer.util.Hold;
/**
* DUPS_OK_ACKNOWLEDGE确认模式的Consumer.<br>
* @author lijinlong
*
*/
public class DupsOkAckConsumer {
/** JNDI name for ConnectionFactory */
static final String CONNECTION_FACTORY_JNDI_NAME = "ConnectionFactory";
/** JNDI name for Topic Destination (use for Pub/Sub Mode) */
static final String TOPIC_JNDI_NAME = "exampleTopic"; /**
* 从主题接收消息.
*/
@Test
public void receive() {
receive(createMessageListener());
} /**
* 创建MessageListener实例.
* @return
*/
private MessageListener createMessageListener() {
MessageListener listener = new MessageListener() {
public void onMessage(Message message) {
try {
String text = ((TextMessage) message).getText();
System.out.println(text);
} catch (JMSException e) {
e.printStackTrace();
} try {
Thread.sleep(5 * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}; return listener;
} /**
* 接收消息.<br>
*
* @param listener
* 监听器,如果消息接收成功,将被回调.
*/
private void receive(MessageListener listener) {
Context jndiContext = null;
ConnectionFactory connectionFactory = null;
Connection connection = null;
Session session = null;
Destination destination = null;
MessageConsumer consumer = null;
// create a JNDI API IntialContext object
try {
jndiContext = new InitialContext();
} catch (NamingException e) {
System.out.println("Could not create JNDI Context:"
+ e.getMessage());
System.exit(1);
}
// look up ConnectionFactory and Destination
try {
connectionFactory = (ConnectionFactory) jndiContext
.lookup(CONNECTION_FACTORY_JNDI_NAME);
destination = (Destination) jndiContext.lookup(TOPIC_JNDI_NAME);
} catch (NamingException e) {
System.out.println("JNDI look up failed:" + e.getMessage());
System.exit(1);
}
// receive Messages and finally release the resources.
try {
connection = connectionFactory.createConnection();
connection.start(); // connection should be called in
// receiver-client
session = connection.createSession(Boolean.FALSE,
Session.DUPS_OK_ACKNOWLEDGE);
consumer = session.createConsumer(destination);
// key code for asynchronous receive:set messageListener
consumer.setMessageListener(listener);
Hold.hold(); // 阻塞程序继续执行
} catch (JMSException e) {
e.printStackTrace();
} finally {
try {
if (session != null)
session.close();
if (connection != null)
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
DupsOkAckConsumer.java