[Java EE] 多线程(五):单例模式与阻塞队列-2. 阻塞队列

时间:2024-05-04 07:13:09

2.1 什么是阻塞队列

阻塞队列是⼀种特殊的队列.也遵守"先进先出"的原则.
阻塞队列能是⼀种线程安全的数据结构(但是像我们前面学习的普通队列和优先级队列都是线程不安全的),并且具有以下特性:
• 当队列满的时候,继续⼊队列就会阻塞,直到有其他线程从队列中取⾛元素.
• 当队列空的时候,继续出队列也会阻塞,直到有其他线程往队列中插⼊元素.
阻塞队列的⼀个典型应⽤场景就是"⽣产者消费者模型".这是⼀种⾮常典型的开发模型.

拓展:消息队列
还有一种队列,叫做消息队列,首先通过topic这样的参数,对数据进行归类,出队列的时候,指定topic,每个topic下的所有数据先进先出,消息队列往往也带有阻塞特性.由于这种队列实在是太好用了,在部署服务器的时候,一般对这种队列会进行单独部署.

2.2 标准库中的阻塞队列

在Java标准库中内置了阻塞队列.如果我们需要在⼀些程序中使⽤阻塞队列.直接使⽤标准库中的即
可.
• BlockingQueue是⼀个接⼝.真正实现的类是LinkedBlockingQueue / ArrayBlockingQueue / PriorityBlockingQueue.
put方法用于阻塞式的入队列,take用于阻塞式的出队列.
• BlockingQueue也有offer,poll,peek等⽅法,但是这些⽅法不带有阻塞特性.

2.3 生产者消费者模型

⽣产者消费者模式就是通过⼀个容器来解决⽣产者和消费者的强耦合问题
⽣产者和消费者彼此之间不直接通讯, 而通过阻塞队列来进行通讯所以⽣产者⽣产完数据之后不⽤等待消费者处理,直接扔给阻塞队列,消费者不找⽣产者要数据,⽽是直接从阻塞队列⾥取.
在这里插入图片描述
生产者和消费者模型有以下好处:

  1. 阻塞队列就相当于⼀个缓冲区,平衡了⽣产者和消费者的处理能⼒.(削峰填谷)
    场景: 天猫11·11秒杀商品,在双十一的零点,阿里巴巴的服务器会接收到大量的支付请求,但是这样请求又是复杂的,很有可能服务器就被这样的一波请求冲垮了,这时候便会用到阻塞队列,让服务器慢慢处理这些指令.
    这样做可以有效进⾏"削峰",防⽌服务器被突然到来的⼀波请求直接冲垮.
    在这里插入图片描述

  2. 阻塞队列也能使⽣产者和消费者之间解耦.
    如果直接连接,服务器A和服务器B之间的逻辑关联会非常强,如果A或者B出现了bug或者出现了修改,另一个服务器就会被牵连到.这样的耦合性就比较高.我们希望通过阻塞队列来降低耦合性.让两个服务器和阻塞队列关联.

举例:过年包饺子
有请助教:空,荧
包饺子需要两步,擀皮,包馅,假如荧擀皮(生产者),空包馅(消费者),荧擀完皮会放在他们中间的盖帘上(阻塞队列).
在这里插入图片描述
假如荧擀皮慢,空包馅快,空在盖帘空的时候,空就会阻塞等待.
假如空包馅慢,荧擀皮快,空在盖帘满的时候,荧就会阻塞等待.

下面我们来用代码实现一个生产者消费者模型:

import java.util.concurrent.LinkedBlockingQueue;

/**
 * 生产者与消费者模型
 */
public class Demo23 {
    public static void main(String[] args) {
        BlockingQueue<Integer> blockingQueue = new LinkedBlockingQueue<>();
        Thread thread = new Thread(()->{
            int count = 1;
            while (true){
                try {
                    blockingQueue.put(count);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
                System.out.println("put:" + count);
                count++;
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        });
        Thread thread1 = new Thread(()->{
            int num = 0;
            while (true){
                try {
                    num = blockingQueue.take();//当队列为空的时候就阻塞等待
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
                System.out.println("take:" + num);
            }
        });
        thread.start();
        thread1.start();
    }
}

2.4 阻塞队列的实现

  • 通过"循环队列"的方式实现.
    循环队列描述
  • 使用synchronized进行加锁控制.
  • put插⼊元素的时候,判定如果队列满了,就进⾏wait.(注意,要在循环中进⾏wait.被唤醒时不⼀定
    队列就不满了,因为还有可能是其他的线程的interrupt唤醒了该线程
    ).
  • take取出元素的时候,判定如果队列为空,就进⾏wait.(也是循环wait)
/**
 * 实现阻塞队列
 */
public class Block {
    public int size = 0;
    public int tail = 0;
    public int head = 0;
    public int[] item = new int[10];//默认循环队列的容积是10
    public int take() throws InterruptedException {//取队列元素
        synchronized (this){
            int ret = 0;
            while (size == 0){
                this.wait();
            }
            ret = item[head];//处队头元素
            head = (head+1)%item.length;
            size--;
            this.notify();//当出了一个元素之后,唤醒put的wait()
            return ret;
        }
    }
    public void put(int num) throws InterruptedException {
        synchronized (this){
            while (size == item.length){//队列满,阻塞等待
                this.wait();
            }
            item[tail] = num;
            tail = (tail+1)%item.length;
            size++;
            this.notify();//添加一个元素之后,唤醒take的阻塞
        }
    }
/**
*开始测试
**/

    public static void main(String[] args) {
        Block block = new Block();
        Thread thread = new Thread(()->{
            int num = 1;
            while (true){
                try {
                    block.put(num);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
                System.out.println("put:"+num);
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
                num++;
            }
        });
        Thread thread1 = new Thread(()->{
            int ret = 0;
            while (true){
                try {
                    ret = block.take();
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
                System.out.println("take:"+ret);
            }
        });
        thread.start();
        thread1.start();
    }
}

测试结果:
在这里插入图片描述