Netty为什么不直接用AtomicXXX,而要用AtomicXXXFieldUpdater去更新变量呢?

时间:2023-03-10 03:04:32
Netty为什么不直接用AtomicXXX,而要用AtomicXXXFieldUpdater去更新变量呢?

Netty为什么不直接用AtomicXXX,而要用AtomicXXXFieldUpdater去更新变量呢?

Netty为什么不直接用AtomicXXX,而要用AtomicXXXFieldUpdater去更新变量呢?更多技术分享可关注我

前言

如果仔细阅读过Netty的线程调度模型的源码,或者NIO线程对象及其线程池的创建源码,那么肯定会遇到类似“AtomicIntegerFieldUpdater”的身影,不禁想知道——Netty为何不直接使用原子类包装普通的比如计数的变量?

下面带着这个疑问,深入Netty以及JDK源码去窥探一二,顺便学习先进的用法。原文:​Netty为什么不直接用AtomicXXX,而要用AtomicXXXFieldUpdater去更新变量呢?

JDK的Atomic原子操作类实现机制

在JDK里,Atomic 开头的原子操作类有很多,涉及到 Java 常用的数字类型的,基本都有相应的 Atomic 原子操作类,如下图所示:

Netty为什么不直接用AtomicXXX,而要用AtomicXXXFieldUpdater去更新变量呢?

原子操作类都是线程安全的,编码时可以放心大胆的使用。下面以其中常用的AtomicInteger原子类为例子,分析这些原子类的底层实现机制,辅助理解Netty为何没有直接使用原子类。具体使用的demo就不写了,想必Javaer都多少用过或者见过,直接看AtomicInteger类核心源码:

 private volatile int value; // 简化了部分非核心源码

// 初始化,简化了部分非核心源码
public AtomicInteger(int initialValue) {
value = initialValue;
}
public final int get() {
return value;
}
// 自增 1,并返回自增之前的值
public final int getAndIncrement() {
return unsafe.getAndAddInt(this, valueOffset, 1);
}
// 自减 1,并返回自增之前的值
public final int getAndDecrement() {
return unsafe.getAndAddInt(this, valueOffset, -1);
}

以上,AtomicInteger可以对int类型的值进行线程安全的自增或者自减等操作。从源码中可以看到,线程安全的操作方法底层都是使用unsafe方法实现,这是一个JDK的魔法类,能实现很多贴近底层的功能,所以并不是Java的实现的,但是能保证底层的这些getAndXXX操作都是线程安全的,关于unsafe具体的用法和细节,可以参考这篇文章Java魔法类:Unsafe应用解析(https://tech.meituan.com/2019/02/14/talk-about-java-magic-class-unsafe.html,可能无法直接打开,复制黏贴到浏览器即可)

题外话:如果AtomicXXX的对象是自定义类型呢?不要慌,Java 也提供了自定义类型的原子操作类——AtomicReference,它操作的对象是个泛型对象,故能支持自定义的类型,其底层是没有自增方法的,操作的方法可以作为函数入参传递,源码如下:

 // 对 x 执行 accumulatorFunction 操作
// accumulatorFunction 是个函数,可以自定义想做的事情
// 返回老值
public final V getAndAccumulate(V x,
BinaryOperator<V> accumulatorFunction) {
// prev 是老值,next 是新值
V prev, next;
// 自旋 + CAS 保证一定可以替换老值
do {
prev = get();
// 执行自定义操作
next = accumulatorFunction.apply(prev, x);
} while (!compareAndSet(prev, next));
return prev;
}

JDK的AtomicXXXFieldUpdater原子更新器及其优势

在Java5中,JDK就开始提供原子类了,当然也包括原子的更新器——即后缀为FieldUpdater的类,如下Integer、Long,还有一个自定义类型的原子更新器,共三类:

Netty为什么不直接用AtomicXXX,而要用AtomicXXXFieldUpdater去更新变量呢?

这些原子更新器常见于各种优秀的开源框架里,而很少被普通的业务程序员直接使用,其实这些原子更新器也可以被用来包装共享变量(必须是volatile修饰的对象属性),来为这些共享变量实现原子更新的功能。这些被包装的共享变量可以是原生类型,也可以是引用类型,那么不禁要问:已经有了原子类,为啥还额外提供一套原子更新器呢?

简单的说有两个原因,以int变量为例,基于AtomicIntegerFieldUpdater实现的原子计数器,比单纯的直接用AtomicInteger包装int变量的花销要小,因为前者只需要一个全局的静态变量AtomicIntegerFieldUpdater即可包装volatile修饰的非静态共享变量,然后配合CAS就能实现原子更新,而这样做,使得后续同一个类的每个对象中只需要共享这个静态的原子更新器即可为对象计数器实现原子更新,而原子类是为同一个类的每个对象中都创建了一个计数器 + AtomicInteger对象,这种开销显然就比较大了。

下面看一个JDK使用原子更新器的例子,即JDK的BufferedInputStream,如下是源码的片段节选:

 public class BufferedInputStream extends FilterInputStream {
private static int DEFAULT_BUFFER_SIZE = 8192;
private static int MAX_BUFFER_SIZE = Integer.MAX_VALUE - 8;
protected volatile byte buf[];
/**
* Atomic updater to provide compareAndSet for buf. This is
* necessary because closes can be asynchronous. We use nullness
* of buf[] as primary indicator that this stream is closed. (The
* "in" field is also nulled out on close.)
*/
private static final
AtomicReferenceFieldUpdater<BufferedInputStream, byte[]> bufUpdater =
AtomicReferenceFieldUpdater.newUpdater
(BufferedInputStream.class, byte[].class, "buf");

可以看出,每个BufferedInputStream对象都包含了一个buf属性,该属性是对象属性,且被volition修饰,并被原子更新器AtomicReferenceFieldUpdater包装,注意这个引用类型的原子更新器是静态类型的,这意味着不论用户创建了多少个BufferedInputStream对象,在全局都只有这一个原子更新器被创建,这里之所以不用原子类AtomicReference直接包装buf属性,是因为buf是一个byte数组,通常会是一个比较大的对象,如果用原子类直接包装,那么后续每个BufferedInputStream对象都会额外创建一个原子类的对象,会消耗更多的内存,负担较重,因此JDK直接使用了原子更新器代替了原子类,Netty源码中的类似使用也是如出一辙。

另外一个重要原因是使用原子更新器,不会破坏共享变量原来的结构,回到上述JDK的例子,buf对外仍然可以保留buf对象的原生数组属性,只不过多了一个volatile修饰,外界可以直接获取到这个byte数组实现一些业务逻辑,而且在必要的时候也能使用原子更新器实现原子更新,可谓两头不耽误,灵活性较强!

还有一个可能的疑问点需要理解,即原子更新器虽然是静态的,但是其修饰的共享变量确仍然是类的对象属性,即每个类的对象仍然是只包含自己那独一份的共享变量,不会因为原子更新器是静态的,而受到任何影响。

结论:实现原子更新最佳的方式是直接使用原子更新器实现。一方面是更节省内存,另一方面是不破坏原始的共享变量,使用起来更灵活。当然如果是时延要求没有那么高的场景,那么就不需要这么严苛,直接使用原子类就OK,毕竟原子类的编码简单,开发效率高,不易出错。

品Netty源码,学习原子更新的最佳实现方式

前面说了很多理论,下面看一段Netty源码,看Netty是如何优雅的使用原子更新器的。下面是Netty的NIO线程实现类——SingleThreadEventExecutor的部分源码,省略了很多和本次分析无关的代码:

 /**
* Abstract base class for {@link OrderedEventExecutor}'s that execute all its submitted tasks in a single thread.
*/
public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {
private static final int ST_NOT_STARTED = 1;
private static final int ST_STARTED = 2;
private static final int ST_SHUTTING_DOWN = 3;
private static final int ST_SHUTDOWN = 4;
private static final int ST_TERMINATED = 5;

private static final AtomicIntegerFieldUpdater<SingleThreadEventExecutor> STATE_UPDATER;
private static final AtomicReferenceFieldUpdater<SingleThreadEventExecutor, ThreadProperties> PROPERTIES_UPDATER;
private static final long SCHEDULE_PURGE_INTERVAL = TimeUnit.SECONDS.toNanos(1);

static {
AtomicIntegerFieldUpdater<SingleThreadEventExecutor> updater =
PlatformDependent.newAtomicIntegerFieldUpdater(SingleThreadEventExecutor.class, "state");
if (updater == null) {
updater = AtomicIntegerFieldUpdater.newUpdater(SingleThreadEventExecutor.class, "state");
}
STATE_UPDATER = updater;
}

private final Queue<Runnable> taskQueue;
private final Executor executor;
private volatile Thread thread;
private volatile int state = ST_NOT_STARTED;

以上截取了一小片段,并删除了注释,可以清晰的看到Netty封装了JDK的Thread对象,一些标识线程状态的静态常量,线程执行器,异步任务队列,以及标识线程状态的属性state等,其中重点关注state,这个属性是普通的共享变量,由volatile修饰,并且被静态的原子更新器STATE_UPDATER包装。

下面看NIO线程的启动源码:

     /**
* NioEventLoop线程启动方法, 这里会判断本NIO线程是否已经启动
*/
private void startThread() {
if (STATE_UPDATER.get(this) == ST_NOT_STARTED) {
if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
doStartThread();
}
}
}

注释写到了,启动NIO线程之前会做一次是否已经启动的判断,避免重复启动,这个判断逻辑就是前面提到的原子更新器实现的,当本NIO线程实例没有启动时,会做一次CAS计算,注意CAS对应操作系统的一个指令,是原子操作,如果是多个外部线程在启动NIO线程,那么同时只有一个外部线程能启动成功一次,后续的线程不会重复启动这个NIO线程。保证在NIO线程的一次生命周期内,外部线程只能调用一次doStartThread()方法,这样可以实现无锁更新,且没有自旋,性能较好,这里之所以不需要自旋,是因为启动线程就应该是一锤子买卖,启动不成功,就说明是已经启动了,直接跳过,无需重试。

在看一个自旋的用法:

Netty为什么不直接用AtomicXXX,而要用AtomicXXXFieldUpdater去更新变量呢?

在NIO线程被优雅(也可能异常)关闭时,会在死循环里,结合CAS算法,原子更新当前NIO线程的状态为关闭中。。。这里有两个注意事项:

1、和线程安全的启动NIO线程的逻辑不一样,更新线程状态必须成功,不是一锤子买卖,所以需要自旋重试,直到CAS操作成功

2、需要使用局部变量缓存外部的共享变量的旧值,保证CAS操作执行期间该共享变量的旧值不被外部线程修改

3、同样的,每次执行CAS操作之前,必须判断一次旧值,只有符合更新条件,才真的执行CAS操作,否则说明已经被外界线程更新成功,无需重复操作,以提升性能。

Netty这样做也侧面反映Nerty的源码确实很优秀,平时的业务开发,如果有类似场景,那么可以参考学习这两类用法。

总结使用原子更新器的注意事项:

1、包装的必须是被volatile修饰的共享变量

2、包装的必须是非静态的共享变量

3、必须搭配CAS的套路自行实现比较并交换的逻辑

4、自行实现比较并交换的逻辑时需要注意:如果是非一锤子买卖的原子更新操作,那么必须用局部变量缓存外部的共享变量的旧值,具体原因可以参考:Netty的线程调度模型分析(10)《多线程环境下,实例变量转为局部变量的程序设计技巧》,且放在一个循环里操作,以保证最终一致性。

后记

dashuai的博客是终身学习践行者,大厂程序员,且专注于工作经验、学习笔记的分享和日常吐槽,包括但不限于互联网行业,附带分享一些PDF电子书,资料,帮忙内推,欢迎拍砖!

Netty为什么不直接用AtomicXXX,而要用AtomicXXXFieldUpdater去更新变量呢?