Java多线程基础知识(五)

时间:2022-04-07 04:59:51

一. Java中的13个原子操作类

在Jdk1.5中,这个包中的原子操作类提供了一种用法简单,性能高效,线程安全的更新一个变量的方式。

1. 原子更新基本类型类

AtomicBoolean : 原子更新布尔类型

AtomicInteger : 原子更新整型

AtomicLong : 原子更新长整型

2. 原子更新数组

AtomicIntegerArray: 原子更新整型数组里的元素

AtomicLongArray:原子更新长整型数组里的元素

AtomicReferenceArray:原子更新引用类型数组里的元素

3. 原子更新引用类型

AtomicReference:原子更新引用类型

AtomicReferenceFieldUpdater:原子更新引用类型里的字段

AtomicMarkableReference:原子更新带有标记的引用类型

4. 原子更新字段类

AtomicIntegerFieldUpdater:原子更新整型的字段的更新器

AtomicLongFieldUpdater:原子更新长整型字段的更新器

AtomicStampedReference:原子更新带有版本号的引用类型

二. Java中的并发工具类

在Jdk1.5中,提供了几个非常有用的并发工具类,它们提供了一种并发流程控制手段。

1. CountDownLatch 等待多个线程完成

CountDownLatch允许一个或多个线程等待其他线程完成操作

package com.bochao.concurrency;

import java.util.concurrent.CountDownLatch;

public class CountDownLatchTest {

	static CountDownLatch countDownLatch = new CountDownLatch(2);

	public static void main(String[] args) {

		new Thread(new Runnable() {

			@Override
public void run() { System.out.println(1);
try {
Thread.currentThread().sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
countDownLatch.countDown(); System.out.println(2);
countDownLatch.countDown();
}
}).start(); try {
countDownLatch.await();
System.out.println(3);
} catch (InterruptedException e) {
e.printStackTrace();
} } }

2. CyclicBarrier 同步屏障

让一组线程到达一个屏障时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续运行。

package com.bochao.concurrency;

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier; class A implements Runnable{ @Override
public void run() {
// TODO Auto-generated method stub
System.out.println("之前触发的动作!");
}
} public class CyclicBarrierTest2 { static CyclicBarrier c = new CyclicBarrier(2, new A()); public static void main(String[] args) { new Thread(new Runnable() { @Override
public void run() {
// TODO Auto-generated method stub
try {
c.await();
} catch (InterruptedException | BrokenBarrierException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println(1);
}
}).start(); try {
c.await();
} catch (InterruptedException | BrokenBarrierException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println(2);
} }

CyclicBarrier应用场景

它多应用于使用多线程计算数据的场景,最后使用前置的主线程任务来统计合并计算结果。

CyclicBarrier 和 CountDownLatch的区别

1. CountDownLatch 的计数器只能使用一次

2. CyclicBarrier计数器可以使用reset()重置

3. Semaphore 控制并发线程数

用来控制同时访问特定资源的线程数量,它通过协调各个线程以保证合理的使用公共资源。

package com.bochao.concurrency;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore; public class SemaphoreTest { private static final int THREAD_COUNT = 20; // 创建固定大小的20个线程池
private static ExecutorService threadPool = Executors.newFixedThreadPool(THREAD_COUNT); // 信号量10个, 只允许并发执行10个线程
private static Semaphore semaphore = new Semaphore(10); public static void main(String[] args) { for(int i=0; i<THREAD_COUNT; i++){ threadPool.execute(new Runnable() { @Override
public void run() { try {
// 获取执行许可
semaphore.acquire();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("指定特定代码!");
semaphore.release();
}
});
} threadPool.shutdown();
} }

一些比较有用的方法:

<1>. int availablePermits() :返回此信号量中当前可用的许可证数

<2>. int getQueueLength() :返回正在等待获取许可证的线程数

<3>. boolean hasQueuedThreads():是否有线程正在等待获取许可证

<4>. void reducePermits(int reduction):减少reduction个许可证,是个protected方法

<5>. Collection getQueuedThreads() :返回所有等待获取许可证线程集合,是个protected方法

4. Exchanger 线程间交换数据

Exchanger是一个用于线程间协作的工具类。可用于线程间的数据交换,它提供一个同步点,在这个同步点,两个线程可以交换彼此数据,这两个线程通过exchange方法交换数据,如果第一个线程先执行exchange方法,它会一直等待第二个线程也执行exchange方法,当两个线程都到达同步点时,这两个线程就可以交换数据,将本线程生产出来的数据传递给对方线程。

package com.bochao.concurrency;

import java.util.concurrent.Exchanger;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; public class ExchangerTest { private static final Exchanger<String> exgr = new Exchanger<String>(); private static ExecutorService threadPool = Executors.newFixedThreadPool(2); public static void main(String[] args) { threadPool.execute(new Runnable() { @Override
public void run() { String A = "A"; try {
exgr.exchange(A);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}); threadPool.execute(new Runnable() { @Override
public void run() { String B = "B";
try {
String A = exgr.exchange(B);
System.out.println("A录入数据:" + A);
System.out.println("B录入数据:" + B);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
} }