java编程思想-java中的并发(一)

时间:2022-08-20 15:35:29

一、基本的线程机制

并发编程使我们可以将程序划分为多个分离的、独立运行的任务。通过使用多线程机制,这些独立任务中的每一个都将由执行线程来驱动。

线程模型为编程带来了便利,它简化了在单一程序中同时jiao'zhi'zai'yi'qi交织在一起的多个操作的处理。在使用线程时,CPU将轮流给每个任务分配其占用时间。每个人物都觉得自己在一直占用CPU,但事实上CPU时间是划分成片段分配给所有的任务。线程的一大好处就是可以使你从这个层次抽身出来,即代码不必知道他是运行在具有一个还是多个CPU的机器上。所以,使用线程机制是一种建立透明的、可扩展的程序的方法,如果程序运行的太慢,为机器增添一个CPU就能很容易的加快程序的运行速度。多任务和多线程往往是使用多处理器系统的最合理方式。

1. 定义任务

线程可以驱动任务,因此需要一种描述任务的方式,这可以由Runnable接口来提供。要想定义任务,只需实现Runable接口并编写run()方法,使得该任务可以执行你的命令。例如,下面的LiftOff任务将显示发射之前的倒计时:

public class LiftOff implements Runnable {
protected int countDown = 10;
private static int taskCount = 0;
private final int id = taskCount++; public LiftOff() {
} public LiftOff(int countDown) {
this.countDown = countDown;
} public String status() {
return "#" + id + "(" + (countDown > 0 ? countDown : "Liftoff!") + ").";
} @Override
public void run() {
while (countDown-- > 0) {
System.out.println(status());
Thread.yield();
}
}
}

标识符id可以用来区分任务的多个实例,他是final的,因为它一旦被初始化后,就不希望被修改。

任务的run()方法通常会有某种形式的循环,使得任务一直yun'xing运行下去,直到不再需要,所以要设定跳出循环的条件(有一种选择是直接从run()返回)。通常,run()被写成无限循环的形式,这就意味着,除非有某个条件使得run()终止,否则它将永远运行下去。

在run()中对静态方法Thread.yield()的调用时对线程调度器(java线程机制的一部分,可以将CPU从一个线程转移给另一个线程)的一种建议。

在下面的实例中,这个任务的run()不是由单独的线程驱动的,他是在main()中直接调用的(实际上,这里仍旧使用了线程,即总是分配给main()的那个线程):

public class MainThread {
public static void main(String[] args) {
LiftOff launch = new LiftOff();
launch.run();
}
}

2. Thread类

将Runnable对象转变为工作任务的传统方式是把它提交给一个Thread构造器,下面的示例展示了如何使用Thread来驱动LiftOff对象:

public class BasicThreads {
public static void main(String[] args){
Thread t = new Thread(new LiftOff());
t.start();
System.out.println("waiting for lifting off");
}
}

Thread构造器只需要一个Runnable对象。调用Thread对象的start()方法为该线程执行必须的初始化操作,然后调用Runnable的run()方法,以便在这个新线程中启动该任务。

3. 使用Executor

JAVA SE5的java.util.concurrent包中的执行器(Executor)将为你管理Thread对象,从而简化了并发编程。Executor在客户端和任务执行之间提供了一个间接层;与客户端直接执行任务不同,这个中介对象将执行任务。Executor允许你管理异步任务的执行,而无需显式的管理线程的生命周期。Executor在Java SE5/6中是启动任务的优选方法。

public class CachedThreadPool {
public static void main(String[] args) {
ExecutorService exec = Executors.newCachedThreadPool();
for (int i = 0; i < 5; i++) {
exec.execute(new LiftOff());
}
exec.shutdown();
}
}

非常常见的情况是,单个的Executor被用来创建和管理系统中所有的任务。

对shutdown()方法的调用可以防止新任务被提交给这个Executor,当前线程将继续运行在shutdown()被调用之前提交的所有任务。这个程序将在Executor中的所有任务完成之后尽快退出。

可以将示例中的CachedThreadPool替换为不同类型的Executor。FixedThreadPool使用了有限的线程集来执行所提交的任务:

public class FixedThreadPool {
public static void main(String[] args) {
ExecutorService exec = Executors.newFixedThreadPool(5);
for (int i = 0; i < 5; i++) {
exec.execute(new LiftOff());
}
exec.shutdown();
}
}

有了FixedThreadPool,你就可以一次性yu'xian预先执行代价高昂的线程分配,因而也就可以限制线程的数量了。这可以节省时间,因为你不用为每个任务都固定的付出创建线程的开销。在事件驱动的系统中,需要线程的事件处理器,通过直接从池中获取线程,也可以如你所愿的尽快得到任务。你不会滥用可获得的资源,因为FixedThreadPool使用的Thread对象的数量是有界的。

注意,在任何线程池中,现有线程在可能的情况下,都会被自动复用。

CachedThreadPool在程序执行过程中通常会创建与所需数量相同的线程,然后在他回收旧线程时停止创建新线程,因此它是合理的Executor的首选。只有当这种方式会引发问题时,你才需要切换到FixedThreadPool。

SingleThreadExecutor就像是线程数量为1的FixedThreadPool。这对于你希望在另一个线程中连续运行的任何事物(长期存活的任务)来说,都是很有用的,例如监听进入的套接字连接的任务。他对于希望在线程中运行的短任务也同样很方便,例如,更新本地或者远程日志的小任务,或者是事件分发线程。

如果xiang向SingleThreadExecutor提交了多个任务,那么这些任务将排队,每个任务都会在下一个任务开始之前运行结束,所有的任务将使用相同的线程。

public class SingleThreadPool {
public static void main(String[] args) {
ExecutorService exec = Executors.newSingleThreadExecutor();
for (int i = 0; i < 5; i++) {
exec.execute(new LiftOff());
}
exec.shutdown();
}
}

作为另一个示例,假如你有大量的线程,那它们运行的任务将使用文件系统。你可以用SingleThreadExecutor来运行这些线程,以确保任意时刻在任何线程中都只有唯一的任务在运行。在这种方式下,你不需要在共享资源上处理同步(同时不会过度使用文件系统)。有时更好的解决方案是在资源上同步,但是SingleThreadExecutor可以让你省去只是为了维持某些事物的原型而进行的各种协调努力。通过序列化任务,你可以消除对序列化对象的需求。

4. 从任务中产生返回值

Runable是执行工作的独立任务,但是它不返回任何值。如果你希望任务在完成时能够返回一个值,那么可以实现Callable接口而不是Runnable接口。在Java SE5中引入的Callable是一种具有类型参数的泛型,它的类型参数表示的是从方法call()(而不是run())中返回的值,并且必须使用ExecutorService.submit()方法调用他,下面是一个简单示例:

public class CallableDemo {
public static void main(String[] args) {
ExecutorService exec = Executors.newCachedThreadPool();
ArrayList<Future<String>> results = new ArrayList<>();
for (int i = 0; i < 10; i++) {
results.add(exec.submit(new TaskWithResult(i)));
} for (Future<String> fs : results) {
try {
System.out.println(fs.get());
} catch (InterruptedException e) {
System.out.println(e);
return;
} catch (ExecutionException e) {
System.out.println(e);
} finally {
exec.shutdown();
}
}
}
} class TaskWithResult implements Callable<String> {
private int id; public TaskWithResult(int id) {
this.id = id;
} public String call() {
return "result of TaskWithResult " + id;
}
}

submit()方法会产生Future对象,他用Callable返回结果的特定类型进行了参数化。你可以用isDone()方法来查询Future是否已经完成。当任务完成时,它具有一个结果,你可以调用get()方法来获取该结果。你也可以不用isDone()进行检查就直接调用get(),这种情况下,get()将阻塞,直至结果准备就绪。你还可以在试图调用get()来获取结果之前,先调用具有超时的get(),或者调用isDone()来查看任务是否完成。

5. 休眠

影响任务行为的一种简单方法是调用sleep(),这将使任务终止执行给定的时间。在LiftOff类中,要是把对yield()的调用换成是调用sleep(),将得到如下结果:

public class SleepingTask extends LiftOff {
public void run() {
try {
while (countDown-- > 0) {
System.out.println(status());
TimeUnit.MILLISECONDS.sleep(100);
}
} catch (InterruptedException e) {
System.out.println("Interrupted");
}
} public static void main(String[] args) {
ExecutorService exec = Executors.newCachedThreadPool();
for (int i = 0; i < 5; i++) {
exec.execute(new SleepingTask());
}
exec.shutdown();
}
}

对sleep()的调用可以抛出InterruptedException异常,并且你可以看到,它在run()中被捕获。因为异常不能跨线程传播回main(),所以你必须在本地处理所有在任务内部产生的异常。

你可能会注意到,这些任务是按照“完美的分布”顺序运行的,即从0到4,然后再回过头从0开始,当然这取决于你的平台。这是有意义的,因为在每个打印语句之后,每个任务都将要睡眠(即阻塞),这使得线程调度器可以切换到另一个线程,进而驱动另一个任务。但是,顺序行为依赖于底层的线程机制,这种机制在不同的操作系统之间是有差异的,因此,你不能依赖于他。如果你必须控制任务执行的顺序,那么最好的押宝就是使用同步控制,或者在某些情况下,压根不使用线程,但是要编写自己的写作例程,这些例程会按照指定的顺序在互相之间传递控制权。

6. 优先级

线程的优先级将该线程的重要性传递给了调度器。尽管CPU处理现有线程集的顺序是不确定的,但是调度器将倾向于让优先权最高的线程先执行。然而,这并不是意味着优先权较低的线程将得不到执行(也就是说,优先权不会导致死锁)。优先级较低的线程仅仅是执行的频率较低。

7. 让步

如果知道已经完成了在run()方法的循环的一次迭代过程中所需的工作,就可以给线程调度机制一个暗示:你的工作已经做得差不多了,可以让别的线程使用CPU了。这个暗示将通过调用yield()方法来作出(不过这只是一个暗示,没有任何机制保证他将被采纳)。当调用yield()时,你也是在建议具有相同优先级的其他线程可以运行。

8. 后台线程

所谓后台(daemon)线程,是指在程序运行的时候在后台提供一种通用服务的线程,并且这种线程并不属于程序中不可或缺的部分。因此,当所有的非后台线程结束时,程序也就终止了,同时会杀死进程中的所有后台线程。反过来说,只要有任何非后台线程还在运行,程序就不会终止。比如,执行main()的就是一个非后台线程。

public class SimpleDaemon implements Runnable {
@Override
public void run() {
try {
while (true) {
TimeUnit.MILLISECONDS.sleep(100);
System.out.println(Thread.currentThread() + " " + this);
}
} catch (InterruptedException e) {
System.out.println("sleep() interrupted");
}
} public static void main(String[] args) throws Exception {
for (int i = 0; i < 10; i++) {
Thread daemon = new Thread(new SimpleDaemon());
daemon.setDaemon(true);
daemon.start();
} System.out.println("All daemons started");
TimeUnit.MILLISECONDS.sleep(175);
}
}

必须在线程启动之前调用setDaemon()方法,才能把它设置为后台线程。

一旦main()完成其工作,就没什么能阻止程序终止了,因为除了后台线程外,已经没有程序在运行了。main()线程被设定为短暂睡眠,所以可以观察到所有后台线程启动后的结果。不这样的话,你就只能看见一些后台线程创建时得到的结果。

SimpleDaemons.java创建了显式的线程,以便可以设置他们的后台标识。通过编写定制的ThreadFactory可以定制由Executor创建的线程的属性(后台、优先级、名称)。

public class DaemonThreadFactory implements ThreadFactory {
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setDaemon(true);
return t;
}
}

这与普通的ThreadFactory的唯一差异就是它将后台状态全部设置为了true。你现在可以用一个新的DaemonThreadFactory作为参数传递给Executor.newCachedThreadPool():

public class DaemonFromFactory implements Runnable {
@Override
public void run() {
try {
while (true){
TimeUnit.MILLISECONDS.sleep(100);
System.out.println(Thread.currentThread() + " " + this);
}
}catch (InterruptedException e){
System.out.println("interrupted");
}
} public static void main(String[] args) throws Exception{
ExecutorService exec = Executors.newCachedThreadPool(new DaemonThreadFactory());
for (int i=0;i<10;i++){
exec.execute(new DaemonFromFactory());
}
System.out.println("All daemons started");
TimeUnit.MILLISECONDS.sleep(500);
}
}

每个静态的ExecutorService创建方法都被重载为接受一个ThreadFactory对象,而这个对象将被用来创建新的线程。

可以通过调用isDaemon()方法来确定线程是否是一个后台线程。如果是一个后台线程,那么它创建的任何线程将被自动设置成后台线程。你应该意识到后台进程在不执行finally子句的情况下就会终止其run()方法。

9. 编码的变体

你可以通过调用适当的Thread构造器为Thread对象赋予具体的名称,这个名称可以通过使用getName()从toString()中获得。

另一种可能会看到的惯用法是自管理的Runnable:

public class SelfManaged implements Runnable {
private int countDown = 5;
private Thread t = new Thread(this); public SelfManaged() {
t.start();
} public String toString() {
return Thread.currentThread().getName() + "(" + countDown + ").";
} @Override
public void run() {
while (true) {
System.out.println(this);
if (--countDown == 0) {
return;
}
}
} public static void main(String[] args) {
for (int i = 0; i < 5; i++) {
new SelfManaged();
}
}
}

这与从Thread继承并没有什么特别的差异,只是语法稍微晦涩一些。但是,实现接口使得你可以继承另一个不同的类,而从Thread继承将不行。

10. 加入一个线程

一个线程可以在其他线程之上调用join()方法,其效果是等待一段时间直到第二个线程结束才继续执行。如果某个线程在另一个线程t上调用t.join(),此线程将被挂起,直到目标线程t结束才恢复(即t.isAlive()返回为假)。

也可以在调用join()时带上一个超时参数(单位可以是毫秒,或者毫秒和纳秒),这样如果目标线程在这段时间到期时还没有结束的话,join()方法总能返回。

对join()方法的调用可以被中断,做法是在调用线程上调用interrupt()方法,这时需要用到try-catch子句。

下面这个例子演示了所有这些操作:

public class Joining {
public static void main(String[] args) {
Sleeper
sleepy = new Sleeper("Sleepy", 1500),
grumpy = new Sleeper("grumpy", 1500);
Joiner
dopey = new Joiner("Dopey", sleepy),
doc = new Joiner("Doc", grumpy);
grumpy.interrupt();
}
} class Sleeper extends Thread {
private int duration; public Sleeper(String name, int sleepTime) {
super(name);
duration = sleepTime;
start();
} public void run() {
try {
sleep(duration);
} catch (InterruptedException e) {
System.out.println(getName() + " was interrupted. " + "interrupted(): " + isInterrupted());
}
System.out.println(getName() + " has awakened");
}
} class Joiner extends Thread {
private Sleeper sleeper; public Joiner(String name, Sleeper sleeper) {
super(name);
this.sleeper = sleeper;
start();
} public void run() {
try {
sleeper.join();
} catch (InterruptedException e) {
System.out.println("interrupted.");
}
System.out.println(getName() + " join completed.");
}
}

Sleeper是一个Thread类型,它要休眠一段时间,这段时间是通过构造器传进来的参数所指定的。在run()中,sleep()方法可能在指定的时间期满时返回,但也可能被中断。在catch子句中,将根据isInterrupted()的返回值报告这个中断。然而,异常被捕获shi时将清理这个标志,所以在catch子句中,在异常被捕获时这个标志总是为假。除异常外,这个标志还可用于其他情况,比如线程可能会检查其中断状态。

注意,Java SE5的java.util.concurrent类库包含诸如CyclicBarrier这样的工具,他们可能比最初线程类库中的join()更加合适。