java实现生产者/消费者的三种方式

时间:2023-03-09 07:55:10
java实现生产者/消费者的三种方式
package com.wenki.thread;

import java.util.LinkedList;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class ProductAndConsume {

    public static void main(String[] args) {
        ProductAndConsume o = new ProductAndConsume();
//        Storage storage = o.new StorageOne();
//        Storage storage = o.new StorageTwo();
        Storage storage = o.new StorageThree();
        Consumer consumer1 = o.new Consumer(storage);
        Consumer consumer2 = o.new Consumer(storage);
        Producter producter1 = o.new Producter(storage);
        Producter producter2 = o.new Producter(storage);
        Producter producter3 = o.new Producter(storage);
        consumer1.start();
        consumer2.start();
        producter1.start();
        producter2.start();
        producter3.start();
    }

    class Producter extends Thread{
        Storage storage;
        public Producter(Storage storage) {
            this.storage = storage;
        }
        public void product(){
            this.storage.product();
        }
        @Override
        public void run() {
            for(;;){
                product();
            }
        }
    }

    class Consumer extends Thread{
        Storage storage;
        public Consumer(Storage storage){
            this.storage = storage;
        }
        public void consume(){
            this.storage.consume();
        }
        @Override
        public void run() {
            for(;;){
                consume();
            }
        }
    }

    interface Storage{
        int MAX_SIZE = 100;
        LinkedList<Object> list = new LinkedList<Object>();
        public abstract void product();
        public abstract void consume();
    }

    //第一种方式  wait() + notify()
    class StorageOne implements Storage{

        @Override
        public void product() {
            synchronized (list) {
                while(list.size() == MAX_SIZE){
                    try {
                        list.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                if(list.add(new Object())){
                    System.out.println("生产 ### 产品数量 : " + list.size());
                    //通知消费者可以继续消费
                    list.notifyAll();
                }
                try {
                    TimeUnit.MILLISECONDS.sleep(100);
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
        }

        @Override
        public void consume() {
            synchronized (list) {
                while(list.size() == 0){
                    try {
                        list.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                if(list.remove() != null){
                    System.out.println("消费 ### 产品数量: " + list.size());
                    //通知生产者可以继续生产
                    list.notifyAll();
                }
                try {
                    TimeUnit.MILLISECONDS.sleep(100);
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
        }

    }

    //第二种方式 await() + signal()
    class StorageTwo implements Storage{

        Lock lock = new ReentrantLock();
        Condition fully = lock.newCondition();
        Condition empty = lock.newCondition();

        @Override
        public void product() {
            lock.lock();
            try{
                while(list.size() == MAX_SIZE){
                    fully.await();
                }
                if(list.add(new Object())){
                    System.out.println("生产 ### 产品数量 : " + list.size());
                    empty.signalAll();
                }
                TimeUnit.MILLISECONDS.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }finally {
                lock.unlock();
            }

        }

        @Override
        public void consume() {
            lock.lock();
            try{
                while(list.size() == 0){
                    empty.await();
                }
                if(list.remove() != null){
                    System.out.println("消费 ### 产品数量: " + list.size());
                    fully.signalAll();
                }
                TimeUnit.MILLISECONDS.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }finally{
                lock.unlock();
            }
        }

    }

    //第三种 BlockingQueue 阻塞队列
    class StorageThree implements Storage{

        LinkedBlockingQueue<Object> list = new LinkedBlockingQueue<Object>(MAX_SIZE);

        @Override
        public void product() {
            try {
                list.put(new Object());
                System.out.println("生产 ### 产品数量 : " + list.size());
                TimeUnit.MILLISECONDS.sleep(100);
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }

        }

        @Override
        public void consume() {
            try {
                if(list.take() != null){
                    System.out.println("消费 ### 产品数量: " + list.size());
                }
                TimeUnit.MILLISECONDS.sleep(100);
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }

    }
}