publicvoidrun(){ // 线程状态不为 NEW 或者 修改当前线程来运行当前任务失败,直接返回 if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { Callable<V> c = callable; // 再次校验线程状态 if (c != null && state == NEW) { // 注意盯着这个运行结果变量 V result; boolean ran; try { // 任务执行 result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; // 执行异常就修改线程状态为 EXCEPTIONAL setException(ex); } if (ran) // 执行正常就修改线程的状态为 NORMAL set(result); } } finally { runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } }
public V get() throws InterruptedException, ExecutionException { int s = state; // 如果线程状态没完成,就进入等待队列 if (s <= COMPLETING) s = awaitDone(false, 0L); return report(s); }
private int awaitDone(boolean timed, long nanos) throws InterruptedException { final long deadline = timed ? System.nanoTime() + nanos : 0L; WaitNode q = null; boolean queued = false; // 自旋 for (;;) { if (Thread.interrupted()) { removeWaiter(q); thrownew InterruptedException(); }
int s = state; // 已完成就返回 if (s > COMPLETING) { if (q != null) q.thread = null; return s; } // 快完成(异常),就等一会 elseif (s == COMPLETING) // cannot time out yet Thread.yield(); // 第一次进来一般会走到这里,把当前线程构建一个等待节点 elseif (q == null) q = new WaitNode(); // 第二次循环尝试把节点入队 elseif (!queued) queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q); // 如果有超时时间 elseif (timed) { nanos = deadline - System.nanoTime(); if (nanos <= 0L) { removeWaiter(q); return state; } LockSupport.parkNanos(this, nanos); } // 如果发现入队失败(已经入队过了),就挂起当前线程 else LockSupport.park(this); } }