基于synchronized实现的阻塞队列

时间:2021-10-16 19:57:16
 package com.lilei.pack09;

 import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; public class MySyncQueue<T> { private Object[] ts; int pos = -1; public MySyncQueue(int size) {
ts = new Object[size];
} public synchronized void push(T t) throws InterruptedException {
while (true) {
if (pos + 1 < ts.length) {
ts[++pos] = t;
notifyAll();
System.out.println(Thread.currentThread().getName() + " push,currentSize=" + (pos + 1));
return;
} else {
wait();
}
}
} public synchronized T pop() throws InterruptedException { while (true) {
if (pos >= 0) {
@SuppressWarnings("unchecked")
T t = (T) ts[pos--]; notifyAll(); System.out.println(Thread.currentThread().getName() + " pop,currentSize=" + (pos + 1)); return t;
} else {
wait();
}
} } public static class Inner { } public static void main(String[] args) {
ExecutorService es = Executors.newFixedThreadPool(30); final MySyncQueue<Inner> queue = new MySyncQueue<Inner>(15); int repeat = 1000; while (repeat-->0) { for (int i = 0; i < 15; i++) {
es.execute(new Runnable() {
public void run() { try {
queue.pop();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
} for (int i = 0; i < 15; i++) {
es.execute(new Runnable() {
public void run() { try {
queue.push(new Inner());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
} } es.shutdown();
} }