package com.wenki.thread; import java.util.LinkedList; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class ProductAndConsume { public static void main(String[] args) { ProductAndConsume o = new ProductAndConsume(); // Storage storage = o.new StorageOne(); // Storage storage = o.new StorageTwo(); Storage storage = o.new StorageThree(); Consumer consumer1 = o.new Consumer(storage); Consumer consumer2 = o.new Consumer(storage); Producter producter1 = o.new Producter(storage); Producter producter2 = o.new Producter(storage); Producter producter3 = o.new Producter(storage); consumer1.start(); consumer2.start(); producter1.start(); producter2.start(); producter3.start(); } class Producter extends Thread{ Storage storage; public Producter(Storage storage) { this.storage = storage; } public void product(){ this.storage.product(); } @Override public void run() { for(;;){ product(); } } } class Consumer extends Thread{ Storage storage; public Consumer(Storage storage){ this.storage = storage; } public void consume(){ this.storage.consume(); } @Override public void run() { for(;;){ consume(); } } } interface Storage{ int MAX_SIZE = 100; LinkedList<Object> list = new LinkedList<Object>(); public abstract void product(); public abstract void consume(); } //第一种方式 wait() + notify() class StorageOne implements Storage{ @Override public void product() { synchronized (list) { while(list.size() == MAX_SIZE){ try { list.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } if(list.add(new Object())){ System.out.println("生产 ### 产品数量 : " + list.size()); //通知消费者可以继续消费 list.notifyAll(); } try { TimeUnit.MILLISECONDS.sleep(100); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } @Override public void consume() { synchronized (list) { while(list.size() == 0){ try { list.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } if(list.remove() != null){ System.out.println("消费 ### 产品数量: " + list.size()); //通知生产者可以继续生产 list.notifyAll(); } try { TimeUnit.MILLISECONDS.sleep(100); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } } //第二种方式 await() + signal() class StorageTwo implements Storage{ Lock lock = new ReentrantLock(); Condition fully = lock.newCondition(); Condition empty = lock.newCondition(); @Override public void product() { lock.lock(); try{ while(list.size() == MAX_SIZE){ fully.await(); } if(list.add(new Object())){ System.out.println("生产 ### 产品数量 : " + list.size()); empty.signalAll(); } TimeUnit.MILLISECONDS.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); }finally { lock.unlock(); } } @Override public void consume() { lock.lock(); try{ while(list.size() == 0){ empty.await(); } if(list.remove() != null){ System.out.println("消费 ### 产品数量: " + list.size()); fully.signalAll(); } TimeUnit.MILLISECONDS.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); }finally{ lock.unlock(); } } } //第三种 BlockingQueue 阻塞队列 class StorageThree implements Storage{ LinkedBlockingQueue<Object> list = new LinkedBlockingQueue<Object>(MAX_SIZE); @Override public void product() { try { list.put(new Object()); System.out.println("生产 ### 产品数量 : " + list.size()); TimeUnit.MILLISECONDS.sleep(100); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } @Override public void consume() { try { if(list.take() != null){ System.out.println("消费 ### 产品数量: " + list.size()); } TimeUnit.MILLISECONDS.sleep(100); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } }