消息队列:快速上手ActiveMQ消息队列的JMS方式使用(两种模式:Topic和Queue的消息推送和订阅)

时间:2023-03-08 21:22:02
消息队列:快速上手ActiveMQ消息队列的JMS方式使用(两种模式:Topic和Queue的消息推送和订阅)

1、实现功能

希望使用一套API,实现两种模式下的消息发送和接收功能,方便业务程序调用

1、发送Topic

2、发送Queue

3、接收Topic

4、接收Queue

2、接口设计

根据功能设计公共调用接口

  1. /**
  2. * 数据分发接口(用于发送、接收消息队列数据)
  3. *
  4. * @author eguid
  5. *
  6. */
  7. public interface MsgDistributeInterface {
  8. /**
  9. * 发送到主题
  10. *
  11. * @param topicName -主题
  12. * @param data -数据
  13. * @return
  14. */
  15. public boolean sendTopic(String topicName, byte[] data);
  16. /**
  17. * 发送到主题
  18. * @param topicName -主题
  19. * @param data-数据
  20. * @param offset -偏移量
  21. * @param length -长度
  22. * @return
  23. */
  24. boolean sendTopic(String topicName, byte[] data, int offset, int length);
  25. /**
  26. * 发送到队列
  27. *
  28. * @param queueName -队列名称
  29. * @param data -数据
  30. * @return
  31. */
  32. public boolean sendQueue(String queueName, byte[] data);
  33. /**
  34. * 发送到队列
  35. * @param queueName -队列名称
  36. * @param data -数据
  37. * @param offset
  38. * @param length
  39. * @return
  40. */
  41. public boolean sendQueue(String queueName, byte[] data,int offset, int length);
  42. /**
  43. * 接收队列消息
  44. * @param queueName 队列名称
  45. * @param listener
  46. * @throws JMSException
  47. */
  48. void receiveQueue(String queueName, MessageListener listener) throws JMSException;
  49. /**
  50. * 订阅主题
  51. * @param topicName -主题名称
  52. * @param listener
  53. * @throws JMSException
  54. */
  55. void receiveTopic(String topicName, MessageListener listener) throws JMSException;
  56. }

3、基于ActiveMQ的接口实现

  1. /**
  2. * 基于activeMQ的消息生产者/消费者实现(初始化该对象时即初始化连接消息队列,如果无法连接到消息队列,立即抛出异常)
  3. *
  4. * @author eguid
  5. *
  6. */
  7. public class ActiveMQImpl implements MsgDistributeInterface {
  8. private String userName;
  9. private String password;
  10. private String brokerURL;
  11. private boolean persistentMode;//持久化模式
  12. //连接工厂
  13. ConnectionFactory connectionFactory;
  14. //发送消息的线程
  15. Connection connection;
  16. // 事务管理
  17. Session session;
  18. //存放各个线程订阅模式生产者
  19. ThreadLocal<MessageProducer> topicThreadLocal = new ThreadLocal<MessageProducer>();
  20. //存放各个线程队列模式生产者
  21. ThreadLocal<MessageProducer> queueThreadLocal = new ThreadLocal<MessageProducer>();
  22. public ActiveMQImpl(String userName, String password, String brokerURL) throws JMSException {
  23. this(userName, password, brokerURL, true);
  24. }
  25. public ActiveMQImpl(String userName, String password, String brokerURL,boolean persistentMode) throws JMSException {
  26. this.userName = userName;
  27. this.password = password;
  28. this.brokerURL = brokerURL;
  29. this.persistentMode=persistentMode;
  30. init();
  31. }
  32. public void init() throws JMSException {
  33. try {
  34. // 创建一个链接工厂
  35. connectionFactory = new ActiveMQConnectionFactory(this.userName, this.password, this.brokerURL);
  36. // 从工厂中创建一个链接
  37. connection = connectionFactory.createConnection();
  38. // 开启链接
  39. connection.start();
  40. // 创建一个事务(订阅模式,事务采用自动确认方式)
  41. session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
  42. } catch (JMSException e) {
  43. throw e;
  44. }
  45. }
  46. @Override
  47. public boolean sendTopic(String topicName, byte[] data) {
  48. return sendTopic(topicName, data, 0, data.length);
  49. }
  50. @Override
  51. public boolean sendTopic(String topicName, byte[] data, int offset, int length) {
  52. return send(true, topicName, data, offset, length);
  53. }
  54. @Override
  55. public boolean sendQueue(String queueName, byte[] data) {
  56. return sendQueue(queueName, data, 0, data.length);
  57. }
  58. @Override
  59. public boolean sendQueue(String queueName, byte[] data, int offset, int length) {
  60. return send(false, queueName, data, offset, length);
  61. }
  62. /**
  63. * 发送数据
  64. *
  65. * @param name
  66. * @param data
  67. * @param offset
  68. * @param length
  69. * @param type
  70. * -类型
  71. * @return
  72. */
  73. private boolean send(boolean type, String name, byte[] data, int offset, int length) {
  74. try {
  75. MessageProducer messageProducer = getMessageProducer(name, type);
  76. BytesMessage msg = createBytesMsg(data, offset, length);
  77. System.err.println(Thread.currentThread().getName()+"发送消息");
  78. // 发送消息
  79. messageProducer.send(msg);
  80. } catch (JMSException e) {
  81. return false;
  82. }
  83. return false;
  84. }
  85. public void receive(String topicName) throws JMSException {
  86. final Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
  87. Topic topic =session.createTopic(topicName);
  88. MessageConsumer consumer=session.createConsumer(topic);
  89. consumer.setMessageListener(new MessageListener() {
  90. @Override
  91. public void onMessage(Message message) {
  92. BytesMessage msg=(BytesMessage) message;
  93. System.err.println(Thread.currentThread().getName()+"收到消息:"+msg.toString());
  94. }
  95. });
  96. }
  97. /**
  98. * 创建字节数组消息
  99. *
  100. * @param data
  101. * @param offset
  102. * @param length
  103. * @return
  104. * @throws JMSException
  105. */
  106. private BytesMessage createBytesMsg(byte[] data, int offset, int length) throws JMSException {
  107. BytesMessage msg = session.createBytesMessage();
  108. msg.writeBytes(data, offset, length);
  109. return msg;
  110. }
  111. /**
  112. * 创建对象序列化消息
  113. * @param obj
  114. * @return
  115. * @throws JMSException
  116. */
  117. private ObjectMessage createMapMsg(Serializable obj) throws JMSException {
  118. // MapMessage msg = session.createMapMessage();//key-value形式的消息
  119. ObjectMessage msg = session.createObjectMessage(obj);
  120. return msg;
  121. }
  122. /**
  123. * 创建字符串消息
  124. * @param text
  125. * @return
  126. * @throws JMSException
  127. */
  128. private TextMessage createTextMsg(String text) throws JMSException {
  129. TextMessage msg = session.createTextMessage(text);
  130. return msg;
  131. }
  132. /**
  133. * 获取创建者
  134. *
  135. * @param name -名称(主题名称和队列名称)
  136. * @param type -类型(true:topic,false:queue)
  137. * @return
  138. * @throws JMSException
  139. */
  140. private MessageProducer getMessageProducer(String name, boolean type) throws JMSException {
  141. return type?getTopicProducer(name):getQueueProducer(name);
  142. }
  143. /**
  144. * 创建或获取队列
  145. * @param queueName
  146. * @return
  147. * @throws JMSException
  148. */
  149. private MessageProducer getQueueProducer(String queueName) throws JMSException {
  150. MessageProducer messageProducer = null;
  151. if ((messageProducer = queueThreadLocal.get()) == null) {
  152. Queue queue = session.createQueue(queueName);
  153. messageProducer = session.createProducer(queue);
  154. //是否持久化(1-不持久化(如果没有消费者,消息就也会自动失效),2-持久化(如果没有消费者进行消费,消息队列也会缓存消息等待消费者进行消费))
  155. messageProducer.setDeliveryMode(persistentMode?DeliveryMode.PERSISTENT:DeliveryMode.NON_PERSISTENT);
  156. queueThreadLocal.set(messageProducer);
  157. }
  158. return messageProducer;
  159. }
  160. /**
  161. * 创建或获取主题
  162. * @param topicName
  163. * @return
  164. * @throws JMSException
  165. */
  166. private MessageProducer getTopicProducer(String topicName) throws JMSException {
  167. MessageProducer messageProducer = null;
  168. if ((messageProducer = topicThreadLocal.get()) == null) {
  169. Topic topic = session.createTopic(topicName);
  170. messageProducer = session.createProducer(topic);
  171. //是否持久化(1-不持久化(如果没有消费者,消息就也会自动失效),2-持久化(如果没有消费者进行消费,消息队列也会缓存消息等待消费者进行消费))
  172. messageProducer.setDeliveryMode(persistentMode?DeliveryMode.PERSISTENT:DeliveryMode.NON_PERSISTENT);
  173. topicThreadLocal.set(messageProducer);
  174. }
  175. return messageProducer;
  176. }
  177. public String getPassword() {
  178. return password;
  179. }
  180. public void setPassword(String password) {
  181. this.password = password;
  182. }
  183. @Override
  184. public void receiveQueue(String queueName,MessageListener listener) throws JMSException {
  185. final Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
  186. Queue topic =session.createQueue(queueName);
  187. MessageConsumer consumer=session.createConsumer(topic);
  188. consumer.setMessageListener(listener);
  189. }
  190. @Override
  191. public void receiveTopic(String topicName,MessageListener listener) throws JMSException {
  192. final Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
  193. Topic topic =session.createTopic(topicName);
  194. MessageConsumer consumer=session.createConsumer(topic);
  195. consumer.setMessageListener(listener);
  196. }

4、测试一下Topic和Queue

  1. public static void main(String[] args) throws JMSException{
  2. //如果创建失败会立即抛出异常
  3. MsgDistributeInterface producter = new ActiveMQImpl("system", "manager", "tcp://127.0.0.1:61616");
  4. Test testMq = new Test();
  5. try {
  6. Thread.sleep(1000);
  7. } catch (InterruptedException e) {
  8. e.printStackTrace();
  9. }
  10. //Thread 1
  11. new Thread(testMq.new ProductorMq(producter)).start();
  12. //Thread 2
  13. new Thread(testMq.new ProductorMq(producter)).start();
  14. //Thread 3
  15. new Thread(testMq.new ProductorMq(producter)).start();
  16. //Thread 4
  17. new Thread(testMq.new ProductorMq(producter)).start();
  18. //Thread 5
  19. new Thread(testMq.new ProductorMq(producter)).start();
  20. //Thread 6
  21. new Thread(testMq.new ProductorMq(producter)).start();
  22. //订阅接收线程Thread 1
  23. new Thread(new Runnable() {
  24. @Override
  25. public void run() {
  26. try {
  27. producter.receiveTopic("eguid-topic",new MessageListener() {
  28. @Override
  29. public void onMessage(Message message) {
  30. BytesMessage msg=(BytesMessage) message;
  31. System.err.println(Thread.currentThread().getName()+"订阅主题消息:"+msg.toString());
  32. }
  33. });
  34. } catch (JMSException e) {
  35. // TODO Auto-generated catch block
  36. e.printStackTrace();
  37. }
  38. }
  39. }).start();
  40. //订阅接收线程Thread 2
  41. new Thread(new Runnable() {
  42. @Override
  43. public void run() {
  44. try {
  45. producter.receiveTopic("eguid-topic",new MessageListener() {
  46. @Override
  47. public void onMessage(Message message) {
  48. BytesMessage msg=(BytesMessage) message;
  49. System.err.println(Thread.currentThread().getName()+"订阅主题消息:"+msg.toString());
  50. }
  51. });
  52. } catch (JMSException e) {
  53. // TODO Auto-generated catch block
  54. e.printStackTrace();
  55. }
  56. }
  57. }).start();
  58. //队列消息生产线程Thread-1
  59. new Thread(testMq.new QueueProductor(producter)).start();
  60. //队列消息生产线程Thread-2
  61. new Thread(testMq.new QueueProductor(producter)).start();
  62. //队列接收线程Thread 1
  63. new Thread(new Runnable() {
  64. @Override
  65. public void run() {
  66. try {
  67. producter.receiveQueue("eguid-queue",new MessageListener() {
  68. @Override
  69. public void onMessage(Message message) {
  70. BytesMessage msg=(BytesMessage) message;
  71. System.err.println(Thread.currentThread().getName()+"收到队列消息:"+msg.toString());
  72. }
  73. });
  74. } catch (JMSException e) {
  75. // TODO Auto-generated catch block
  76. e.printStackTrace();
  77. }
  78. }
  79. }).start();
  80. //队列接收线程Thread2
  81. new Thread(new Runnable() {
  82. @Override
  83. public void run() {
  84. try {
  85. producter.receiveQueue("eguid-queue",new MessageListener() {
  86. @Override
  87. public void onMessage(Message message) {
  88. BytesMessage msg=(BytesMessage) message;
  89. System.err.println(Thread.currentThread().getName()+"收到队列消息:"+msg.toString());
  90. }
  91. });
  92. } catch (JMSException e) {
  93. // TODO Auto-generated catch block
  94. e.printStackTrace();
  95. }
  96. }
  97. }).start();
  98. }
  99. private class ProductorMq implements Runnable{
  100. Jtt809MsgProducter producter;
  101. public ProductorMq(Jtt809MsgProducter producter){
  102. this.producter = producter;
  103. }
  104. @Override
  105. public void run() {
  106. while(true){
  107. try {
  108. String wang=Thread.currentThread().getName()+"Hello eguid! This is topic.";
  109. producter.sendTopic("eguid-topic",wang.getBytes());
  110. Thread.sleep(2000);
  111. } catch (InterruptedException e) {
  112. e.printStackTrace();
  113. }
  114. }
  115. }
  116. }
  117. private class QueueProductor implements Runnable{
  118. Jtt809MsgProducter producter;
  119. public QueueProductor(Jtt809MsgProducter producter){
  120. this.producter = producter;
  121. }
  122. @Override
  123. public void run() {
  124. while(true){
  125. try {
  126. String eguid=Thread.currentThread().getName()+"Hello eguid! This is queue.";
  127. producter.sendQueue("eguid-queue",eguid.getBytes());
  128. Thread.sleep(2000);
  129. } catch (InterruptedException e) {
  130. e.printStackTrace();
  131. }
  132. }
  133. }
  134. }

-------------------End--------------------