Chapter 5 生产者消费者系列之传统实现

时间:2022-11-05 21:13:30

1 概述

线程间通信是多线程十分重要的一个知识点,Java多线程是用基于wait/notify的等待/通知模式实现的。其一个经典的案例就是“生产者消费者模式”。其中,生产者负责生产商品,消费者负责消费商品。在没有商品时,消费者必须等待生产者生产;而在已经有商品时,生产者必须等待消费者消费完才能继续消费。当然,还有“变种”模式,就是生产者可以生产商品堆积,但是堆积的数目不能超过一定的数目。可是,原理还是类似的,下面就生产商品不堆积的情况介绍一下Java多线程的实现方式。

2 最简单的模型:一生产者一消费者

生产者消费者最简单的模型就是只有一个生产者和一个消费者,Java的实现方式也很简单,参考下面的代码。其中,抽象了一个商品类即Resource类,为了方便起见,为其定义了一个属性即它的id,在生产的时候这个id随机产生(小于1000)。Resource类拥有生产和消费两个方法,这两个方法都是加synchronized关键词同步的,持有的锁都是this锁。另外,在Resource类中定义了一个标志位,用于标识商品是不是已经生产了。在调用生产方法时,判断这个标志位,如果为真就表示已经有商品,生产方法需要等待,当其被消费者消费以后唤醒,执行生产操作,将标志位置真,唤醒消费完以后执行等待的消费者。就这样,生产者和消费者“你来我往”,执行本身操作以后,切换标志位,唤醒对方,本身今日等待状态。

import java.util.Random;
 
/**
 * Created by fubinhe on 16/9/30.
 */
public class SingleProducerConsumer {
    public static void main(String[] args) {
        Resource res = new Resource();
        Producer producer = new Producer(res);
        Consumer consumer = new Consumer(res);
        Thread proThread = new Thread(producer);
        Thread conThread = new Thread(consumer);
        proThread.start();
        conThread.start();
    }
}
 
class Producer implements Runnable {
 
    private Resource res;
 
    public Producer(Resource res) {
        this.res = res;
    }
 
    @Override
    public void run() {
        while (true) {
            res.produce(new Random().nextInt(1000));
        }
    }
}
 
class Consumer implements Runnable {
 
    private Resource res;
 
    public Consumer(Resource res) {
        this.res = res;
    }
 
    @Override
    public void run() {
        while (true) {
            res.consume();
        }
    }
}
 
class Resource {
 
    private int id;
 
    private boolean hasProduced = false;
 
    public synchronized void produce(int id) {
        if (hasProduced) {
            try {
                this.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        this.id = id;
        System.out.println("produced..." + id);
        hasProduced = true;
        this.notify();
    }
 
    public synchronized void consume() {
        if (!hasProduced) {
            try {
                this.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        System.out.println("consumed......" + this.id);
        hasProduced = false;
        this.notify();
    }
}

这个程序的执行结果如下图所示,生产状态和消费状态不断的切换。

Chapter 5 生产者消费者系列之传统实现

3 多生产者和消费者

当有多个生产者和消费者的数目不再单一时,上面的模型就会出现一些问题。下面以两个生产者和两个消费者为例(因为具体数目有多少个的原理都是类似的,方便起见只选择两个),说明当出现多个生产者和消费者时,会遇到的一些问题和我们的解决方式。

3.1 执行权一直被某一方占有

我们将上面的代码稍微改动一下,分别开辟两个生产者和消费者线程。

import java.util.Random;
 
/**
 * Created by fubinhe on 16/9/30.
 */
public class MultipleProducerConsumer {
    public static void main(String[] args) {
        Resource res = new Resource();
        Producer producer = new Producer(res);
        Consumer consumer = new Consumer(res);
        Thread proThread1 = new Thread(producer);
        Thread proThread2 = new Thread(producer);
        Thread conThread1 = new Thread(consumer);
        Thread conThread2 = new Thread(consumer);
        proThread1.start();
        proThread2.start();
        conThread1.start();
        conThread2.start();
    }
}
 
class Producer implements Runnable {
 
    private Resource res;
 
    public Producer(Resource res) {
        this.res = res;
    }
 
    @Override
    public void run() {
        while (true) {
            res.produce(new Random().nextInt(1000));
        }
    }
}
 
class Consumer implements Runnable {
 
    private Resource res;
 
    public Consumer(Resource res) {
        this.res = res;
    }
 
    @Override
    public void run() {
        while (true) {
            res.consume();
        }
    }
}
 
class Resource {
 
    private int id;
 
    private boolean hasProduced = false;
 
    public synchronized void produce(int id) {
        if (hasProduced) {
            try {
                this.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        this.id = id;
        System.out.println("produced..." + id);
        hasProduced = true;
        this.notify();
    }
 
    public synchronized void consume() {
        if (!hasProduced) {
            try {
                this.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        System.out.println("consumed......" + this.id);
        hasProduced = false;
        this.notify();
    }
}

然后其他代码都不需要改动,执行以后可能会出现类似下图的结果,就是某一方会一直占有执行权,而对方没有机会执行。出现这种情况,是因为始终只有一个线程会抢到main函数中的res对象的this锁而得到执行权,例如生产者proThread1抢到,而其他三个线程都在等待状态(就是另外一个生产者线程和两个消费者线程),当前生产完了以后,就会唤醒等待的线程。而这个时候有三个线程在等待,不幸的是,执行权可能被另外一个生产者线程抢到,更不幸的是,执行权一直被这两个生产者轮流抢到,而消费者线程却一直“眼巴巴”的在旁边看着得不得执行的权力。就这样,会出现下图这样的情形。当然,也可能出现一直是消费者线程在执行的情况。

Chapter 5 生产者消费者系列之传统实现

3.2 假死

为了解决上面的那个问题,可以将上面的代码中,produce方法和consume方法最开始的地方将if判断改为while。这样的话,当某个线程被唤醒时,它会回头判断是谁唤醒它的,如果是自己一方的,就接着等待,否则才得到执行权进行下面的操作。

import java.util.Random;
 
/**
 * Created by fubinhe on 16/9/30.
 */
public class MultipleProducerConsumer {
    public static void main(String[] args) {
        Resource res = new Resource();
        Producer producer = new Producer(res);
        Consumer consumer = new Consumer(res);
        Thread proThread1 = new Thread(producer);
        Thread proThread2 = new Thread(producer);
        Thread conThread1 = new Thread(consumer);
        Thread conThread2 = new Thread(consumer);
        proThread1.start();
        proThread2.start();
        conThread1.start();
        conThread2.start();
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        Thread[] threads = new Thread[Thread.currentThread().getThreadGroup().activeCount()];
        Thread.currentThread().getThreadGroup().enumerate(threads);
        for (Thread thread : threads) {
            System.out.println(thread.getName() + ": " + thread.getState());
        }
    }
}
 
class Producer implements Runnable {
 
    private Resource res;
 
    public Producer(Resource res) {
        this.res = res;
    }
 
    @Override
    public void run() {
        while (true) {
            res.produce(new Random().nextInt(1000));
        }
    }
}
 
class Consumer implements Runnable {
 
    private Resource res;
 
    public Consumer(Resource res) {
        this.res = res;
    }
 
    @Override
    public void run() {
        while (true) {
            res.consume();
        }
    }
}
 
class Resource {
 
    private int id;
 
    private boolean hasProduced = false;
 
    public synchronized void produce(int id) {
        while (hasProduced) {
            try {
                this.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        this.id = id;
        System.out.println("produced..." + id);
        hasProduced = true;
        this.notify();
    }
 
    public synchronized void consume() {
        while (!hasProduced) {
            try {
                this.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        System.out.println("consumed......" + this.id);
        hasProduced = false;
        this.notify();
    }
}

当将if改为while时,就可以消除某一方一直执行的问题,但是会出现“假死”的危险,如下图所示。在上面代码中的main函数中,添加了对所有线程的状态判断。发现生成者和消费者现在全部是处在等待的状态。出现这样的情况,是因为,当某个线程被自己方唤醒后重新判断后继续等待,而由于它没有出synchronized修饰的方法,其持有的this锁一直不会释放,这样就出现“假死”的情况。

Chapter 5 生产者消费者系列之传统实现

3.3 终极解决方法

最终的解决方式,就是不能一次只唤醒一个,因为这个被唤醒的有可能是自己方的。那么,将notify方法换成notifyAll方法,就是唤醒时唤醒所有在等待的线程,也不分哪一方了。这样就能解决“假死”的问题。最终的代码和执行结果如下。

import java.util.Random;
 
/**
 * Created by fubinhe on 16/9/30.
 */
public class MultipleProducerConsumer {
    public static void main(String[] args) {
        Resource res = new Resource();
        Producer producer = new Producer(res);
        Consumer consumer = new Consumer(res);
        Thread proThread1 = new Thread(producer);
        Thread proThread2 = new Thread(producer);
        Thread conThread1 = new Thread(consumer);
        Thread conThread2 = new Thread(consumer);
        proThread1.start();
        proThread2.start();
        conThread1.start();
        conThread2.start();
    }
}
 
class Producer implements Runnable {
 
    private Resource res;
 
    public Producer(Resource res) {
        this.res = res;
    }
 
    @Override
    public void run() {
        while (true) {
            res.produce(new Random().nextInt(1000));
        }
    }
}
 
class Consumer implements Runnable {
 
    private Resource res;
 
    public Consumer(Resource res) {
        this.res = res;
    }
 
    @Override
    public void run() {
        while (true) {
            res.consume();
        }
    }
}
 
class Resource {
 
    private int id;
 
    private boolean hasProduced = false;
 
    public synchronized void produce(int id) {
        while (hasProduced) {
            try {
                this.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        this.id = id;
        System.out.println("produced..." + id);
        hasProduced = true;
        this.notifyAll();
    }
 
    public synchronized void consume() {
        while (!hasProduced) {
            try {
                this.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        System.out.println("consumed......" + this.id);
        hasProduced = false;
        this.notifyAll();
    }
}

Chapter 5 生产者消费者系列之传统实现

4 传统方法的缺点

通过上面的一步步剖析,可以发现传统方式实现生产者消费者模式时,唤醒线程的方式比较“费劲”。而且这种“不分你我”的唤醒方式的一个明显的缺点是,比较消耗资源,因为有些线程并唤醒后也不能执行重新进入等待状态。Java 5引入了Condition对象,通过不同的Condition对象更有针对性的唤醒等待的线程,具体可以参考下一篇blog:Chapter 6 生产者消费者之Condition实现