day4 java消息中间件服务

时间:2021-03-12 17:29:22
关于ActiveMQ、RocketMQ、RabbitMQ、Kafka一些总结和区别
ActiveMQ之消息服务器平台(发邮件)!!!!
day4  java消息中间件服务

day4  java消息中间件服务

PS: 讲个故事,老王要给他的两个女儿讲故事,他要一个一个讲很费劲,后来他使用了微信公众号,让订阅微信公众号的人关注就减轻了负担。

day4  java消息中间件服务

day4  java消息中间件服务

PS: 传统的如果一个用户进行登录,会调用分多的服务,如果没有消息中间件等待的时间就会很长(这样同步的效率很低),有了消息中间件首先

能有异步的保证登录,然后还能保证服务不会被一个一个执行

day4  java消息中间件服务

PS:生活中的应用

day4  java消息中间件服务
day4  java消息中间件服务

day4  java消息中间件服务

day4  java消息中间件服务

day4  java消息中间件服务

day4  java消息中间件服务

day4  java消息中间件服务

day4  java消息中间件服务

day4  java消息中间件服务

day4  java消息中间件服务

day4  java消息中间件服务

day4  java消息中间件服务

day4  java消息中间件服务

PS: kafka性能高,但是数据会丢失

rabbitmp保证数据不丢失,性能比activemq强

activeMq满足80%以上的业务场景

关于消息队列的介绍----ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ

PS: JMS是一种规范,不同的公司有不同的实现,其中最常用的是ActiveMQ, 类似于JDBC感觉
队列是一两万,消息流量几十万条也是可以使用的
day4  java消息中间件服务

day4  java消息中间件服务

PS: 也就是对方不在的时候也可以发送


day4  java消息中间件服务

day4  java消息中间件服务

PS: JMS规定点对点 和 发布和订阅模式,  如果是点对点的模式就把数据扔到 Queue中,如果是 发布订阅就发送到topic中;
消息中间件可以理解为 数据库,通过ConnecttionFactory连接,从而获得一个Connecttion,然后把消息发送到指定的目的地

day4  java消息中间件服务

--------------------------------------------------------------------------------------------

day4  java消息中间件服务

day4  java消息中间件服务

day4  java消息中间件服务

day4  java消息中间件服务

day4  java消息中间件服务

day4  java消息中间件服务

day4  java消息中间件服务

day4  java消息中间件服务

day4  java消息中间件服务

day4  java消息中间件服务

PS:这是Java Message Service的体系。

PS:体系架构是重点,分为消费者和发送者两者。统称为客户
PS: 消息的两种模式 : 队列和话题

day4  java消息中间件服务

1.下载ActiveMQ

去官方网站下载:http://activemq.apache.org/

2.运行ActiveMQ

解压缩apache-activemq-5.5.1-bin.zip,

PS :要用这个老版本的;            后来学习的时候使用5.11.1

day4  java消息中间件服务

修改配置文件activeMQ.xml,将0.0.0.0修改为localhost

<transportConnectors>

<transportConnector name="openwire" uri="tcp://localhost:61616"/>

<transportConnector name="ssl"     uri="ssl://localhost:61617"/>

<transportConnector name="stomp"   uri="stomp://localhost:61613"/>

<transportConnector uri="http://localhost:8081"/>

<transportConnector uri="udp://localhost:61618"/>

day4  java消息中间件服务

然后双击apache-activemq-5.5.1\bin\activemq.bat运行ActiveMQ程序。

day4  java消息中间件服务

day4  java消息中间件服务

day4  java消息中间件服务

启动ActiveMQ以后,登陆:http://localhost:8161/admin/,创建一个Queue,命名为FirstQueue。

day4  java消息中间件服务

PS:不会的话,直接看百度怎么配置JMS
1.安装包2.修改配置 3.bat启动 4.执行代码看业务流程 PS :安全机制主要是配置用户

day4  java消息中间件服务

day4  java消息中间件服务

day4  java消息中间件服务

PS: 配置好的话,会自动创建三个表,处理完数据以后自动会清空

day4  java消息中间件服务

day4  java消息中间件服务

PS: 事物的设置主要在发送端, 最后一种提交方式用的比较少

day4  java消息中间件服务


------------------------------------------------------------------------------------------------------------------------------------------------------

day4  java消息中间件服务

PS:JMS用在J2EE生产环境中,kafka用在大数据开发环境中 。 都是消息队列的处理。。。。
PS:比如上图,在web系统接收数据,然后会生成相应的日志文件,应用kafka读取数据,然后再执行相应的业务处理。

day4  java消息中间件服务

package cn.itcast_03_mq.topic;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory; public class ProducerTool {
private String user = ActiveMQConnection.DEFAULT_USER;
private String password = ActiveMQConnection.DEFAULT_PASSWORD;
private String url = ActiveMQConnection.DEFAULT_BROKER_URL;
private String subject = "mytopic";
private Destination destination = null;
private Connection connection = null;
private Session session = null;
private MessageProducer producer = null;
// 初始化
private void initialize() throws JMSException, Exception {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
user, password, url);
connection = connectionFactory.createConnection();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
destination = session.createTopic(subject);
producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
}
// 发送消息
public void produceMessage(String message) throws JMSException, Exception {
initialize();
TextMessage msg = session.createTextMessage(message);
connection.start();
System.out.println("Producer:->Sending message: " + message);
producer.send(msg);
System.out.println("Producer:->Message sent complete!");
}
// 关闭连接
public void close() throws JMSException {
System.out.println("Producer:->Closing connection");
if (producer != null)
producer.close();
if (session != null)
session.close();
if (connection != null)
connection.close();
}
}
package cn.itcast_03_mq.topic;
import java.util.Random; import javax.jms.JMSException; public class ProducerTest { /**
* @param args
*/
public static void main(String[] args) throws JMSException, Exception {
ProducerTool producer = new ProducerTool();
Random random = new Random();
for(int i=0;i<20;i++){ Thread.sleep(random.nextInt(10)*1000); producer.produceMessage("Hello, world!--"+i);
producer.close();
} }
}
day4  java消息中间件服务
package cn.itcast_03_mq.topic;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.MessageListener;
import javax.jms.Message;
import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory; public class ConsumerTool implements MessageListener,ExceptionListener {
//这些都是体系架构的东西
private String user = ActiveMQConnection.DEFAULT_USER;
private String password = ActiveMQConnection.DEFAULT_PASSWORD;
private String url =ActiveMQConnection.DEFAULT_BROKER_URL;
private String subject = "mytopic";
private Destination destination = null;
private Connection connection = null;
private Session session = null;
private MessageConsumer consumer = null;
public static Boolean isconnection=false;
// 初始化
private void initialize() throws JMSException, Exception {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
user, password, url); //创建工厂
connection = connectionFactory.createConnection(); //创建链接
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
destination = session.createTopic(subject);
consumer = session.createConsumer(destination);
} // 消费消息
public void consumeMessage() throws JMSException, Exception {
initialize();
connection.start();
consumer.setMessageListener(this); //监听消息
connection.setExceptionListener(this);//异常监听
isconnection=true;
System.out.println("Consumer:->Begin listening...");
// 开始监听
// Message message = consumer.receive();
}
// 关闭连接
public void close() throws JMSException {
System.out.println("Consumer:->Closing connection");
if (consumer != null)
consumer.close();
if (session != null)
session.close();
if (connection != null)
connection.close();
}
// 消息处理函数
public void onMessage(Message message) {
try {
if (message instanceof TextMessage) {
TextMessage txtMsg = (TextMessage) message;
String msg = txtMsg.getText();
System.out.println("Consumer:->Received: " + msg);
} else {
System.out.println("Consumer:->Received: " + message);
}
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
} public void onException(JMSException arg0) {
isconnection=false;
}
}
package cn.itcast_03_mq.topic;

import javax.jms.JMSException;

public class ConsumerTest implements Runnable {
static Thread t1 = null; /**
* @param args
* @throws InterruptedException
* @throws InterruptedException
* @throws JMSException
* @throws InterruptedException
*/
public static void main(String[] args) throws InterruptedException { t1 = new Thread(new ConsumerTest());
t1.setDaemon(false);
t1.start();
/**
* 如果发生异常,则重启consumer
*/
/*while (true) {
System.out.println(t1.isAlive());
if (!t1.isAlive()) {
t1 = new Thread(new ConsumerTest());
t1.start();
System.out.println("重新启动");
}
Thread.sleep(5000);
}*/
// 延时500毫秒之后停止接受消息
// Thread.sleep(500);
// consumer.close();
} public void run() {
try {
ConsumerTool consumer = new ConsumerTool();
consumer.consumeMessage();
while (ConsumerTool.isconnection) {
}
} catch (Exception e) {
} }
}
day4  java消息中间件服务
 
PS:此为程序启动以后监听的效果
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/D:/All_Downloads/BaiduYunDownload/chuanzhiboke%e5%a4%a7%e6%95%b0%e6%8d%ae/%e8%a7%86%e9%a2%91/day04%e5%b9%b6%e5%8f%91/day04/%e4%bb%a3%e7%a0%81/1-cloudDay04/lib/activemq-all-5.9.0.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/D:/All_Downloads/BaiduYunDownload/chuanzhiboke%e5%a4%a7%e6%95%b0%e6%8d%ae/%e8%a7%86%e9%a2%91/day04%e5%b9%b6%e5%8f%91/day04/%e4%bb%a3%e7%a0%81/1-cloudDay04/lib/slf4j-simple-1.6.4.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
2017-11-16 19:12:57,790 [ActiveMQ Task-1] INFO org.apache.activemq.transport.failover.FailoverTransport.doReconnect(FailoverTransport.java:1040) method:doReconnect Successfully connected to tcp://localhost:61616
Consumer:->Begin listening...
Consumer:->Received: Hello, world!--0
Consumer:->Received: Hello, world!--1
Consumer:->Received: Hello, world!--2
Consumer:->Received: Hello, world!--3
Consumer:->Received: Hello, world!--4
Consumer:->Received: Hello, world!--5
Consumer:->Received: Hello, world!--6
Consumer:->Received: Hello, world!--7
Consumer:->Received: Hello, world!--8
Consumer:->Received: Hello, world!--9
Consumer:->Received: Hello, world!--10
Consumer:->Received: Hello, world!--11
Consumer:->Received: Hello, world!--12
Consumer:->Received: Hello, world!--13
Consumer:->Received: Hello, world!--14
Consumer:->Received: Hello, world!--15
Consumer:->Received: Hello, world!--16
Consumer:->Received: Hello, world!--17
Consumer:->Received: Hello, world!--18
Consumer:->Received: Hello, world!--19

PS :总结

PS: activemp只是用在javaee中,在现实生产中目前都用kafka了

day4  java消息中间件服务

PS:JMS用在J2EE生产环境中,kafka用在大数据开发环境中 。 都是消息队列的处理。。。。
PS:比如上图,在web系统接收数据,然后会生成相应的日志文件,应用kafka读取数据,然后再执行相应的业务处理。
下面介绍spring Jms理论
day4  java消息中间件服务

day4  java消息中间件服务

day4  java消息中间件服务

day4  java消息中间件服务

步骤

1.配置spring application.xml

 day4  java消息中间件服务

day4  java消息中间件服务

day4  java消息中间件服务

day4  java消息中间件服务

day4  java消息中间件服务

day4  java消息中间件服务

PS :动态可以拓展

day4  java消息中间件服务

day4  java消息中间件服务

day4  java消息中间件服务