ConcurrentHashMap源码解读二

时间:2023-12-04 11:09:32

接下来就讲解put里面的三个方法,分别是

1、数组初始化方法initTable()

2、线程协助扩容方法helpTransfer()

3、计数方法addCount()

首先是数组初始化,再将源码之前,先得搞懂里面的一个重要参数,那就是sizeCtl。

sizeCtl默认为0,代表数组未初始化。

sizeCtl为正数,如果数组未初始化,那么其记录的是数组的初始容量,如果数组已经初始化,那么其记录的是数组的扩容阈值。

sizeCtl为-1,表示数组正在进行初始化。

sizeCtl小于0,并且不是-1,表示数组正在扩容,-(1+n),表示此时有n个线程共同完成数组的扩容操作。

接下来讲解initTable()方法

private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
    //第一次put的时候,table还没被初始化,所以能进入while。sizeCtl默认值是0,当有两个线程都想进行初始化时,线程A CAS成功,也就是else if为true,继续执行下面,而另一个线程cas就是false,就重新进行while循环,而这时sizeCtl为-1.所以这个线程就
    放弃cpu的控制权,说白了就是在多线程下保证初始化只执行一次。
while ((tab = table) == null || tab.length == 0) {//tab在这里赋值,是table的引用
if ((sc = sizeCtl) < 0)//sc在这里赋值。是sizeCtl的引用。
Thread.yield(); // lost initialization race; just spin//线程放弃cpu的控制权。
        //SIZECTL:表示当前对象的内存偏移量,sc表示期望值,-1表示要替换的值,设定为-1表示要初始化表了
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {//SIZECTL是地址偏移量,如果SIZECTL对应地址的值与sc相等,说明当前的线程是第一个到达这条语句的线程,那么就会将SIZECTL地址所对应的值替换成-1,而SIZECTL地址偏移量对应的对象就是sizeCtl
try {
if ((tab = table) == null || tab.length == 0) {//再次进行判断,防止在进行U.compareAndSwapInt(this, SIZECTL, sc, -1)的时候有其他线程并发进入方法,导致出错,使用双重锁
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;//sc在前边就赋值了,如果有初值,那么这里n就是设定的初始值,否则n就是默认容量。16
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];//基于初始长度,构建数组对象
table = tab = nt;
sc = n - (n >>> 2);//这里就是计算扩容阈值,并赋值给sc。也就是n-0.25n = 0.75n
}
} finally {
sizeCtl = sc;//将扩容阈值,赋值给sizeCtl,第一次初始化后。
}
break;
}
}
return tab;
}

所以在这个方法中,sizeCtl为正数,如果数组未初始化,那么其记录的是数组的初始容量,如果数组已经初始化,那么其记录的是数组的扩容阈值。就是这个含义。

第二个要讲解的方法是addCount方法。

这段代码分为两个部分,一个是计数部分,另一个是扩容部分。

<用两种方法进行计数,一个是用cas对baseCount<baseCount是一个全局属性,volatile的>进行加法计数,另一个是用CounterCell数组,其实CounterCell对象就有一个属性是value,用CounterCell构建一个数组,然后哪个线程要进行加法,就用这个线程产生一个hash值,并用这个数与CounterCell数组的长度减一做与运算,得到的结果就是CounterCell数组的index,然后对这个index对应的CounterCell对象的value做加法。在最后统计计数的时候是用:baseCount+每个CounterCell的value>
countcell就是通过分散计算来减少竞争。其内部有一个基础值和一个哈希表。当没有竞争的时候在基础值上计数。有竞争的时候通过哈希表计数(每个线程有一个哈希值,通过哈希值确定在哈希表的位置,在这个位置进行计数,不同位置互不影响)

计数部分:通过baseCount和CounterCell数组,二选一的参与计数
扩容部分:当大于扩容阈值的时候进行扩容,当满足扩容条件时才能扩容
如果有别的线程正在进行扩容,那么就存在nextTable(一个全局属性,表示正在扩容的线程),就把nextTable作为参数传入transfer方法,就是让这个线程帮忙扩容nextTable
如果没有别的线程正在扩容,那么就把null传入transfer方法,让transfer方法创建一个nextTable

private final void addCount(long x, int check) {
CounterCell[] as; long b, s;//as表示counterCells引用, b表示获取的baseCount值, s应该是表示元素个数。
 //当CounterCell数组不为空,则优先利用数组中的CounterCell记录数量
    //或者当baseCount的累加操作失败,会利用数组中的CounterCell记录数量
    //条件一:as!=null true:表示counterCells以及初始化过了,当前线程应该将数据写入到对应的counterCell中。
            //as==null 也就是条件1为false,那么表示counterCells未初始化,当前所有线程应该将数据写到baseCount中。
if ((as = counterCells) != null ||
!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {//条件二:true:表示当前线程cas替换数据成功  false表示发生竞争了,可能需要重试或者扩容。//因为这里有个!,所以应该false才能进入
    //什么时候会进入到if判断里面
         //1.条件一:as!=null true:表示counterCells以及初始化过了,当前线程应该将数据写入到对应的counterCell中。
        //2.条件二:false表示发生竞争了,可能需要重试或者扩容
CounterCell a; long v; int m;//a表示当前线程命中的CounterCell单元格 v表示期望值,m表示as数组的长度。
boolean uncontended = true; //标识是否有多线程竞争 ,//true表示未发生竞争,false表示发生竞争。
      //当as数组为空
        //或者当as长度为0
        //或者当前线程对应的as数组桶位的元素为空
        //或者当前线程对应的as数组桶位不为空,但是累加失败
        //条件一:as==null 为true,说明counterCells未初始化,那么上面就是根据多线程写base发生竞争进入到这里的。
                //as!=null为false,说明counterCells已经初始化了。当前线程应该是找自己的counterCells写值。
if (as == null || (m = as.length - 1) < 0 ||
        //ThreadLocalRandom.getProbe()表示当前线程的hash值。 m是CountCell长度-1,as.length一定是2的次方数,比如长度为16,那么减一就是15
            //也就是b1111,那么与上之后肯定是小于等于当前长度的值,也就是下标。
            //如果a==null,也就是条件为true,说明当前线程对应下标的CountCell为空,那么就需要创建
            //如果是false,不为空,说明下一步想要将x的值添加到CountCell中。
(a = as[ThreadLocalRandom.getProbe() & m]) == null ||
         //条件三:true:表示cas失败,意味着当前线程对应的CountCell有竞争,
                    //false,表示cas成功
!(uncontended =
U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
       //都有哪些情况会调用下面的方法:
            //1.as==null 为true,说明counterCells未初始化,那么上面就是根据多线程写base发生竞争进入到这里的,那么初始化。
            //2.如果a==null,也就是条件为true,说明当前线程对应下标的CountCell为空,那么就需要创建
            //3.true:表示cas失败,意味着当前线程对应的CountCell有竞争,那么就可能是重试或者扩容。
            //以上任何一种情况成立,都会进入该方法,传入的uncontended是false
fullAddCount(x, uncontended);//也就是countCells是一个全局得volatile的CountCell数组,创建实在fullAddCount方法中。
return;
}
if (check <= 1)
return;
s = sumCount();//计算元素个数
}
if (check >= 0) {
Node<K,V>[] tab, nt; int n, sc;
       //当元素个数达到扩容阈值
        //并且数组不为空
        //并且数组长度小于限定的最大值
        //满足以上所有条件,执行扩容
while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
(n = tab.length) < MAXIMUM_CAPACITY) {
int rs = resizeStamp(n);
if (sc < 0) { //sc小于0,说明有线程正在扩容,那么会协助扩容
      //扩容结束或者扩容线程数达到最大值或者扩容后的数组为null或者没有更多的桶位需要转移,结束操作
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
            //扩容线程加1,成功后,进行协助扩容操作
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);//协助扩容,newTable不为null
}
          //否则sc>=0,说明是首个扩容的线程,所以transfer传入的参数是null
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
s = sumCount();
}
}
}

接下来就是讲解上面的fullAddCount方法

① 当CounterCell数组不为空,优先对CounterCell数组中的CounterCell的value累加

② 当CounterCell数组为空,会去创建CounterCell数组,默认长度为2,并对数组中的CounterCell的value累加

③ 当数组为空,并且此时有别的线程正在创建数组,那么尝试对baseCount做累加,成功即返回,否则自旋

private final void fullAddCount(long x, boolean wasUncontended) {//wasUncontended只有counterCells初始化之后,并且当前线程竞争修改失败,才会是false。其余情况都是true。
int h;//h表示线程的hash值。
    //获取当前线程的hash值
if ((h = ThreadLocalRandom.getProbe()) == 0) {//条件成立:说明当前线程还未分配hash值。
ThreadLocalRandom.localInit();//给当前线程分配hash值 // force initialization
h = ThreadLocalRandom.getProbe();//取出当前线程的hash值,赋值给h
wasUncontended = true;//为啥这里强制设为true,当前线程肯定是写入到了countCells[0]位置,不把它当作一次真正的竞争。
}
boolean collide = false; //标识是否有冲突,如果最后一个桶不是null,那么为true //表示扩容意向 false一定不会扩容,true可能会扩容  // True if last slot nonempty
for (;;) {//自旋
CounterCell[] as; CounterCell a; int n; long v;//as表示counterCells引用,a表示当前线程命中的CounterCell,n表示counterCells数组长度,v表示期望值
       //数组不为空,优先对数组中CouterCell的value累加
        //CASE1:表示countCells已经初始化了,当前线程应该将数据写入到对应的CounterCell中。
if ((as = counterCells) != null && (n = as.length) > 0) {
      //2.如果a==null,也就是条件为true,说明当前线程对应下标的CountCell为空,那么就需要创建
            //3.true:表示cas失败,意味着当前线程对应的CountCell有竞争,那么就可能是重试或者扩容。这两种情况会进入到这个if判断中
            //线程对应的桶位为null
         //CASE1.1:a = as[(n - 1) & h]) == null true表示当前线程对应的下标位置的CounterCell为null,需要创建new CounterCell
if ((a = as[(n - 1) & h]) == null) {
if (cellsBusy == 0) { //true:表示当前锁未被占用,false表示锁被占用 // Try to attach new Cell
CounterCell r = new CounterCell(x); // Optimistic create//创建CounterCell对象
                //利用CAS修改cellBusy状态为1,成功则将刚才创建的CounterCell对象放入数组中
                    //条件一:true:表示当前锁未被占用,false表示锁被占用
                    //条件二:true:表示当前线程获取锁成功,false表示当前线程获取锁失败。
if (cellsBusy == 0 &&
U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
boolean created = false;//是否创建成功的标记
try { // Recheck under lock
CounterCell[] rs; int m, j;//rs表示当前countCells引用,m表示rs的长度,j表示当前线程命中的下标。
                  //桶位为空, 将CounterCell对象放入数组
                            //条件一条件二恒成立
                            // rs[j = (m - 1) & h] == null为了防止其它线程初始化过该位置,然后当前线程再次初始化该位置,导致数据丢失。
if ((rs = counterCells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
rs[j] = r;
created = true;//表示放入成功
}
} finally {
cellsBusy = 0;
}
if (created)//成功退出循环
break;
continue; //桶位已经被别的线程放置了已给CounterCell对象,继续循环 // Slot is now non-empty
}
}
collide = false;//将扩容意向改为false,原因是因为再CASE1.1中当前CounterCell都是为null,不可能不让写。因此不需要扩容。
}
          //桶位不为空,重新计算线程hash值,然后继续循环
            //CASE1.2:只有一种情况会来到这里,wasUncontended只有counterCells初始化之后,并且当前线程竞争修改失败,才会是false
else if (!wasUncontended) // CAS already known to fail
wasUncontended = true; // Continue after rehash
        //重新计算了hash值后,对应的桶位依然不为空,对value累加
            //成功则结束循环
            //失败则继续下面判断
            //CASE1.3:当前线程rehash过hash值,然后新命中的CounterCell不为空。则来到这里。
            //true:写成功,退出循环,
            //false:表示rehash之后命中的新的cell也有竞争,重试1次  再重试一次
else if (U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))
break;
         //数组被别的线程改变了,或者数组长度超过了可用cpu大小,重新计算线程hash值,否则继续下一个判断
            //CASE1.4:条件一:n >= NCPU true->表示扩容意向改为false,表示不扩容了。false说明counterCells数组还可以扩容
            //条件二:counterCells != as true表示其它线程已经扩容过了,当前线程rehash之后重试即可。
else if (counterCells != as || n >= NCPU)
collide = false; // At max size or stale
        //当没有冲突,修改为有冲突,并重新计算线程hash,继续循环
            //CASE1.5  !collide=true,设置扩容意向为true,但是不一定真的发生扩容。因为一旦进入这里,那么又会rehash一下,又会重来。
else if (!collide)
collide = true;
         //如果CounterCell的数组长度没有超过cpu核数,对数组进行两倍扩容
            //并继续循环
            //CASE1.6:真正扩容的逻辑
            //条件一:cellsBusy == 0 true表示当前无锁状态,当前线程可以去竞争这把锁。
            //条件二:U.compareAndSwapInt(this, CELLSBUSY, 0, 1)  true表示当前线程获取锁成功,可以执行扩容逻辑。
                                                            //false表示当前时刻有其它线程正在做扩容相关的操作。
else if (cellsBusy == 0 &&
U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
try {
if (counterCells == as) {// Expand table unless stale //这里又有双重检测,就是为了防止扩容过了又再次扩容。
CounterCell[] rs = new CounterCell[n << 1];//扩容为两倍扩容
for (int i = 0; i < n; ++i)
rs[i] = as[i];//将旧数组的值放到新数组。
counterCells = rs;
}
} finally {
cellsBusy = 0; //释放锁
}
collide = false;
continue; // Retry with expanded table
}
h = ThreadLocalRandom.advanceProbe(h);//重置当前hash值 rehash
}
     //CounterCells数组为空,并且没有线程在创建数组,修改标记,并创建数组,因为前面CASE1是数组不为空,所以这里是数组为空
        //CASE2:当前条件countCells还未初始化,as为null
        //条件一:cellsBusy == 0  true表示当前未加锁
        //条件二:counterCells == as,为什么又重新比较一遍,明明前面CASE1已经赋值未null了。因为其它线程可能会在你给as赋值之后修改了counterCells。
        //条件三:如果未true,表示获取锁成功,会把cellsBusy改成1。false表示其它线程正在持有这把锁,那么当前线程就进不了这里面了。
else if (cellsBusy == 0 && counterCells == as &&
U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
boolean init = false;
try { // Initialize table
if (counterCells == as) {//为什么这里又判断了一下counterCells == as,防止其它线程已经初始化了,当前线程再次初始化,就会覆盖掉其它线程初始化的CountCell,导致丢失数据。
CounterCell[] rs = new CounterCell[2];//初始容量为2
rs[h & 1] = new CounterCell(x);
counterCells = rs;
init = true;
}
} finally {
cellsBusy = 0;
}
if (init)
break;
}
    //CASE3:
        //1.当前cellsBusy加锁状态,表示其他线程正在初始化countCells,所以当前线程将值累加到baseCount。
        //2.countCells被其它线程初始化后,当前线程需要将数据累加到base。
        //数组为空,并且有别的线程在创建数组,那么尝试对baseCount做累加,成功就退出循环,失败就继续循环
else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x))
break; // Fall back on using base
}
}

总结:addCount方法通过属性baseCount 和 counterCell数组中所有的元素的和来记录size
在无竞争的情况下,通过cas当前map对象的baseCount为baseCount + 1,
有竞争情况下,上诉cas失败,会初始化一个长度为2的CounterCell数组,数组会扩容,每次扩容成两倍,每个线程有在counterCell数组中对应的位置(多个线程可能会对应同一个位置), 如果位置上的CounterCell元素为空,就生成一个value为1的元素,如果不为空,则cas当前CounterCell元素的value为value + 1;如果都失败尝试cas当前map对象的baseCount为baseCount + 1。