LongAdder 源码分析

摘要:

  1. 有了 AtomicLong 为什么还会有 LongAdder

TOP 带着问题看源码

  1. 有了 AtomicLong 为什么还会有 LongAdder

1. 基本介绍

LongAdder 是一个线程安全,JDK 8新加入的一个用来计数的工具类

按照作者的说法,LongAdder 在多个线程更新下比 AtomicLong 性能更好,但要消耗更多的空间

LongAdder 继承自 Striped64,其对一些简单情况做了处理(cell 存在且更新没有竞争),复杂情况交给 Striped64 的 longAccumulate。

2. Striped64

Striped64 设计思路是把多个线程分散到不同计数单元,减少线程竞争,提高并发效率

2.1 成员变量分析

1
2
3
4
5
6
7
8
// 可用 CPU 数量
static final int NCPU = Runtime.getRuntime().availableProcessors();
// cell 数组,大小为2的幂次方
transient volatile Cell[] cells;
// 基础偏移值
transient volatile long base;
// 0 无锁 1 有锁
transient volatile int cellsBusy;

2.2 Cell 类分析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@sun.misc.Contended static final class Cell {
volatile long value;
Cell(long x) { value = x; }
final boolean cas(long cmp, long val) {
return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
}

// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
private static final long valueOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> ak = Cell.class;
valueOffset = UNSAFE.objectFieldOffset
(ak.getDeclaredField("value"));
} catch (Exception e) {
throw new Error(e);
}
}
}

Cell 类是 Striped64 的静态内部类。通过注解 `@sun.misc.Contended` 来自动实现缓存行填充,让 Java 编译器和 JRE 运行时来决定如何填充。本质上是一个填充了的、提供了 CAS 更新的 volatile 变量。

2.3 longAccumulate() 分析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
final void longAccumulate(long x, LongBinaryOperator fn,
boolean wasUncontended) {
int h;
// 获取线程的 probe hash值,如果 seed 初始化,probe 为非0
if ((h = getProbe()) == 0) {
// 如果 probe 为0,就强制初始化一次
ThreadLocalRandom.current(); // force initialization
// get 到 probe
h = getProbe();
wasUncontended = true;
}
boolean collide = false; // True if last slot nonempty
for (;;) {
Cell[] as; Cell a; int n; long v;
if ((as = cells) != null && (n = as.length) > 0) {
// 通过 hash值获取数组 cells 一个index
if ((a = as[(n - 1) & h]) == null) {
// 当前位置为空,并且拿到锁(cellsBusy 0是无锁,1是有锁)
if (cellsBusy == 0) { // Try to attach new Cell
// 构建一个 Cell
Cell r = new Cell(x); // Optimistically create
// casCellsBusy 会把 cellsBuy 设置为1,也即是获取锁
if (cellsBusy == 0 && casCellsBusy()) {
// 创建标识
boolean created = false;
try { // Recheck under lock
Cell[] rs; int m, j;
if ((rs = cells) != null &&
(m = rs.length) > 0 &&
// 计算hash位置j
rs[j = (m - 1) & h] == null) {
// 把新构建的 Cell 塞到数组cells的index j的地方
rs[j] = r;
// 更新创建完成状态
created = true;
}
} finally {
// free lock
cellsBusy = 0;
}
// 如果完成直接退出
if (created)
break;
// 否则继续创建(失败)
continue; // Slot is now non-empty
}
}
// 执行到这里说明也是失败(没拿到锁),设置碰撞标识为false
collide = false;
}
// hash位置已经有值,则往下走
else if (!wasUncontended) // CAS already known to fail
wasUncontended = true; // Continue after rehash
// 对当前位置累加,例如原本地方的值是1,要加1,现在则为2。成功就退出
else if (a.cas(v = a.value, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break;
else if (n >= NCPU || cells != as)
// cells 长度大于cpu数量,设置碰撞标识为false
collide = false; // At max size or stale
else if (!collide)
// 碰撞标识设置为 true
collide = true;
// 说明前面操作没有成功,再次尝试获取锁进行扩容
else if (cellsBusy == 0 && casCellsBusy()) {
try {
if (cells == as) { // Expand table unless stale
// 扩容2倍,然后数组copy
Cell[] rs = new Cell[n << 1];
for (int i = 0; i < n; ++i)
rs[i] = as[i];
cells = rs;
}
} finally {
// lock free
cellsBusy = 0;
}
// 扩容后重试
collide = false;
continue; // Retry with expanded table
}
// 重新计算 hash 值
h = advanceProbe(h);
}
// 1. 初始化 cells,
else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
boolean init = false;
try { // Initialize table
if (cells == as) {
// 最开始容量是2
Cell[] rs = new Cell[2];
// hash对应位置赋值
rs[h & 1] = new Cell(x);
cells = rs;
init = true;
}
} finally {
cellsBusy = 0;
}
if (init)
break;
}
// 初始化失败,CAS 把 value 累加到 base
else if (casBase(v = base, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break; // Fall back on using base
}
}

3. add() 分析

1
2
3
4
5
6
7
8
9
10
11
12
13
public void add(long x) {
Cell[] as; long b, v; int m; Cell a;
// cells 为空直接使用 cas 赋值,cas成功直接返回
if ((as = cells) != null || !casBase(b = base, b + x)) {
boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 ||
(a = as[getProbe() & m]) == null ||
!(uncontended = a.cas(v = a.value, v + x)))
// cas 失败 || cells 不为空 且 index 处为null || cas 再次修改失败
// 调用 Striped64 的 longAccumulate
longAccumulate(x, null, uncontended);
}
}

4. sum() 分析

熟悉 ConcurrentHashMap 的同鞋看到 sum 相比已经很熟悉,惰性按需计算,可能会不太精准

1
2
3
4
5
6
7
8
9
10
11
12
13
public long sum() {
Cell[] as = cells; Cell a;
// 先统计 base的值
long sum = base;
if (as != null) {
// 再遍历 cells 中的值进行累加
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}

5. reset() 分析

遍历 cells 数组,重置为0

1
2
3
4
5
6
7
8
9
10
public void reset() {
Cell[] as = cells; Cell a;
base = 0L;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
a.value = 0L;
}
}
}

总结

可以看到 LongAdder 的核心思路就是保证高并发最坏的情况,通过对线程进行散列分片减少竞争时长,利用上了多核的性能。这种设计方式和 CSAPP 中 提高并行性.md#%E6%8F%90%E9%AB%98%E5%B9%B6%E8%A1%8C%E6%80%A7](https://github.com/itliusir/CS_Notes/blob/master/操作系统/操作系统(十二).md#提高并行性) 提到的方式是一样的。

回到开篇 TOP 1 问题,可以看到 LongAdder 主要目的是解决高并发下 AtomicLong 自旋开销问题 。