【Java多线程系列三】实现线程同步的方法

时间:2021-07-14 11:00:01

两种实现线程同步的方法

方法 特性
synchronized  不需要显式的加锁,易实现
ReentrantLock 需要显式地加解锁,灵活性更好,性能更优秀,结合Condition可实现多种条件锁 
package com.concurrent.test;

import java.util.Stack;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock; /**
* @Description: 三种方法实现生产者/消费者
*/
public class ThreadSynchronizeTest { public static void main(String[] args) {
ProducerConsumer producerConsumer = new ProducerConsumerViaBlockingQueue();
producerConsumer.test();
}
} abstract class ProducerConsumer {
protected int capacity = 10;
protected int element = 0; protected abstract void produce() throws InterruptedException; protected abstract void consume() throws InterruptedException; public void test() {
Thread producer = new Thread(new Runnable() {
@Override
public void run() {
while (true) {
try {
produce();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}); Thread consumer = new Thread(new Runnable() {
@Override
public void run() {
while (true) {
try {
consume();
} catch (InterruptedException e) {
e.printStackTrace();
}
} }
}); producer.start();
consumer.start(); }
} /**
* 方法一:ReentrantLock结合Condition
*/
class ProducerConsumerViaReentrantLock extends ProducerConsumer {
private Stack<Integer> stack = new Stack<>();
private ReentrantLock lock = new ReentrantLock();
private Condition notFull = lock.newCondition();
private Condition notEmpty = lock.newCondition(); @Override
protected void produce() throws InterruptedException {
try {
lock.lock();
if (stack.size() == capacity) {
notFull.await();
} ++element;
System.out.println(Thread.currentThread().getId() + "," + Thread.currentThread().getName() + " produce " + element);
stack.push(element);
notEmpty.signalAll();
Thread.sleep(1000L);
} finally {
lock.unlock();
}
} @Override
protected void consume() throws InterruptedException {
try {
lock.lock();
if (stack.isEmpty()) {
notEmpty.await();
}
int element = stack.pop();
System.out.println(Thread.currentThread().getId() + "," + Thread.currentThread().getName() + " consume " + element);
notFull.signalAll();
} finally {
lock.unlock();
}
} } /**
* 方法二:synchronized 结合 wait、notify、notifyAll
*/
class ProducerConsumerViaObjectLock extends ProducerConsumer { private Stack<Integer> stack = new Stack<>();
private Object lock = new Object(); @Override
protected void produce() throws InterruptedException {
/*
* 1,lock为监视器
* 2,wait/notify/notifyAll方法必须在synchronized块内调用
* 3,调用wait/notify/notifyAll方法但不持有监视器的使用权将会抛出java.lang.IllegalMonitorStateException
*/
synchronized (lock) {
if (stack.size() == capacity) {
lock.wait();
} ++element;
System.out.println(Thread.currentThread().getId() + "," + Thread.currentThread().getName() + " produce " + element);
stack.push(element);
lock.notifyAll();
Thread.sleep(1000L);
}
} @Override
protected void consume() throws InterruptedException {
synchronized (lock) {
if (stack.isEmpty()) {
lock.wait();
} int element = stack.pop();
System.out.println(Thread.currentThread().getId() + "," + Thread.currentThread().getName() + " consume " + element);
lock.notifyAll();
}
}
} /**
* 方法三:BlockingQueue
*/
class ProducerConsumerViaBlockingQueue extends ProducerConsumer { private BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(capacity); @Override
protected void produce() throws InterruptedException {
++element;
System.out.println(Thread.currentThread().getId() + "," + Thread.currentThread().getName() + " produce " + element);
queue.put(element);
Thread.sleep(1000L);
} @Override
protected void consume() throws InterruptedException {
int element = queue.take();
System.out.println(Thread.currentThread().getId() + " consume " + element);
Thread.sleep(10000L);
}
}