Java集合源码学习(15)_Queue接口的实现PriorityQueue和PriorityBlockingQueue

时间:2022-03-31 14:48:05

一:PriorityQueue

继承自AbstractQueue类,队列里面的数据根据指定的Comparator或者自然排序(元素的compareTo())排序;

1:*的队列(可以动态扩展)

2:内部实现基于堆

3:队列的头元素,是按照排序规则最小的元素;

一:类变量:

private static final int DEFAULT_INITIAL_CAPACITY = 11;//默认的初始化容量
//实际的存放元素的数组,其实实现的是一个最小堆。
private transient Object[] queue;
//queue里面的元素的实际数量
private int size = 0;
//指定的比较器
private final Comparator<? super E> comparator;

二:几个方法

1:容量扩展

private void grow(int minCapacity) {
if (minCapacity < 0) // overflow
throw new OutOfMemoryError();
int oldCapacity = queue.length;
// Double size if small; else grow by 50%
int newCapacity = ((oldCapacity < 64) ? ((oldCapacity + 1) * 2) : ((oldCapacity / 2) * 3));
if (newCapacity < 0) // overflow
newCapacity = Integer.MAX_VALUE;
if (newCapacity < minCapacity)
newCapacity = minCapacity;
queue = Arrays.copyOf(queue, newCapacity);
}

2:元素入队列

public boolean offer(E e) {
if (e == null)
throw new NullPointerException();
modCount++;
int i = size;
if (i >= queue.length)
grow(i + 1);//如果空间不足,扩容
size = i + 1;
if (i == 0)
queue[0] = e;
else
siftUp(i, e);//将数据放置到堆的最后位置,自下而上的调整堆
return true;
}
自下而上调整的算法如下:
private void siftUp(int k, E x) {
if (comparator != null)
siftUpUsingComparator(k, x);
else
siftUpComparable(k, x);
}

private void siftUpComparable(int k, E x) {
Comparable<? super E> key = (Comparable<? super E>) x;
while (k > 0) {
int parent = (k - 1) >>> 1;
Object e = queue[parent];
if (key.compareTo((E) e) >= 0)
break;
queue[k] = e;
k = parent;
}
queue[k] = key;
}

private void siftUpUsingComparator(int k, E x) {
while (k > 0) {
int parent = (k - 1) >>> 1;
Object e = queue[parent];
if (comparator.compare(x, (E) e) >= 0)
break;
queue[k] = e;
k = parent;
}
queue[k] = x;
}

3:元素出队列

public E poll() {
if (size == 0)
return null;
int s = --size;
modCount++;
E result = (E) queue[0];
E x = (E) queue[s];
queue[s] = null;
if (s != 0)
siftDown(0, x);//移除堆最上面的元素,然后自上而下调整堆
return result;
}
自上而下调整的算法如下:
private void siftDown(int k, E x) {
if (comparator != null)
siftDownUsingComparator(k, x);
else
siftDownComparable(k, x);
}

private void siftDownComparable(int k, E x) {
Comparable<? super E> key = (Comparable<? super E>) x;
int half = size >>> 1; // loop while a non-leaf
while (k < half) {
int child = (k << 1) + 1; // assume left child is least
Object c = queue[child];
int right = child + 1;
if (right < size && ((Comparable<? super E>) c).compareTo((E) queue[right]) > 0)
c = queue[child = right];
if (key.compareTo((E) c) <= 0)
break;
queue[k] = c;
k = child;
}
queue[k] = key;
}

private void siftDownUsingComparator(int k, E x) {
int half = size >>> 1;
while (k < half) {
int child = (k << 1) + 1;
Object c = queue[child];
int right = child + 1;
if (right < size && comparator.compare((E) c, (E) queue[right]) > 0)
c = queue[child = right];
if (comparator.compare(x, (E) c) <= 0)
break;
queue[k] = c;
k = child;
}
queue[k] = x;
}

学习过数据结构的话,如果对堆的算法有了解,理解上面这几个方法应该比较容易

二:PriorityBlockingQueue

继承自AbstractQueue,实现了BlockingQueue接口; 其实是,包装了PriorityQueue,对相应的方法进行了加锁的控制; 当然,也是一个*队列;

一:类的变量

private final PriorityQueue<E> q;//实际存储数据的队列
private final ReentrantLock lock = new ReentrantLock(true);//公平的锁
private final Condition notEmpty = lock.newCondition();//因为是无解的队列,所以不需要notFull的条件变量

二:关键方法

1:offer方法(立即返回,成功true)

public boolean offer(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
boolean ok = q.offer(e);
assert ok;
notEmpty.signal();//唤醒一个等待线程
return true;
} finally {
lock.unlock();
}
}

2:add()(抛异常)方法和put()方法(阻塞)和offer(time)(阻塞)

入队列都不会阻塞,直接调用offer方法的
public boolean add(E e) {
return offer(e);
}
public void put(E e) {offer(e); // never need to block}
public boolean offer(E e, long timeout, TimeUnit unit) {return offer(e); // never need to block}

3:poll()方法,不阻塞

public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return q.poll();//如果不存在返回null
} finally {
lock.unlock();
}
}

4:take()阻塞式获取

public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
try {
while (q.size() == 0)
notEmpty.await();
} catch (InterruptedException ie) {//不太理解为何有signal了一次?
notEmpty.signal(); // propagate to non-interrupted thread
throw ie;
}
E x = q.poll();
assert x != null;
return x;
} finally {
lock.unlock();
}
}

public E poll(long timeout, TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
E x = q.poll();
if (x != null)
return x;
if (nanos <= 0)
return null;
try {
nanos = notEmpty.awaitNanos(nanos);
} catch (InterruptedException ie) {
notEmpty.signal(); // propagate to non-interrupted thread
throw ie;
}
}
} finally {
lock.unlock();
}
}