并发编程(二)concurrent 工具类

时间:2023-02-06 21:18:25

并发编程(二)concurrent 工具类

一、CountDownLatch

经常用于监听某些初始化操作,等初始化执行完毕后,通知主线程继续工作。

import java.util.concurrent.CountDownLatch;

public class CountDownLatchTest extends Thread {
private final static CountDownLatch countDown = new CountDownLatch(2); // (1) @Override
public void run() {
// 唤醒线程线程
countDown.countDown(); // (2)
System.out.println(Thread.currentThread().getName() + "执行完毕...");
} public static void main(String[] args) { new Thread(new CountDownLatchTest()).start();
new Thread(new CountDownLatchTest()).start();
try {
Thread.sleep(1000);
countDown.await(); // (3)
System.out.println(Thread.currentThread().getName() + "继续执行...");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
  1. 声明一个 CountDownLatch 对象,参数 2 表示被阻塞的线程需要被唤醒再次才能执行。

    final CountDownLatch countDown = new CountDownLatch(2);
  2. countDown() 调用两次后,主线程才会继续执行

    countDown.countDown();
  3. 阻塞当前线程-main

    countDown.await();
  4. 执行结果如下:

    Thread-1执行完毕...
    Thread-0执行完毕...
    main继续执行... // Thread-0, Thread-1 执行完成才会继续执行主线程

二、CyclicBarrier

假设有只有的一个场景:每个线程代表一个跑步运动员,当运动员都准备好后,才一起出发,只要有一个没有准备了,大家都等待。

import java.io.IOException;
import java.util.Random;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class UseCyclicBarrier { static class Runner implements Runnable {
private CyclicBarrier barrier;
private String name; public Runner(CyclicBarrier barrier, String name) {
this.barrier = barrier;
this.name = name;
}
@Override
public void run() {
try {
Thread.sleep(1000 * (new Random()).nextInt(5));
System.out.println(name + " 准备OK.");
barrier.await(); //(1)
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println(name + " Go!!");
}
} public static void main(String[] args) throws IOException, InterruptedException {
CyclicBarrier barrier = new CyclicBarrier(2); // (2)
ExecutorService executor = Executors.newFixedThreadPool(2); executor.submit(new Thread(new Runner(barrier, "Thread-1")));
executor.submit(new Thread(new Runner(barrier, "Thread-2"))); executor.shutdown();
}
}
  1. await() 阻塞当前的线程。

    barrier.await();
  2. 声明一个 CyclicBarrier 对象,参数 2 表示 barrier 必须有两个线程都准备好了才能执行。

    CyclicBarrier barrier = new CyclicBarrier(2);
  3. 执行结果如下:

    Thread-1 准备OK.
    Thread-2 准备OK.
    Thread-1 Go!!
    Thread-2 Go!!
  4. 修改 CyclicBarrier barrier = new CyclicBarrier(3) 后这两个线程都会被阻塞, 执行结果如下:

    Thread-1 准备OK.
    Thread-2 准备OK.

三、Future

future模式请参考这里

四、Semaphore

Semaphore 信号量非常适合高并发访问。

public class UseSemaphore {
public static void main(String[] args) {
// 线程池
ExecutorService exec = Executors.newCachedThreadPool();
// 只能5个线程同时访问
final Semaphore semp = new Semaphore(5); // (1)
// 模拟20个客户端访问
for (int index = 0; index < 20; index++) {
final int NO = index;
Runnable run = new Runnable() {
public void run() {
try {
// 获取许可
semp.acquire(); // (2)
System.out.println("Accessing: " + NO);
//模拟实际业务逻辑
Thread.sleep((long) (Math.random() * 10000));
// 访问完后,释放
semp.release(); // (3)
} catch (InterruptedException e) {
;
}
}
};
exec.execute(run);
} try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
} System.out.println(semp.getQueueLength()); // 退出线程池
exec.shutdown();
}
}
  1. 声明一个 Semaphore 对象,参数 5 表示最多有5个线程同时访问。

    final Semaphore semp = new Semaphore(5);
  2. semp.acquire() 获取 semp 对象,如果超过5个线程,那么其余的线程就会阻塞,直到有线程执行完毕。

  3. semp.release() 释放 semp 对象,这样其余的线程就可以执行了。

补充:

  • PV(page view) 网站的总访问量,页面浏览量或点击量,用户每刷新一次就会记录一次。

  • UV(unique vistor) 访问网站的一台电脑客户端为一个访客。一般来讲,时间上以00:00~24:00之内相同的客户端记录一次。

  • QPS(query per second) 即每秒查询数,QPS 很大程度代表了系统业务的繁忙程度。一旦当前 QPS 超过所设定的预警阀值,可以考虑对集群扩容,以免压力过大导致宕机。

  • RT(response time) 即请求的响应时间,这个指标非常关键,直接说明客户端的体验,因此任何系统设计师都想降低 RT 时间。

对系统进行峰值评估,采用所谓的80/20原则,即80%的请求20%的时间到达:

QRS = (PV * 80%) / (24 * 60 * 60 * 20%)

五、ReentrantLock

import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock; public class ReentrantLockTest implements Runnable { private Lock lock = new ReentrantLock(); // (1) public void run(){
try {
lock.lock(); // (2)
System.out.println(Thread.currentThread().getName() + "进入..");
Thread.sleep(1000);
System.out.println(Thread.currentThread().getName() + "退出..");
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock(); // (3)
}
} public static void main(String[] args) throws InterruptedException {
final ReentrantLockTest ur = new ReentrantLockTest(); for (int i = 0; i < 10; i++) {
new Thread(ur).start();
}
}
}
  1. ReentrantLock 一般用法:

    private Lock lock = new ReentrantLock();
    try {
    lock.lock();
    //do something
    } finally {
    lock.unlock();
    }
  2. condition 使用方法,注意 condition 可以实例化多个:

    Lock lock = new ReentrantLock();
    Condition condition = lock.newCondition();
    condition.await(); //阻塞线程,释放锁
    condition.signal();//唤醒线程,不释放锁
  3. 公平锁(true)和非公平锁(false),非公平锁执行效率比公平锁高

    Lock lock = new ReentrantLock(boolean isFair);
  4. 读写锁,实现读写分离的锁,适用于读多写少的情况下(读读共享,读写互斥)

    private ReentrantReadWriteLock rwlock = new ReentrantReadWriteLock(); // (1)
    private ReadLock readLock = rwlock.readLock(); // (2)
    private WriteLock writeLock = rwlock.writeLock(); // (3) public void read(){
    try {
    readLock.lock();
    // do something
    } finally {
    readLock.unlock();
    }
    } public void write(){
    try {
    writeLock.lock();
    // do something
    } finally {
    writeLock.unlock();
    }
    }

每天用心记录一点点。内容也许不重要,但习惯很重要!