并发容器之ConcurrentHashMap源码解析

时间:2021-09-22 15:01:18

该博客主要针对的是Java1.8的ConcurrentHashMap,如果有说的不对的地方欢迎大家留言。

目录


描述

1. 主要功能

该Hash表的主要功能是保证了并发情况下的可读性(尤其是他的Get(),同时也包括了迭代),同时尽可能少的减少代码的更新,次要目标是保持空间消耗。相同或比java.util.hashmap,并支持高许多线程在空表上的初始插入效率。


2.实现的基本原理

Java 1.8 与 Java 1.7 实现的原理不同

1.7版本采用的锁分段技术,是由Segment数组结构和HashEntry数组结构组成。
1.8版本采用Node + CAS + Synchronized来保证并发安全进行实现,底层依然采用数组+链表+红黑树的存储结构,结构如下:并发容器之ConcurrentHashMap源码解析


3.为什么要使用ConcurrentHashMap

因为在并发的情况下HashMap的put操作会导致死循环,是因为多线程会导致HashMap中的Entry形成一个环形链表。然后线程安全的HashTable虽然是线程安全的但是效率太低了,因为HashTable的Get方法和Put方法都是同步方法所以如果多线程的时候会有很大的竞争。


类的关系

public class ConcurrentHashMap<K,V> extends AbstractMap<K,V>
implements ConcurrentMap<K,V>, Serializable
![这里写图片描述](http://img.blog.csdn.net/20170623111127449?watermark/2/text/aHR0cDovL2Jsb2cuY3Nkbi5uZXQvcXFfMjE1MDg2MzU=/font/5a6L5L2T/fontsize/400/fill/I0JBQkFCMA==/dissolve/70/gravity/SouthEast)

Map

  • Map 接口
    • 最上级的基本Map接口,定义了实现Map的基本方法。
  • AbstractMap 抽象类
    • 一个对Map接口的默认实现方法。要实现一个Map大部分都继承了它。

ConcurrentMap

  • ConcurrentMap 接口
    • 继承了Map接口,定义了实现ConcurrentMap的基本方法。
  • ConcurrentHashMap 类
    • 继承了AbstractMap,实现了ConcurrentMap接口,具体的实现类。

具体分析

1.构造方法

ConcurrentHashMap 一共有五个构造方法分别是:

  1. 创建一个默认的ConcurrentHashMap初始大小是16。
 ConcurrentHashMap map=new ConcurrentHashMap();

2.创建一个ConcurrentHashMap,并指定初始化大小。

 int size=20;
ConcurrentHashMap map=new ConcurrentHashMap(size);

源码

 /*定义的最大容量 purposes*/
private static final int MAXIMUM_CAPACITY = 1 << 30;
/*构造方法*/
public ConcurrentHashMap(int initialCapacity) {
if (initialCapacity < 0)
throw new IllegalArgumentException();
int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
MAXIMUM_CAPACITY :
tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
this.sizeCtl = cap;
}
/*计算大小的方法 - 该方法用于计算不小于c的最小的2的幂 相当于table的大小总是小于c的最小的2的幂*/
private static final int tableSizeFor(int c) {
int n = c - 1;
n |= n >>> 1;
n |= n >>> 2;
n |= n >>> 4;
n |= n >>> 8;
n |= n >>> 16;
return (n < 0) ? 1 : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1;
}

3.创建一个ConcurrentHashMap,并指定初始大小和负载因子
负载因子loadFactor:当容量达到initialCapacity*loadFactor时,执行扩容

int concurrencyLevel=1
public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
if (initialCapacity < concurrencyLevel) // Use at least as many bins
initialCapacity = concurrencyLevel; // as estimated threads
long size = (long)(1.0 + (long)initialCapacity / loadFactor);
int cap = (size >= (long)MAXIMUM_CAPACITY) ?
MAXIMUM_CAPACITY : tableSizeFor((int)size);
this.sizeCtl = cap;
}

4.创建一个ConcurrentHashMap,并指定初始大小和负载因子和预估的并发更新线程数

    public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
if (initialCapacity < concurrencyLevel) // Use at least as many bins
initialCapacity = concurrencyLevel; // as estimated threads
long size = (long)(1.0 + (long)initialCapacity / loadFactor);
int cap = (size >= (long)MAXIMUM_CAPACITY) ?
MAXIMUM_CAPACITY : tableSizeFor((int)size);
this.sizeCtl = cap;
}

5.创建一个ConcurrentHashMap,将传入的Map映射到成新的ConcurrentHashMap.

public ConcurrentHashMap(Map<? extends K, ? extends V> m) {
this.sizeCtl = DEFAULT_CAPACITY;
putAll(m);
}

注意:ConcurrentHashMap在构造函数中只会初始化sizeCtl值,并不会直接初始化table,而是延缓到第一次put操作。
所以接下来我们来看一下它真正的初始化方法:

private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
while ((tab = table) == null || tab.length == 0) {
/*通过判断sizeCtl来得知是否进行了初始化*/
if ((sc = sizeCtl) < 0)
Thread.yield(); // lost initialization race; just spin
/*如果没有初始化那就使用UnSafe类中的CAS方法改变SIZECTL的状态为-1*/
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
if ((tab = table) == null || tab.length == 0) {
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
/*创建Node*/
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
/*表示初始化完成后table的容量*/
sc = n - (n >>> 2);
}
} finally {
sizeCtl = sc;
}
break;
}
}
return tab;
}

1.Put方法

public V put(K key, V value) {
return putVal(key, value, false);
}

final V putVal(K key, V value, boolean onlyIfAbsent) {
/*判空*/
if (key == null || value == null) throw new NullPointerException();
/*对hashCode进行再散列,算法为(h ^ (h >>> 16)) & HASH_BITS*/
int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0)
/*这里进行了真正的初始化*/
tab = initTable();
/*判断首节点是否为空*/
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
/*如果为空创建一个节点,成功则跳出Put循环*/
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
/*如果tab[i]不为空并且hash值为MOVED(-1),说明该链表正在进行transfer操作,返回扩容完成后的table。*/
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
else {
V oldVal = null;
synchronized (f) {
/*通过getObjectVolatile方法来获取Node 因为volatile修饰数组变量时,对其他线程可见的是数组对象的内存地址,而不是里面的元素*/
if (tabAt(tab, i) == f) {
/*如果f.hash >= 0,说明f是链表结构的头结点,遍历链表,如果找到对应的node节点,则修改value,否则在链表尾部加入节点。*/
if (fh >= 0) {
binCount = 1;
/*循环遍历链表*/
for (Node<K,V> e = f;; ++binCount) {
K ek;
/*这里如果Key的值相同就覆盖旧的值*/
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
/*如果f是TreeBin类型节点,说明f是红黑树根节点,则在树结构上遍历元素,更新或增加节点。*/
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
/*如果链表中节点数binCount >= TREEIFY_THRESHOLD(默认是8),则把链表转化为红黑树结构。*/
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
/*检查是否需要扩容,构建nextTable*/
addCount(1L, binCount);
return null;
}

2.Get方法

Get方法相对简单,没有用到同步方法

大概步骤:
1.计算key的hash。
2.通过tabAt方法获取对于地址的value.

对于Get方法能保证一致性,我是这样理解的:因为这里采用了tabAt方法,并且采用了volatile关键字。

 public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
int h = spread(key.hashCode());
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {
if ((eh = e.hash) == h) {
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
else if (eh < 0)
return (p = e.find(h, key)) != null ? p.val : null;
while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}

3.Remove方法

public V remove(Object key) {
return replaceNode(key, null, null);
}

根据代码可以看出只是做了一个replaceNode方法,根据对应的key将对应的value进行赋空操作,如果成功会返还一个key对应的值。


不管是replace方法还是remove方法具体的实现都是下面这段代码:

final V replaceNode(Object key, V value, Object cv) {
/*计算key值的hash*/
int hash = spread(key.hashCode());
/*循环*/
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
/*判Null 并将使用tabAt方法获取到的Node赋值给f*/
if (tab == null || (n = tab.length) == 0 ||
(f = tabAt(tab, i = (n - 1) & hash)) == null)
break;
/*是否在扩容*/
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
/*开始remove*/
else {
V oldVal = null;
boolean validated = false;
/*锁住f 及这个Node节点*/
synchronized (f) {
/*再取一次判断是否取对值*/
if (tabAt(tab, i) == f) {
/*链表的处理方法*/
if (fh >= 0) {
validated = true;
for (Node<K,V> e = f, pred = null;;) {
K ek;
/*匹配key*/
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
V ev = e.val;
/*匹配value*/
if (cv == null || cv == ev ||
(ev != null && cv.equals(ev))) {
oldVal = ev;
if (value != null)
e.val = value;
else if (pred != null)
pred.next = e.next;
else
/*将当前位置的节点设置为要移除的下一个节点*/
setTabAt(tab, i, e.next);
}
break;
}
pred = e;
if ((e = e.next) == null)
break;
}
}
/*树的移除方法*/
else if (f instanceof TreeBin) {
validated = true;
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> r, p;
if ((r = t.root) != null &&
(p = r.findTreeNode(hash, key, null)) != null) {
V pv = p.val;
if (cv == null || cv == pv ||
(pv != null && cv.equals(pv))) {
oldVal = pv;
if (value != null)
p.val = value;
else if (t.removeTreeNode(p))
setTabAt(tab, i, untreeify(t.first));
}
}
}
}
}
/*返回移除的对象*/
if (validated) {
if (oldVal != null) {
if (value == null)
addCount(-1L, -1);
return oldVal;
}
break;
}
}
}
return null;
}

4.Size方法

Java 1.8 的size方法主要是计算元素个数保存中的baseCount,部分元素的变化个数保存在CounterCell数组中的值进行累加:

public int size() {
long n = sumCount();
return ((n < 0L) ? 0 :
(n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
(int)n);
}

final long sumCount() {
CounterCell[] as = counterCells; CounterCell a;
long sum = baseCount;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}

结束

参考[占小狼](http://www.jianshu.com/p/e694f1e868ec)