Java并发编程二三事

时间:2021-11-17 11:27:18

Java并发编程二三事

转自我的Github

近日重新翻了一下《Java Concurrency in Practice》故以此文记之。

我觉得Java的并发可以从下面三个点去理解:

* 编译重排序
* 内存模型
* 资源竞争

这三个概念在Java并发中是非常重要的,最好也可以阅读相关资料理解一下。

一些概念

Amdahl定律

在包含N个处理器的机器中,最高的加速比为:Sppedup<=1/(F+(1-F)/n)。当N接近无穷大,最大的加速比趋近于1/F。因此,如果程序中有50%的计算需要串行执行,那么最高的加速比只有2;如果程序中有10%的计算需要串行执行,那么最高的加速比接近10。

Happens-Before

JMM为程序中所有的操作定义了一个偏序关系,称之为Happens-Before。想要保证执行操作B的线程可以看到操作A的结果(无论A,B是 否在同一个线程中执行),那么在A和B之间必须满足Happens-Before关系。如果两个操作之间缺乏Happens-Before关系,那么 JVM可以对它们任意地重排序,这会导致不一致的运行结果。

Happens-Before的规则包括:

程序顺序规则 如果程序中A在操作B之前,那么线程中A操作将在B操作之前执行。(同一线程中)

监视器锁规则 在监视器锁上的解锁操作必须在同一个监视器锁上的加锁操作之前执行。

volatile变量规则 对volatile变量的写入操作必须在对该变量的读操作之前执行。

线程启动规则 在线程上对Thread.Start的调用必须在该线程中执行任何操作之前执行。

线程结束规则 线程中的任何操作都必须在其他线程检测到该线程已经结束之前执行,或者从Thread.join中成功返回,或者在调用Thread.isAlive时返回false。

中断规则 当一个线程在另一个线程上调用interrupt时,必须在被中断线程检测到interrupt调用之前执行(通过抛出InterruptedException,或者调用isInterrupted和interrupted)。

终结器规则 对象的构造函数必须在启动该对象的终结器之前执行完成。

传递性 如果操作A在操作B之前执行,并且操作B在操作C之前执行,那么操作A必须在操作C之前执行。

要注意的是Happens-Before规则更多的是从可见性的方面去理解,这样可以更容易的理解Happens-Before规则,以及JVM的重排序。例如程序顺序规则从可见性方面可以这样理解,在B操作执行的时候,A操作的执行结果对B操作可见;监视器锁规则可以这样理解,在同一个监视器上的解锁操作执行的时候,监视器上的加锁操作的执行结果对于解锁操作可见。

final域的安全发布

初始化安全性将确保对于被正确构造的对象,所有线程都能看到由构造函数为对象给各个final域设置的正确值,而不管采用何种方式来发布对象。 而且对于可以通过被正确构造对象中某个final域到达的任意变量(例如某个final数组的元素,活着由一个final域引用的HashMap的内容) 将同样对于其他线程是可见的。

对于含有final域的对象,初始化安全性可以防止对象的初始引用被重排序到构造过程之前。当构造函数完成时,构造函数对final域的所有写入操 作,以及对通过这些域可以到达的任何变量的写入操作,都将被“冻结”,并且任何活的该对象引用的线程都至少能确保看到被冻结的值。对于通过final域可 到达的初始变量的写入操作,将不会与构造过程后的操作一起被重排序。

初始化安全性只能保证通过final域可达的值从构造过程完成时开始的可见性。对于通过非final域可达的值,或者在构造过程完成后有可能改变的值,必须采用同步来确保可见性。

基本组件

闭锁

用于线程同步。线程可以堵塞在闭锁的await()方法上,直到countDown()操作将闭锁的计数减少到0。

     /**
* 闭锁
*/
public long countDownLatch() throws InterruptedException {
int nThreads = 5;
final CountDownLatch startGate = new CountDownLatch(1);
final CountDownLatch endGate = new CountDownLatch(nThreads); for (int i = 0; i < nThreads; i++) {
Thread t = new Thread() {
public void run() {
try {
startGate.await();
sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
endGate.countDown();
}
};
t.start();
} long start = System.nanoTime();
startGate.countDown();
endGate.await();
long end = System.nanoTime();
return end - start;
}

FutureTask

可以从新起的工作线程中获取结果,会堵塞在get()方法,直到结果返回或者线程异常终止。下面只是一个简单的实例,实际使用注意处理抛出的异常。

     /**
* FutureTask
*/
public String futureTask() throws ExecutionException, InterruptedException {
FutureTask<String> future = new FutureTask<>(() -> {
Thread.currentThread().sleep(3000);
return "Hello Future.";
}); Thread thread = new Thread(future);
thread.start();
System.out.println("future.get()");
return future.get();
}

信号量

用于线程的同步,类似PV操作。当计数减少到0的时候acquire()会堵塞,并且直到有其他线程调用release()线程释放信号量或者线程被中断。

 1   /**
* Semaphore
*/
public void semaphore() {
Semaphore sem = new Semaphore(1);
int nThread = 5;
for (int i = 0; i < nThread; i++) {
Thread t = new Thread() {
public void run() {
try {
sem.acquire();
int random = (int) (5 * Math.random());
sleep(random);
System.out.println(currentThread().getId());
sem.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
t.start();
}
}

栅栏

与闭锁类似,线程会堵塞在await()方法,直到一定数量的线程到达或者线程被中断。在一定数量的线程到达后,栅栏打开,这批线程可以从堵塞中恢复,然后栅栏再次关闭。

   /**
* 栅栏
* @return
* @throws InterruptedException
*/
public long cyclicBarrier() throws InterruptedException {
int nThreads = 25;
final CyclicBarrier barrier = new CyclicBarrier(5, new Runnable() {
@Override
public void run() {
System.out.println("Barrier pass...");
}
}); for (int i = 0; i < nThreads; i++) {
Thread t = new Thread() {
public void run() {
try {
barrier.await();
sleep(1000);
} catch (BrokenBarrierException | InterruptedException e) {
e.printStackTrace();
}
}
};
t.start();
}
}

稍微高级一点的内容

CompletionService

作为Executor的包装,主要用来在执行多线程的时候获取返回的执行结果进行处理。至于ExecutorService线程池的种类和相关配置请参考JDK文档。

   /**
* CompletionService
*/
public void completionService() {
int nThread = 5;
ExecutorService executor = Executors.newFixedThreadPool(5);
CompletionService completionService = new ExecutorCompletionService(executor); for (int i = 0; i < nThread; i++) {
completionService.submit(new Callable<String>() {
@Override
public String call() throws Exception {
if (Thread.currentThread().isInterrupted()) {
return "interrupted";
}
int st = (int)(Math.random() * 5000);
Thread.sleep(st);
return Thread.currentThread().getId() + ":" + st;
}
});
} for (int i = 0; i < nThread; i++) {
try {
Future<String> f = completionService.take();
System.out.println("Round" + i);
System.out.println(f.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
} }
}

设定执行时间的任务

下面这个例子只是说明可以这样做,但是并不建议这样处理,因为在非调用线程取消一个线程是一个不太合理的处理方式,最好是让调用者取消,这样调用者还可以进行下一步的处理。 另外这种方式看起来并不优雅。

   /**
* 取消: 即使任务不响应中断,限时运行的方法仍能够返回到他的调用者。在任务启动以后偶timedRun执行一个限时的join方法,
* 在join返回后,将检查是否有异常抛出,有的话再次抛出异常,由于Throwable在两个线程之间共享,所以设置为volatile
*/
public void timeRun(final Runnable r) throws Throwable {
ScheduledExecutorService cancelExec = Executors.newScheduledThreadPool(5);
class RethrowableTask implements Runnable {
private volatile Throwable t;
@Override
public void run() {
try {
r.run();
} catch (Throwable t) {
this.t = t;
}
}
void rethrow() throws Throwable {
if (t != null) {
throw t;
}
}
} RethrowableTask task = new RethrowableTask();
final Thread taskThread = new Thread(task);
taskThread.start();
cancelExec.schedule(() -> taskThread.interrupt(), 100000, TimeUnit.MILLISECONDS);
taskThread.join(10000);
task.rethrow();
}

下面是通过Future带时间的get()方法实现的,有时限的任务,可以对比一下,下面这个方式显然要优雅很多。

   public void betterTimeRun(Runnable r) throws Throwable {
ExecutorService executorService = Executors.newFixedThreadPool(5);
Future<?> task = executorService.submit(r);
try {
task.get(10000, TimeUnit.MILLISECONDS);
} catch (TimeoutException e) {
// 在finally中被取消
} catch (ExecutionException e) {
throw e.getCause();
} finally {
task.cancel(true);
}
}

在try…catch…finally中取消任务。

UncaughtExcption的处理

Thread的run方法是不抛出非检查异常的,所以外部的try…catch也无法捕获,有时候这会导致一些问题,比如资源没有被释放。

但是我们可以通过Thread的实例方法setUncaughtExceptionHandler去为任何一个线程设置一个 UncaughtExceptionHandler。当然也可以调用Thread类的静态方法setUncaughtExceptionHandler去 为所有线程设置一个UncaughtExceptionHandler。接口示例如下。

 class MyHandler implements Thread.UncaughtExceptionHandler {
/**
* 对于unchecked异常,可以实现UncaughtExceptionHandler接口,当一个线程由于未捕获异常而退出时,JVM会把这个事件告报给应用程序提供的
* UncaughtExceptionHandler,如果没有提供任何异常处理器,那么默认的行为是将栈追踪信息输出到System.error
*/
@Override
public void uncaughtException(Thread t, Throwable e) {
// Do something...
}
}

如果没有设置自己的Handler,那么JVM的默认行为是将栈信息输出到System.error。

保存被取消的任务

如果需要在关闭线程池的时候保存被取消的任务,那么可以扩展AbstractExcecutorService,对被取消的任务进行纪录,以便下次继续处理。

   /**
* 当关闭线程池时保存被取消的任务
*/
class TrackingExecutor extends AbstractExecutorService {
private final ExecutorService exec;
private final Set<Runnable> tasksCancelledAtShutdown = Collections.synchronizedSet(new HashSet<Runnable>()); public TrackingExecutor() {
this.exec = Executors.newCachedThreadPool();
} public List<Runnable> getCancelledTasks() {
if (!exec.isTerminated()) {
throw new IllegalStateException("...");
}
return new ArrayList<Runnable>(tasksCancelledAtShutdown);
} @Override
public void execute(Runnable command) {
exec.execute(new Runnable() {
@Override
public void run() {
try {
command.run();
} finally {
if (isShutdown() && Thread.currentThread().isInterrupted()) {
tasksCancelledAtShutdown.add(command);
}
}
}
});
}
}

可以看到,用一个同步的集合保存被取消的任务,任务的run方法在try的包围中,如果线程被中断或者关闭,那么将相应的任务加入 taskCancelledAtShutdown中,以上代码只覆写了exceute方法,其他方法可以委托给ExecutorService。

典型的工作线程结构

下面是典型的工作线程结构,注意中断监测和异常处理,并且在finally中做一些资源回收的工作。

   /**
* 典型线程池工作者线程结构
*/
public void run() {
Throwable thrown = null;
try {
while (!Thread.currentThread().isInterrupted()) {
// Run task...
}
} catch (Throwable e) {
thrown = e;
} finally {
// Exited the thread...
}
}

ReentrantLock

synchronized使用对象内置的监视器进行同步,这已经可以满足绝大多数的开发情况。如果你还需要更加灵活的锁,比如实现有时限的堵塞,或 者需要在无法获得锁的时候进行一些处理,那么你可以使用ReentrantLock。比如下面的tryLock方法,它在能够获取锁的时候返回true, 不能获取锁的时候返回false,而不会堵塞进程。

   /**
* tryLock() 如果能够获得锁即获得锁并返回true,否则返回false
*/
public void tryLock() {
final int[] data = {0};
ReentrantLock lock1 = new ReentrantLock();
ReentrantLock lock2 = new ReentrantLock(); Thread thread1 = new Thread(new Runnable() {
@Override
public void run() {
boolean tag = false;
for(int i = 0; i < 10; i++) {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
while (true) {
if (lock1.tryLock()) {
try {
if (lock2.tryLock()) {
try {
data[0]++;
System.out.println("++");
tag = true;
} finally {
lock2.unlock();
}
}
} finally {
lock1.unlock();
}
} if (tag) {
tag = false;
break;
}
}
}
}
}); Thread thread2 = new Thread(new Runnable() {
@Override
public void run() {
boolean tag = false;
for(int i = 0; i < 10; i++) {
try {
Thread.sleep(400);
} catch (InterruptedException e) {
e.printStackTrace();
}
while (true) {
if (lock2.tryLock()) {
try {
if (lock1.tryLock()) {
try {
data[0]--;
System.out.println("--");
tag = true;
} finally {
lock1.unlock();
}
}
} finally {
lock2.unlock();
}
} if (tag) {
tag = false;
break;
}
}
}
}
}); thread1.start();
thread2.start(); try {
thread1.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
thread2.join();
} catch (InterruptedException e) {
e.printStackTrace();
} System.out.println(data[0]);
}

Condition

如果有两个条件谓词在同一个同步对象上使用wait()和notify()进行资源的堵塞等待,那么即有可能会发生信号的丢失,导致死锁。对于这种 情况,要么是用notifyAll()通知等待队列中的进程,这样效率会稍微低一些;另一种方法是使用锁的Condition,

   /**
* Condition
*/
class ConditionBoundeBuffer<T> {
protected final Lock lock = new ReentrantLock(); private final Condition notFull = lock.newCondition();
private final Condition notEmpty = lock.newCondition(); private final T[] items = (T[]) new Object[3];
private int tail, head, count; public void put(T x) throws InterruptedException {
lock.lock();
try {
while (count == items.length) {
notFull.await();
}
items[tail] = x;
if (++tail == items.length)
tail = 0;
++count;
notEmpty.signal();
} finally {
lock.unlock();
}
} public T take() throws InterruptedException {
lock.lock();
try {
while (count == 0) {
notEmpty.await();
}
T x = items[head];
items[head] = null;
if (++head == items.length) {
head = 0;
}
--count;
notFull.signal();
return x;
} finally {
lock.unlock();
}
}
}