ConcurrentHashMap 源码分析

TOP 带着问题看源码

  1. ConcurrentHashMap 是怎么保证线程安全的

1. 基本介绍

顾名思义,ConcurrentHashMap 是线程安全的 HashMap,HashMap 我们之前的文章已经分析过了,这次主要分析 ConcurrentHashMap 是怎么保证线程安全的。

img

  • AbstractMap 实现类

    提供一些围绕着iterator的基础方法

  • ConcurrentMap 接口

    声明是线程安全的 Map

  • Serializable 接口

    标记该类是可序列化的

2. 成员变量分析

1
2
3
4
5
6
7
8
// 存储数据数组
transient volatile Node<K,V>[] table;
// 扩容时新生成的数据,table 的2倍
private transient volatile Node<K,V>[] nextTable;
// 基础计数值,没有竞争时候使用
private transient volatile long baseCount;
// 控制符,-1 表示有线程正在初始化;-N 表示有N - 1个线程正在进行扩容;默认0 表示还没初始化;>0 表示下一次扩容或者初始化的大小
private transient volatile int sizeCtl;

3. 核心方法分析

3.1 新增数据操作

3.1.1 put(K key, V value)

put 方法调用的是 putVal,putIfAbsent 参数传的是 false,代表不去重

核心逻辑如下:

  1. 计算hash,这里算的是一定大于0的,当hash 小于 0代表是两种情况: ① MOVED(-1) 正在扩容 ② TREEBIN(-2) 此元素后是红黑树

  2. 检查是否初始化,没有初始化先初始化一次

  3. 根据hash值,查找对应位置的元素

    3.1 如果没有元素,就 CAS 放进去

    3.2 如果有元素,加锁针对链表或者红黑树(通过hash是否大于0来判断)进行替换或添加

  4. 检查链表长度,大于8就转为红黑树

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
public V put(K key, V value) {
return putVal(key, value, false);
}

/** Implementation for put and putIfAbsent */
final V putVal(K key, V value, boolean onlyIfAbsent) {
// key 或者 value 为 null 直接抛异常
if (key == null || value == null) throw new NullPointerException();
// 计算hash值
// 这里与hashmap不同的是计算过程中引入了HASH_BITS(0x7fffffff,高于30位都是0)将高位置0
int hash = spread(key.hashCode());
// 标记默认为0(数组)
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
// 懒初始化策略,使用时候发现为空才去调用初始化方法
if (tab == null || (n = tab.length) == 0)
tab = initTable();
// 和HashMap的区别是采用 volatile读
// 如果当前hash对应的位置为空使用 CAS 进行添加(放置)元素
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
// MOVED(-1) 是预留的hash值给,代表别的线程正在进行扩容操作
else if ((fh = f.hash) == MOVED)
// 帮助扩容
tab = helpTransfer(tab, f);
// 走到这里是说明计算的hash对应地方已经有元素了
// 执行替换或添加(链表、树)
else {
V oldVal = null;
// 对这个节点加锁
synchronized (f) {
// 在 synchronized 语义中再次判断一次
if (tabAt(tab, i) == f) {
// hash值不为预留的hash,这里主要是判断是不是 TREEBIN(-2) 来看此节点是否为红黑树
if (fh >= 0) {
// 标记为1,链表在遍历查找的时候会累加这个值(链表)
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
// 如果key也一样,就直接替换
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
// 否则,找到下一个为空的地方尾部插入
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
// 如果是红黑树
else if (f instanceof TreeBin) {
Node<K,V> p;
// 标记为2,小于8就行(红黑树)
binCount = 2;
// 调用红黑树的添加方法
// 二分查找到就直接返回,查不到就构建一个节点加到红黑树再返回
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
// 上面链表里标记的,这个值是代表了链表的长度
// 大于8就转为红黑树
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
// 数组的元素个数+1
addCount(1L, binCount);
return null;
}

3.2 查找数据操作

3.2.1 get(Object key)

查找元素相对于新增来说,不需要考虑并发,所以逻辑较为简单

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
// 计算 hash 值
int h = spread(key.hashCode());
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {
// 如果key相同,直接返回这个元素
if ((eh = e.hash) == h) {
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
// 如果是红黑树(TREEBIN(-2)),调用红黑树的查找方法
else if (eh < 0)
return (p = e.find(h, key)) != null ? p.val : null;
// 否则就是链表了,遍历链表进行查找
while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
// 找不到返回null
return null;
}

3.3 初始化 && 扩容操作

3.3.1 initTable()

核心逻辑就是:

  1. 检查 sizeCtl
  2. 设置 sizeCtl
  3. 设置容量,初始化 table(0.75的负载因子)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
// 循环
while ((tab = table) == null || tab.length == 0) {
// 检查 sizeCtl,如果小于0说明正在进行初始化或者扩容,先让出 CPU 等待一会
if ((sc = sizeCtl) < 0)
Thread.yield(); // lost initialization race; just spin
// 当前线程在初始化,设置为-1,失败就重试
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
if ((tab = table) == null || tab.length == 0) {
// 如果sc为空就设置容量为默认容量
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
// 初始化table
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
// 设置sc为n的0.75倍,可以看到这里是写死的,所以没有全局可变的负载因子
sc = n - (n >>> 2);
}
} finally {
sizeCtl = sc;
}
break;
}
}
return tab;
}

3.3.2 transfer(Node<K,V>[] tab, Node<K,V>[] nextTab)

核心逻辑如下:

  1. 新数组初始化为2倍
  2. 拆分为l两个链表(树)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int n = tab.length, stride;
// 如果每个核要处理的小于16就设置为16
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range
// 初始化,double size
if (nextTab == null) { // initiating
try {
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
transferIndex = n;
}
// 新数组的大小
int nextn = nextTab.length;
// ForwardingNode 是用来临时存放扩容时的节点
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
boolean advance = true;
boolean finishing = false; // to ensure sweep before committing nextTab
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh
// 循环,计算i值(--i)
while (advance) {
int nextIndex, nextBound;
if (--i >= bound || finishing)
advance = false;
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
// 如果迁移已经完成
if (finishing) {
nextTable = null;
table = nextTab;
sizeCtl = (n << 1) - (n >>> 1);
return;
}
// CAS 更新扩容阈值,sizeCtl - 1说明新加入了一个当前线程参与扩容
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
finishing = advance = true;
i = n; // recheck before commit
}
}
else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd);
// MOVED(-1) 表示遍历到了ForwardingNode节点,意味着该节点已经处理过了
else if ((fh = f.hash) == MOVED)
advance = true; // already processed
else {
// 加锁
synchronized (f) {
// 节点复制
if (tabAt(tab, i) == f) {
Node<K,V> ln, hn;
// 拆分为2个链表,这里是和hashMap类似的
if (fh >= 0) {
int runBit = fh & n;
Node<K,V> lastRun = f;
for (Node<K,V> p = f.next; p != null; p = p.next) {
// 根据是否为0来判断是低位链表还是高位链表
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
if (runBit == 0) {
ln = lastRun;
hn = null;
}
else {
hn = lastRun;
ln = null;
}
// 遍历链表
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
// 放到低位链表
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
// 放到高位链表
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
// 低位不变
setTabAt(nextTab, i, ln);
// 高位+n
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
// 红黑树
else if (f instanceof TreeBin) {
// 分为两个树,如果小于6就退化为链表
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
for (Node<K,V> e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
}
else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
}
}
}
}
}

3.4 元素个数操作

3.4.1 addCount(long x, int check)

主要做了两件事,一件事是更新元素个数,一个是检查是否扩容(这里省略这部分…)

当你看到 baseCount 和 CounterCell[] 是不是很熟悉,没错!和前面分析的 LongAdder 是一样的,就是散列开来分开计数,最后在统计,增加并行性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
private final void addCount(long x, int check) {
CounterCell[] as; long b, s;
if ((as = counterCells) != null ||
!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
CounterCell a; long v; int m;
boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 ||
(a = as[ThreadLocalRandom.getProbe() & m]) == null ||
!(uncontended =
U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
fullAddCount(x, uncontended);
return;
}
if (check <= 1)
return;
s = sumCount();
}
if (check >= 0) {
...
}
}

3.4.2 size()

统计 baseCount + counterCells

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public int size() {
long n = sumCount();
return ((n < 0L) ? 0 :
(n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
(int)n);
}
// 其实就是把基础统计和散列各线程的计数加起来,这里计数是不精准的
final long sumCount() {
CounterCell[] as = counterCells; CounterCell a;
long sum = baseCount;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}

通过对核心方法的分析,回到问题 TOP 1 可以知道,其主要采用 synchronized + CAS 来保证线程安全的

4. 总结

可以发现ConcurrentHashMap 采用 synchronized + CAS 来保证线程安全,尽量提高并行性。