JAVA 并发编程学习总结

时间:2022-11-25 18:03:45

一. 相关概念

1.同步和异步

同步是指在发出一个功能调用时,在没有得到结果之前,该调用就不返回

异步是指当一个异步过程调用发出后,调用者不需要等待结果返回随时可以进行下一个请求,在后台会开启线程继续执行该任务,该任务完成后会通过状态、通知和回调来通知调用者

2.并发和并行

并发: 两个或多个事件在同一个时间段内发生

JAVA 并发编程学习总结

并行:两个或多个事件在同一时刻发生

JAVA 并发编程学习总结

3.临界区和临界资源

临界资源: 一次仅允许一个线程使用的共享资源
临界区: 每个线程中访问临资源的那段程序称为临界区,每次只允许一个线程进入临界区

4.阻塞和非阻塞

阻塞: 在调用结果返回之前,当前线程会被挂起。函数只有在得到结果只会才会返回

非阻塞: 在不能立刻得到结果之前 该函数不会阻塞当前线程而是会立即返回

5.死锁、活锁和饥饿

死锁: 两个或两个线程在执行过程中,由于竞争资源造成的阻塞的过程。没有外力推动的情况下无法进行下去。
产生死锁有四个必要条件:

  • (1) 互斥条件:一个资源每次只能被一个进程使用。
  • (2) 请求与保持条件:一个进程因请求资源而阻塞时,对已获得的资源保持不放。
  • (3) 不剥夺条件:进程已获得的资源,在末使用完之前,不能强行剥夺。
  • (4) 循环等待条件:若干进程之间形成一种头尾相接的循环等待资源关系。

举一个很常见的例子,有两个线程ThreadA和ThreadB。ThreadA先获取变量A的锁,再获取变量B的锁;而ThreadB先获取变量B的锁再获取变量A的锁,由于线程的调度是随机的,那么有可能ThreadA先获取了A的锁,而此时ThreadB获取了变量B的锁,ThreadA阻塞等待ThreadB释放B锁,而ThreadB又阻塞等待ThreadA释放A锁,两个线程陷入无限期的等待,也就是死锁

package com.hqq.day25.concurrency;

/**
* DeadLock
* 死锁Demo
* Created by heqianqian on 2017/8/12.
*/

public class DeadLock {
private static final Object A = new Object();
private static final Object B = new Object();

public static void main(String[] args) {
new ThreadA().start();
new ThreadB().start();
}

private static class ThreadA extends Thread {

@Override
public void run() {
try {
System.out.println("Thread A Try to Lock A");
synchronized (A) {
System.out.println("Thread A Locked A");
System.out.println("Thread A Try to Lock B...");
Thread.sleep(1000);
synchronized (B) {
System.out.println("Thread A Locked B");
}
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

private static class ThreadB extends Thread {

@Override
public void run() {
try {
System.out.println("Thread B Try to Lock A");
synchronized (B) {
System.out.println("Thread B Locked A");
System.out.println("Thread B Try to Lock B...");
Thread.sleep(1000);
synchronized (A) {
System.out.println("Thread B Locked B");
}
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

}

运行结果

Thread A Try to Lock A
Thread A Locked A
Thread A Try to Lock B...
Thread B Try to Lock A
Thread B Locked A
Thread B Try to Lock B...

两个线程都陷入了等待……

相对来说死锁还是比较容易判断的,而另一种不容易发现的就是活锁了

活锁:指事物1可以使用资源,但它让其他事物先使用资源;事物2可以使用资源,但它也让其他事物先使用资源,于是两者一直谦让,都无法使用资源。

因此避免活锁的简单方法是采用先来先服务的策略。当多个事务请求*同一数据对象时,*子系统按请求*的先后次序对事务排队,数据对象上的锁一旦释放就批准申请队列中第一个事务获得锁。

饥饿如果事务T1*了数据R,事务T2又请求*R,于是T2等待。T3也请求*R,当T1释放了R上的*后,系统首先批准了T3的请求,T2仍然等待。然后T4又请求*R,当T3释放了R上的*之后,系统又批准了T4的请求……T2可能永远等待,这就是饥饿。

二. 并发级别

JAVA 并发编程学习总结

1.阻塞并发 Blocking algoithms

是并发级别最低的同步算法 同一时刻只能一个线程访问临界区资源

2.无阻塞并发 Obstruction-freedom

是指在任何时间点,一个孤立运行线程的每一个操作可以在有限步内结束。只要没有竞争,线程可以持续运行。一旦共享数据被修改,就会终止已完成的部分操作并进行回滚

3.无锁并发 Lock-freedom

Lock-freedom是指整个系统作为一个整体一直运行下去,系统内部单个线程某段时间内可能饥饿,因此无锁并发保证每次都有一个线程胜出,不会进入无限期的等待。可以用CAS实现

4.无等待并发 Wait-freedom

Wait-freedom是指每一个线程都一直运行下去而无需等待外部条件。整个流程操作都在一个有限步的步骤内完成。该级别是最高的并发级别,没有任何阻塞

三. 并行的两个定律

1.Amdahl定律

定义了串行系统并行化后加速比的计算公式和上限

加速比=优化前耗时/优化后耗时

举个例子

JAVA 并发编程学习总结

加速比=优化前系统耗时/优化后系统耗时=500/400=1.25

JAVA 并发编程学习总结

上图是加速比和处理器个数的关系

2.Gustafson定律

执行时间=a[串行时间]+b[并行时间]
总执行时间=a+n*b[n为处理器个数]
加速比=(a+n*b)/(a+b)
串行比例 F = a / (a+b)

四. 线程和进程

进程(Process):

  • 具有一定独立功能的程序
  • 关于某个数据集合上的一次运行活动
  • 系统进行资源分配和调度的一个独立单位.

线程(Thread):

  • 进程的一个实体
  • 是CPU调度和分派的基本单位
  • 比进程更小的能独立运行的基本单位
  • 线程自己基本上不拥有系统资源,只拥有一点在运行中必不可少的资源(如程序计数器,一组寄存器和栈),但是它可与同属一个进程的其他的线程共享进程所拥有的全部资源.

二者的关系:

一个线程可以创建和撤销另一个线程;同一个进程中的多个线程之间可以并发执行.
相对进程而言,线程是一个更加接近于执行体的概念,它可以与同进程中的其他线程共享数据,但拥有自己的栈空间,拥有独立的执行序列。

区别:

进程和线程的主要差别在于它们是不同的操作系统资源管理方式。

进程有独立的地址空间,一个进程崩溃后,在保护模式下不会对其它进程产生影响,而线程只是一个进程中的不同执行路径。线程有自己的堆栈和局部变量,但线程之间没有单独的地址空间,一个线程死掉就等于整个进程死掉,所以多进程的程序要比多线程的程序健壮,但在进程切换时,耗费资源较大,效率要差一些。但对于一些要求同时进行并且又要共享某些变量的并发操作,只能用线程,不能用进程。

1) 简而言之,一个程序至少有一个进程,一个进程至少有一个线程.
2) 线程的划分尺度小于进程,使得多线程程序的并发性高。
3) 另外,进程在执行过程中拥有独立的内存单元,而多个线程共享内存,从而极大地提高了程序的运行效率。
4) 线程在执行过程中与进程还是有区别的。每个独立的线程有一个程序运行的入口、顺序执行序列和程序的出口。但是线程不能够独立执行,必须依存在应用程序中,由应用程序提供多个线程执行控制。
5) 从逻辑角度来看,多线程的意义在于一个应用程序中,有多个执行部分可以同时执行。但操作系统并没有将多个线程看做多个独立的应用,来实现进程的调度和管理以及资源分配。这就是进程和线程的重要区别。

优缺点

线程和进程在使用上各有优缺点:
线程执行开销小,但不利于资源的管理和保护;而进程正相反。
同时,线程适合于在SMP机器上运行,而进程则可以跨机器迁移。

1.线程中断 //TODO 待补充

使用interrupt()方法中断线程只是打了一个停止的标记 并不是真正停止线程
其他方法:
boolean isInterrupted(): 判断线程是否被中断
boolean interrupted(): 判断是否被中断 并清除当前中断状态

2.yield()和join()方法

yield()方法:

  /**
* A hint to the scheduler that the current thread is willing to yield
* its current use of a processor. The scheduler is free to ignore this
* hint.
*
* <p> Yield is a heuristic attempt to improve relative progression
* between threads that would otherwise over-utilise a CPU. Its use
* should be combined with detailed profiling and benchmarking to
* ensure that it actually has the desired effect.
*
* <p> It is rarely appropriate to use this method. It may be useful
* for debugging or testing purposes, where it may help to reproduce
* bugs due to race conditions. It may also be useful when designing
* concurrency control constructs such as the ones in the
* {@link java.util.concurrent.locks} package.
*/
public static native void yield();

yield()是一个静态的本地方法,表示的的那个线程愿意放弃CPU的使用权并且和其他线程一起竞争CPU
因此和sleep()方法的区别就是 调用yield()的线程还是有可能获得CPU的使用权的

join()方法:

/**
* Waits at most {@code millis} milliseconds for this thread to
* die. A timeout of {@code 0} means to wait forever.
*
* <p> This implementation uses a loop of {@code this.wait} calls
* conditioned on {@code this.isAlive}. As a thread terminates the
* {@code this.notifyAll} method is invoked. It is recommended that
* applications not use {@code wait}, {@code notify}, or
* {@code notifyAll} on {@code Thread} instances.
*
* @param millis
* the time to wait in milliseconds
*
* @throws IllegalArgumentException
* if the value of {@code millis} is negative
*
* @throws InterruptedException
* if any thread has interrupted the current thread. The
* <i>interrupted status</i> of the current thread is
* cleared when this exception is thrown.
*/
public final synchronized void join(long millis)
throws InterruptedException {
long base = System.currentTimeMillis();
long now = 0;

if (millis < 0) {
throw new IllegalArgumentException("timeout value is negative");
}

if (millis == 0) {
while (isAlive()) {
wait(0);
}
} else {
while (isAlive()) {
long delay = millis - now;
if (delay <= 0) {
break;
}
wait(delay);
now = System.currentTimeMillis() - base;
}
}
}

join()方法是用途是等待当前线程运行结束。
查看源码知道,默认情况下即没有设置join()的时间时 只要当前还有运行的线程 则等待该线程运行结束后再运行。
此处只有wait() 而我们并没有看到notify() 原因是每个线程结束后JVM会自动调用notifyAll()

while (isAlive()) {
wait(0);
}

例子: 如果使Main方法在前面四个线程顺序执行完再执行?
可以使用join()简单实现

package com.hqq.day25.concurrency.common;

/**
* JoinDemo2
* Created by heqianqian on 2017/8/12.
*/

public class JoinDemo2 {

public static void main(String[] args) throws InterruptedException {
Runnable runnable = new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " running....");
}
};
Thread thread1 = new Thread(runnable);
thread1.setName("Thread1");
Thread thread2 = new Thread(runnable);
thread2.setName("Thread2");
Thread thread3 = new Thread(runnable);
thread3.setName("Thread3");

thread1.start();
thread2.start();
thread3.start();

thread1.join();
thread2.join();
thread3.join();

System.out.println("Main Ended!");
}
}

运行结果

Thread1 running....
Thread2 running....
Thread3 running....
Main Ended!

join()和sleep()的区别

join()内部是使用wait()来实现的 因此也就等同于wait()和sleep()的区别,也就是wait()是会释放锁进入等待池等待被唤醒,而sleep()不会释放锁只是睡眠到一定时间又继续执行

守护线程

守护线程用来在后台完成一些系统服务 当所有非守护线程运行结束 守护线程也会终止

使用setDeamon(boolean) 设置当前线程是否是守护线程

五. 内存模型和线程安全

1. 原子性

  • 一个操作是不可中断的 即使在多个线程一起执行时 一个操作一旦开始 也会是不可中断的

2. 有序性

  • 指令的执行顺序和编写的源代码的顺序相同
    实际在执行程序时为了提高性能 编译器和处理器会对指令做重排序

重排序分三种类型:

  1. 编译器优化的重排序:
    编译器在不改变单线程序语义的前提下 可以重新安排语句的执行顺序
  2. 指令级并行的重排序:
    将多条执行重叠执行 如果不存在数据依赖性 处理器可以改变语句对应机器指令的执行顺序
  3. 内存系统的重排序:
    由于处理器使用缓存和读/写缓冲区,这使得加载和存储操作看上去可能是在乱序执行

3.可见性

  • 当一个线程修改了一个共享变量的值 其他线程可以立即知道这个修改

4.Happens-Before

满足一下原则:

  • a) 程序顺序原则: 一个线程内保证语义的串行性
  • b) volatile原则: volatile变量的写优先于读
  • c ) 锁原则: 对于一个监视器的解锁,happens-before于随后对这个监视器的加锁
  • d) 传递性原则: A优先于B B优先于C 可以得出A优先于C
  • 线程的start()优先于它的每一个操作
  • 线程的所有操作优先于线程的终结join()
  • 对象的构造函数优先于finalize()函数

5.五种实现同步[通信]的机制

  • wait()/notify() 方法
  • await()/signal()方法
  • BlockingQueue阻塞队列方法
  • Semaphore信号量
  • PipedInputStream和PipedOutputStream管道通信的方法

1.wait()/notify() 方法

package com.hqq.day21.communication.wait_notify.alternate;

/**
* Alternate
* 交替运行线程
* Created by heqianqian on 2017/8/7.
*/

public class Alternate {

private volatile boolean isSolid;

public synchronized void drawSolid() {
try {
for (int i = 0; i < 5; i++) {
while (!isSolid) {
this.wait();
}
for (int j = 0; j < 5; j++) {
System.out.println("★★★★★");
}
isSolid = false;
this.notifyAll();
}
} catch (Exception e) {
e.printStackTrace();
}
}

public synchronized void solidHollow() {
try {
for (int j = 0; j < 5; j++) {
while (isSolid) {
this.wait();
}
for (int i = 0; i < 5; i++) {
System.out.println("☆☆☆☆☆");
}
isSolid = true;
this.notifyAll();
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
package com.hqq.day21.communication.wait_notify.alternate;

/**
* App
* Created by heqianqian on 2017/8/7.
*/

public class App {

public static void main(String[] args) {
Alternate alternate = new Alternate();
new Thread(alternate::drawSolid).start();
new Thread(alternate::solidHollow).start();
}

}

运行结果

☆☆☆☆☆
☆☆☆☆☆
★★★★★
★★★★★
☆☆☆☆☆
☆☆☆☆☆
★★★★★
★★★★★

2.await()/signal()方法

await()/signal()方法和wait()/notify() 方法的区别:

  1. wait()/notify() 方法只能在synchronized同步代码块中使用,而await()/signal()方法一般是结合Lock使用
  2. wait()/notify() 是Object类的方法 所有类都有 而await()/signal()方法只有部分类采用 比如Condition

3.BlockingQueue阻塞队列

BlockingQueue内部是使用await()/signal()来实现的 用于阻塞的方法是put()和take()方法

当使用put()添加元素时 如果发现当前队列元素已经是最大时自动阻塞
当使用take()获取元素时 如果发现当前队列为空 会自动阻塞

package com.hqq.day15.blocking_queue;

import java.util.concurrent.BlockingQueue;

/**
* Producer
* 功能:从blockingqueue中放入数据
* Created by heqianqian on 2017/7/26.
*/

public class Producer<T> implements Runnable {

private BlockingQueue<T> blockingQueue;

public Producer(BlockingQueue<T> blockingQueue) {
this.blockingQueue = blockingQueue;
}

@Override
@SuppressWarnings("unchecked")
public void run() {
try {
blockingQueue.put((T) Integer.valueOf(1));
Thread.sleep(1000);
blockingQueue.put((T) Integer.valueOf(2));
Thread.sleep(1000);
blockingQueue.put((T) Integer.valueOf(3));
} catch (InterruptedException e) {
e.printStackTrace();
}

}
}
package com.hqq.day15.blocking_queue;

import java.util.concurrent.BlockingQueue;

/**
* Customer
* 功能:从blockingqueue中取数据
* Created by heqianqian on 2017/7/26.
*/

public class Customer<T> implements Runnable {

private BlockingQueue<T> blockingQueue;

public Customer(BlockingQueue<T> blockingQueue) {
this.blockingQueue = blockingQueue;
}

@Override
public void run() {
try {
System.out.println(blockingQueue.take());
System.out.println(blockingQueue.take());
System.out.println(blockingQueue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
package com.hqq.day15.blocking_queue;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

/**
* BlockingQueueExample
* Created by heqianqian on 2017/7/26.
*/

public class BlockingQueueExample {

public static void main(String[] args) {
BlockingQueue<Integer> blockingQueue = new ArrayBlockingQueue<>(1024);

Producer<Integer> producer = new Producer<>(blockingQueue);
Customer<Integer> customer = new Customer<>(blockingQueue);

new Thread(producer).start();
new Thread(customer).start();
}

}

运行结果:

1
2
3

4. Semaphore 信号量

概念就不赘述了 这里说一下Semaphore和互斥量Mutex的区别,一般来说我们说互斥量是二元信号量 也就是Semaphore阈值为1的情况 但是二者还是存在一定的区别:

信号量在整个系统可以被任意线程获取并释放 同一个信号量可以被系统中的其他线程释放 而互斥量则要求哪个线程获取的就由哪个线程释放

例子:

package com.hqq.day15.semaphore;

import java.util.concurrent.Semaphore;

/**
* CountLetterRunnable
* 使用Semaphore进行线程间的通信
* Created by heqianqian on 2017/7/27.
*/

public class CountLetterRunnable implements Runnable {

private Semaphore semaphore;

private int times = 0;

public CountLetterRunnable(Semaphore semaphore) {
this.semaphore = semaphore;
}

@Override
public void run() {
try {
while (times < 50) {
semaphore.acquire();
for (int i = 0; i < 100; i++) {
System.out.println(Thread.currentThread().getName() + " print " + i+" for "+ times+" times");
}
semaphore.release();
times++;
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
package com.hqq.day15.semaphore;

import java.util.concurrent.Semaphore;

/**
* CountNumberRunnable
* 使用Semaphore进行线程间的通信
* Created by heqianqian on 2017/7/27.
*/

public class CountNumberRunnable implements Runnable {


private Semaphore semaphore;

private int times = 0;

public CountNumberRunnable(Semaphore semaphore) {
this.semaphore = semaphore;
}

@Override
public void run() {
try {
while (times < 50) {
semaphore.acquire();
for (int i = 'a'; i < 'z'; i++) {
System.out.println(Thread.currentThread().getName() + " print " + (char) i + " for " + times + " times");
}
semaphore.release();
times++;
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
package com.hqq.day15.semaphore;

import java.util.concurrent.Semaphore;

/**
* SemaphoreRunnable
* Created by heqianqian on 2017/7/27.
*/

public class SemaphoreRunnable implements Runnable {

private Semaphore semaphore;

public SemaphoreRunnable(Semaphore semaphore) {
this.semaphore = semaphore;
}

@Override
public void run() {
try {
semaphore.acquire();
System.out.println(Thread.currentThread().getName() + " executed!");
Thread.sleep(2000);
semaphore.release();
} catch (InterruptedException e) {
e.printStackTrace();
}

}
}
package com.hqq.day15.semaphore;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Semaphore;

/**
* SemaphoreExample
* Created by heqianqian on 2017/7/27.
*/

public class SemaphoreExample {

public static void main(String[] args) {
Semaphore semaphore = new Semaphore(1);

//SemaphoreRunnable s1 = new SemaphoreRunnable(semaphore);
//SemaphoreRunnable s2 = new SemaphoreRunnable(semaphore);
//
//new Thread(s1).start();
//new Thread(s2).start();

CountNumberRunnable num = new CountNumberRunnable(semaphore);
CountLetterRunnable letter = new CountLetterRunnable(semaphore);

new Thread(num).start();
new Thread(letter).start();
}
}

5.管道通信

管道通信分为 PipedInputStream和PipedOutputStream 和PipedReader和PipedWriter两组 这里就只举PipedReader和PipedWriter的例子

package com.hqq.day21.communication.pipe.character;

import java.io.IOException;
import java.io.PipedReader;

/**
* ReaderThread
* 使用字符管道读取数据
* Created by heqianqian on 2017/8/7.
*/

public class ReaderThread extends Thread {

private PipedReader reader = new PipedReader();

public ReaderThread(PipedReader reader) {
this.reader = reader;
}

public void readData() {
char[] chars = new char[1024];
try {
reader.read(chars);
String data = new String(chars);
System.out.println(Thread.currentThread().getName() + " Read[" + data + "]");
reader.close();
} catch (IOException e) {
e.printStackTrace();
}
}

@Override
public void run() {
readData();
}
}
package com.hqq.day21.communication.pipe.character;

import java.io.IOException;
import java.io.PipedWriter;

/**
* WriterThread
* 使用字符管道写数据
* Created by heqianqian on 2017/8/7.
*/

public class WriterThread extends Thread {

private PipedWriter pipedWriter;

public WriterThread(PipedWriter pipedWriter) {
this.pipedWriter = pipedWriter;
}

public void writeData() {
String data = "你好";
try {
pipedWriter.write(data);
System.out.println(Thread.currentThread().getName() + " Write [" + data + "]");
pipedWriter.close();
} catch (IOException e) {
e.printStackTrace();
}
}

@Override
public void run() {
writeData();
}
}
package com.hqq.day21.communication.pipe;

import com.hqq.day21.communication.pipe.character.ReaderThread;
import com.hqq.day21.communication.pipe.character.WriterThread;
import com.hqq.day21.communication.pipe.stream.ReadStreamThread;
import com.hqq.day21.communication.pipe.stream.WriteStreamThread;

import java.io.*;

/**
* App
* Created by heqianqian on 2017/8/7.
*/

public class App {
public static void main(String[] args) throws IOException, InterruptedException {
//Byte Stream
PipedOutputStream pos = new PipedOutputStream();
PipedInputStream pis = new PipedInputStream();

pis.connect(pos);

ReadStreamThread readStreamThread = new ReadStreamThread(pis);
WriteStreamThread writeStreamThread = new WriteStreamThread(pos);

writeStreamThread.start();
readStreamThread.start();

//Character Stream
PipedReader reader = new PipedReader();
PipedWriter writer = new PipedWriter();
reader.connect(writer);

ReaderThread readerThread = new ReaderThread(reader);
WriterThread writerThread = new WriterThread(writer);

writerThread.start();
readerThread.start();
}
}

运行结果

Thread-3 Write [你好]
Thread-2 Read[你好]

6.读写锁问题

共享数据时要求:

  • 允许多个读操作同时进行
  • 读操作和写操作不可同时进行
  • 写操作和写操作不可同时进行

读写锁ReentrantReadWriteLock

具有特性:
a) 可重入性:内部的WriteLock可以获取ReadLock 反之不成立
b) 可降级:WriteLock可以降级成ReadLock 反之不成立
c) WriteLock支持Condition 而ReadLock不支持 使用时会抛出UnSupportedOperationException

六. 无锁类

实现机制CAS(CompareAndSet) 使用CPU指令原语cmpXchg实现

  • 1.AtomicInteger
  • 2.Unsafe
  • 3.AtomicReference
    模板类 封装任意类型数据
  • 4.AtomicStampedReference
    内部Pair封装了值和时间戳 用时间戳来标识每次改变
  • 5.AtomicIntegerArray
    内部封装了整型数组
  • 6.AtomicIntegerFieldUpdater
    作用:让普通变量也具有原子性

七. 并发包

//TODO 待补充 TAT

1.ReentrantLock 完全互斥锁

使用Condition实现等待通知[Condition 是jdk5出现的技术 可以实现多路通知功能]

[多路通知:在一个Lock对象里可以创建多个Condition(对象监视器)实例 线程对象可以注册在指定的Condition中 从而可以有选择的进行线程通知 在调度线程上更加灵活]

wait/notify调度的线程是由JVM随机通知的 而ReentrantLock+Condition可以有选择性的通知

  • Object的wait()方法相当于Condition的await()方法
  • Object的wait(long)方法相当于Condition的await(long,TimeUnit)方- 法
  • Object的notify()方法相当于Condition的signal()方法
  • Object的notifyAll()方法相当于Condition的signalAll()方法

公平锁和非公平锁;

  • 公平锁:线程获取锁的顺序是按照加锁的顺序来分配的[FIFO]
  • 非公平锁:线程获取锁的顺序是抢占机制,随机获得锁的

其他方法:

  • getHoldCount():查询当前线程保持此锁定的个数 调用lock()方法的次数
  • getQueueLength():返回正等待获取此锁定的线程估计数
  • getWaitQueueLength(Condition):返回等待与此锁定相关给定条件Condition的线程估计数
    比如有5个线程 每个线程都调用了Condition的await() 那么getWaitQueueLength()返回的就是5
  • boolean hasQueuedThread(Thread):查询指定的线程是否正在等待获取此锁定
  • boolean hasQueuedThreads():查询是否有线程正在等待此锁定
  • boolean hasWaiters(Condition):查询是否有线程正在等待和此锁定关的condition的条件
  • isFair():公平锁 [ReentrantLock默认是非公平锁]
  • isHeldByCurrentThread():查询当前线程是否保持此锁定
  • isLocked():查询此锁定是否由任意线程保持
  • void lockInterruptibly():如果当前线程未被中断.则获取锁定.如果已经被中断则抛出异常
  • void tryLock():仅在调用时锁定未被另一个线程保持的情况下才获取该锁定
  • void tryLock(Long,TimeUnit):如果锁定在给定的等待时间内没有被另一个线程保持,且当前线程未被中断.则获取该锁定
  • void awaitUninterruptibly():线程等待的时候可以不被打断
  • void awaitUntil():线程在等待时间到达之前 可以被其他线程提前唤醒

[ReentrantReadWriteLock]:读写互斥锁 读和读之间不互斥 提高代码运行速度
和读操作有关的锁:共享锁 和写操作有关的锁:排他锁

具有特性:

a) 可重入性
b) 可中断性
c) 可限时性
d) 公平性

2.Timer 定时器

可以用于安卓中的轮询动画

作用:主要负责计划任务的功能[在指定的时间开始执行某一个任务]
主要负责设置计划任务 封装任务的类是TimerTask类
Timer:

  • schedule(TimerTsk,Date):在指定的日期执行一次某任务
  • schedule(TimerTsk,long):延迟long毫秒后执行任务
  • schedule(TimerTsk,long,long):long):毫秒之后按照指定间隔无线循环的执行某一任务
  • cancel():将任务队列中的全部队列清除
  • scheduleAtFixedRate(TimerTask,Date,long):和schedule的区别只在于不延迟的情况

不延迟的情况下:
schedule:如果执行任务的时间没有被延迟,下一次任务的执行时间参考的是上一次任务[开始]时间
scheduleAtFixedRate:如果执行任务的时间没有被延迟,下一次任务的执行时间参考的是上一次任务[结束]时间

追赶执行性

schedule:不具有 不执行
scheduleAtFixedRate:具有 追赶执行

TimerTask:

cancel():把自身从任务队列中清除

package com.hqq.day21.timer;

import java.text.SimpleDateFormat;
import java.util.Timer;
import java.util.Date;
import java.util.TimerTask;

/**
* TimerExample
* Created by heqianqian on 2017/8/8.
*/

public class TimerExample {

private static Timer timer = new Timer(false);//设置为守护进程后 task内的任务也不再执行

public static void main(String[] args) throws InterruptedException {
System.out.println("当前时间:"+System.currentTimeMillis());
TimerTask timerTask1 = new TimerTask() {
@Override
public void run() {
System.out.println("运行了,时间为" + System.currentTimeMillis());
}
};
TimerTask timerTask2 = new TimerTask() {
@Override
public void run() {
System.out.println("我也运行了,时间为" + System.currentTimeMillis());
}
};
TimerTask recycleTask = new TimerTask() {
@Override
public void run() {
System.out.println("间隔周期执行:"+System.currentTimeMillis());
}
};
TimerTask cancelTask = new TimerTask() {
@Override
public void run() {
System.out.println("任务执行!");
this.cancel();
}
};
//timer.schedule(timerTask1, 2000);//delay大于0 延迟delay执行 如果delay的时间早于当前时间 则立即执行
//timer.schedule(timerTask2,100);
//间隔周期执行
//timer.schedule(recycleTask,1000,1000);
//测试TimerTask的cancel()方法 把自身从任务队列中清除
timer.schedule(cancelTask,1000,1000);
}
}

运行结果:

当前时间:1502584381969
任务执行!

3.CountDownLatch 倒数计时器

计数器的初始值为线程的数量,每当一个线程完成了自己的任务,计数器的值就会减1,当计数器的值达到0时,它表示所有的线程都已经完成了任务 然后在闭锁上等待的线程就可以恢复执行任务。

例子:

package com.hqq.day15.countdown_latch;

import java.util.concurrent.CountDownLatch;

/**
* Decrementer
* Created by heqianqian on 2017/7/26.
*/

public class Decrementer implements Runnable {

private CountDownLatch countDownLatch;

public Decrementer(CountDownLatch countDownLatch) {
this.countDownLatch = countDownLatch;
}

@Override
public void run() {
try {
countDownLatch.countDown();
System.out.println("Count Down");
Thread.sleep(1000);
countDownLatch.countDown();
System.out.println("Count Down");
Thread.sleep(1000);
countDownLatch.countDown();
System.out.println("Count Down");
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
package com.hqq.day15.countdown_latch;

import java.util.concurrent.CountDownLatch;

/**
* Waiter
* Created by heqianqian on 2017/7/26.
*/

public class Waiter implements Runnable{

private CountDownLatch countDownLatch;

public Waiter(CountDownLatch countDownLatch) {
this.countDownLatch = countDownLatch;
}

@Override
public void run() {
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("CountDownLatch Finish!");
}
}
package com.hqq.day15.countdown_latch;

import java.util.concurrent.CountDownLatch;

/**
* CountDownLatchDemo
* Created by heqianqian on 2017/7/26.
*/

public class CountDownLatchDemo {

public static void main(String[] args) {
CountDownLatch countDownLatch = new CountDownLatch(3);

Waiter waiter = new Waiter(countDownLatch);
Decrementer decrementer = new Decrementer(countDownLatch);

new Thread(waiter).start();
new Thread(decrementer).start();
}

}

运行结果:

Count Down
Count Down
Count Down
CountDownLatch Finish!

4. CyclicBarrier

初始时规定一个数目,然后计算调用CyclicBarrier.await()进入等待的线程数,当线程数达到了这个数目 所有等待的线程被唤醒并继续

所有的线程必须到齐后才可以一起继续运行

初始时可以带一个Runnable参数,在线程数达到该数目其他线程被唤醒之前至执行。

例子:

package com.hqq.day15.cyclic_barrier;

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

/**
* CyclicBarrierRunnable
* Created by heqianqian on 2017/7/27.
*/

public class CyclicBarrierRunnable implements Runnable {

private CyclicBarrier cyclicBarrier1;
private CyclicBarrier cyclicBarrier2;

public CyclicBarrierRunnable(CyclicBarrier cyclicBarrier1, CyclicBarrier cyclicBarrier2) {
this.cyclicBarrier1 = cyclicBarrier1;
this.cyclicBarrier2 = cyclicBarrier2;
}

@Override
public void run() {
try {
Thread.sleep(1000);
System.out.println(Thread.currentThread().getName() + " awaiting at barrier1");
cyclicBarrier1.await();

Thread.sleep(1000);
System.out.println(Thread.currentThread().getName() + " awaiting at barrier2");
cyclicBarrier2.await();

Thread.sleep(1000);
System.out.println(Thread.currentThread().getName() + " done!");
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}
}
package com.hqq.day15.cyclic_barrier;

import java.util.concurrent.CyclicBarrier;

/**
* CyclicBarrierExample
* Created by heqianqian on 2017/7/27.
*/

public class CyclicBarrierExample {

public static void main(String[] args) {
Runnable r1 = new Runnable() {
@Override
public void run() {
System.out.println("Barrier1 executed!");
}
};
Runnable r2 = new Runnable() {
@Override
public void run() {
System.out.println("Barrier2 executed!");
}
};

CyclicBarrier c1 = new CyclicBarrier(2,r1);
CyclicBarrier c2 = new CyclicBarrier(2,r2);

CyclicBarrierRunnable cbr1 = new CyclicBarrierRunnable(c1,c2);
CyclicBarrierRunnable cbr2 = new CyclicBarrierRunnable(c1,c2);

new Thread(cbr1).start();
new Thread(cbr2).start();
}

}

运行结果:

Thread-0 awaiting at barrier1
Thread-1 awaiting at barrier1
Barrier1 executed!
Thread-0 awaiting at barrier2
Thread-1 awaiting at barrier2
Barrier2 executed!
Thread-1 done!
Thread-0 done!

和CountDownLatch的作用类似 区别是CountDownLatch倒数到0之后不可以继续使用 而CyclicBarrier可以继续使用

八. 线程池

//TODO 待补充 QAQ
1. 线程池的种类

  • new FixedThreadPool: 固定数量的线程池
  • new SingleThreadExecutor:单一线程池
  • new CachedThreadPool:缓存线程池 根据需求改变大小
  • new ScheduledThreadPool:任务调度线程池

2.Fork/Join 分治思想

fork/join 类似 Map/Reduce算法

区别是fork/join只有的必要的时候才把任务分割成一个个的小任务,而map/reduce总是在一开始就执行第一步分割。因此fork/join适合JVM内线程级别 而map/reduce适合分布式系统

package com.hqq.day15.fork_join_pool;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.RecursiveAction;

/**
* MyRecursiveAction
* Created by heqianqian on 2017/7/27.
*/

public class MyRecursiveAction extends RecursiveAction {

private int workLoad = 0;

public MyRecursiveAction(int workLoad) {
this.workLoad = workLoad;
}

@Override
protected void compute() {
if (workLoad > 20) {
System.out.println("Split WorkLoad : " + this.workLoad);
List<MyRecursiveAction> actions = new ArrayList<>();
actions.addAll(createSubTask());
for (MyRecursiveAction action : actions) {
action.fork();
}
} else {
System.out.println("Finish Task By MySelf " + workLoad);
}
}

public List<MyRecursiveAction> createSubTask() {
List<MyRecursiveAction> actions = new ArrayList<>();

MyRecursiveAction action1 = new MyRecursiveAction(this.workLoad / 2);
MyRecursiveAction action2 = new MyRecursiveAction(this.workLoad / 2);

actions.add(action1);
actions.add(action2);

return actions;
}
}
package com.hqq.day15.fork_join_pool;

import java.util.concurrent.ForkJoinPool;

/**
* ForkJoinPoolExample
* Created by heqianqian on 2017/7/27.
*/

public class ForkJoinPoolExample {

public static void main(String[] args) {
//1.使用无返回的RecursiveAction
ForkJoinPool forkJoinPool = new ForkJoinPool();
MyRecursiveAction action = new MyRecursiveAction(100);
forkJoinPool.invoke(action);
}
}

执行结果:

Split WorkLoad : 100
Split WorkLoad : 50
Split WorkLoad : 50
Split WorkLoad : 25

九. 锁优化

几种优化机制

  • a) 减少锁持有的时间
  • b) 减少锁粒度
  • c) 锁分离
  • d) 锁粗化
  • e)锁消除

1.减少锁持有的时间
减少其他线程等待的时间 只在需要线程安全的代码上加锁
2.减少锁粒度
将大对象拆分成小对象 对每个小对象加锁 降低锁竞争 最典型的例子就是ConcurrentHashMap的分段锁 只在数据所在的JDK7中是Segment,JDK8中是Node上加锁
JAVA 并发编程学习总结
3.锁分离
最典型的例子是读写锁ReadWriteLock 实现读操作和写操作分离
4.锁粗化
如果虚拟机探测到有这样一串零碎的操作都对同一个对象加锁,将会把加锁同步的范围扩展到整个操作序列的外部,这样就只需要加锁一次就够了
例子:
第一个例子

 for (int i = 0; i < 10; i++) {
synchronized (lock){
...
}
}

会优化成

synchronized (lock){
for (int i = 0; i < 10; i++) {
...
}
}

第二个例子

synchronized (lock1){
//...
}
//..。无需同步但很快能执行完的代码
synchronized (lock2){
//...
}

会优化成

synchronized (lock1){
//...
}

5.锁消除

编译器级别的优化
在JIT中 发现不可能有共享的对象会消除他们的锁
常见情况是JDK中自带锁机制的对象如Vector和StringBuffer,当编译的时候发现这些对象没有处于线程不安全的状态 会消除他们的锁操作

package com.hqq.day26.lock_elimate;

/**
* EliminateLockDemo
* 测试锁消除机制
* Created by heqianqian on 2017/8/13.
*/

public class EliminateLockDemo {

public static void main(String[] args) {
long startTime = System.currentTimeMillis();
for (int i = 0; i < 100; i++) {
concatString("Let it"," Crash!");
}
System.out.println("Cost "+(System.currentTimeMillis()-startTime)+" mills");
}

private static void concatString(String str1,String str2){
StringBuffer buffer = new StringBuffer();
buffer.append(str1);
buffer.append(str2);
System.out.println(buffer.toString());
}

}

不是锁消除的情况下运行结果:

...
Cost 15 mills

使用-XX:+DoEscapeAnalysis -XX:+EliminateLocks 打开锁消除

...
Cost 0 mills

虚拟机内的锁优化

对象头的概念 Mark Word

HotSpot虚拟机中,对象在内存中存储的布局可以分为三块区域:对象头(Header)、实例数据(Instance Data)和对齐填充(Padding)。

Mark Word 用于存储对象自身的运行时数据,如哈希码(HashCode)、GC分代年龄、锁状态标志、线程持有的锁、偏向线程 ID、偏向时间戳等等。JVM 对象头一般占用两个机器码,在 32-bit JVM 上占用 64bit, 在 64-bit JVM 上占用 128bit 即 16 bytes(暂不考虑开启压缩指针的场景)。另外,如果对象是一个 Java 数组,那在对象头中还必须有一块用于记录数组长度的数据,因为虚拟机可以通过普通 Java 对象的元数据信息确定 Java 对象的大小,但是从数组的元数据中无法确定数组的大小。

JAVA 并发编程学习总结

1.偏向锁–偏向当前已经持有锁的线程

在无竞争的情况下,之前获得锁的线程再次请求锁时,那么该线程不用再次获得锁就可直接进入同步块 当其他线程请求锁时 偏向结束
JVM默认启用偏向锁

2.轻量级锁—减少线程互斥的几率

利用CPU的原语CAS 在线程进入互斥之前进行补救
轻量级锁失败会转成重量级锁 即使用操作系统层面的互斥 也有可能尝试自旋锁

3.自旋锁 – 不断尝试请求获取锁

当请求的锁被其他线程占有时 当前线程不会挂起 而是会不断尝试请求获取锁

总结:

偏向锁:避免某个线程反复获取/释放同一把锁的资源消耗
而轻量级锁和自旋锁都是为了防止调用操作系统层面的互斥