【bird-java】分布式服务间的事件总线EventBus

时间:2022-09-29 08:43:41

什么是EventBus
EventBus是对发布-订阅模式的一种实现。其以一种非常优雅的方式实现了组件间的解耦与通信,在Android开发、DDD等领域都有非常广泛的应用。
【bird-java】分布式服务间的事件总线EventBus

事件流大致如下:

  • Producer向EventBus发送事件。
  • EventBus向所有监听了该事件的Consumer推送事件。
  • 监听了该事件的Consumer消费事件。

注:一个组件即可以是Producer,也可以是Consumer。

分布式服务间的EventBus
在分布式系统中,事件在服务之间的传递要比单机EventBus复杂很多。有没有一种适用于分布式服务之间的,并且事件传递就像单机一样简单的EventBus呢?在GitHub上搜索了JAVA实现的EventBus,排名前十的几乎都是用于Android或JAVA的单机事件总线。良久之后...还是自己动手吧。集群环境下的EventBus比单机版需要多考虑一些问题,比如:

  1. 服务集群部署的情况下,如何保证每个集群均可订阅该事件,且每个集群只能消费一次该事件。
  2. 如何实现一个服务内部多个`xxxService`订阅同一事件。

解决方案:

  1. 使用`kafka`实现集群间的发布订阅(基于`topic`),同一集群处于同一个kafka的consumer-group来保证每个集群只会消费一次该事件。
  2. 服务在启动时可反射获得所有实现了`IEventHandler<TEventArg>`的类并缓存,服务消费消息时获取所有注册了该消息的handler并调用其`HandleEvent`方法。

【bird-java】分布式服务间的事件总线EventBus

部分关键源码

1、事件消息的定义

public abstract class EventArg implements IEventArg{

    private Date eventTime;

    public EventArg(){
eventTime = new Date();
} public Date getEventTime() {
return eventTime;
} public void setEventTime(Date eventTime) {
this.eventTime = eventTime;
}
}

事件消息默认记录创建时间,可扩展其他字段,比如发送时间、标识等。

2、使用spring-kafka发送消息

/**
* kafka事件注册器,向kafka队列中push消息
*/
@Component
public class KafkaRegister implements IEventRegister { @Autowired(required = false)
private KafkaTemplate<String,IEventArg> kafkaTemplate; /**
* 事件注册
*
* @param eventArg 事件参数
*/
@Override
public void regist(IEventArg eventArg) {
kafkaTemplate.send(getTopic(eventArg),eventArg);
} /**
* 获取kafka的topic
*
*
* @param eventArg
* @return topic
*/
private String getTopic(IEventArg eventArg){
return eventArg.getClass().getName();
}
}

3、消费kafka消息并执行当前服务中所有订阅了该消息的事件

/**
* kafka事件监听器
*/
public class KafkaEventArgListener implements MessageListener<String,EventArg> { @Autowired
private IEventHandlerFactory eventHandlerFactory; @Override
public void onMessage(ConsumerRecord<String, EventArg> consumerRecord) {
if (consumerRecord == null) return;
EventArg value = consumerRecord.value(); Set<IEventHandler> handlers = eventHandlerFactory.getHandlers(value);
if (handlers == null) return;
for (IEventHandler handler : handlers) {
handler.HandleEvent(value);
}
}
}

EventBus的使用

1、事件的定义。所有事件均继承于上文EventArg抽象类,示例如下:

public class TestEventArg extends EventArg{
private String value; public String getValue() {
return value;
} public void setValue(String value) {
this.value = value;
}
}

2、事件发布。示例代码:

eventBus.push(new TestEventArg());

3、事件订阅。一个服务发布事件之后,任何服务中的任何`xxxServiceImpl`类均可订阅该事件,实现`IEventHandler<TEventArg>`接口即可完成事件的订阅,示例如下:

public class FormServiceImpl extends AbstractServiceImpl<Form> implements FormService,IEventHandler<TestEventArg> {

    @Override
public void HandleEvent(TestEventArg eventArg) {
System.out.println("notify zero======");
}
}

整体来说,使用还是很简单的,EventBus实现与使用示例收录于bird-java项目中,项目地址:https://github.com/liuxx001/bird-java

【bird-java】分布式服务间的事件总线EventBus的更多相关文章

  1. vue*事件总线eventBus的简单理解和使用

    公共事件总线eventBus的实质就是创建一个vue实例,通过一个空的vue实例作为桥梁实现vue组件间的通信.它是实现非父子组件通信的一种解决方案. 用法如下: 第一步:项目中创建一个js文件(我通 ...

  2. ASP&period;NET Core基于微软微服务eShopOnContainer事件总线EventBus的实现

    这个EventBus的实现是基于微软微服务https://github.com/dotnet-architecture/eShopOnContainers项目的,我把它从项目中抽离出来,打包成nuge ...

  3. java分布式服务框架Dubbo的介绍与使用

    1. Dubbo是什么? Dubbo是一个分布式服务框架,致力于提供高性能和透明化的RPC远程服务调用方案,以及SOA服务治理方案.简单的说,dubbo就是个服务框架,如果没有分布式的需求,其实是不需 ...

  4. 转载:java分布式服务框架Dubbo的介绍与使用

    1. Dubbo是什么? Dubbo是一个分布式服务框架,致力于提供高性能和透明化的RPC远程服务调用方案,以及SOA服务治理方案.简单的说,dubbo就是个服务框架,如果没有分布式的需求,其实是不需 ...

  5. Java分布式服务框架Dubbo初探(待实践)

    Dubbo是什么? Dubbo[]是一个分布式服务框架,致力于提供高性能和透明化的RPC远程服务调用方案,以及SOA服务治理方案. 其核心部分包含: 远程通讯: 提供对多种基于长连接的NIO框架抽象封 ...

  6. 事件总线 EventBus

    661. .net中事件模型很优雅的实现了观察者模式,同时被大量的使用在各种框架中. [2016-04-30 10:52:42]662. Prism框架中实现了一个典型的EventAggregator ...

  7. Guava&colon; 事件总线EventBus

    EventBus 直译过来就是事件总线,它使用发布订阅模式支持组件之间的通信,不需要显式地注册回调,比观察者模式更灵活,可用于替换Java中传统的事件监听模式,EventBus的作用就是解耦,它不是通 ...

  8. 自己动手写事件总线&lpar;EventBus&rpar;

    本文由云+社区发表 事件总线核心逻辑的实现. EventBus的作用 Android中存在各种通信场景,如Activity之间的跳转,Activity与Fragment以及其他组件之间的交互,以及在某 ...

  9. C&num; 事件总线 EventBus

    1. 引言 事件总线这个概念对你来说可能很陌生,但提到观察者(发布-订阅)模式,你也许就很熟悉.事件总线是对发布-订阅模式的一种实现.它是一种集中式事件处理机制,允许不同的组件之间进行彼此通信而又不需 ...

随机推荐

  1. 51nod 1117 聪明的木匠 &lpar;哈夫曼树&rpar;

    题目:传送门. 题意:中文题. 题解:就是构造一颗哈夫曼树,数据结构里的知识. #include <iostream> #include <cstdio> #include & ...

  2. JavaScript设计模式——前奏(封装和信息隐藏)

    前面一篇讲了js设计模式的前奏,包括接口的讲解.. 三:封装和信息隐藏: 信息隐藏用来进行解耦,定义一些私有的数据和方法. 封装是用来实现信息隐藏的技术,通过闭包实现私有数据的定义和使用. 接口在这其 ...

  3. &lbrack;codeforces 241&rsqb;C&period; Mirror Box

    [codeforces 241]C. Mirror Box 试题描述 Mirror Box is a name of a popular game in the Iranian National Am ...

  4. ListView往TreView里面拖拽

    ListView往TreView里面拖拽       unit Unit1; interface uses   Windows, Messages, SysUtils, Variants, Class ...

  5. Linux iostat监测IO状态&lpar;转&rpar;

    Linux iostat监测IO状态 2010-03-1  |  13:13分类:Linux,技术细节  |  标签:Linux  |  53,945 views Linux系统出现了性能问题,一般我 ...

  6. EhLib DBGridEh组件在Delphi中应用全攻略总结(转)

    EhLib DBGridEh组件在Delphi中应用全攻略总结(转) http://blog.sina.com.cn/s/blog_94b1b40001013xn0.html 优化SQL查询:如何写出 ...

  7. css常用居中

    对一个已知大小的元素上下左右居中(已知大小了,直接margin也就行了): css如下:.parent{height:100px;width:100px;background:grey;positio ...

  8. ABP官方文档翻译 2&period;5 设置管理

    设置管理 介绍 关于 ISettingStore 定义设置 设置范围 重写设置定义 获取设置值 服务端 客户端 更改设置 关于缓存 介绍 每个应用都需要存储设置,并且在应用的某些地方需要使用这些设置. ...

  9. edgedb 内部pg 数据存储的探索 &lpar;一&rpar;基本环境搭建

    edgedb 是基于pg 上的对象关系数据库,已经写过使用docker 运行的demo,为了探索内部的原理,做了一下尝试,开启pg 访问 后边会进一步的学习 环境准备 为了测试,使用yum 安装 安装 ...

  10. Linux端口映射,80端口映射到8080端口

    iptables -t nat -A PREROUTING -i eth0 -p tcp --dport 80 -j REDIRECT --to-port 8080 其中eth0为外网网卡名称 ipt ...