java并发编程(四)同步工具类

时间:2021-10-07 20:51:46

接上一篇《java并发编程(三)客户端加锁与组合

最近事太多,耽误了两天时间没有写博客,感觉每天不记录下自己的学习东西就想没学一样,这两天的事实在太多,不管是学业上的 还是感情上的,闹心,程序员还是把踏入爱河的事放一放吧~对了 最近接了一本《HTTP权威指南》,想深入学习下,赶紧把Java 并发访问的相关知识 总结完成就开始,另外,在某米音乐的朋友 说 java web 已经很难找工作了,现在是php的天下了,心里一颤,然后继续学习java,你懂的,好了不扯了,继续今天的内容。


java提供了很多的同步工具类,简化了编程过程,而且减少了bug,当然我们完全可以自己写同步工具类,但是 我们不是说好的 要站在巨人的肩膀上 编程的吗。


CountDownLatch

一个闭锁工具类,闭锁状态包括一个计数器,该计数器初始化一个整数,表示需要等待时间发生的数量,countDown方法递减计数器,表示一个事件发生了,await方法等待计数器为零,计数器非零,await会一直阻塞。一个闭锁的例子
public long timeTasks(int nThread, final Runnable task)
throws InterruptedException {

final CountDownLatch startGate = new CountDownLatch(1);
final CountDownLatch endGate = new CountDownLatch(nThread);

for (int i = 0; i < nThread; i++) {
new Thread(new Runnable() {

@Override
public void run() {
try {
startGate.await();

try {
task.run();
} finally {
endGate.countDown();
}

} catch (InterruptedException e) {
e.printStackTrace();
}
task.run();

}
}).start();
}

long start = System.nanoTime();
startGate.countDown();
endGate.await();
long end = System.nanoTime();

return end - start;

}

FutureTask

表示的计算是通过Callable来实现的,相当于一种可生成结果的Runnable,并且可以处于三种状态:等待运行,正在运行和运行完成。 FutureTask.get的行为取决于任务的状态。如果任务完成,那么get会立即得到结果,否则处于阻塞。
public class PreLoader {

private final FutureTask<ProductInfo> future = new FutureTask<ProductInfo>(
new Callable<ProductInfo>() {

@Override
public ProductInfo call() throws Exception {
return loaderProductInfo();
}
});

private final Thread thread = new Thread(future);

public void start() {
thread.start();
}

public ProductInfo get() throws DataLoadException, InterruptedException {
try {
return future.get();
} catch (ExecutionException e) {
Throwable cause = e.getCause();
if (cause instanceof DataLoadException)
throw (DataLoadException) cause;
else
throw launderThrowable(cause);
}
}

private static RuntimeException launderThrowable(Throwable t) {

if (t instanceof RuntimeException)
return (RuntimeException) t;
else if (t instanceof Error)
throw (Error) t;
else
throw new IllegalStateException("Not unchecked ", t);

}

ProductInfo loaderProductInfo() {
// 耗时事件
return null;
}

}

PreLoader 类创建了一个FutureTask,当程序需要ProductInfo时,可以调用get方法,如果数据已经加载,立即返回,否则将等待加载完成后返回。

Semaphore

一个计数信号量,用来控制同时访问某个特定资源的操作数,或者同时执行某个指定操作数的数量,还可以用来实现某种资源池,或者对容器施加边界。 Semaphore可以用于实现资源池,例如 数据库连接池。我们可以构建一个固定长度的资源池,当池为空时,请求资源将会失败,但你真正希望看到的行为时阻塞而不是失败,并且当池非空时接触阻塞。
public class BoundedHash<T> {

private final Set<T> set;
private final Semaphore sem;

public BoundedHash(int permits) {
set = Collections.synchronizedSet(new HashSet<T>());
sem = new Semaphore(permits);
}

public boolean add(T t) throws InterruptedException {
sem.acquire();
boolean wasAdded = false;
try {
wasAdded = set.add(t);
return wasAdded;
} finally {
if (!wasAdded)
sem.release();
}
}

public boolean remove(T t) {
boolean wasRemoved = set.remove(t);
if (wasRemoved)
sem.release();
return wasRemoved;
}

}

CyclicBarrier

栅栏用于实现一些协议,所有线程都必须到达栅栏位置,才能继续执行。(类似于赛跑)书上的一个例子
public class CellularAutomata {

private final Board mainBoard;
private final CyclicBarrier barrier;
private final Worker[] workers;

public CellularAutomata(Board board) {
this.mainBoard = board;
int count = Runtime.getRuntime().availableProcessors();
this.barrier = new CyclicBarrier(count, new Runnable() {

@Override
public void run() {

}
});
this.workers = new Worker[count];
for (int i = 0; i < count; i++) {
workers[i] = new Worker(mainBoard.getSubBoard(count, i));
}

}

private class Worker implements Runnable {
private final Board board;

public Worker(Board board) {
this.board = board;
}

@Override
public void run() {
while (!board.hasConvergeted()) {
for (int i = 0; i < board.getMaxx(); i++)
for (int j = 0; j < board.getMaxy(); j++)
board.setNewValue(i, j, computeValue(i, j));
try {
barrier.await();//等待 直到达到线程数
} catch (InterruptedException | BrokenBarrierException e) {
return;
}
}
}

}

public void start() {
for (int i = 0; i < workers.length; i++) {
new Thread(workers[i]).start();
mainBoard.waitForConvergence();
}
}

}


构造高效且可伸缩的结果缓存

创建一个 计算的接口
interface Computable<K, V> {

V compute(K arg) throws InterruptedException;

}

public class Memoizer<K, V> implements Computable<K, V> {

private final Map<K, Future<V>> cache = new ConcurrentHashMap<K, Future<V>>();
private final Computable<K, V> c;

public Memoizer(Computable<K, V> c) {
this.c = c;
}

@Override
public V compute(K arg) throws InterruptedException {
while (true) {
Future<V> f = cache.get(arg);
if (f == null) {
Callable<V> eval = new Callable<V>() {

@Override
public V call() throws Exception {
return c.compute(arg);
}
};
FutureTask<V> ft = new FutureTask<V>(eval);
f = cache.put(arg, ft);
if (f == null) {
f = ft;
ft.run();
}
}

try {
return f.get();
} catch (CancellationException e) {
cache.remove(arg, f);
} catch (ExecutionException e) {
throw new InterruptedException();
}

}
}

}

认真分析这个例子,设计的很巧妙。

基础部分总结:

  • It’s the mutable state, stupid.1
    All concurrency issues boil down to coordinating access to mutable

    state. The less mutable state, the easier it is to ensure thread safety.

  • Make fields final unless they need to be mutable.

  • Immutable objects are automatically thread-safe.

    Immutable objects simplify concurrent programming tremendously.They are simpler and safer, and can be shared freely without lockingor defensive copying.

  • Encapsulation makes it practical to manage the complexity.

    You could write a thread-safe program with all data stored in globalvariables, but why would you want to? Encapsulating data withinobjects makes it easier to preserve their invariants; encapsulatingsynchronization within objects makes it easier to comply with theirsynchronization policy.

  • Guard each mutable variable with a lock.

  • Guard all variables in an invariant with the same lock.

  • Hold locks for the duration of compound actions.

  • A program that accesses a mutable variable from multiple threadswithout synchronization is a broken program.

  • Don’t rely on clever reasoning about why you don’t need to synchro-nize.

  • Include thread safety in the design process—or explicitly documentthat your class is not thread-safe.

  • Document your synchronization policy. 



  • 可变状态是至关重要的
所有的并发问题都可以归结为如何协调对并发状态的访问。可变状态越少,就越容易确保线程安全性。
  • 尽量将域生命为final类型,除非需要它们是可变的。
  • 不可变对象一定是线程安全的。
不可变对象能极大地降低并发编程的复杂性,它们简单而且安全,可以任意共享而无须使用加锁或保护性复制等机制。
  • 封装有助于管理复杂性
在编写线程安全的程序时,虽然可以将所有数据都保存在全局变量中,但为什么这样做?将数据封装在对象中,更易于维持不变性条件:将同步机制封装在对象中,更易于遵循同步策略。
  • 用锁来保护每个可变对象。
  • 当保护同一个不变性条件中的所有变量时,要使用同一个锁。
  • 在执行符合操作期间,要持有锁。
  • 如果从多个线程中访问同一个可变变量时没有同步机制,那么程序会出现问题。
  • 不要故作聪明的推断出不需要使用同步。
  • 在设计过程中考虑线程安全,或者在文档中明确地指出不是线程安全的。
  • 将同步策略文档化。