Synchronized 源码分析

TOP 带着问题看源码

  1. Synchronized 怎么做到同步的
  2. Synchronized 做了哪些优化
  3. Synchronized 有哪些缺点

1. Synchronized 基本介绍

前面我们已经介绍和分析了管程,而 Synchronized 则是 JVM 层面中管程的一种实现,它通过对细节的屏蔽方便了开发人员的使用。

2. 对象的内存结构

在 HotSpot 虚拟机中,对象在堆内存中的存储布局可以划分为三个部分:对象头、实例数据、对齐填充

我们可以通过使用工具 jol 打印对象的结构

obj example

1
2
3
4
5
public class ObjDemo {
int a;
String s;
long l;
}

print obj struct

1
2
3
4
5
6
import org.openjdk.jol.info.ClassLayout;

public void printObjStruct () {
System.out.println(
ClassLayout.parseInstance(new ObjDemo()).toPrintable());
}

result

1
2
3
4
5
6
7
8
9
10
OFFSET  SIZE               TYPE DESCRIPTION                               VALUE
0 4 (object header) 01 00 00 00 (00000001 00000000 00000000 00000000) (1)
4 4 (object header) 00 00 00 00 (00000000 00000000 00000000 00000000) (0)
8 4 (object header) 44 c1 00 f8 (01000100 11000001 00000000 11111000) (-134168252)
12 4 int ObjDemo.a 0
16 8 long ObjDemo.l 0
24 4 java.lang.String ObjDemo.s null
28 4 (loss due to the next object alignment)
Instance size: 32 bytes
Space losses: 0 bytes internal + 4 bytes external = 4 bytes total

2.1 对象头

HotSpot 虚拟机对象的对象头包括两类信息,一类是存储对象运行时数据,我们称为 Mark Word ;一类是类型指针,即对象指向它的类型元数据的指针(虚拟机通过该指针确定对象是哪个类的实例),我们称为 klass pointer

2.1.1 Klass Pointer

存放的是该对象对应的类的指针(指向方法区的内存区域)

2.1.2 Mark Word

markOop.hpp

1
2
3
4
5
6
7
8
9
10
11
// 64 bits:
// --------
// unused:25 hash:31 -->| unused:1 age:4 biased_lock:1 lock:2 (normal object)
// JavaThread*:54 epoch:2 unused:1 age:4 biased_lock:1 lock:2 (biased object)
// PromotedObject*:61 --------------------->| promo_bits:3 ----->| (CMS promoted object)
// size:64 ----------------------------------------------------->| (CMS free block)
//
// unused:25 hash:31 -->| cms_free:1 age:4 biased_lock:1 lock:2 (COOPs && normal object)
// JavaThread*:54 epoch:2 cms_free:1 age:4 biased_lock:1 lock:2 (COOPs && biased object)
// narrowOop:32 unused:24 cms_free:1 unused:4 promo_bits:3 ----->| (COOPs && CMS promoted object)
// unused:21 size:35 -->| cms_free:1 unused:7 ------------------>| (COOPs && CMS free block)

由 markOop 的源码注释,我们可以知道 mark word 在不同的状态下都存哪些东西。

对象的状态一共有五种:无锁(0 01)、可偏向锁(1 01)、轻量级锁(_ 00)、重量级锁(_ 10)、GC状态(_ 11)

注意:

  1. age 一直为4位的原因是我们 gc 的年龄最大是15就会被回收到老年代,所以 4 个 bit位就可以表示
  2. lock 2位只能表示4种状态,加上偏向标志biased lock 1位则可以表示5种状态
  3. epoch 是用来标示是否已经重偏向

通过对象头的结构我们可以知道,同步锁实际是存储在对象头中的,不难推断其同步的方式就是通过对象与线程的绑定占有来实现排他的效果。 Synchronized 关键字在编译后会在同步块的前后生成 monitorentermonitorexit 这两个字节码指令,JVM 会对其进行解析,如果锁的是对象则就是对该对象加锁解锁,如果是类方法则是对 Class 对象加锁解锁,如果是实例方法则是对对应的对象加锁解锁。

回到 TOP 1 问题,可以知道同步是通过对象与线程的绑定记录来实现的。

2.2 实例数据

真正存储的有效信息,默认顺序会按照虚拟机的默认分配顺序, 如果 -XX:CompactFields 参数为 true (默认为true),子类中较小的变量页允许插入到父类变量的空隙中。

2.3 对齐填充

由于 HotSpot 虚拟机的自动内存管理系统要求对象起始地址必须是 8 字节的整数倍,因此如果对象实例数据部分没有对齐的话就需要对齐填充来补全

3. monitorenter 源码解析

我们主要解析 monitorenter 加锁的例子。

3.1 入口

HotSpot 虚拟机实现了两种的解释器,分别是模板解释器 和 C++解释器,默认使用的是模板解释器。

C++ 解释器也即是我们平时用来实现功能的方法,简单明了但是很慢;模板解释器是跳过了编译器,自己使用汇编代码来做的,比较难懂。

所以 monitorenter 的两个入口,我们从 C++ 解释器的入口分析,更加容易明白。

3.2 偏向锁&轻量级锁

3.2.1 偏向锁的获取

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
CASE(_monitorenter): {
oop lockee = STACK_OBJECT(-1);
CHECK_NULL(lockee);
// 寻找空闲的锁记录(Lock Record) 空间
BasicObjectLock* limit = istate->monitor_base();
BasicObjectLock* most_recent = (BasicObjectLock*) istate->stack_base();
BasicObjectLock* entry = NULL;
while (most_recent != limit ) {
if (most_recent->obj() == NULL) entry = most_recent;
else if (most_recent->obj() == lockee) break;
most_recent++;
}
// 存在空闲的Lock Record
if (entry != NULL) {
// Lock Record 的 obj 指针指向锁对象
entry->set_obj(lockee);
int success = false;
uintptr_t epoch_mask_in_place = (uintptr_t)markOopDesc::epoch_mask_in_place;
markOop mark = lockee->mark();
intptr_t hash = (intptr_t) markOopDesc::no_hash;
// 如果锁对象的对象头标志是偏向模式(1 01)
if (mark->has_bias_pattern()) {
uintptr_t thread_ident;
uintptr_t anticipated_bias_locking_value;
thread_ident = (uintptr_t)istate->thread();
// 通过位运算计算anticipated_bias_locking_value
anticipated_bias_locking_value =
// 将线程id与prototype_header(epoch、分代年龄、偏向模式、锁标志)部分相或
(((uintptr_t)lockee->klass()->prototype_header() | thread_ident)
// 与锁对象的markword异或,相等为0
^ (uintptr_t)mark)
// 将上面结果中的分代年龄忽略掉
&~((uintptr_t) markOopDesc::age_mask_in_place);
// ① 为0代表偏向线程是当前线程 且 对象头的epoch与class的epoch相等,什么也不做
if (anticipated_bias_locking_value == 0) {
if (PrintBiasedLockingStatistics) {
(* BiasedLocking::biased_lock_entry_count_addr())++;
}
success = true;
}
// ② 偏向模式关闭,则尝试撤销
else if ((anticipated_bias_locking_value & markOopDesc::biased_lock_mask_in_place) != 0) {
// try revoke bias
markOop header = lockee->klass()->prototype_header();
if (hash != markOopDesc::no_hash) {
header = header->copy_set_hash(hash);
}
if (Atomic::cmpxchg_ptr(header, lockee->mark_addr(), mark) == mark) {
if (PrintBiasedLockingStatistics)
(*BiasedLocking::revoked_lock_entry_count_addr())++;
}
}
// ③ 锁对象头的 epoch 与 class 的 epoch 不相等,尝试重偏向
else if ((anticipated_bias_locking_value & epoch_mask_in_place) !=0) {
// try rebias
markOop new_header = (markOop) ( (intptr_t) lockee->klass()->prototype_header() | thread_ident);
if (hash != markOopDesc::no_hash) {
new_header = new_header->copy_set_hash(hash);
}
if (Atomic::cmpxchg_ptr((void*)new_header, lockee->mark_addr(), mark) == mark) {
if (PrintBiasedLockingStatistics)
(* BiasedLocking::rebiased_lock_entry_count_addr())++;
}
else {
// 有竞争重偏向失败,调用 monitorenter 锁升级
CALL_VM(InterpreterRuntime::monitorenter(THREAD, entry), handle_exception);
}
success = true;
}
// ④ 未偏向任何线程,尝试偏向
else {
markOop header = (markOop) ((uintptr_t) mark & ((uintptr_t)markOopDesc::biased_lock_mask_in_place |
(uintptr_t)markOopDesc::age_mask_in_place |
epoch_mask_in_place));
if (hash != markOopDesc::no_hash) {
header = header->copy_set_hash(hash);
}
markOop new_header = (markOop) ((uintptr_t) header | thread_ident);
// debugging hint
DEBUG_ONLY(entry->lock()->set_displaced_header((markOop) (uintptr_t) 0xdeaddead);)
// CAS 尝试修改
if (Atomic::cmpxchg_ptr((void*)new_header, lockee->mark_addr(), header) == header) {
if (PrintBiasedLockingStatistics)
(* BiasedLocking::anonymously_biased_lock_entry_count_addr())++;
}
// 有竞争偏向失败,调用 monitorenter 锁升级
else {
CALL_VM(InterpreterRuntime::monitorenter(THREAD, entry), handle_exception);
}
success = true;
}
}

// 走到这里说明偏向的不是当前线程或没有开启偏向锁等原因
if (!success) {
// 轻量级锁逻辑 start
// 构造无锁状态 Mark Word 的 copy(Displaced Mark Word)
markOop displaced = lockee->mark()->set_unlocked();
// 将锁记录空间(Lock Record)指向Displaced Mark Word
entry->lock()->set_displaced_header(displaced);
// 是否禁用偏向锁和轻量级锁
bool call_vm = UseHeavyMonitors;
if (call_vm || Atomic::cmpxchg_ptr(entry, lockee->mark_addr(), displaced) != displaced) {
// 判断是不是锁重入,是的话把Displaced Mark Word设置为null来表示重入
// 置null的原因是因为要记录重入次数,但是mark word大小有限,所以每次重入都在栈帧中新增一个Displaced Mark Word为null的记录
if (!call_vm && THREAD->is_lock_owned((address) displaced->clear_lock_bits())) {
entry->lock()->set_displaced_header(NULL);
} else {
// 若禁用则锁升级
CALL_VM(InterpreterRuntime::monitorenter(THREAD, entry), handle_exception);
}
}
}
UPDATE_PC_AND_TOS_AND_CONTINUE(1, -1);
} else {
istate->set_msg(more_monitors);
UPDATE_PC_AND_RETURN(0); // Re-execute
}
}

回到 TOP 2 问题,可以知道在真正系统同步之前对竞争态小的做了偏向锁和轻量级锁的优化。

3.2.2 偏向锁的撤销

这里的撤销并不是锁的释放,而是尝试获取偏向锁因为不满足条件把锁改为非偏向锁状态

  • JavaThread* thread 是指 java 中当前线程
  • BasicObjectLock* elem 包含对象头数据和 oop 指针
  • UseBiasedLocking 是指是否启动偏向锁标识,JVM 启动默认是启动偏向锁

获取偏向锁失败会进入下面逻辑,如果是支持偏向锁,走 fast_enter ,否则走 slow_enter

interpreterRuntime.cpp

1
2
3
4
5
6
7
8
9
10
IRT_ENTRY_NO_ASYNC(void, InterpreterRuntime::monitorenter(JavaThread* thread, BasicObjectLock* elem))
...
if (UseBiasedLocking) {
// Retry fast entry if bias is revoked to avoid unnecessary inflation
ObjectSynchronizer::fast_enter(h_obj, elem->lock(), true, CHECK);
} else {
ObjectSynchronizer::slow_enter(h_obj, elem->lock(), CHECK);
}
...
IRT_END

synchronizer.cpp

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
void ObjectSynchronizer::fast_enter(Handle obj, BasicLock* lock, bool attempt_rebias, TRAPS) {
// 再次判断是否开启偏向锁
if (UseBiasedLocking) {
// 未处于全局安全点
if (!SafepointSynchronize::is_at_safepoint()) {
// 撤销或重偏向
BiasedLocking::Condition cond = BiasedLocking::revoke_and_rebias(obj, attempt_rebias, THREAD);
if (cond == BiasedLocking::BIAS_REVOKED_AND_REBIASED) {
return;
}
} else {
// 处于全局安全点,撤销偏向锁
assert(!attempt_rebias, "can not rebias toward VM thread");
BiasedLocking::revoke_at_safepoint(obj);
}
assert(!obj->mark()->has_bias_pattern(), "biases should be revoked by now");
}

slow_enter (obj, lock, THREAD) ;
}

biasedLocking.cpp

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
BiasedLocking::Condition BiasedLocking::revoke_and_rebias(Handle obj, bool attempt_rebias, TRAPS) {
// 获取对象头的 Mark Word
markOop mark = obj->mark();
// ① 是否是可偏向状态(后三位是否为 1 01) 且 Thread ID 为 null 且 attempt_rebias 为 false(如锁对象的hashcode方法被调用),需要撤销偏向锁
if (mark->is_biased_anonymously() && !attempt_rebias) {
markOop biased_value = mark;
// 构造非偏向的markword
markOop unbiased_prototype = markOopDesc::prototype()->set_age(mark->age());
// CAS 设置重新设置偏向锁状态(撤销)
markOop res_mark = (markOop) Atomic::cmpxchg_ptr(unbiased_prototype, obj->mark_addr(), mark);
if (res_mark == biased_value) {
return BIAS_REVOKED;
}
// ② 是否是可偏向状态(后三位是否为 1 01)
} else if (mark->has_bias_pattern()) {
Klass* k = obj->klass();
markOop prototype_header = k->prototype_header();
// ②-① 已经有线程对对象做了锁定,需要撤销偏向锁
if (!prototype_header->has_bias_pattern()) {
markOop biased_value = mark;
markOop res_mark = (markOop) Atomic::cmpxchg_ptr(prototype_header, obj->mark_addr(), mark);
return BIAS_REVOKED;
// ②-② epoch 不相等,说明重偏向过,已过期,需要撤销偏向锁
} else if (prototype_header->bias_epoch() != mark->bias_epoch()) {
if (attempt_rebias) {
assert(THREAD->is_Java_thread(), "");
markOop biased_value = mark;
markOop rebiased_prototype = markOopDesc::encode((JavaThread*) THREAD, mark->age(), prototype_header->bias_epoch());
markOop res_mark = (markOop) Atomic::cmpxchg_ptr(rebiased_prototype, obj->mark_addr(), mark);
if (res_mark == biased_value) {
return BIAS_REVOKED_AND_REBIASED;
}
// 不允许获取偏向锁,撤销锁
} else {
markOop biased_value = mark;
markOop unbiased_prototype = markOopDesc::prototype()->set_age(mark->age());
markOop res_mark = (markOop) Atomic::cmpxchg_ptr(unbiased_prototype, obj->mark_addr(), mark);
if (res_mark == biased_value) {
return BIAS_REVOKED;
}
}
}
}
// 批量重偏向 批量撤销
HeuristicsResult heuristics = update_heuristics(obj(), attempt_rebias);
if (heuristics == HR_NOT_BIASED) {
return NOT_BIASED;
} else if (heuristics == HR_SINGLE_REVOKE) {
Klass *k = obj->klass();
markOop prototype_header = k->prototype_header();
// 需要撤销的是偏向当前线程的锁
if (mark->biased_locker() == THREAD &&
prototype_header->bias_epoch() == mark->bias_epoch()) {
ResourceMark rm;
if (TraceBiasedLocking) {
tty->print_cr("Revoking bias by walking my own stack:");
}
BiasedLocking::Condition cond = revoke_bias(obj(), false, false, (JavaThread*) THREAD);
((JavaThread*) THREAD)->set_cached_monitor_info(NULL);
assert(cond == BIAS_REVOKED, "why not?");
return cond;
} else {
// 调用revoke_bias方法
// revoke_bias 主要逻辑
// 1. 查看当前线程是否存活,不存活直接撤销
// 2. 偏向的线程如果不在同步块直接撤销(通过遍历线程栈的Lock Record来判断)
// 3. 轻量级锁逻辑
VM_RevokeBias revoke(&obj, (JavaThread*) THREAD);
VMThread::execute(&revoke);
return revoke.status_code();
}
}

assert((heuristics == HR_BULK_REVOKE) ||
(heuristics == HR_BULK_REBIAS), "?");
// 批量撤销、重定向
VM_BulkRevokeBias bulk_revoke(&obj, (JavaThread*) THREAD,
(heuristics == HR_BULK_REBIAS),
attempt_rebias);
VMThread::execute(&bulk_revoke);
return bulk_revoke.status_code();
}

3.3 重量级锁

synchronizer.cpp

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
void ObjectSynchronizer::slow_enter(Handle obj, BasicLock* lock, TRAPS) {
markOop mark = obj->mark();
assert(!mark->has_bias_pattern(), "should not see bias pattern here");
// 如果是无锁状态
if (mark->is_neutral()) {
// Anticipate successful CAS -- the ST of the displaced mark must
// be visible <= the ST performed by the CAS.
lock->set_displaced_header(mark);
if (mark == (markOop) Atomic::cmpxchg_ptr(lock, obj()->mark_addr(), mark)) {
TEVENT (slow_enter: release stacklock) ;
return ;
}
// 轻量级锁重入
} else
if (mark->has_locker() && THREAD->is_lock_owned((address)mark->locker())) {
assert(lock != mark->locker(), "must not re-lock the same lock");
assert(lock != (BasicLock*)obj->mark(), "don't relock with same BasicLock");
lock->set_displaced_header(NULL);
return;
}
...
// 锁标记为_ 10,膨胀为重量级锁,调用 enter 方法
lock->set_displaced_header(markOopDesc::unused_mark());
ObjectSynchronizer::inflate(THREAD, obj())->enter(THREAD);
}

3.3.1 膨胀过程

膨胀过程很简单,就是对不同状态的对象进行不同的处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
ObjectMonitor * ATTR ObjectSynchronizer::inflate (Thread * Self, oop object) {
assert (Universe::verify_in_progress() ||
!SafepointSynchronize::is_at_safepoint(), "invariant") ;

for (;;) {
const markOop mark = object->mark() ;
// ① 如果已经是重量级锁状态直接返回
if (mark->has_monitor()) {
ObjectMonitor * inf = mark->monitor() ;
return inf ;
}
// ② 如果是正在膨胀中,重试等待另一个膨胀完
if (mark == markOopDesc::INFLATING()) {
TEVENT (Inflate: spin while INFLATING) ;
ReadStableMark(object) ;
continue ;
}
// ③ 如果是轻量级锁状态
if (mark->has_locker()) {
// 先初始化 ObjectMonitor 对象
ObjectMonitor * m = omAlloc (Self) ;
m->Recycle();
m->_Responsible = NULL ;
m->OwnerIsThread = 0 ;
m->_recursions = 0 ;
m->_SpinDuration = ObjectMonitor::Knob_SpinLimit ; // Consider: maintain by type/class
// 设置状态为膨胀中
markOop cmp = (markOop) Atomic::cmpxchg_ptr (markOopDesc::INFLATING(), object->mark_addr(), mark) ;
if (cmp != mark) {
omRelease (Self, m, true) ;
continue ; // Interference -- just retry
}
markOop dmw = mark->displaced_mark_helper() ;
// Setup monitor fields to proper values -- prepare the monitor
m->set_header(dmw) ;
m->set_owner(mark->locker());
m->set_object(object);
guarantee (object->mark() == markOopDesc::INFLATING(), "invariant") ;
// 将锁对象头设置为重量级锁状态
object->release_set_mark(markOopDesc::encode(m));

if (ObjectMonitor::_sync_Inflations != NULL) ObjectMonitor::_sync_Inflations->inc() ;
TEVENT(Inflate: overwrite stacklock) ;
if (TraceMonitorInflation) {
if (object->is_instance()) {
ResourceMark rm;
tty->print_cr("Inflating object " INTPTR_FORMAT " , mark " INTPTR_FORMAT " , type %s",
(void *) object, (intptr_t) object->mark(),
object->klass()->external_name());
}
}
return m ;
}
// ④ 如果是无锁状态
ObjectMonitor * m = omAlloc (Self) ;
// prepare m for installation - set monitor to initial state
m->Recycle();
m->set_header(mark);
m->set_owner(NULL);
m->set_object(object);
m->OwnerIsThread = 1 ;
m->_recursions = 0 ;
m->_Responsible = NULL ;
m->_SpinDuration = ObjectMonitor::Knob_SpinLimit ; // consider: keep metastats by type/class
// CAS 设置对象头标志为重量级锁
if (Atomic::cmpxchg_ptr (markOopDesc::encode(m), object->mark_addr(), mark) != mark) {
// 有竞争设置失败,释放monitor重试
m->set_object (NULL) ;
m->set_owner (NULL) ;
m->OwnerIsThread = 0 ;
m->Recycle() ;
omRelease (Self, m, true) ;
m = NULL ;
continue ;
}

// Hopefully the performance counters are allocated on distinct
// cache lines to avoid false sharing on MP systems ...
if (ObjectMonitor::_sync_Inflations != NULL) ObjectMonitor::_sync_Inflations->inc() ;
TEVENT(Inflate: overwrite neutral) ;
if (TraceMonitorInflation) {
if (object->is_instance()) {
ResourceMark rm;
tty->print_cr("Inflating object " INTPTR_FORMAT " , mark " INTPTR_FORMAT " , type %s",
(void *) object, (intptr_t) object->mark(),
object->klass()->external_name());
}
}
return m ;
}
}

3.3.2 获取重量级锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
void ATTR ObjectMonitor::enter(TRAPS) {
Thread * const Self = THREAD ;
void * cur ;
// 无锁状态,通过 CAS 直接获得锁
cur = Atomic::cmpxchg_ptr (Self, &_owner, NULL) ;
if (cur == NULL) {
return ;
}
// 锁重入,计数,直接返回
if (cur == Self) {
_recursions ++ ;
return ;
}
// 当前线程之前是轻量级锁状态,重置owner为当前线程(之前是执行锁记录(Lock Record)的指针),重置重入计数为1
if (Self->is_lock_owned ((address)cur)) {
assert (_recursions == 0, "internal state error");
_recursions = 1 ;
_owner = Self ;
OwnerIsThread = 1 ;
return ;
}
Self->_Stalled = intptr_t(this) ;
// 调用系统同步操作之前,先尝试自旋获得锁
if (Knob_SpinEarly && TrySpin (Self) > 0) {
Self->_Stalled = 0 ;
return ;
}

JavaThread * jt = (JavaThread *) Self ;
Atomic::inc_ptr(&_count);
EventJavaMonitorEnter event;

{
...
for (;;) {
jt->set_suspend_equivalent();
EnterI (THREAD) ;
if (!ExitSuspendEquivalent(jt)) break ;
_recursions = 0 ;
_succ = NULL ;
exit (false, Self) ;

jt->java_suspend_self();
}
Self->set_current_pending_monitor(NULL);
}
...
}

3.3.3 调用 EnterI 方法获得锁

核心逻辑分为三步:

  1. 将当前线程封装为 node 塞到队列 cxq 的队头
  2. 调用 park 挂起当前线程
  3. 被唤醒后再次尝试获取锁(在唤醒时候会根据不同的唤醒策略定义 cxq 与 EntryList 的优先级)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
void ATTR ObjectMonitor::EnterI (TRAPS) {
Thread * Self = THREAD ;
// 尝试获取锁
if (TryLock (Self) > 0) {
return ;
}

DeferredInitialize () ;
// 自旋
if (TrySpin (Self) > 0) {
return ;
}
// 如果没获取到锁(锁被占用)就把线程封装到node节点中
ObjectWaiter node(Self) ;
Self->_ParkEvent->reset() ;
node._prev = (ObjectWaiter *) 0xBAD ;
node.TState = ObjectWaiter::TS_CXQ ;
ObjectWaiter * nxt ;
for (;;) {
// 将node节点插入到 cxq 队列的头部
node._next = nxt = _cxq ;
// CAS 成功直接返回
if (Atomic::cmpxchg_ptr (&node, &_cxq, nxt) == nxt) break ;
// CAS 失败走这里,重试获取锁
if (TryLock (Self) > 0) {
return ;
}
}
// 如果没有等待的线程,把_Responsible设置为自己
if ((SyncFlags & 16) == 0 && nxt == NULL && _EntryList == NULL) {
Atomic::cmpxchg_ptr (Self, &_Responsible, NULL) ;
}
TEVENT (Inflated enter - Contention) ;
int nWakeups = 0 ;
int RecheckInterval = 1 ;

for (;;) {
// 再次尝试获取锁
if (TryLock (Self) > 0) break ;

if ((SyncFlags & 2) && _Responsible == NULL) {
Atomic::cmpxchg_ptr (Self, &_Responsible, NULL) ;
}

// 调用 park 函数挂起当前线程
if (_Responsible == Self || (SyncFlags & 1)) {
TEVENT (Inflated enter - park TIMED) ;
Self->_ParkEvent->park ((jlong) RecheckInterval) ;
// Increase the RecheckInterval, but clamp the value.
RecheckInterval *= 8 ;
if (RecheckInterval > 1000) RecheckInterval = 1000 ;
} else {
TEVENT (Inflated enter - park UNTIMED) ;
Self->_ParkEvent->park() ;
}

if (TryLock(Self) > 0) break ;
TEVENT (Inflated enter - Futile wakeup) ;
if (ObjectMonitor::_sync_FutileWakeups != NULL) {
ObjectMonitor::_sync_FutileWakeups->inc() ;
}
++ nWakeups ;
if ((Knob_SpinAfterFutile & 1) && TrySpin (Self) > 0) break ;
if ((Knob_ResetEvent & 1) && Self->_ParkEvent->fired()) {
Self->_ParkEvent->reset() ;
OrderAccess::fence() ;
}
// 在释放锁时,_succ会被设置为 EntryList 或 cxq 中的一个线程
if (_succ == Self) _succ = NULL ;

// Invariant: after clearing _succ a thread *must* retry _owner before parking.
OrderAccess::fence() ;
}
// 上个死循环如果结束意味着获取到锁了
// 将当前线程 node 从 EntryList 或 cxq 移除
UnlinkAfterAcquire (Self, &node) ;
if (_succ == Self) _succ = NULL ;
if (_Responsible == Self) {
_Responsible = NULL ;
OrderAccess::fence(); // Dekker pivot-point
}
if (SyncFlags & 8) {
OrderAccess::fence() ;
}
return ;
}

4. 总结

可以看到 Synchronized 的加锁过程非常复杂,但是核心设计还是在不同的竞争态选择不同的优化模式,尽量使同步的开销减少到最小。而重量级的实现可以发现就是管程的一种实现模式,对比并发包的 Java 层面实现的管程,Synchronized 的条件变量等待队列比较单一,只有 wait 一种。虽然灵活性不如并发包的锁,但是在异常不可控和代码维护方面 Synchronized 无疑是更好。

回到 TOP 3 问题,可以知道 Synchronized 缺点就是不灵活(条件变量、代码使用、超时时间),不公平,不能响应中断。