浅谈ConcurrentHashMap实现原理

时间:2023-03-10 06:55:03
浅谈ConcurrentHashMap实现原理

我们都知道HashTable是线程安全的类,因为使用了Synchronized来锁整张Hash表来实现线程安全,让线程独占;

ConcurrentHashMap的锁分离技术就是用多个锁来控制对Hash表的不同部分进行修改,因为我可能只需要对一小块部分进行操作,而如果锁整张表开销太大了,其内部实现就是用Semgent来控制的,每个Semgent都是一个小的HashTable,他们有自己的锁;

然后有些方法需要访问整个表,比如Size,ContainsValue肯定是要访问整个表的数据,这个时候就需要锁整张表,ConcurrentHashMap就会按顺序锁定每个Segment,操作完毕之后再按顺序释放每个Segment,按顺序是为了防止出现死锁;

引用一张图片解释ConcurrentHashMap的结构:

浅谈ConcurrentHashMap实现原理

可以看出就是在HashMap的基础上多了一个数组(暂且这样理解),数组的每个元素就是Segment;Segment<K,V> extends ReentrantLock implements Serializable;

ConcurrentHashMap的构造函数:

public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
if (concurrencyLevel > MAX_SEGMENTS)
concurrencyLevel = MAX_SEGMENTS;
// Find power-of-two sizes best matching arguments
int sshift = 0;
int ssize = 1;
while (ssize < concurrencyLevel) {
++sshift;
ssize <<= 1;
}
this.segmentShift = 32 - sshift;
this.segmentMask = ssize - 1;
if (initialCapacity > MAXIMUM_CAPACITY)
initialCapacity = MAXIMUM_CAPACITY;
int c = initialCapacity / ssize;
if (c * ssize < initialCapacity)
++c;
int cap = MIN_SEGMENT_TABLE_CAPACITY;
while (cap < c)
cap <<= 1;
// create segments and segments[0]
Segment<K,V> s0 =
new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
(HashEntry<K,V>[])new HashEntry[cap]);
Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
this.segments = ss;
}

ConcurrentHahsMap进行Put操作的时候,对Key需要进行3次Hash运算才能确定最终的位置:hash1(key) -> x1(得到一个值);hash2(x1) -> x2(确定Segment位置,第二次hash是为了减少hash碰撞);hash3(x2) -> x3(确定元素放入哪个HashEntry);这里只是用比较好理解的白话说,下面会说原理;

ConcurrentHashMap中主要实体类就是三个:ConcurrentHashMap(整个Hash表),Segment(桶),HashEntry(节点);

 static final class HashEntry<K,V> {
final K key;
final int hash;
volatile V value;
final HashEntry<K,V> next;
}

可以看出,除了value不是final,其next属性都是final,说明不能从hash链的中间或尾部添加或删除节点;对于Put操作,可以一律将元素添加到hash链的头部,而对于remove操作,从中间删除一个节点,将该节点的前面所有节点复制一份并指向删除节点的下一节点;这会产生两条Hash链,这也是为了保证多线程访问的同步性;将value设置成volatile,这样避免了加锁;因为一个线程在执行get,另一个线程在执行remove,remove还没有执行完的时候,get能看到remove之前的值,如下图:

浅谈ConcurrentHashMap实现原理

下面说说Segment的数据结构:

static final class Segment<K,V> extends ReentrantLock implements Serializable {
/**
* count用来统计该段数据的个数,它是volatile,如增加/删除节点,都要写count值,每次读取操作开始都要读取count的值。
*/
transient volatileint count;
/**
* modCount统计段结构改变的次数,主要是为了检测对多个段进行遍历过程中某个段是否发生改变
*/
transient int modCount;
/**
* rehash界限值
*/
transient int threshold;
/**
* 就是上面的HashEntry
*/
transient volatile HashEntry<K,V>[] table;
/**
* 负载因子
*/
final float loadFactor;
}

ConcurrentHashMap的get方法是直接调用Segment的get方法:

public V get(Object key) {
int hash = hash(key); // throws NullPointerException if key null
return segmentFor(hash).get(key, hash);
}
V get(Object key, int hash) {
if (count != 0) { // read-volatile 当前桶的数据个数是否为0
HashEntry<K,V> e = getFirst(hash); 得到头节点
while (e != null) {
if (e.hash == hash && key.equals(e.key)) {
V v = e.value;
if (v != null)
return v;
return readValueUnderLock(e); // recheck
}
e = e.next;
}
}
return null;
}
 V readValueUnderLock(HashEntry<K,V> e) {
lock();
try {
return e.value;
} finally {
unlock();
}
}

get方法是不需要加锁的,因为上面已经看到了count和HashEntry都是volatile;

源码中增加value可能为null的情况,是为了保险起见,当节点正在发生改变而又未锁定时就可能为空,而HashEntry中的value不是final的,所以个人认为是为了加此判断;

如用于统计当前Segement大小的count字段和用于存储值的HashEntry的value。定义成volatile的变量,能够在线程之间保持可见性,能够被多线程同时读,并且保证不会读到过期的值,但是只能被单线程写(有一种情况可以被多线程写,就是写入的值不依赖于原值),在get操作里只需要读不需要写共享变量count和value,所以可以不用加锁。之所以不会读到过期的值,是根据java内存模型的happen before原则,对volatile字段的写入操作先于读操作,即使两个线程同时修改和获取volatile变量,get操作也能拿到最新的值,这是用volatile替换锁的经典应用场景;

接下来看看Put操作:

V put(K key, int hash, V value, boolean onlyIfAbsent) {
lock();
try {
int c = count;
if (c++ > threshold) // ensure capacity
rehash();
HashEntry<K,V>[] tab = table;
int index = hash & (tab.length - 1);
HashEntry<K,V> first = tab[index];
HashEntry<K,V> e = first;
while (e != null && (e.hash != hash || !key.equals(e.key)))
e = e.next;
V oldValue;
if (e != null) {
oldValue = e.value;
if (!onlyIfAbsent)
e.value = value;
}
else {
oldValue = null;
++modCount;
tab[index] = new HashEntry<K,V>(key, hash, first, value);
count = c; // write-volatile
}
return oldValue;
} finally {
unlock();
}
}

先上个锁,做个预判,容量+1的时候需不需要扩容,这个判断相比于HashMap做的非常精妙,因为HashMap是在增加元素之后才判断是否需要扩容,如果我扩容之后不增加元素,就白扩容了;

通过索引定位到HashEntry,判断如果不为空,替换节点的value,否则new一个Entry,将此Entry插入到链头,由构造函数得知,它的next则是first;

扩容默认也是会创建两倍容量的数组,进行计算将原数组的数据插入到新数组,ConcurrentHashMap不会对整个容器进行扩容,而只对某个segment进行扩容;

ConcurrentHashMap的size方法

需要统计所有segment的元素总和,为了不影响性能,它会先2次尝试不锁住segment来统计大小,如果统计的过程中count发生了变化,则会把所有的segment的put和remove方法全部锁住;通过modCount来协助判断容器是否发生变化;统计size前后判断modCount是否发生变化即可;

参考:http://www.cnblogs.com/ITtangtang/p/3948786.html