Java语言快速实现简单MQ消息队列服务

时间:2021-05-18 01:04:54

MQ基础回顾

在上一篇消息通讯之关于消息队列MQ必须了解的相关概念中 , 我们尽可能地详细的了解了一些关于MQ (消息队列) 的相关概念,并且我们上一篇中提到一个最基本的MQ通讯模型如下所示,所以本章节使用JAVA语言自己动手来写一个MQ (类似ActiveMQ,RabbitMQ)

Java语言快速实现简单MQ消息队列服务

主要角色

首先我们必须需要搞明白MQ (消息队列) 中的三个基本角色

  • Producer : 消息生产者,负责生产消息并发送到Broker
  • Broker : 消息处理中心,负责接受消息,存储消息,转发消息
  • Consumer :消息消费者,负责消费消息

整体架构如下所示

Java语言快速实现简单MQ消息队列服务

自定义协议

首先从上一篇中介绍了协议的相关信息,具体厂商的MQ(消息队列)需要遵循某种协议或者自定义协议 , 消息的生产者和消费者需要遵循其协议(约定)才能后成功地生产消息和生产消息,所以在这里我们自定义一个协议如下.

消息处理中心 : 如果接收到的信息包含"SEND"字符串,即视为生产者发送的消息,消息处理中心需要将此信息存储等待消费者消费

消息处理中心 : 如果接受到的信息为CONSUME,既视为消费者发送消费请求,需要将存储的消息队列头部的信息转发给消费者,然后将此消息从队列中移除

消息处理中心 : 如果消息处理中心存储的消息满3条仍然没有消费者进行消费,则不再接受生产者的生产请求

消息生产者:需要遵循协议将生产的消息头部增加 "SEND:" 表示生产消息

消息消费者:需要遵循协议向消息处理中心发送 "CONSUME"字符串表示消费消息

流程顺序

项目构建流程

下面将整个MQ的构建流程过一遍

  1. 新建一个Broker类,内部维护一个ArrayBlockingQueue队列,提供生产消息和消费消息的方法,仅仅具备存储服务功能
  2. 新建一个BrokerServer类,将Broker发布为服务到本地9999端口,监听本地9999端口的Socket链接,在接受的信息中进行我们的协议校验, 这里仅仅具备接受消息,校验协议,转发消息功能;
  3. 新建一个MqClient类,此类提供与本地端口9999的Socket链接 , 仅仅具备生产消息和消费消息的方法
  4. 测试:新建两个MyClient类对象,分别执行其生产方法和消费方法

具体使用流程

  1. 生产消息:客户端执行生产消息方法,传入需要生产的信息,该信息需要遵循我们自定义的协议,消息处理中心服务在接受到消息会根据自定义的协议校验该消息是否合法,如果合法如果合法就会将该消息存储到Broker内部维护的ArrayBlockingQueue队列中.如果ArrayBlockingQueue队列没有达到我们协议中的最大长度将将消息添加到队列中,否则输出生产消息失败.
  2. 消息消息:客户端执行消费消息方法,Broker服务会校验请求的信息的信息是否等于CONSUME,如果验证成功则从Broker内部维护的ArrayBlockingQueue队列的Poll出一个消息返回给客户端

代码演示

消息处理中心 Broker

/**
* 消息处理中心
*/
public class Broker {
// 队列存储消息的最大数量
private final static int MAX_SIZE = 3; // 保存消息数据的容器
private static ArrayBlockingQueue<String> messageQueue = new ArrayBlockingQueue<String>(MAX_SIZE); // 生产消息
public static void produce(String msg) {
if (messageQueue.offer(msg)) {
System.out.println("成功向消息处理中心投递消息:" + msg + ",当前暂存的消息数量是:" + messageQueue.size());
} else {
System.out.println("消息处理中心内暂存的消息达到最大负荷,不能继续放入消息!");
}
System.out.println("=======================");
} // 消费消息
public static String consume() {
String msg = messageQueue.poll();
if (msg != null) {
// 消费条件满足情况,从消息容器中取出一条消息
System.out.println("已经消费消息:" + msg + ",当前暂存的消息数量是:" + messageQueue.size());
} else {
System.out.println("消息处理中心内没有消息可供消费!");
}
System.out.println("======================="); return msg;
} }

消息处理中心服务 BrokerServer

/**
* 用于启动消息处理中心
*/
public class BrokerServer implements Runnable { public static int SERVICE_PORT = 9999; private final Socket socket; public BrokerServer(Socket socket) {
this.socket = socket;
} @Override
public void run() {
try (
BufferedReader in = new BufferedReader(new InputStreamReader(
socket.getInputStream()));
PrintWriter out = new PrintWriter(socket.getOutputStream())
)
{
while (true) {
String str = in.readLine();
if (str == null) {
continue;
}
System.out.println("接收到原始数据:" + str); if (str.equals("CONSUME")) { //CONSUME 表示要消费一条消息
//从消息队列中消费一条消息
String message = Broker.consume();
out.println(message);
out.flush();
} else if (str.contains("SEND:")){
//接受到的请求包含SEND:字符串 表示生产消息放到消息队列中
Broker.produce(str);
}else {
System.out.println("原始数据:"+str+"没有遵循协议,不提供相关服务");
}
}
} catch (Exception e) {
e.printStackTrace();
}
} public static void main(String[] args) throws Exception {
ServerSocket server = new ServerSocket(SERVICE_PORT);
while (true) {
BrokerServer brokerServer = new BrokerServer(server.accept());
new Thread(brokerServer).start();
}
}
}

客户端 MqClient

/**
* 访问消息队列的客户端
*/
public class MqClient { //生产消息
public static void produce(String message) throws Exception {
//本地的的BrokerServer.SERVICE_PORT 创建SOCKET
Socket socket = new Socket(InetAddress.getLocalHost(), BrokerServer.SERVICE_PORT);
try (
PrintWriter out = new PrintWriter(socket.getOutputStream())
) {
out.println(message);
out.flush();
}
} //消费消息
public static String consume() throws Exception {
Socket socket = new Socket(InetAddress.getLocalHost(), BrokerServer.SERVICE_PORT);
try (
BufferedReader in = new BufferedReader(new InputStreamReader(
socket.getInputStream()));
PrintWriter out = new PrintWriter(socket.getOutputStream())
) {
//先向消息队列发送命令
out.println("CONSUME");
out.flush(); //再从消息队列获取一条消息
String message = in.readLine(); return message;
}
} }

测试MQ

public class ProduceClient {

    public static void main(String[] args) throws Exception {
MqClient client = new MqClient(); client.produce("SEND:Hello World");
} } public class ConsumeClient { public static void main(String[] args) throws Exception {
MqClient client = new MqClient();
String message = client.consume(); System.out.println("获取的消息为:" + message);
}
}

我们多执行几次客户端的生产方法和消费方法就可以看到一个完整的MQ的通讯过程,下面是我执行了几次的一些日志

接收到原始数据:SEND:Hello World
成功向消息处理中心投递消息:SEND:Hello World,当前暂存的消息数量是:1
=======================
接收到原始数据:SEND:Hello World
成功向消息处理中心投递消息:SEND:Hello World,当前暂存的消息数量是:2
=======================
接收到原始数据:SEND:Hello World
成功向消息处理中心投递消息:SEND:Hello World,当前暂存的消息数量是:3
=======================
接收到原始数据:SEND:Hello World
消息处理中心内暂存的消息达到最大负荷,不能继续放入消息!
=======================
接收到原始数据:Hello World
原始数据:Hello World没有遵循协议,不提供相关服务 接收到原始数据:CONSUME
已经消费消息:SEND:Hello World,当前暂存的消息数量是:2
=======================
接收到原始数据:CONSUME
已经消费消息:SEND:Hello World,当前暂存的消息数量是:1
=======================
接收到原始数据:CONSUME
已经消费消息:SEND:Hello World,当前暂存的消息数量是:0
=======================
接收到原始数据:CONSUME
消息处理中心内没有消息可供消费!
=======================

小结

本章示例代码主要源自分布式消息中间件实践一书 , 这里我们自己使用Java语言写了一个MQ消息队列 , 通过这个消息队列我们对MQ中的几个角色 "生产者,消费者,消费处理中心,协议"有了更深的理解 ; 那么下一章节我们就来一块学习具体厂商的MQ RabbitMQ

Java语言快速实现简单MQ消息队列服务的更多相关文章

  1. IM开发基础知识补课&lpar;五&rpar;:通俗易懂,正确理解并用好MQ消息队列

    1.引言 消息是互联网信息的一种表现形式,是人利用计算机进行信息传递的有效载体,比如即时通讯网坛友最熟悉的即时通讯消息就是其具体的表现形式之一. 消息从发送者到接收者的典型传递方式有两种: 1)一种我 ...

  2. 手把手教你用redis实现一个简单的mq消息队列(java)

    众所周知,消息队列是应用系统中重要的组件,主要解决应用解耦,异步消息,流量削锋等问题,实现高性能,高可用,可伸缩和最终一致性架构.目前使用较多的消息队列有 ActiveMQ,RabbitMQ,Zero ...

  3. C&num; Queue与RabbitMQ的爱恨情仇(文末附源码):Q与MQ消息队列简单应用(二)

    上一章我们讲了队列( Queue),这一章我们讲Message Queue消息队列,简称MQ. 定义: MQ是MessageQueue,消息队列的简称(是流行的开源消息队列系统,利用erlang语言开 ...

  4. Java编程的逻辑 &lpar;61&rpar; - 内存映射文件及其应用 - 实现一个简单的消息队列

    本系列文章经补充和完善,已修订整理成书<Java编程的逻辑>,由机械工业出版社华章分社出版,于2018年1月上市热销,读者好评如潮!各大网店和书店有售,欢迎购买,京东自营链接:http:/ ...

  5. 多维度对比5款主流分布式MQ消息队列,妈妈再也不担心我的技术选型了

    1.引言 对于即时通讯网来说,所有的技术文章和资料都在围绕即时通讯这个技术方向进行整理和分享,这一次也不例外.对于即时通讯系统(包括IM.消息推送系统等)来说,MQ消息中件间是非常常见的基础软件,但市 ...

  6. 使用Rabbit MQ消息队列

    使用Rabbit MQ消息队列 综合概述 消息队列 消息队列就是一个消息的链表,可以把消息看作一个记录,具有特定的格式以及特定的优先级.对消息队列有写权限的进程可以向消息队列中按照一定的规则添加新消息 ...

  7. 阿里云ACE共创空间——MQ消息队列产品测试

    一.产品背景消息队列是阿里巴巴集团自主研发的专业消息中间件. 产品基于高可用分布式集群技术,提供消息订阅和发布.消息轨迹查询.定时(延时)消息.资源统计.监控报警等一系列消息云服务,是企业级互联网架构 ...

  8. 【阿里云产品公测】消息队列服务MQS java SDK 机器人应用初体验

    [阿里云产品公测]消息队列服务MQS java SDK 机器人应用初体验 作者:阿里云用户啊里新人   初体验 之 测评环境 由于MQS支持外网访问,因此我在本地做了一些简单测试(可能有些业余),之后 ...

  9. 高并发架构系列:MQ消息队列的12点核心原理总结

    消息队列已经逐渐成为分布式应用场景.内部通信.以及秒杀等高并发业务场景的核心手段,它具有低耦合.可靠投递.广播.流量控制.最终一致性 等一系列功能. 无论是 RabbitMQ.RocketMQ.Act ...

随机推荐

  1. SQL Server优化技巧之SQL Server中的&quot&semi;MapReduce&quot&semi;

    日常的OLTP环境中,有时会涉及到一些统计方面的SQL语句,这些语句可能消耗巨大,进而影响整体运行环境,这里我为大家介绍如何利用SQL Server中的”类MapReduce”方式,在特定的统计情形中 ...

  2. WPF笔记&lpar;2&period;4 Grid&rpar;——Layout

    原文:WPF笔记(2.4 Grid)--Layout 第一章已经简单介绍过这个容器,这一节详细介绍.Grid一般是用表格(Grid.Row 和Grid.Column )的,比StackPanel更细致 ...

  3. Vue 事件驱动和依赖追踪

    之前关于 Vue 数据绑定原理的一点分析,最近需要回顾,就顺便发到随笔上了 在之前实现一个自己的Mvvm中,用 setter 来观测model,将界面上所有的 viewModel 绑定到 model ...

  4. python13&lowbar;day4

    上周复习 1,python基础 2,基本数据类型 3,函数式编程 函数式编程.三元运行.内置函数.文件处理 容易出问题的点 函数默认返回值为none,对于列表字典,传入引用. 1 2 3 4 5 6 ...

  5. 17089 最大m子段和

    17089 最大m子段和 时间限制:1000MS  内存限制:65535K提交次数:0 通过次数:0 题型: 编程题   语言: G++;GCC;VC Description "最大m子段和 ...

  6. PAT1012&colon;The Best Rank

    1012. The Best Rank (25) 时间限制 400 ms 内存限制 65536 kB 代码长度限制 16000 B 判题程序 Standard 作者 CHEN, Yue To eval ...

  7. linux达人养成计划学习笔记(七)—— 用户登录查看命令

    一.查看用户登录信息 1.命令格式 w 2.命令结果 第一行信息是:系统当前时间     系统运行总时间     登录用户数量     一分钟/五分钟/十分钟的系统负载(越大越差) 二.who命令 1 ...

  8. 【译】第12节---数据注解-ConcurrencyCheck

    原文:http://www.entityframeworktutorial.net/code-first/concurrencycheck-dataannotations-attribute-in-c ...

  9. Java数组逆序排列

    //逆序排列原理 /* A: 数组逆序原理* a: 题目分析* 通过观察发现,本题目要实现原数组元素倒序存放操作.即原数组存储元素为{12,69,852,25,89,588},逆序后为原数组存储元素变 ...

  10. 阿里Dragonfly docker p2p 镜像分发试用

      阿里的Dragonfly p2p 镜像分发已经开源了,同时加入了cncf ,很给力 拉取镜像 docker pull registry.cn-hangzhou.aliyuncs.com/alidr ...