Spring + JMS + ActiveMQ实现简单的消息队列(监听器异步实现)

时间:2023-03-09 22:38:04
Spring + JMS + ActiveMQ实现简单的消息队列(监听器异步实现)

首先声明:以下内容均是在网上找别人的博客综合学习而成的,可能会发现某些代码与其他博主的相同,由于参考的文章比较多,这里对你们表示感谢,就不一一列举,如果有侵权的地方,请通知我,我可以把该文章删除。

1、jms-xml Spring配置文件

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <beans
  3. xmlns="http://www.springframework.org/schema/beans"
  4. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  5. xmlns:p="http://www.springframework.org/schema/p"
  6. xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd">
  7. <bean id = "connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
  8. <property name = "brokerURL" value = "tcp://localhost:61616"/>
  9. </bean>
  10. <bean id = "topicDestination" class="org.apache.activemq.command.ActiveMQTopic"
  11. autowire="constructor">
  12. <constructor-arg value="com.spring.xkey.jms.topic"/>
  13. </bean>
  14. <bean id="sendMessage" class="com.spring.xkey.jms.SendMessage">
  15. <property name="username" value="xkey"/>
  16. <property name="password" value="1234567890"/>
  17. </bean>
  18. <bean id = "jmsMessageConverter" class="com.spring.xkey.jms.JmsMessageConverter">
  19. <property name="sendMessage" ref="sendMessage"/>
  20. </bean>
  21. <!-- 创建JMS发送信息的模板的对象 -->
  22. <bean id = "jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
  23. <property name="connectionFactory" ref="connectionFactory"/>
  24. <!--property name="defaultDestination" ref="topicDestination"/-->
  25. <property name="receiveTimeout" value="6000"/>
  26. <property name="messageConverter" ref="jmsMessageConverter"/>
  27. </bean>
  28. <bean id = "jmsMessageListener" class="com.spring.xkey.jms.JmsMessageListener">
  29. </bean>
  30. <bean id = "publisher" class="com.spring.xkey.jms.Publisher">
  31. <property name="jmsTemplate" ref="jmsTemplate"/>
  32. <property name="destinations" ref="topicDestination" />
  33. <property name="sendMessage" ref="sendMessage"/>
  34. </bean>
  35. <bean id = "consumer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
  36. <property name="connectionFactory" ref="connectionFactory"/>
  37. <property name="destination" ref="topicDestination" />
  38. <property name="messageListener" ref="jmsMessageListener" />
  39. </bean>
  40. </beans>

2、Listener代码

  1. package com.spring.xkey.jms;
  2. import java.util.Date;
  3. import javax.jms.JMSException;
  4. import javax.jms.Message;
  5. import javax.jms.MessageListener;
  6. import org.apache.activemq.command.ActiveMQMapMessage;
  7. public class JmsMessageListener implements MessageListener {
  8. public void onMessage(Message message) {
  9. ActiveMQMapMessage msg = null;
  10. //System.out.println("ONMessage-----------------" + message.toString());
  11. try {
  12. if (message instanceof ActiveMQMapMessage) {
  13. msg = (ActiveMQMapMessage) message;
  14. String username = msg.getString("username");
  15. String password = msg.getString("password");
  16. System.out.println("Message::: "+username+", "+password);
  17. //              msg = (ActiveMQMapMessage) message;
  18. //              String sentDate = msg.getString("date");
  19. //              String reMessage = msg.getString("message");
  20. //              int sentCount = msg.getInt("count");
  21. //              System.out
  22. //                      .println("-------------New Message Arrival-----------"
  23. //                              + new Date());
  24. //              System.out.println("It's " + sentCount + " time From Darcy: "
  25. //                      + reMessage + "   ---Send time :" + sentDate);
  26. }
  27. } catch (JMSException e) {
  28. System.out.println("JMSException in onMessage(): " + e.toString());
  29. } catch (Throwable t) {
  30. System.out.println("Exception in onMessage():" + t.getMessage());
  31. }
  32. }
  33. }

3、Converter代码

  1. package com.spring.xkey.jms;
  2. import javax.jms.JMSException;
  3. import javax.jms.MapMessage;
  4. import javax.jms.Message;
  5. import javax.jms.Session;
  6. import org.springframework.jms.support.converter.MessageConversionException;
  7. import org.springframework.jms.support.converter.MessageConverter;
  8. public class JmsMessageConverter implements MessageConverter{
  9. private SendMessage sendMessage;
  10. public void setSendMessage(SendMessage sendMsg){
  11. this.sendMessage = sendMsg;
  12. }
  13. public Object fromMessage(Message message) throws JMSException,
  14. MessageConversionException {
  15. // TODO Auto-generated method stub
  16. MapMessage  mapmessage= (MapMessage)message;
  17. this.sendMessage.setUsername(mapmessage.getString("username"));
  18. this.sendMessage.setPassword(mapmessage.getString("password"));
  19. System.out.println("First");
  20. return sendMessage;
  21. }
  22. public Message toMessage(Object arg0, Session session) throws JMSException,
  23. MessageConversionException {
  24. // TODO Auto-generated method stub
  25. this.sendMessage = (SendMessage)arg0;
  26. MapMessage  mapmessage= (MapMessage) session.createMapMessage();
  27. mapmessage.setString("username", this.sendMessage.getUsername());
  28. mapmessage.setString("password", this.sendMessage.getPassword());
  29. System.out.println("Second");
  30. return mapmessage;
  31. }
  32. }

4、Publisher代码

  1. package com.spring.xkey.jms;
  2. import java.util.Scanner;
  3. import javax.jms.Destination;
  4. import org.springframework.jms.core.JmsTemplate;
  5. public class Publisher {
  6. private JmsTemplate template;
  7. private Destination[] destinations;
  8. private SendMessage sendMessage;
  9. public void chart()
  10. {
  11. boolean chart = true;
  12. int count = 0;
  13. while(chart)
  14. {
  15. count ++;
  16. Scanner cin=new Scanner(System.in);
  17. System.out.println("输入聊天内容,输入N停止聊天");
  18. String text=cin.nextLine();
  19. if(text.equals("N"))
  20. {
  21. chart = false;
  22. }
  23. System.out.println("我:"+text);
  24. sendChartMessage(count,text);
  25. }
  26. }
  27. public void sendMsgCon(){
  28. Scanner cin=new Scanner(System.in);
  29. String username = cin.nextLine();
  30. String password = cin.nextLine();
  31. this.sendMessage.setUsername(username);
  32. this.sendMessage.setPassword(password);
  33. sendConvertor(this.sendMessage);
  34. }
  35. public void sendConvertor(SendMessage sendMsg){
  36. template.convertAndSend(destinations[0],sendMsg);
  37. }
  38. protected void sendChartMessage(int count , String strMessage)
  39. {
  40. MyMessageCreator creator = new MyMessageCreator(count,strMessage);
  41. template.send(destinations[0], creator);
  42. }
  43. public JmsTemplate getJmsTemplate() {
  44. return template;
  45. }
  46. public void setJmsTemplate(JmsTemplate template) {
  47. this.template = template;
  48. }
  49. public Destination[] getDestinations() {
  50. return destinations;
  51. }
  52. public void setDestinations(Destination[] destinations) {
  53. this.destinations = destinations;
  54. }
  55. public void setSendMessage(SendMessage sendMsg){
  56. this.sendMessage = sendMsg;
  57. }
  58. public SendMessage getSendMessage(){
  59. return this.sendMessage;
  60. }
  61. }

5、SendMessage代码

  1. package com.spring.xkey.jms;
  2. public class SendMessage {
  3. private String username;
  4. private String password;
  5. public void setUsername(String user){
  6. this.username = user;
  7. }
  8. public void setPassword(String pass){
  9. this.password = pass;
  10. }
  11. public String getUsername(){
  12. return this.username;
  13. }
  14. public String getPassword(){
  15. return this.password;
  16. }
  17. }

6、Test类

    1. package com.spring.xkey.jms;
    2. import javax.jms.JMSException;
    3. import org.springframework.context.ApplicationContext;
    4. import org.springframework.context.support.ClassPathXmlApplicationContext;
    5. import org.springframework.jms.listener.DefaultMessageListenerContainer;
    6. public class Test {
    7. /**
    8. * @param args
    9. */
    10. public static void main(String[] args) {
    11. // TODO Auto-generated method stub
    12. ApplicationContext context =
    13. new ClassPathXmlApplicationContext("jms.xml");
    14. /**Sender sender = (Sender)context.getBean("sender");
    15. sender.SendInfo();
    16. Receiver receiver = (Receiver)context.getBean("receiver");
    17. try {
    18. System.out.println(receiver.receiverInfo());
    19. } catch (JMSException e) {
    20. // TODO Auto-generated catch block
    21. e.printStackTrace();
    22. }*/
    23. Publisher pub = (Publisher)context.getBean("publisher");
    24. DefaultMessageListenerContainer consumer =
    25. (DefaultMessageListenerContainer)context.getBean("consumer");
    26. consumer.start();
    27. pub.sendMsgCon();
    28. //pub.chart();
    29. }
    30. }