并发框架Disruptor场景应用

时间:2021-06-11 07:23:05
今天用一个停车场问题来加深对Disruptor的理解。一个有关汽车进入停车场的问题。当汽车进入停车场时,系统首先会记录汽车信息。同时也会发送消息到其他系统处理相关业务,最后发送短信通知车主收费开始。看了很多文章,里面的代码都是大同小异的,可能代码真的是很经典。以下代码也是来源网络,只是自己手动敲的,加了一些注释。
 
代码包含以下内容:
1) 事件对象Event
2)三个消费者Handler
3)一个生产者Processer
4)执行Main方法
Event类:汽车信息
  1.  
    public class MyInParkingDataEvent {
  2.  
     
  3.  
    private String carLicense; // 车牌号
  4.  
     
  5.  
    public String getCarLicense() {
  6.  
    return carLicense;
  7.  
    }
  8.  
     
  9.  
    public void setCarLicense(String carLicense) {
  10.  
    this.carLicense = carLicense;
  11.  
    }
  12.  
     
  13.  
    }

Handler类:一个负责存储汽车数据,一个负责发送kafka信息到其他系统中,最后一个负责给车主发短信通知

  1.  
    import com.lmax.disruptor.EventHandler;
  2.  
    import com.lmax.disruptor.WorkHandler;
  3.  
     
  4.  
    /**
  5.  
    * Handler 第一个消费者,负责保存进场汽车的信息
  6.  
    *
  7.  
    */
  8.  
    public class MyParkingDataInDbHandler implements EventHandler<MyInParkingDataEvent> , WorkHandler<MyInParkingDataEvent>{
  9.  
     
  10.  
    @Override
  11.  
    public void onEvent(MyInParkingDataEvent myInParkingDataEvent) throws Exception {
  12.  
    long threadId = Thread.currentThread().getId(); // 获取当前线程id
  13.  
    String carLicense = myInParkingDataEvent.getCarLicense(); // 获取车牌号
  14.  
    System.out.println(String.format("Thread Id %s 保存 %s 到数据库中 ....", threadId, carLicense));
  15.  
    }
  16.  
     
  17.  
    @Override
  18.  
    public void onEvent(MyInParkingDataEvent myInParkingDataEvent, long sequence, boolean endOfBatch)
  19.  
    throws Exception {
  20.  
    this.onEvent(myInParkingDataEvent);
  21.  
    }
  22.  
     
  23.  
    }
  1.  
    import com.lmax.disruptor.EventHandler;
  2.  
     
  3.  
    /**
  4.  
    * 第二个消费者,负责发送通知告知工作人员(Kafka是一种高吞吐量的分布式发布订阅消息系统)
  5.  
    */
  6.  
    public class MyParkingDataToKafkaHandler implements EventHandler<MyInParkingDataEvent>{
  7.  
     
  8.  
    @Override
  9.  
    public void onEvent(MyInParkingDataEvent myInParkingDataEvent, long sequence, boolean endOfBatch)
  10.  
    throws Exception {
  11.  
    long threadId = Thread.currentThread().getId(); // 获取当前线程id
  12.  
    String carLicense = myInParkingDataEvent.getCarLicense(); // 获取车牌号
  13.  
    System.out.println(String.format("Thread Id %s 发送 %s 进入停车场信息给 kafka系统...", threadId, carLicense));
  14.  
    }
  15.  
     
  16.  
    }
  1.  
    import com.lmax.disruptor.EventHandler;
  2.  
     
  3.  
    /**
  4.  
    * 第三个消费者,sms短信服务,告知司机你已经进入停车场,计费开始。
  5.  
    */
  6.  
    public class MyParkingDataSmsHandler implements EventHandler<MyInParkingDataEvent>{
  7.  
     
  8.  
    @Override
  9.  
    public void onEvent(MyInParkingDataEvent myInParkingDataEvent, long sequence, boolean endOfBatch)
  10.  
    throws Exception {
  11.  
    long threadId = Thread.currentThread().getId(); // 获取当前线程id
  12.  
    String carLicense = myInParkingDataEvent.getCarLicense(); // 获取车牌号
  13.  
    System.out.println(String.format("Thread Id %s 给 %s 的车主发送一条短信,并告知他计费开始了 ....", threadId, carLicense));
  14.  
    }
  15.  
     
  16.  
    }

Producer类:负责上报停车数据

  1.  
    import java.util.concurrent.CountDownLatch;
  2.  
    import com.lmax.disruptor.EventTranslator;
  3.  
    import com.lmax.disruptor.dsl.Disruptor;
  4.  
     
  5.  
    /**
  6.  
    * 生产者,进入停车场的车辆
  7.  
    */
  8.  
    public class MyInParkingDataEventPublisher implements Runnable{
  9.  
     
  10.  
    private CountDownLatch countDownLatch; // 用于监听初始化操作,等初始化执行完毕后,通知主线程继续工作
  11.  
    private Disruptor<MyInParkingDataEvent> disruptor;
  12.  
    private static final Integer NUM = 1; // 1,10,100,1000
  13.  
     
  14.  
    public MyInParkingDataEventPublisher(CountDownLatch countDownLatch,
  15.  
    Disruptor<MyInParkingDataEvent> disruptor) {
  16.  
    this.countDownLatch = countDownLatch;
  17.  
    this.disruptor = disruptor;
  18.  
    }
  19.  
     
  20.  
    @Override
  21.  
    public void run() {
  22.  
    MyInParkingDataEventTranslator eventTranslator = new MyInParkingDataEventTranslator();
  23.  
    try {
  24.  
    for(int i = 0; i < NUM; i ++) {
  25.  
    disruptor.publishEvent(eventTranslator);
  26.  
    Thread.sleep(1000); // 假设一秒钟进一辆车
  27.  
    }
  28.  
    } catch (InterruptedException e) {
  29.  
    e.printStackTrace();
  30.  
    } finally {
  31.  
    countDownLatch.countDown(); // 执行完毕后通知 await()方法
  32.  
    System.out.println(NUM + "辆车已经全部进入进入停车场!");
  33.  
    }
  34.  
    }
  35.  
     
  36.  
    }
  37.  
     
  38.  
    class MyInParkingDataEventTranslator implements EventTranslator<MyInParkingDataEvent> {
  39.  
     
  40.  
    @Override
  41.  
    public void translateTo(MyInParkingDataEvent myInParkingDataEvent, long sequence) {
  42.  
    this.generateData(myInParkingDataEvent);
  43.  
    }
  44.  
     
  45.  
    private MyInParkingDataEvent generateData(MyInParkingDataEvent myInParkingDataEvent) {
  46.  
    myInParkingDataEvent.setCarLicense("车牌号: 鄂A-" + (int)(Math.random() * 100000)); // 随机生成一个车牌号
  47.  
    System.out.println("Thread Id " + Thread.currentThread().getId() + " 写完一个event");
  48.  
    return myInParkingDataEvent;
  49.  
    }
  50.  
     
  51.  
    }

执行的Main方法:

  1.  
    import com.lmax.disruptor.EventFactory;
  2.  
    import com.lmax.disruptor.YieldingWaitStrategy;
  3.  
    import com.lmax.disruptor.dsl.Disruptor;
  4.  
    import com.lmax.disruptor.dsl.EventHandlerGroup;
  5.  
    import com.lmax.disruptor.dsl.ProducerType;
  6.  
     
  7.  
    /**
  8.  
    * 执行的Main方法 ,
  9.  
    * 一个生产者(汽车进入停车场);
  10.  
    * 三个消费者(一个记录汽车信息,一个发送消息给系统,一个发送消息告知司机)
  11.  
    * 前两个消费者同步执行,都有结果了再执行第三个消费者
  12.  
    */
  13.  
    public class MyInParkingDataEventMain {
  14.  
     
  15.  
    public static void main(String[] args) {
  16.  
    long beginTime=System.currentTimeMillis();
  17.  
    int bufferSize = 2048; // 2的N次方
  18.  
    try {
  19.  
    // 创建线程池,负责处理Disruptor的四个消费者
  20.  
    ExecutorService executor = Executors.newFixedThreadPool(4);
  21.  
     
  22.  
    // 初始化一个 Disruptor
  23.  
    Disruptor<MyInParkingDataEvent> disruptor = new Disruptor<MyInParkingDataEvent>(new EventFactory<MyInParkingDataEvent>() {
  24.  
    @Override
  25.  
    public MyInParkingDataEvent newInstance() {
  26.  
    return new MyInParkingDataEvent(); // Event 初始化工厂
  27.  
    }
  28.  
    }, bufferSize, executor, ProducerType.SINGLE, new YieldingWaitStrategy());
  29.  
     
  30.  
    // 使用disruptor创建消费者组 MyParkingDataInDbHandler 和 MyParkingDataToKafkaHandler
  31.  
    EventHandlerGroup<MyInParkingDataEvent> handlerGroup = disruptor.handleEventsWith(
  32.  
    new MyParkingDataInDbHandler(), new MyParkingDataToKafkaHandler());
  33.  
     
  34.  
    // 当上面两个消费者处理结束后在消耗 smsHandler
  35.  
    MyParkingDataSmsHandler myParkingDataSmsHandler = new MyParkingDataSmsHandler();
  36.  
    handlerGroup.then(myParkingDataSmsHandler);
  37.  
     
  38.  
    // 启动Disruptor
  39.  
    disruptor.start();
  40.  
     
  41.  
    CountDownLatch countDownLatch = new CountDownLatch(1); // 一个生产者线程准备好了就可以通知主线程继续工作了
  42.  
    // 生产者生成数据
  43.  
    executor.submit(new MyInParkingDataEventPublisher(countDownLatch, disruptor));
  44.  
    countDownLatch.await(); // 等待生产者结束
  45.  
     
  46.  
    disruptor.shutdown();
  47.  
    executor.shutdown();
  48.  
    } catch (Exception e) {
  49.  
    e.printStackTrace();
  50.  
    }
  51.  
     
  52.  
    System.out.println("总耗时:"+(System.currentTimeMillis()-beginTime));
  53.  
    }
  54.  
     
  55.  
    }

--------------------- 本文来自 ITDragon龙 的CSDN 博客 ,全文地址请点击:https://blog.csdn.net/qq_19558705/article/details/77247912?utm_source=copy