Java中的阻塞队列-ArrayBlockingQueue(一)

时间:2022-09-25 13:29:25

最近在看一些java基础的东西,看到了队列这章,打算对复习的一些知识点做一个笔记,也算是对自己思路的一个整理,本章先聊聊java中的阻塞队列

参考文章:

http://ifeve.com/java-blocking-queue/

https://blog.csdn.net/u014082714/article/details/52215130

Java中的阻塞队列-ArrayBlockingQueue(一)

由上图可以用看出java中的阻塞队列都实现了 BlockingQueue接口,BlockingQueue又继承自Queue

1、什么是阻塞队列?

阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。这两个附加的操作是:在队列为空时,获取元素的线程会等待队列变为非空。当队列满时,存储元素的线程会等待队列可用。阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程。阻塞队列就是生产者存放元素的容器,而消费者也只从容器里拿元素。

阻塞队列提供了四种处理方法:

Java中的阻塞队列-ArrayBlockingQueue(一)

  • 抛出异常:是指当阻塞队列满时候,再往队列里插入元素,会抛出IllegalStateException(“Queue full”)异常。当队列为空时,从队列里获取元素时会抛出NoSuchElementException异常 。
  • 返回特殊值:插入方法会返回是否成功,成功则返回true。移除方法,则是从队列里拿出一个元素,如果没有则返回null
  • 一直阻塞:当阻塞队列满时,如果生产者线程往队列里put元素,队列会一直阻塞生产者线程,直到拿到数据,或者响应中断退出。当队列空时,消费者线程试图从队列里take元素,队列也会阻塞消费者线程,直到队列可用。
  • 超时退出:当阻塞队列满时,队列会阻塞生产者线程一段时间,如果超过一定的时间,生产者线程就会退出。

2.、Java里的阻塞队列

JDK7提供了7个阻塞队列。分别是

  • ArrayBlockingQueue :一个由数组结构组成的有界阻塞队列。
  • LinkedBlockingQueue :一个由链表结构组成的有界阻塞队列。
  • PriorityBlockingQueue :一个支持优先级排序的*阻塞队列。
  • DelayQueue:一个使用优先级队列实现的*阻塞队列。
  • SynchronousQueue:一个不存储元素的阻塞队列。
  • LinkedTransferQueue:一个由链表结构组成的*阻塞队列。
  • LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列。

ArrayBlockingQueue

ArrayBlockingQueue是一个用数组实现的有界阻塞队列。此队列按照先进先出(FIFO)的原则对元素进行排序。默认情况下不保证访问者公平的访问队列,所谓公平访问队列是指阻塞的所有生产者线程或消费者线程,当队列可用时,可以按照阻塞的先后顺序访问队列,即先阻塞的生产者线程,可以先往队列里插入元素,先阻塞的消费者线程,可以先从队列里获取元素。通常情况下为了保证公平性会降低吞吐量。我们可以使用以下代码创建一个公平的阻塞队列

ArrayBlockingQueue fairQueue = new  ArrayBlockingQueue(1000,true);

public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
   throw new IllegalArgumentException();
   this.items = new Object[capacity];
   lock = new ReentrantLock(fair);
   notEmpty = lock.newCondition();
   notFull = lock.newCondition();
}

通过源码我们可以看到,构造器第一个参数是指定有界队列的大小(及数组的大小),第二个参数指定是否使用公平锁,这里可以看到阻塞队列的公平访问队列是通过重入锁来实现的(关于重入锁我们在别的章节介绍)

下边我们结合源码对其提供的方法做一个简单分析

关于构造器相关说明

/**
*
* 构造函数,设置队列的初始容量
*/
public ArrayBlockingQueue(int capacity) {
this(capacity, false);
} /**
* 构造函数。capacity设置数组大小 ,fair设置是否为公平锁
* capacity and the specified access policy.
*/
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);//是否为公平锁,如果是的话,那么先到的线程先获得锁对象。
//否则,由操作系统调度由哪个线程获得锁,一般为false,性能会比较高
notEmpty = lock.newCondition();
notFull = lock.newCondition();
} /**
*构造函数,带有初始内容的队列
*/
public ArrayBlockingQueue(int capacity, boolean fair,
Collection<? extends E> c) {
this(capacity, fair); final ReentrantLock lock = this.lock;
lock.lock(); //要给数组设置内容,先上锁
try {
int i = 0;
try {
for (E e : c) {
checkNotNull(e);
items[i++] = e;//依次拷贝内容
}
} catch (ArrayIndexOutOfBoundsException ex) {
throw new IllegalArgumentException();
}
count = i;
putIndex = (i == capacity) ? 0 : i;//如果putIndex大于数组大小 ,那么从0重新开始
} finally {
lock.unlock();//最后一定要释放锁
}
}
关于方法的说明

/** 
     * 添加一个元素,其实super.add里面调用了offer方法 
     */  
    public boolean add(E e) {  
        return super.add(e);  
    }  
 
/**
* 当调用offer方法返回false时,直接抛出异常
*/
     public boolean add(E e) {
       if (offer(e))
           return true;
else
throw new IllegalStateException("Queue full");
}
}
    /** 
     *加入成功返回true,否则返回false 
     *  
     */  
    public boolean offer(E e) {  
        checkNotNull(e);  
        final ReentrantLock lock = this.lock;  
        lock.lock();//上锁  
        try {  
            if (count == items.length) //超过数组的容量  
                return false;  
            else {  
                enqueue(e); //放入元素  
                return true;  
            }  
        } finally {  
            lock.unlock();  
        }  
    }  
  
    /** 
     * 如果队列已满的话,就会等待 
     */  
    public void put(E e) throws InterruptedException {  
        checkNotNull(e);  
        final ReentrantLock lock = this.lock;  
        lock.lockInterruptibly();//和lock()方法的区别是让它在阻塞时也可抛出异常跳出  
        try {  
            while (count == items.length)  
                notFull.await(); //这里就是阻塞了,要注意。如果运行到这里,那么它会释放上面的锁,一直等到notify  
            enqueue(e);  
        } finally {  
            lock.unlock();  
        }  
    }  
  
    /** 
     * 带有超时时间的插入方法,unit表示是按秒、分、时哪一种 
     */  
    public boolean offer(E e, long timeout, TimeUnit unit)  
        throws InterruptedException {  
  
        checkNotNull(e);  
        long nanos = unit.toNanos(timeout);  
        final ReentrantLock lock = this.lock;  
        lock.lockInterruptibly();  
        try {  
            while (count == items.length) {  
                if (nanos <= 0)  
                    return false;  
                nanos = notFull.awaitNanos(nanos);//带有超时等待的阻塞方法  
            }  
            enqueue(e);//入队  
            return true;  
        } finally {  
            lock.unlock();  
        }  
    }  
  
    //实现的方法,如果当前队列为空,返回null  
    public E poll() {  
        final ReentrantLock lock = this.lock;  
        lock.lock();  
        try {  
            return (count == 0) ? null : dequeue();  
        } finally {  
            lock.unlock();  
        }  
    }  
     //实现的方法,如果当前队列为空,一直阻塞  
    public E take() throws InterruptedException {  
        final ReentrantLock lock = this.lock;  
        lock.lockInterruptibly();  
        try {  
            while (count == 0)  
                notEmpty.await();//队列为空,阻塞方法  
            return dequeue();  
        } finally {  
            lock.unlock();  
        }  
    }  
    //带有超时时间的取元素方法,否则返回Null  
    public E poll(long timeout, TimeUnit unit) throws InterruptedException {  
        long nanos = unit.toNanos(timeout);  
        final ReentrantLock lock = this.lock;  
        lock.lockInterruptibly();  
        try {  
            while (count == 0) {  
                if (nanos <= 0)  
                    return null;  
                nanos = notEmpty.awaitNanos(nanos);//超时等待  
            }  
            return dequeue();//取得元素  
        } finally {  
            lock.unlock();  
        }  
    }  
    //只是看一个队列最前面的元素,取出是不删除队列中的原来元素。队列为空时返回null  
    public E peek() {  
        final ReentrantLock lock = this.lock;  
        lock.lock();  
        try {  
            return itemAt(takeIndex); // 队列为空时返回null  
        } finally {  
            lock.unlock();  
        }  
    }  
  
    /** 
     * 返回队列当前元素个数 
     * 
     */  
    public int size() {  
        final ReentrantLock lock = this.lock;  
        lock.lock();  
        try {  
            return count;  
        } finally {  
            lock.unlock();  
        }  
    }  
  
    /** 
     * 返回当前队列再放入多少个元素就满队 
     */  
    public int remainingCapacity() {  
        final ReentrantLock lock = this.lock;  
        lock.lock();  
        try {  
            return items.length - count;  
        } finally {  
            lock.unlock();  
        }  
    }  
  
    /** 
     *  从队列中删除一个元素的方法。删除成功返回true,否则返回false 
     */  
    public boolean remove(Object o) {  
        if (o == null) return false;  
        final Object[] items = this.items;  
        final ReentrantLock lock = this.lock;  
        lock.lock();  
        try {  
            if (count > 0) {  
                final int putIndex = this.putIndex;  
                int i = takeIndex;  
                do {  
                    if (o.equals(items[i])) {  
                        removeAt(i); //真正删除的方法  
                        return true;  
                    }  
                    if (++i == items.length)  
                        i = 0;  
                } while (i != putIndex);//一直不断的循环取出来做判断  
            }  
            return false;  
        } finally {  
            lock.unlock();  
        }  
    }  
  
    /** 
     * 是否包含一个元素 
     */  
    public boolean contains(Object o) {  
        if (o == null) return false;  
        final Object[] items = this.items;  
        final ReentrantLock lock = this.lock;  
        lock.lock();  
        try {  
            if (count > 0) {  
                final int putIndex = this.putIndex;  
                int i = takeIndex;  
                do {  
                    if (o.equals(items[i]))  
                        return true;  
                    if (++i == items.length)  
                        i = 0;  
                } while (i != putIndex);  
            }  
            return false;  
        } finally {  
            lock.unlock();  
        }  
    }  
  
    /** 
     * 清空队列 
     * 
     */  
    public void clear() {  
        final Object[] items = this.items;  
        final ReentrantLock lock = this.lock;  
        lock.lock();  
        try {  
            int k = count;  
            if (k > 0) {  
                final int putIndex = this.putIndex;  
                int i = takeIndex;  
                do {  
                    items[i] = null;  
                    if (++i == items.length)  
                        i = 0;  
                } while (i != putIndex);  
                takeIndex = putIndex;  
                count = 0;  
                if (itrs != null)  
                    itrs.queueIsEmpty();  
                for (; k > 0 && lock.hasWaiters(notFull); k--)  
                    notFull.signal();  
            }  
        } finally {  
            lock.unlock();  
        }  
    }  
  
    /** 
     * 取出所有元素到集合 
     */  
    public int drainTo(Collection<? super E> c) {  
        return drainTo(c, Integer.MAX_VALUE);  
    }  
  
    /** 
     * 取出所有元素到集合 
     */  
    public int drainTo(Collection<? super E> c, int maxElements) {  
        checkNotNull(c);  
        if (c == this)  
            throw new IllegalArgumentException();  
        if (maxElements <= 0)  
            return 0;  
        final Object[] items = this.items;  
        final ReentrantLock lock = this.lock;  
        lock.lock();  
        try {  
            int n = Math.min(maxElements, count);  
            int take = takeIndex;  
            int i = 0;  
            try {  
                while (i < n) {  
                    @SuppressWarnings("unchecked")  
                    E x = (E) items[take];  
                    c.add(x);  
                    items[take] = null;  
                    if (++take == items.length)  
                        take = 0;  
                    i++;  
                }  
                return n;  
            } finally {  
                // Restore invariants even if c.add() threw  
                if (i > 0) {  
                    count -= i;  
                    takeIndex = take;  
                    if (itrs != null) {  
                        if (count == 0)  
                            itrs.queueIsEmpty();  
                        else if (i > take)  
                            itrs.takeIndexWrapped();  
                    }  
                    for (; i > 0 && lock.hasWaiters(notFull); i--)  
                        notFull.signal();  
                }  
            }  
        } finally {  
            lock.unlock();  
        }  
    }  

Java中的阻塞队列-ArrayBlockingQueue(一)的更多相关文章

  1. 聊聊并发(七)——Java中的阻塞队列

    3. 阻塞队列的实现原理 聊聊并发(七)--Java中的阻塞队列 作者 方腾飞 发布于 2013年12月18日 | ArchSummit全球架构师峰会(北京站)2016年12月02-03日举办,了解更 ...

  2. Java中的阻塞队列(BlockingQueue)

    1. 什么是阻塞队列 阻塞队列(BlockingQueue)是 Java 5 并发新特性中的内容,阻塞队列的接口是 java.util.concurrent.BlockingQueue,它提供了两个附 ...

  3. 多线程编程学习六&lpar;Java 中的阻塞队列&rpar;&period;

    介绍 阻塞队列(BlockingQueue)是指当队列满时,队列会阻塞插入元素的线程,直到队列不满:当队列空时,队列会阻塞获得元素的线程,直到队列变非空.阻塞队列就是生产者用来存放元素.消费者用来获取 ...

  4. 阻塞队列一——java中的阻塞队列

    目录 阻塞队列简介:介绍阻塞队列的特性与应用场景 java中的阻塞队列:介绍java中实现的供开发者使用的阻塞队列 BlockQueue中方法:介绍阻塞队列的API接口 阻塞队列的实现原理:具体的例子 ...

  5. Java中的阻塞队列

    1. 什么是阻塞队列? 阻塞队列(BlockingQueue)是一个支持两个附加操作的队列.这两个附加的操作是:在队列为空时,获取元素的线程会等待队列变为非空.当队列满时,存储元素的线程会等待队列可用 ...

  6. java 中的阻塞队列

    1.什么是阻塞队列: 支持阻塞的插入方法,意思是当队列满时,队列会阻塞插入元素的线程,知道队列不满. 支持阻塞的移除方法:意思是在队列为空时,获取元素的线程会等待队列变为非空. 插入和移除操作的4种处 ...

  7. Java核心知识点学习----多线程中的阻塞队列&comma;ArrayBlockingQueue介绍

    1.什么是阻塞队列? 所谓队列,遵循的是先进先出原则(FIFO),阻塞队列,即是数据共享时,A在写数据时,B想读同一数据,那么就将发生阻塞了. 看一下线程的四种状态,首先是新创建一个线程,然后,通过s ...

  8. java 5并发中的阻塞队列ArrayBlockingQueue的使用以及案例实现

    演示一个阻塞队列的使用 public class BlockingQueueTest { public static void main(String[] args) { //创建一个包含三个元素的阻 ...

  9. java中使用阻塞队列实现生产这与消费这之间的关系

    需求如下: 有一个生产者和一个消费者,生产者不断的生产产品,消费这不断的消费产品.产品总数为N. 1.生产顺序按队列的方式,先进先出. 2.生产者和消费这可以同时进行. 3.当生产者生产了N个产品后不 ...

随机推荐

  1. 分页查询的两种方法(双top 双order 和 row&lowbar;number&lpar;&rpar; over &lpar;&rpar;)

    --跳过10条取2条 也叫分页select top 2 * from studentwhere studentno not in (select top 2 studentno from studen ...

  2. Saltstack pillar组件

     pillar组件 pillar也是Saltstack最重要的组件之一,其作用是定义与被控主机相关的任何数据,定义好的数据可以被其他组件使用,如模板.state.API等.在pillar中定义的数据与 ...

  3. 【原】训练自己的haar-like特征分类器并识别物体(3)

    在前两篇文章中,我介绍了<训练自己的haar-like特征分类器并识别物体>的前三个步骤: 1.准备训练样本图片,包括正例及反例样本 2.生成样本描述文件 3.训练样本 4.目标识别 == ...

  4. 安装服务Memcached&plus;Nginx&plus;Php linux下安装

    Memcached安装 1.      源码安装libevent(下载地址:http://monkey.org/~provos/libevent/) 2.      源码安装memcached(下载地 ...

  5. 详解MySQL中EXPLAIN解释命令&lpar;转&rpar;

    explain显示了mysql如何使用索引来处理select语句以及连接表.可以帮助选择更好的索引和写出更优化的查询语句. 使用方法,在select语句前加上explain就可以了: 如: expla ...

  6. 输出第N个素数

    输出第N个素数 public class FindNthPrime { public static void main(String[] args){ int N = Integer.parseInt ...

  7. bzoj1233&colon; &lbrack;Usaco2009Open&rsqb;干草堆tower

    Description 奶牛们讨厌黑暗. 为了调整牛棚顶的电灯的亮度,Bessie必须建一座干草堆使得她能够爬上去够到灯泡 .一共有N大包的干草(1<=N<=100000)(从1到N编号) ...

  8. &lbrack;转载&rsqb; Hadoop MapReduce

    转载自http://blog.csdn.net/yfkiss/article/details/6387613和http://blog.csdn.net/yfkiss/article/details/6 ...

  9. ss客户端以及tcp&comma;udp&comma;dns代理ss-tproxy在线安装版--centos7&period;3 x64以上&lpar;7&period;3-7&period;6x64测试通过&rpar;

    #!/bin/sh # # Script for automatic setup of an SS-TPROXY server on CentOS 7.3 Minimal. # export PATH ...

  10. oracle查询指定月份数据

    SELECT * FROM [表名]       where  to_number(to_char([表中日期字段],'mm')) = [要查找的月份]