Java并发思维导图
Figure 1. Java并发思维导图

1. 多线程基础

1.1. 创建线程的方式

1.1.1. 继承Thread

public class ThreadA extends Thread {
    @Override
    public void run() {
    }
    public static void main(String[] args){
      new ThreadA().start();
    }
}

1.1.2. 实现Runnable接口, 创建Thread对象

public class ThreadB implements Runnable {
    @Override
    public void run() {
    }
    public static void main(String[] args){
      new Thread(new ThreadB()).start();
    }
}

1.1.3. 实现Callable接口, 创建FutureTask对象

import java.util.concurrent.Callable;
import java.util.concurrent.FutureTask;

public class ThreadC implements Callable<String> {
    @Override
    public String call() {
        return "Callable";
    }
    public static void main(String[] args) throws Exception {
      FutureTask<String> task = new FutureTask<>(new ThreadC());
      task.run();
      task.get();
    }
}

1.2. 关闭线程的方式

1.2.1. 设置关闭标志位

维护一个成员变量标识当前线程是否处于运行状态, 如果当前状态为停止, 则结束任务.

public class ThreadStopDemo1 extends Thread {

    private volatile boolean stopped = false;

    @Override
    public void run() {
        while (!stopped){
            // working here. (1)
        }
    }

    public void stopWork() {
        this.stopped = true;
    }
}
1 如果内部一直busy或阻塞住, 那么就会无法响应外部的停止信号.

1.2.2. 中断线程

public class ThreadStopDemo2 extends Thread {

    @Override
    public void run() {
        try {
            Thread.sleep(10000L);
        } catch (InterruptedException e) {
            // stop working. (1)
        }
    }

    public static void main(String[] args) {
        Thread thread = new ThreadStopDemo2();
        thread.start();
        thread.interrupt(); (2)
    }
}
1 通过catch住异常来中断当前的TIME_WAITING状态.
2 唤醒指定线程.

只有方法签名里会抛出InterruptedException的方法才会抛出异常:

  • [Thread] public static native void sleep(long millis) throws InterruptedException;

  • [Thread] public final void join() throws InterruptedException

  • [Object] public final void wait() throws InterruptedException

1.3. 线程状态

  • NEW

  • RUNNABLE

  • WAITING

  • TIMED_WAITING

  • BLOCKED

  • TERMINATED

线程状态流转图
Figure 2. 线程状态流转图

2. 线程方法

2.1. run

调用Runnable对象的run方法.
如果使用继承Thread的方式来创建线程对象, 则会重写run方法.

2.2. start

start方法会运行当前创建出来的线程, 线程状态从 NEW 变为 RUNNABLE .
start方法不能重复调用.

2.3. sleep

当前线程从 RUNNABLE 变为 TIME_WAITING 状态.

如果sleep过程中调用方调用了这个线程的 interrupt() 方法, 则 sleep 方法会抛出 InterruptedException .

2.4. yield

当前线程从 RUNNING 变为 RUNNABLE 状态, 具体由操作系统实现, Thread.State 中均为 RUNNABLE 枚举值.

2.5. join

等待指定线程执行完.

join 是使用 wait 来实现的.

2.6. interrupt

打断 sleep/wait/join 的线程, 设置 interrupted 标志位为true.

2.7. interrupted

返回当前线程的打断标志, 然后重置为false.

  • 每一条线程都有自己的栈空间,拥有一份方法参数、局部变量和返回值的拷贝.每一个线程都有自己的一份标识信息,包括线程名、线程优先级、线程是否存活、线程执行状态、守护线程标识等.

  • 线程内异常无法在外层try-catch, 只能设置Thread的UncaughtExceptionHandler.

  • wait()释放对象锁, sleep()不释放.

3. volatile

  • 保障可见性: volatile修饰的变量值被修改后, 其他线程工作内存里缓存的该变量的值会标记为过期, 需要从主存重新取值.

  • 保障有序性: volatile变量读写前后会加入内存屏障, 保障读写指令不会乱序执行.

  • 修饰double/long时保证写入/读取的原子性, 但不保证代码块对字段操作的原子性.

3.1. 读写屏障来实现volatile语义

// StoreStoreFence
写操作
// StoreLoadFence

// LoadLoadFence
读操作
// LoadStoreFence

4. synchronized

4.1. 使用方式

  • 修饰成员方法

  • 修饰静态方法

  • 代码块指定修饰对象

4.2. 实现原理

synchronized基于Monitor实现. 在代码块前后和异常表 to 之后分别插入 monitorenter/monitorexit 指令.

Monitor锁对象组成:
* Owner 持有该Monitor锁的对象.
* EntryList 保存竞争该Monitor锁的 Blocked 状态的对象.
* WaitSet 保存竞争该Monitor锁的 Waiting 状态的对象.

  1. 线程1竞争锁时发现Owner为空, 则设置Owner为线程1.

  2. 线程2竞争锁, 发现Owner不为空, 则进入EntryList等待唤醒.

  3. 线程1释放锁, Owner置为空, 唤醒EntryList里的一个线程, 设置为Owner.

4.3. 锁升级的过程

4.3.1. MarkWord

无锁: \$ubrace("unused")_(25位) ubrace("hash")_(31位) ubrace("unused")_(1位) ubrace("age")_(4位) ubrace("bias_lock")_(1位)^0 ubrace("lock")_(2位)^01\$
偏向锁: \$ubrace("thread")_(54位) ubrace("epoch")_(2位) ubrace("unused")_(1位) ubrace("age")_(4位) ubrace("bias_lock")_(1位)^1 ubrace("lock")_(2位)^01\$
轻量级锁: \$ubrace("ptr_lock_record")_(62位) ubrace("lock")_(2位)^00\$
重量级锁: \$ubrace("ptr_monitor")_(62位) ubrace("lock")_(2位)^10\$

无锁 → 偏向锁 → 轻量级锁 → 重量级锁

4.3.2. 轻量级锁

加锁
  1. 每次竞争锁时, 线程栈帧中都会生成一个新的 LockRecord 对象(LockRecord地址 00 )和指向持有该线程锁的对象引用地址,

  2. 第一次有对象竞争这个线程的锁时, 把 LockRecord 地址和对象的MarkWord( hash age bias 01 )cas互换, 后面两位设置为 00 , 将栈帧中的对象引用指向这个对象.

  3. 如果cas失败:

    1. 锁竞争对象MarkWord中LockRecord地址指向当前线程, 则表示该次竞争属于锁重入, cas会设置自己的LockRecord地址为null, 将栈帧中的对象引用指向这个对象.

    2. 已经有其他线程持有了本线程的锁, 则进入 锁膨胀 的过程.

解锁
  1. 每次释放锁时cas检测是否栈顶的LockRecord对象记录的值是否为null:

    1. 如果为null, 表示有重入, 将该LockRecord对象出栈.

    2. 如果不为null, 用cas更新对象的MarkWord为LockRecord中的hash.

      1. 更新成功, 表示解锁成功, 出栈.

      2. 更新失败, 表示该LockRecord指向对象持有的是重量级锁.

4.3.3. 重量级锁

加锁
  1. 线程cas更新对象MarkWord里的hash值为自己的LockRecord地址值失败, 则表示已经有其他线程持有了这个对象的轻量级锁, 此时进入锁膨胀的过程.

  2. 申请一个Monitor对象

    1. 将Monitor的Owner地址指向此时MarkWord里的LockRecord地址.

    2. 将对象的MarkWord设置为Monitor对象的地址,

    3. 将当前线程放入Monitor的EntryList里.

解锁
  1. 将Monitor的Owner置为null.

  2. 从EntryList里唤醒一个线程让其持有这个Monitor锁, 设置为Owner.

4.3.4. 自旋锁

重量级锁加锁失败时, 会自旋尝试多次, 尝试失败后才会把自己加到EntryList里.

4.3.5. 偏向锁

对象初始化时, MarkWord最后三位设置为 \$101\$, 第一次竞争锁时线程id存储在MarkWord的前54位里, 下一次该线程竞争锁时可以直接进入代码同步块.

  • -XX:BiasedLockingStartupDelay=0 设置偏向锁不延迟打开.

偏向锁的撤销
  • JVM关闭偏向锁的功能. -XX:-UseBiasedLocking

  • 有其他线程竞争锁.

  • 调用wait/notify, 因为此时需要依赖Monitor对象的WaitSet.

5. LockSupport

每个线程都关联一个Parker对象, 由 _counter, _cond, _mutex 三部分组成.

LockSupport.park()
  • _counter 为0时, 进入阻塞状态.

  • _counter 为1时, 不进入阻塞状态, 继续运行.

  • 重置 _counter 为0.

LockSupport.unPark(Thread)
  • 如果线程处于阻塞状态, 就唤醒线程继续运行.

  • 如果线程处于运行状态, 则设置 _counter 为1, 线程继续运行.

6. CAS

6.1. 实现原理

基于CPU指令 cmpxchg 比较并交换, 如果提供的值与获取到的值相等则赋值成功, 否则赋值失败.

Unsafe.java
public final class Unsafe {
    public final int getAndAddInt(Object o, long offset, int delta) {
        int v;
        do {
            v = getIntVolatile(o, offset); (1)
        } while (!weakCompareAndSetInt(o, offset, v, v + delta)); (2)
        return v;
    }
}
1 获取变量最新的值.
2 CAS更新为最新的值.

6.2. ABA问题

如果另外一个线程把值从A改为B再改为A, 那么比较的时候会认为该值没有被修改过, 这种情况称之为ABA问题.

ABA问题的解决方案:
  • AtomicStampedReference: 用一个int变量标识当前值的版本号, 每次cas还需要提供新旧的版本号.

  • AtomicMarkableReference: 用boolean变量作为当前值的标志, 每次cas需要提供新旧的标志.

6.3. LongAddr

LongAddr内部维护一个base变量加多个单元, 并发高的情况下可以将CAS并发竞争的操作分摊到各个单元里.
最后取值的时候, 再对base+这些单元求和.
LongAddr求和的时候没有对单元加锁, 所以取值操作只满足最终一致性.

LongAddr.java
public class LongAddr extends Striped64 implements Serializable {
    public void add(long x) {
        Cell[] cs; long b, v; int m; Cell c;
        if ((cs = cells) != null || !casBase(b = base, b + x)) { (1)
            boolean uncontended = true;
            if (cs == null || (m = cs.length - 1) < 0 ||
                (c = cs[getProbe() & m]) == null ||
                !(uncontended = c.cas(v = c.value, v + x))) (2)
                longAccumulate(x, null, uncontended); (3)
        }
    }
}
1 尝试cas修改base的值.
2 尝试cas修改当前线程所属单元的变量值.
3 操作cell数组, 设置值.
Striped64.java
abstract class Striped64 extends Number {
    final void longAccumulate(long x, LongBinaryOperator fn,
                              boolean wasUncontended) {
        int h;
        if ((h = getProbe()) == 0) {
            ThreadLocalRandom.current(); // force initialization
            h = getProbe();
            wasUncontended = true;
        }
        boolean collide = false;                // True if last slot nonempty
        done: for (;;) {
            Cell[] cs; Cell c; int n; long v;
            if ((cs = cells) != null && (n = cs.length) > 0) {
                if ((c = cs[(n - 1) & h]) == null) {
                    if (cellsBusy == 0) {       // Try to attach new Cell
                        Cell r = new Cell(x);   // Optimistically create
                        if (cellsBusy == 0 && casCellsBusy()) { (3)
                            try {               // Recheck under lock
                                Cell[] rs; int m, j;
                                if ((rs = cells) != null &&
                                    (m = rs.length) > 0 &&
                                    rs[j = (m - 1) & h] == null) {
                                    rs[j] = r;
                                    break done;
                                }
                            } finally {
                                cellsBusy = 0;
                            }
                            continue;           // Slot is now non-empty
                        }
                    }
                    collide = false;
                }
                else if (!wasUncontended)       // CAS already known to fail
                    wasUncontended = true;      // Continue after rehash
                else if (c.cas(v = c.value,
                               (fn == null) ? v + x : fn.applyAsLong(v, x))) (4)
                    break;
                else if (n >= NCPU || cells != cs)
                    collide = false;            // At max size or stale
                else if (!collide)
                    collide = true;
                else if (cellsBusy == 0 && casCellsBusy()) {
                    try {
                        if (cells == cs)        // Expand table unless stale
                            cells = Arrays.copyOf(cs, n << 1); (5)
                    } finally {
                        cellsBusy = 0;
                    }
                    collide = false;
                    continue;                   // Retry with expanded table
                }
                h = advanceProbe(h); (6)
            }
            else if (cellsBusy == 0 && cells == cs && casCellsBusy()) { (1)
                try {                           // Initialize table
                    if (cells == cs) {
                        Cell[] rs = new Cell[2];
                        rs[h & 1] = new Cell(x);
                        cells = rs;
                        break done;
                    }
                } finally {
                    cellsBusy = 0;
                }
            }
            // Fall back on using base
            else if (casBase(v = base,
                             (fn == null) ? v + x : fn.applyAsLong(v, x))) (2)
                break done;
        }
    }
}
1 cells默认为null, cellsBusy默认为0, 所以LongAddr里如果cas修改base失败, 就会走到这里初始化cells数组, 并将x加入到cell中.
2 有其他线程在初始化cells, 所以当前线程无法在cells里面赋值, 只能尝试cas修改一次base.
3 当前线程所属的cell为空, 初始化一个cell放到cells数组里.
4 当前线程所属的cell不为空, 尝试cas修改该cell里的值.
5 尝试扩容cells.
6 修改当前线程存储的随机数, 使其归属到别的cell中, 避免竞争.

7. JUC锁

7.1. ReentrantLock

ReentrantLock类图
Figure 3. ReentrantLock类图
ReentrantLock与synchronized对比
  • ReentrantLock和synchronized都支持可重入.

  • ReentrantLock和synchronized都属于阻塞式同步.

  • synchronized使用C++实现的, ReentrantLock是JDK类库实现的.

  • ReentrantLock可中断, synchronized需要手动判断 interrupted 标志位.

  • ReentrantLock尝试加锁时可以设置超时时间.

  • ReentrantLock可以设置为公平锁(默认非公平锁).

  • ReentrantLock支持多个条件变量(Condition).

ReentrantLock 加锁释放锁都是通过内部继承了 AbstractQueuedSynchronizerSync 类来实现的, Sync又有公平锁和非公平锁区分, 默认为非公平锁.

ReentrantLock.java
public class ReentrantLock implements Lock, java.io.Serializable {
    public void lock() {
        sync.lock();
    }
    public boolean tryLock() {
        return sync.tryLock();
    }
    public void unlock() {
        sync.release(1);
    }
    public Condition newCondition() {
        return sync.newCondition();
    }
}

7.1.1. lock方法实现

ReentrantLock.Sync.java
abstract static class Sync extends AbstractQueuedSynchronizer {
    final void lock() {
        if (!initialTryLock()) // 首先尝试先获得锁, 如果获取失败则与其他线程竞争来获取锁
            acquire(1);
    }
}
ReentrantLock.NonfairSync.java
static final class NonfairSync extends Sync {
    final boolean initialTryLock() {
        Thread current = Thread.currentThread();
        if (compareAndSetState(0, 1)) { // 非公平锁首先尝试修改state, 竞争锁
            setExclusiveOwnerThread(current);
            return true;
        } else if (getExclusiveOwnerThread() == current) { // 如果是锁重入则直接增加state, lock成功
            int c = getState() + 1;
            if (c < 0) // overflow
                throw new Error("Maximum lock count exceeded");
            setState(c);
            return true;
        } else
            return false;
    }
}
ReentrantLock.FairSync.java
static final class FairSync extends Sync {
    final boolean initialTryLock() {
        Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {
            if (!hasQueuedThreads() && compareAndSetState(0, 1)) { // 公平锁只有当等待队列为空时才去尝试修改state竞争锁
                setExclusiveOwnerThread(current);
                return true;
            }
        } else if (getExclusiveOwnerThread() == current) { // 如果是锁重入则直接增加state, lock成功
            if (++c < 0) // overflow
                throw new Error("Maximum lock count exceeded");
            setState(c);
            return true;
        }
        return false;
    }
}
AbstractQueuedSynchronizer.java
public abstract class AbstractQueuedSynchronizer {

        public final void acquire(int arg) {
            if (!tryAcquire(arg)) // tryAcquire与initialTryLock类似, 会尝试去获取锁
                acquire(null, arg, false, false, false, 0L); // 竞争锁
        }

        final int acquire(Node node, int arg, boolean shared,
                      boolean interruptible, boolean timed, long time) {
        Thread current = Thread.currentThread();
        byte spins = 0, postSpins = 0;   // retries upon unpark of first thread
        boolean interrupted = false, first = false;
        Node pred = null;                // predecessor of node when enqueued
        for (;;) {
            if (!first && (pred = (node == null) ? null : node.prev) != null &&
                !(first = (head == pred))) {
                if (pred.status < 0) {
                    cleanQueue();           // predecessor cancelled
                    continue;
                } else if (pred.prev == null) {
                    Thread.onSpinWait();    // ensure serialization
                    continue;
                }
            }
            if (first || pred == null) {
                boolean acquired;
                try {
                    if (shared)
                        acquired = (tryAcquireShared(arg) >= 0);
                    else
                        acquired = tryAcquire(arg); // 尝试去竞争锁, 如果竞争成功则返回true
                } catch (Throwable ex) {
                    cancelAcquire(node, interrupted, false);
                    throw ex;
                }
                if (acquired) {
                    if (first) {
                        node.prev = null;
                        head = node;
                        pred.next = null;
                        node.waiter = null;
                        if (shared)
                            signalNextIfShared(node); // 如果当前竞争锁成功, 并且当前节点是头节点, 那么通知调用LockSupport.unpark唤醒当前节点的next节点
                        if (interrupted)
                            current.interrupt();
                    }
                    return 1;
                }
            }
            if (node == null) {                 // allocate; retry before enqueue
                if (shared)
                    node = new SharedNode();
                else
                    node = new ExclusiveNode(); // 初始化Node
            } else if (pred == null) {          // 尝试将当前节点插入到等待队列末端, 并将当前tail的next指向成当前节点
                node.waiter = current;
                Node t = tail;
                node.setPrevRelaxed(t);         // avoid unnecessary fence
                if (t == null)
                    tryInitializeHead(); // 如果head为null, 那么初始化head和tail为一个哑元节点
                else if (!casTail(t, node)) // 将当前节点设置成tail
                    node.setPrevRelaxed(null);  // back out
                else
                    t.next = node;
            } else if (first && spins != 0) {
                --spins;                        // reduce unfairness on rewaits
                Thread.onSpinWait();
            } else if (node.status == 0) {
                node.status = WAITING;          // enable signal and recheck
            } else {
                long nanos;
                spins = postSpins = (byte)((postSpins << 1) | 1);
                if (!timed)
                    LockSupport.park(this); // 当前节点已经进入队列, 调用LockSupport.park暂停当前线程
                else if ((nanos = time - System.nanoTime()) > 0L)
                    LockSupport.parkNanos(this, nanos);
                else
                    break;
                node.clearStatus();
                if ((interrupted |= Thread.interrupted()) && interruptible)
                    break;
            }
        }
        return cancelAcquire(node, interrupted, interruptible);
    }
}

7.1.2. unlock方法实现

AbstractQueuedSynchronizer.java
public abstract class AbstractQueuedSynchronizer {
    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            signalNext(head);
            return true;
        }
        return false;
    }

    private static void signalNext(Node h) {
        Node s;
        if (h != null && (s = h.next) != null && s.status != 0) {
            s.getAndUnsetStatus(WAITING);
            LockSupport.unpark(s.waiter); (1)
        }
    }
}
1 唤醒队列下一个节点.
ReentrantLock.Sync.java
abstract static class Sync extends AbstractQueuedSynchronizer {
    protected final boolean tryRelease(int releases) {
        int c = getState() - releases;
        if (getExclusiveOwnerThread() != Thread.currentThread())
            throw new IllegalMonitorStateException();
        boolean free = (c == 0);
        if (free)
            setExclusiveOwnerThread(null); (1)
        setState(c); (2)
        return free;
    }
}
1 清空持有当前锁的线程记录.
2 修改state值.

总的来说, AQS实现了四个功能: 加锁、释放锁、等待、唤醒.

7.2. Condition

AbstractQueuedSynchronizer.ConditionObject.java
public class ConditionObject implements Condition, java.io.Serializable {

    private int enableWait(ConditionNode node) {
        // 使用condition通信前必须加锁, 所以这里正常会返回true.
        if (isHeldExclusively()) {
            node.waiter = Thread.currentThread();
            node.setStatusRelaxed(COND | WAITING);
            // 将当前线程加入到队列中, 并更新尾结点.
            ConditionNode last = lastWaiter;
            if (last == null)
                firstWaiter = node;
            else
                last.nextWaiter = node;
            lastWaiter = node;
            int savedState = getState();
            // await释放锁
            if (release(savedState))
                return savedState;
        }
        node.status = CANCELLED;
        throw new IllegalMonitorStateException();
    }

    public final void await() throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        // ConditionNode是一个双向链表
        ConditionNode node = new ConditionNode();
        int savedState = enableWait(node);
        LockSupport.setCurrentBlocker(this);
        boolean interrupted = false, cancelled = false;
        // 判断是否在AQS的等待队列里(有其他线程notify时会将该node加入到等待队列里)
        while (!canReacquire(node)) {
            if (interrupted |= Thread.interrupted()) {
                if (cancelled = (node.getAndUnsetStatus(COND) & COND) != 0)
                    break;
            } else if ((node.status & COND) != 0) {
                try {
                    ForkJoinPool.managedBlock(node);
                } catch (InterruptedException ie) {
                    interrupted = true;
                }
            } else
                Thread.onSpinWait();
        }
        LockSupport.setCurrentBlocker(null);
        node.clearStatus();
        // 阻塞结束, 重新竞争锁.
        acquire(node, savedState, false, false, false, 0L);
        // 如果是内部中断导致的唤醒, 则继续抛出中断异常.
        if (interrupted) {
            if (cancelled) {
                unlinkCancelledWaiters(node);
                throw new InterruptedException();
            }
            Thread.currentThread().interrupt();
        }
    }

    public final void signal() {
        ConditionNode first = firstWaiter;
        if (!isHeldExclusively())
            throw new IllegalMonitorStateException();
        if (first != null)
            doSignal(first, false);
    }

    public final void signalAll() {
        ConditionNode first = firstWaiter;
        if (!isHeldExclusively())
            throw new IllegalMonitorStateException();
        if (first != null)
            doSignal(first, true);
    }

    private void doSignal(ConditionNode first, boolean all) {
        while (first != null) {
            ConditionNode next = first.nextWaiter;
            if ((firstWaiter = next) == null)
                // 队列为空, 清空尾结点
                lastWaiter = null;
            if ((first.getAndUnsetStatus(COND) & COND) != 0) {
                // 将当前第一个node加入到等待队列里, 从而唤醒一个await线程.
                enqueue(first);
                // 如果不是signalAll, 则只需要唤醒第一个node
                if (!all)
                    break;
            }
            first = next;
        }
    }
}

7.3. ReentrantReadWriteLock

ReentrantReadWriteLock 实现了读写/写写互斥, 但是读读不互斥.

ReentrantReadWriteLock类图
Figure 4. ReentrantReadWriteLock类图

ReentrantReadWriteLock 内部将state高16位维护持有共享锁线程的重入次数, 低16位维护持有排他锁线程的重入次数, 这是为了CAS无法同时对两个变量操作.
读写锁的加锁解锁分别依赖sync的方法来实现的, sync又分为公平锁和非公平锁, 所以一共有4种加锁解锁实现.

ReentrantReadWriteLock.ReadLock.java
public static class ReadLock implements Lock, java.io.Serializable {
    public void lock() {
        sync.acquireShared(1);
    }
    public void unlock() {
        sync.releaseShared(1);
    }
}
ReentrantReadWriteLock.WriteLock.java
public static class WriteLock implements Lock, java.io.Serializable {
    public void lock() {
        sync.acquire(1);
    }
    public void unlock() {
        sync.release(1);
    }
}
ReentrantReadWriteLock.java
abstract static class Sync extends AbstractQueuedSynchronizer {
    static final int SHARED_SHIFT   = 16;
    static final int SHARED_UNIT    = (1 << SHARED_SHIFT);
    static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;
    static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
    // state右移16位得到高16位的值.
    static int sharedCount(int c)    { return c >>> SHARED_SHIFT; }
    // 将高16位置0, 得到低16位的值.
    static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }

    abstract boolean readerShouldBlock();
    abstract boolean writerShouldBlock();

    // 竞争排他锁
    protected final boolean tryAcquire(int acquires) {
        Thread current = Thread.currentThread();
        int c = getState();
        int w = exclusiveCount(c);
        if (c != 0) { // c不等于0, 代表当前有线程正在持有锁.
            // w为0代表有写线程持有锁, 如果持有锁的线程不是自己, 直接返回false.
            if (w == 0 || current != getExclusiveOwnerThread())
                return false;
            if (w + exclusiveCount(acquires) > MAX_COUNT)
                throw new Error("Maximum lock count exceeded");
            // 当前线程之前已经持有排他锁, 此时为重入锁, 增加state即可.
            setState(c + acquires);
            return true;
        }
        if (writerShouldBlock() ||
        // 尝试竞争
            !compareAndSetState(c, c + acquires))
            return false;
        // CAS修改state成功, 设置自己为ExclusiveOwnerThread.
        setExclusiveOwnerThread(current);
        return true;
    }

    // 释放排他锁
    protected final boolean tryRelease(int releases) {
        if (!isHeldExclusively())
            throw new IllegalMonitorStateException();
        // 排他锁持有锁时, 不会有其他线程修改state, 且因为state是低16位保存了排他锁重入次数, 所以这里直接减state即可.
        int nextc = getState() - releases;
        boolean free = exclusiveCount(nextc) == 0;
        if (free)
            // 重入锁释放锁结束.
            setExclusiveOwnerThread(null);
        setState(nextc);
        return free;
    }

    // 竞争共享锁
    protected final int tryAcquireShared(int unused) {
        Thread current = Thread.currentThread();
        int c = getState();
        if (exclusiveCount(c) != 0 &&
            getExclusiveOwnerThread() != current)
            // 如果当前存在排他锁且不是当前线程持有的排他锁则竞争失败.
            return -1;
        int r = sharedCount(c);
        if (!readerShouldBlock() &&
            r < MAX_COUNT &&
            compareAndSetState(c, c + SHARED_UNIT)) { // 共享锁重入次数保存在state高16位, 每次需要增加1<<16.
            // 统计每个线程竞争共享锁次数.
            if (r == 0) {
                // 当前是竞争共享锁的第一个线程
                firstReader = current;
                firstReaderHoldCount = 1;
            } else if (firstReader == current) {
                firstReaderHoldCount++;
            } else {
                HoldCounter rh = cachedHoldCounter;
                if (rh == null ||
                    rh.tid != LockSupport.getThreadId(current))
                    cachedHoldCounter = rh = readHolds.get();
                else if (rh.count == 0)
                    readHolds.set(rh);
                rh.count++;
            }
            return 1;
        }
        // 不停重试尝试CAS修改state.
        return fullTryAcquireShared(current);
    }

    // 释放共享锁
    protected final boolean tryReleaseShared(int unused) {
        Thread current = Thread.currentThread();
        if (firstReader == current) {
            if (firstReaderHoldCount == 1)
                firstReader = null;
            else
                firstReaderHoldCount--;
        } else {
            HoldCounter rh = cachedHoldCounter;
            if (rh == null ||
                rh.tid != LockSupport.getThreadId(current))
                rh = readHolds.get();
            int count = rh.count;
            if (count <= 1) {
                readHolds.remove();
                if (count <= 0)
                    throw unmatchedUnlockException();
            }
            --rh.count;
        }
        for (;;) { // 因为会存在多个线程持有共享锁, 所以这里需要通过CAS修改state.
            int c = getState();
            int nextc = c - SHARED_UNIT;
            // state减去1<<16. 如果state剩0, 则可以唤醒等待队列下一个线程.
            if (compareAndSetState(c, nextc))
                return nextc == 0;
        }
    }
}

static final class NonfairSync extends Sync {
    final boolean writerShouldBlock() {
        // 非公平锁会尝试竞争一次.
        return false;
    }
    final boolean readerShouldBlock() {
        return apparentlyFirstQueuedIsExclusive();
    }
}

static final class FairSync extends Sync {
    // 公平锁判断如果等待队列中有其他线程, 则放弃竞争.
    final boolean writerShouldBlock() {
        return hasQueuedPredecessors();
    }
    final boolean readerShouldBlock() {
        return hasQueuedPredecessors();
    }
}

7.4. StampedLock

StampedLock 实现了写写互斥, 但是读读/读写不互斥.

7.4.1. StampedLock例子

filename.java
package me.jy.lang.thread.juc;

import java.util.concurrent.locks.StampedLock;
import java.util.stream.IntStream;

/**
 * @author jy
 */
public class StampedLockDemo {

    private final StampedLock lock = new StampedLock();

    private double x;
    private double y;

    public static void main(String[] args) {
        StampedLockDemo stampedLockDemo = new StampedLockDemo();
        IntStream.rangeClosed(1, 100)
            .parallel()
            .forEach(i -> stampedLockDemo.move(i, i + 1));
        // 7212
        System.out.println(stampedLockDemo.computeDistance());
    }

    public void move(double deltaX, double deltaY) {
        // 获取写锁
        long stamp = lock.writeLock();
        x += deltaX;
        y += deltaY;
        // 释放写锁
        lock.unlockWrite(stamp);
    }

    public double computeDistance() {
        // 尝试获取读锁(乐观锁)
        long stamp = lock.tryOptimisticRead();
        // 操作数据
        double currentX = x;
        double currentY = y;
        // 校验是否有线程获取过写锁
        if (!lock.validate(stamp)) {
            // 重新获取读锁(悲观锁)
            stamp = lock.readLock();
            currentX = x;
            currentY = y;
            // 释放锁
            lock.unlockRead(stamp);
        }
        return Math.sqrt(currentX * currentX + currentY * currentY);
    }

}

7.4.2. 实现

StampedLock 的state变量维护了锁的版本号, 低7位标识了读锁状态, 第8位标识了写锁状态, 因为写锁标志只有1位, 所以写锁不能重入.

  • state : 默认为 \$1"<<"8\$ .

  • WBIT : 值为 \$1"<<"7\$ , 第8位为写锁标志.

  • RBITS : 值为 \$1"<<"7-1\$ , 低7位为读锁标志, RBITS减1即为读锁的数量.

  • ABITS : 值为 \$"WBIT"|"RBITS"\$ (即8个1).

  • SBITS : 值为 \$~"RBITS"\$

StampedLock.java
public class StampedLock implements java.io.Serializable {

    public long writeLock() {
        // 重置低8位, 只有当低8位都为0的时候可以直接获得锁.
        long s = U.getLongOpaque(this, STATE) & ~ABITS, nextState;
        // 设置第8位为1.
        if (casState(s, nextState = s | WBIT)) {
            U.storeStoreFence();
            return nextState;
        }
        return acquireWrite(false, false, 0L);
    }

    private long acquireWrite(boolean interruptible, boolean timed, long time) {
        byte spins = 0, postSpins = 0;   // retries upon unpark of first thread
        boolean interrupted = false, first = false;
        WriterNode node = null;
        Node pred = null;
        for (long s, nextState;;) {
            if (!first && (pred = (node == null) ? null : node.prev) != null &&
                !(first = (head == pred))) {
                if (pred.status < 0) {
                    cleanQueue();           // predecessor cancelled
                    continue;
                } else if (pred.prev == null) {
                    Thread.onSpinWait();    // ensure serialization
                    continue;
                }
            }
            if ((first || pred == null) && ((s = state) & ABITS) == 0L &&
                casState(s, nextState = s | WBIT)) {
                U.storeStoreFence();
                if (first) {
                    node.prev = null;
                    head = node;
                    pred.next = null;
                    node.waiter = null;
                    if (interrupted)
                        Thread.currentThread().interrupt();
                }
                return nextState;
            } else if (node == null) {          // retry before enqueuing
                node = new WriterNode();
            } else if (pred == null) {          // try to enqueue
                Node t = tail;
                node.setPrevRelaxed(t);
                if (t == null)
                    tryInitializeHead();
                else if (!casTail(t, node))
                    node.setPrevRelaxed(null);  // back out
                else
                    t.next = node;
            } else if (first && spins != 0) {   // reduce unfairness
                --spins;
                Thread.onSpinWait();
            } else if (node.status == 0) {      // enable signal
                if (node.waiter == null)
                    node.waiter = Thread.currentThread();
                node.status = WAITING;
            } else {
                long nanos;
                spins = postSpins = (byte)((postSpins << 1) | 1);
                if (!timed)
                    LockSupport.park(this);
                else if ((nanos = time - System.nanoTime()) > 0L)
                    LockSupport.parkNanos(this, nanos);
                else
                    break;
                node.clearStatus();
                if ((interrupted |= Thread.interrupted()) && interruptible)
                    break;
            }
        }
        return cancelAcquire(node, interrupted);
    }

    private long cancelAcquire(Node node, boolean interrupted) {
        if (node != null) {
            node.waiter = null;
            node.status = CANCELLED;
            cleanQueue();
            if (node instanceof ReaderNode)
                signalCowaiters((ReaderNode)node);
        }
        return (interrupted || Thread.interrupted()) ? INTERRUPTED : 0L;
    }

    private static void signalCowaiters(ReaderNode node) {
        if (node != null) {
            // 只要有一个ReadLock被唤醒, 就会唤醒所有的ReadLock.
            for (ReaderNode c; (c = node.cowaiters) != null; ) {
                if (node.casCowaiters(c, c.cowaiters))
                    LockSupport.unpark(c.waiter);
            }
        }
    }

    private long releaseWrite(long s) {
        long nextState = state = unlockWriteState(s);
        // 唤醒等待队列第一个节点.
        signalNext(head);
        return nextState;
    }

    private static long unlockWriteState(long s) {
        // 将state加上1<<7, 这样第8位就置0了.
        return ((s += WBIT) == 0L) ? ORIGIN : s;
    }

    // 获取乐观锁
    public long tryOptimisticRead() {
        long s;
        // state & WBIT 不为0时, 方法直接返回0, validate(0) 会返回false.
        return (((s = state) & WBIT) == 0L) ? (s & SBITS) : 0L;
    }

    public boolean validate(long stamp) {
        U.loadFence();
        return (stamp & SBITS) == (state & SBITS);
    }

}

8. JUC工具类

8.1. Semaphore

8.1.1. 使用场景

Semaphore 提供了资源的并发访问限制, 超过了并发量的线程将会阻塞一直到持有锁的线程释放锁.
内部也是自己继承了AQS来实现竞争/释放锁的功能, 有公平锁和非公平锁之分.

8.1.2. Semaphore例子

SemaphoreDemo.java
package me.jy.lang.thread.juc;

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;

/**
 * @author jy
 */
@Slf4j
public class SemaphoreDemo {

    private static final Semaphore SEMAPHORE = new Semaphore(5);

    public static void main(String[] args) {
        IntStream
            .rangeClosed(1, 100)
            .parallel()
            .forEach(i -> {
                try {
                    SEMAPHORE.acquire();
                    log.info("I am in.");
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException ignored) {
                } finally {
                    SEMAPHORE.release();
                }
            });
    }
}

8.1.3. 实现

AbstractQueuedSynchronizer.java
public abstract class AbstractQueuedSynchronizer {
    public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
        if (Thread.interrupted() ||
            (tryAcquireShared(arg) < 0 &&
             acquire(null, arg, true, true, false, 0L) < 0))
            throw new InterruptedException();
    }
}
Semaphore.java
public class Semaphore implements java.io.Serializable {

    public void acquire() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

    public void release() {
        sync.releaseShared(1);
    }

    abstract static class Sync extends AbstractQueuedSynchronizer {

        Sync(int permits) {
            setState(permits);
        }

        final int getPermits() {
            return getState();
        }

        final int nonfairTryAcquireShared(int acquires) {
            for (;;) {
                int available = getState();
                int remaining = available - acquires;
                // CAS修改state, remaining小于0时代表剩余的state不够用了, 线程会去竞争锁.
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }

        protected final boolean tryReleaseShared(int releases) {
            for (;;) {
                int current = getState();
                int next = current + releases;
                if (next < current)
                    throw new Error("Maximum permit count exceeded");
                // 因为有多个线程持有该锁, 所以释放锁时需要不停CAS修改state.
                if (compareAndSetState(current, next))
                    return true;
            }
        }
    }
}

8.2. CountDownLatch

8.2.1. 使用场景

  • 某个线程需要等待其他工作线程执行完毕, 再去执行下面的代码.

8.2.2. CountDownLatch例子

CountDownLatchDemo.java
package me.jy.lang.thread.juc;

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;

/**
 * @author jy
 */
@Slf4j
public class CountDownLatchDemo {

    private static final int WORKER_COUNT = 10;

    private static final ExecutorService executorService = Executors.newFixedThreadPool(4);

    public static void main(String[] args) throws InterruptedException {

        CountDownLatch latch = new CountDownLatch(WORKER_COUNT);
        Runnable work = () -> {
            try {
                log.info("working");
                TimeUnit.SECONDS.sleep(1);
                latch.countDown();
            } catch (InterruptedException ignored) {
            }
        };
        IntStream.rangeClosed(1, WORKER_COUNT).forEach(i -> executorService.execute(work));
        latch.await();

        log.info("workers done!");
        executorService.shutdownNow();
    }
}

8.2.3. 实现

CountDownLatch.java
public class CountDownLatch {

    public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

    public void countDown() {
        sync.releaseShared(1);
    }

    private static final class Sync extends AbstractQueuedSynchronizer {

        Sync(int count) {
            setState(count);
        }

        // 一直等到state为0时才能竞争到锁.
        protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }

        protected boolean tryReleaseShared(int releases) {
            // 不停地CAS将state减1
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c - 1;
                if (compareAndSetState(c, nextc))
                    // state减到0, 可以唤醒等待线程.
                    return nextc == 0;
            }
        }
    }
}

8.3. CyclicBarrier

8.3.1. 使用场景

  • 一批线程互相等待统一就绪后, 再一起执行下面的代码.

8.3.2. CyclicBarrier例子

CyclicBarrierDemo.java
package me.jy.lang.thread.juc;

import lombok.Data;
import lombok.extern.slf4j.Slf4j;

import java.time.Instant;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.*;
import java.util.stream.IntStream;

/**
 * @author jy
 */
@Slf4j
public class CyclicBarrierDemo {

    private static final int THREAD_COUNT = 5;

    private static final ExecutorService THREAD_POOL = Executors.newFixedThreadPool(THREAD_COUNT);

    private static final List<Player> PLAYERS = new CopyOnWriteArrayList<>();

    public static void main(String[] args) {
        CyclicBarrier cyclicBarrier = new CyclicBarrier(THREAD_COUNT, () -> {
            log.info("====== Result ======");
            PLAYERS.stream()
                .sorted(Comparator.comparingLong(a -> a.finishedAt))
                .forEach(System.out::println);
            try {
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            log.info("====== End ======");
            THREAD_POOL.shutdownNow();
        });

        IntStream.rangeClosed(1, THREAD_COUNT)
            .forEach(i -> THREAD_POOL.execute(() -> {
                try {
                    log.info(" is running!");
                    TimeUnit.SECONDS.sleep(i);
                    PLAYERS.add(new Player().setName(Thread.currentThread().getName()).setFinishedAt(Instant.now().getEpochSecond()));
                    cyclicBarrier.await();
                    log.info(" over!"); // 执行完barrierAction后才释放锁.
                } catch (Exception ignored) {
                }
            }));
    }

    @Data
    private static class Player {
        private String name;
        private long finishedAt;
    }

}

8.3.3. 实现

CyclicBarrier.java
public class CyclicBarrier {

    private static class Generation {
        Generation() {}
        // 标识当前线程是否被打断.
        boolean broken;
    }

    public int await() throws InterruptedException, BrokenBarrierException {
        try {
            return dowait(false, 0L);
        } catch (TimeoutException toe) {
            throw new Error(toe); // cannot happen
        }
    }

    private int dowait(boolean timed, long nanos)
        throws InterruptedException, BrokenBarrierException,
               TimeoutException {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            final Generation g = generation;

            if (g.broken)
                throw new BrokenBarrierException();

            // 捕获外部中断信号
            if (Thread.interrupted()) {
                breakBarrier();
                throw new InterruptedException();
            }

            int index = --count;
            // 减到0, 则表示最后一个线程到达执行点
            if (index == 0) {  // tripped
                Runnable command = barrierCommand;
                if (command != null) {
                    try {
                        // 最后一个线程执行回调方法
                        command.run();
                    } catch (Throwable ex) {
                        breakBarrier();
                        throw ex;
                    }
                }
                nextGeneration();
                return 0;
            }

            // loop until tripped, broken, interrupted, or timed out
            for (;;) {
                try {
                    if (!timed)
                        // 当前线程阻塞, 等待最后一个线程唤醒
                        // await会释放锁, 让其他线程也在这里阻塞
                        trip.await();
                    else if (nanos > 0L)
                        nanos = trip.awaitNanos(nanos);
                } catch (InterruptedException ie) {
                    if (g == generation && ! g.broken) {
                        breakBarrier();
                        throw ie;
                    } else {
                        // We're about to finish waiting even if we had not
                        // been interrupted, so this interrupt is deemed to
                        // "belong" to subsequent execution.
                        Thread.currentThread().interrupt();
                    }
                }

                if (g.broken)
                    throw new BrokenBarrierException();

                // 被唤醒后, generation被重新初始化为新的对象, 此处不相等, await方法就会结束.
                if (g != generation)
                    return index;

                if (timed && nanos <= 0L) {
                    breakBarrier();
                    throw new TimeoutException();
                }
            }
        } finally {
            lock.unlock();
        }
    }

    private void nextGeneration() {
        // 唤醒其他阻塞线程
        trip.signalAll();
        // 重置count计数, 下轮可以继续使用
        count = parties;
        // 重置标志位
        generation = new Generation();
    }

    private void breakBarrier() {
        generation.broken = true;
        count = parties;
        trip.signalAll();
    }
}

8.4. Exchanger

8.4.1. 使用场景

  • 两个线程之间交换数据.

8.4.2. Exchanger例子

ExchangerDemo.java
package me.jy.lang.thread.juc;

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.Exchanger;
import java.util.concurrent.TimeUnit;

/**
 * @author jy
 */
@Slf4j
public class ExchangerDemo {

    public static void main(String[] args) {
        Exchanger<String> exchanger = new Exchanger<>();

        new Thread(() -> {
            try {
                // 阻塞等待线程B发送数据
                String data = exchanger.exchange("A");
                log.info(data);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }, "A").start();

        new Thread(() -> {
            try {
                TimeUnit.SECONDS.sleep(3);
                // 与线程A交换数据
                String data = exchanger.exchange("B");
                log.info(data);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }, "B").start();
    }
}

8.4.3. 实现

Exchanger.java
public class Exchanger<V> {

    private volatile Node[] arena;

    private volatile Node slot;

    private final Participant participant;

    static final class Participant extends ThreadLocal<Node> {
        public Node initialValue() { return new Node(); }
    }

    public Exchanger() {
        participant = new Participant();
    }

    @jdk.internal.vm.annotation.Contended static final class Node {
        int index;              // Arena index
        int bound;              // Last recorded value of Exchanger.bound
        int collides;           // Number of CAS failures at current bound
        int hash;               // Pseudo-random for spins
        Object item;            // This thread's current item
        volatile Object match;  // Item provided by releasing thread
        volatile Thread parked; // Set to this thread when parked, else null
    }

    public V exchange(V x) throws InterruptedException {
        Object v;
        Node[] a;
        Object item = (x == null) ? NULL_ITEM : x;
        // 先用slot交换数据, 如果slot被占用了, 就会用arena交换数据.
        if (((a = arena) != null ||
             (v = slotExchange(item, false, 0L)) == null) &&
            (Thread.interrupted() || // disambiguates null return
             (v = arenaExchange(item, false, 0L)) == null))
            throw new InterruptedException();
        return (v == NULL_ITEM) ? null : (V)v;
    }

    private final Object slotExchange(Object item, boolean timed, long ns) {
        Node p = participant.get();
        Thread t = Thread.currentThread();
        if (t.isInterrupted()) // preserve interrupt status so caller can recheck
            return null;

        for (Node q;;) {
            // slot不为null表示有别的线程在等待交换数据
            if ((q = slot) != null) {
                // 重置SLOT, 拿到对方线程想要交换的数据
                if (SLOT.compareAndSet(this, q, null)) {
                    Object v = q.item;
                    // 将自己的数据设置到对方线程的match属性上.
                    q.match = item;
                    Thread w = q.parked;
                    if (w != null)
                        // 唤醒该线程
                        LockSupport.unpark(w);
                    return v;
                }
                // 有其他线程修改了SLOT, CAS失败, 创建arena数组.
                // create arena on contention, but continue until slot null
                if (NCPU > 1 && bound == 0 &&
                    BOUND.compareAndSet(this, 0, SEQ)) // bound设置为256
                    arena = new Node[(FULL + 2) << ASHIFT]; // (CPU数/2+2)*32
            }
            // arena不为空则转为使用arena数组交换数据
            else if (arena != null)
                return null; // caller must reroute to arenaExchange
            else {
                p.item = item;
                // 将自己线程数据设置到item属性里, 然后尝试放入到SLOT中.
                if (SLOT.compareAndSet(this, null, p))
                    break;
                p.item = null;
            }
        }

        // 当前线程数据放入到slot中, 此处自旋, 直到对方线程取出.
        int h = p.hash;
        long end = timed ? System.nanoTime() + ns : 0L;
        // 自旋1024次
        int spins = (NCPU > 1) ? SPINS : 1;
        Object v;
        // 如果p.match为null, 说明还没有别的线程过来交换数据
        while ((v = p.match) == null) {
            if (spins > 0) {
                h ^= h << 1; h ^= h >>> 3; h ^= h << 10;
                if (h == 0)
                    h = SPINS | (int)t.getId();
                else if (h < 0 && (--spins & ((SPINS >>> 1) - 1)) == 0)
                    Thread.yield();
            }
            else if (slot != p)
                spins = SPINS;
            else if (!t.isInterrupted() && arena == null &&
                     (!timed || (ns = end - System.nanoTime()) > 0L)) {
                p.parked = t;
                // 如果slot还没有被拿走, 阻塞自己.
                if (slot == p) {
                    if (ns == 0L)
                        LockSupport.park(this);
                    else
                        LockSupport.parkNanos(this, ns);
                }
                p.parked = null;
            }
            // ns小于0或者被中断, 取消交换直接返回.
            else if (SLOT.compareAndSet(this, p, null)) {
                v = timed && ns <= 0L && !t.isInterrupted() ? TIMED_OUT : null;
                break;
            }
        }
        // 清空双方交换的数据
        MATCH.setRelease(p, null);
        p.item = null;
        // 保留随机数, 供下次使用
        p.hash = h;
        return v;
    }
}

8.5. Phaser

8.5.1. 使用场景

  • 可以当CountDownLatch或CyclicBarrier用.

  • 与CyclicBarrier不同的是, Phaser可以在使用时动态修改需要同步的线程个数.

  • 支持Phaser嵌套, 可以设置Phaser的父Phaser.

8.5.2. Phaser例子

PhaserDemo.java
package me.jy.lang.thread.juc;

import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;

/**
 * @author jy
 */
@Slf4j
public class PhaserDemo {

    private static final int PARTIES = 10;
    private static final Phaser PHASER = new Phaser(PARTIES);

    public static void main(String[] args) {
        PhaserDemo demo = new PhaserDemo();
        IntStream.rangeClosed(1, PARTIES)
            .parallel()
            .forEach(i -> new Thread(demo::countDownLatch).start());
        // 模拟CountDownLatch
        // 等待parties减到0
        PHASER.awaitAdvance(0);
        log.info("awaitAdvance passed");

        // 模拟CyclicBarrier
        IntStream.rangeClosed(1, PARTIES)
            .parallel()
            .forEach(i -> new Thread(demo::cyclicBarrier).start());
    }

    @SneakyThrows
    public void countDownLatch() {
        TimeUnit.SECONDS.sleep(200000);
        PHASER.arrive();
        log.info("arrived");
    }

    @SneakyThrows
    public void cyclicBarrier() {
        log.info("phase0");
        TimeUnit.SECONDS.sleep(2);
        // 等待其他线程执行完毕
        PHASER.arriveAndAwaitAdvance();
        log.info("phase1");
        TimeUnit.SECONDS.sleep(3);
        // 等待其他线程执行完毕
        PHASER.arriveAndAwaitAdvance();
        log.info("phase2");
    }

}

8.5.3. 实现

Phaser.java
public class Phaser {

    // 第1位标识是否同步完成, 第2~32为phase数, 第33~48位为总线程数, 第49~64位为未到达线程数.
    private volatile long state;

    private final Phaser parent;
    private final Phaser root;

    private final AtomicReference<QNode> evenQ;
    private final AtomicReference<QNode> oddQ;

    public Phaser(Phaser parent, int parties) {
        if (parties >>> PARTIES_SHIFT != 0)
            throw new IllegalArgumentException("Illegal number of parties");
        int phase = 0;
        this.parent = parent;
        if (parent != null) {
            final Phaser root = parent.root;
            this.root = root;
            this.evenQ = root.evenQ;
            this.oddQ = root.oddQ;
            if (parties != 0)
                phase = parent.doRegister(1);
        }
        else {
            this.root = this;
            this.evenQ = new AtomicReference<QNode>();
            this.oddQ = new AtomicReference<QNode>();
        }
        // 设置2~32位, 33~48位, 49~64位 为parties
        this.state = (parties == 0) ? (long)EMPTY :
            ((long)phase << PHASE_SHIFT) |
            ((long)parties << PARTIES_SHIFT) |
            ((long)parties);
    }

    public int awaitAdvance(int phase) {
        final Phaser root = this.root;
        long s = (root == this) ? state : reconcileState();
        // 右移32位得到state里的phase数
        int p = (int)(s >>> PHASE_SHIFT);
        if (phase < 0)
            return phase;
        if (p == phase)
            return root.internalAwaitAdvance(phase, null);
        return p;
    }

    private int internalAwaitAdvance(int phase, QNode node) {
        // 唤醒阻塞在前面phase阶段的线程
        releaseWaiters(phase-1);          // ensure old queue clean
        boolean queued = false;           // true when node is enqueued
        int lastUnarrived = 0;            // to increase spins upon change
        int spins = SPINS_PER_ARRIVAL;
        long s;
        int p;
        while ((p = (int)((s = state) >>> PHASE_SHIFT)) == phase) {
            if (node == null) {           // spinning in noninterruptible mode
                int unarrived = (int)s & UNARRIVED_MASK;
                if (unarrived != lastUnarrived &&
                    (lastUnarrived = unarrived) < NCPU)
                    spins += SPINS_PER_ARRIVAL;
                boolean interrupted = Thread.interrupted();
                // 如果被打断, 或者自旋结束, 则创建一个QNode
                if (interrupted || --spins < 0) { // need node to record intr
                    node = new QNode(this, phase, false, false, 0L);
                    node.wasInterrupted = interrupted;
                }
                else
                    // 首先空自旋
                    Thread.onSpinWait();
            }
            else if (node.isReleasable()) // done or aborted
                break;
            else if (!queued) {           // push onto queue
                AtomicReference<QNode> head = (phase & 1) == 0 ? evenQ : oddQ;
                QNode q = node.next = head.get();
                // 加入到队列头部, phase为奇数时加入到evenQ, phase为偶数时加入到oddQ.
                if ((q == null || q.phase == phase) &&
                    (int)(state >>> PHASE_SHIFT) == phase) // avoid stale enq
                    queued = head.compareAndSet(q, node);
            }
            else {
                try {
                    // 将当前node阻塞.
                    ForkJoinPool.managedBlock(node);
                } catch (InterruptedException cantHappen) {
                    node.wasInterrupted = true;
                }
            }
        }

        if (node != null) {
            if (node.thread != null)
                node.thread = null;       // avoid need for unpark()
            if (node.wasInterrupted && !node.interruptible)
                Thread.currentThread().interrupt();
            if (p == phase && (p = (int)(state >>> PHASE_SHIFT)) == phase)
                return abortWait(phase); // possibly clean up on abort
        }
        // 唤醒阻塞在当前phase阶段的线程
        releaseWaiters(phase);
        return p;
    }

    private void releaseWaiters(int phase) {
        QNode q;   // first element of queue
        Thread t;  // its thread
        AtomicReference<QNode> head = (phase & 1) == 0 ? evenQ : oddQ;
        while ((q = head.get()) != null &&
               q.phase != (int)(root.state >>> PHASE_SHIFT)) {
            if (head.compareAndSet(q, q.next) &&
                (t = q.thread) != null) {
                q.thread = null;
                LockSupport.unpark(t);
            }
        }
    }

    public int arrive() {
        // arrive后将未到达线程数减1.
        return doArrive(1);
    }

    public int arriveAndDeregister() {
        // arrive后将未到达线程数和总线程数都减1
        return doArrive(65537);
    }

    private int doArrive(int adjust) {
        final Phaser root = this.root;
        for (;;) {
            long s = (root == this) ? state : reconcileState();
            int phase = (int)(s >>> PHASE_SHIFT);
            if (phase < 0)
                return phase;
            int counts = (int)s;
            int unarrived = (counts == EMPTY) ? 0 : (counts & UNARRIVED_MASK);
            if (unarrived <= 0)
                throw new IllegalStateException(badArrive(s));
            // 将state减去adjust
            if (STATE.compareAndSet(this, s, s-=adjust)) {
                // 减到最后一个线程
                if (unarrived == 1) {
                    long n = s & PARTIES_MASK;
                    // 将总线程数赋值到未到达的线程数, 从而实现Phaser复用
                    int nextUnarrived = (int)n >>> PARTIES_SHIFT;
                    if (root == this) {
                        if (onAdvance(phase, nextUnarrived))
                            n |= TERMINATION_BIT;
                        else if (nextUnarrived == 0)
                            n |= EMPTY;
                        else
                            n |= nextUnarrived;
                        // 先将phase+1, 再修改当前state
                        int nextPhase = (phase + 1) & MAX_PHASE;
                        n |= (long)nextPhase << PHASE_SHIFT;
                        STATE.compareAndSet(this, s, n);
                        // 释放当前phase的线程
                        releaseWaiters(phase);
                    }
                    // 当前Phaser总线程数为0, 说明当前线程已经全部取消注册, 所以通知父级Phaser减65537(反注册一个总线程同时将未到达线程数减一)
                    else if (nextUnarrived == 0) { // propagate deregistration
                        phase = parent.doArrive(ONE_DEREGISTER);
                        STATE.compareAndSet(this, s, s | EMPTY);
                    }
                    else
                        // 通知父级Phaser减1.
                        phase = parent.doArrive(ONE_ARRIVAL);
                }
                return phase;
            }
        }
    }
}

8.7. ThreadLocal

ThreadLocal 在某个线程里存取数据, 使用裴波那契散列法+开放地址法存储, 使用裴波那契数列是为了尽量减少哈希碰撞, 让数据分布更加均匀.

每个 Thread 对象里有一个 ThreadLocalMap 成员变量, 用来存放 ThreadLocal 和对应的Value, ThreadLocal 对象使用 WeakReference 包装, 是为了防止回收 ThreadLocal 对象时, 因为Thread对象没有被回收且有强引用关联, 导致 ThreadLocal 对象回收不了.

ThreadLocal.java
public class ThreadLocal<T> {

    private final int threadLocalHashCode = nextHashCode();

    // 全局的hashCode
    private static AtomicInteger nextHashCode = new AtomicInteger();

    // 2^32*0.618 => int数据范围的黄金分割数
    private static final int HASH_INCREMENT = 0x61c88647;

    private static int nextHashCode() {
        // 从0开始, 每次递增黄金分割数
        return nextHashCode.getAndAdd(HASH_INCREMENT);
    }

    public void set(T value) {
        Thread t = Thread.currentThread();
        ThreadLocalMap map = getMap(t);
        if (map != null) {
            map.set(this, value);
        } else {
            // 为当前的Thread初始化一个ThreadLocalMap, 将当前的ThreadLocal对象和值存在当前的线程对象里.
            createMap(t, value);
        }
    }

    public T get() {
        Thread t = Thread.currentThread();
        ThreadLocalMap map = getMap(t);
        if (map != null) {
            // 根据ThreadLocal的hashCode找到槽位上的Entry
            ThreadLocalMap.Entry e = map.getEntry(this);
            if (e != null) {
                @SuppressWarnings("unchecked")
                T result = (T)e.value;
                return result;
            }
        }
        // 如果找不到Entry, 则获取初始Value, 并放入到哈希表中
        return setInitialValue();
    }

    // 每次ThreadLocal设置value后一定要手动remove掉, 因为线程池会复用线程, 下一次方法调用的时候会get到上一次ThreadLocal中存放的值.
    public void remove() {
         ThreadLocalMap m = getMap(Thread.currentThread());
         if (m != null) {
             m.remove(this);
         }
     }

    ThreadLocalMap getMap(Thread t) {
        return t.threadLocals;
    }

    void createMap(Thread t, T firstValue) {
        t.threadLocals = new ThreadLocalMap(this, firstValue);
    }

    private T setInitialValue() {
        T value = initialValue();
        Thread t = Thread.currentThread();
        ThreadLocalMap map = getMap(t);
        if (map != null) {
            map.set(this, value);
        } else {
            createMap(t, value);
        }
        if (this instanceof TerminatingThreadLocal) {
            TerminatingThreadLocal.register((TerminatingThreadLocal<?>) this);
        }
        return value;
    }
}
ThreadLocal.ThreadLocalMap.java
static class ThreadLocalMap {

    // 哈希表初始化长度为16
    private static final int INITIAL_CAPACITY = 16;
    private Entry[] table;
    private int size = 0;
    private int threshold;

    // 哈希表里每个元素都是弱引用
    static class Entry extends WeakReference<ThreadLocal<?>> {
        Object value;
        Entry(ThreadLocal<?> k, Object v) {
            super(k);
            value = v;
        }
    }

    ThreadLocalMap(ThreadLocal<?> firstKey, Object firstValue) {
        // 初始化哈希表
        table = new Entry[INITIAL_CAPACITY];
        // 计算ThreadLocal对象的槽位
        int i = firstKey.threadLocalHashCode & (INITIAL_CAPACITY - 1);
        table[i] = new Entry(firstKey, firstValue);
        size = 1;
        setThreshold(INITIAL_CAPACITY);
    }

    private void set(ThreadLocal<?> key, Object value) {
        Entry[] tab = table;
        int len = tab.length;
        // 槽位
        int i = key.threadLocalHashCode & (len-1);

        for (Entry e = tab[i];
             e != null;
             e = tab[i = nextIndex(i, len)]) {
            ThreadLocal<?> k = e.get();

            // 槽位放的就是该ThreadLocal对象
            if (k == key) {
                e.value = value;
                return;
            }

            // e不为null, 但是get出来是null, 代表当前元素已被回收
            if (k == null) {
                replaceStaleEntry(key, value, i);
                return;
            }
        }

        // tab[i] 为空, 则直接插入Entry到数组里.
        tab[i] = new Entry(key, value);
        int sz = ++size;
        // 清理部分过期元素
        if (!cleanSomeSlots(i, sz) && sz >= threshold)
            // 如果哈希表中元素数量大于阈值就执行rehash
            rehash();
    }

    private boolean cleanSomeSlots(int i, int n) {
        boolean removed = false;
        Entry[] tab = table;
        int len = tab.length;
        do {
            i = nextIndex(i, len);
            Entry e = tab[i];
            // 如果遇到过期Entry, 将n再次赋值为len, 以再次扫描log(n)次
            if (e != null && e.get() == null) {
                n = len;
                removed = true;
                i = expungeStaleEntry(i);
            }
        } while ( (n >>>= 1) != 0); // 控制扫描次数
        return removed;
    }

    private void rehash() {
        expungeStaleEntries();
        // 清理了过期元素后, 元素数量还超过阈值的3/4, 则扩容一倍
        if (size >= threshold - threshold / 4)
            resize();
    }

    private void expungeStaleEntries() {
        Entry[] tab = table;
        int len = tab.length;
        for (int j = 0; j < len; j++) {
            Entry e = tab[j];
            if (e != null && e.get() == null)
                expungeStaleEntry(j);
        }
    }

    private int expungeStaleEntry(int staleSlot) {
        Entry[] tab = table;
        int len = tab.length;
        // 清除当前槽位上的Entry
        tab[staleSlot].value = null;
        tab[staleSlot] = null;
        size--;
        Entry e;
        int i;
        // 清除当前槽位的同时, 遍历下面不为空的槽位
        for (i = nextIndex(staleSlot, len); (e = tab[i]) != null; i = nextIndex(i, len)) {
            ThreadLocal<?> k = e.get();
            // 清除过期Entry
            if (k == null) {
                e.value = null;
                tab[i] = null;
                size--;
            } else {
                // 重新计算槽位
                int h = k.threadLocalHashCode & (len - 1);
                // Entry所属的槽位和当前槽位不同, 移动该Entry到h或者h后面第一个空的槽位
                if (h != i) {
                    tab[i] = null;
                    while (tab[h] != null)
                        h = nextIndex(h, len);
                    tab[h] = e;
                }
            }
        }
        return i;
    }

    private void resize() {
        Entry[] oldTab = table;
        int oldLen = oldTab.length;
        // 每次扩容两倍
        int newLen = oldLen * 2;
        Entry[] newTab = new Entry[newLen];
        int count = 0;
        for (Entry e : oldTab) {
            if (e != null) {
                ThreadLocal<?> k = e.get();
                if (k == null) {
                    e.value = null; // Help the GC
                } else {
                    // 重新计算槽位
                    int h = k.threadLocalHashCode & (newLen - 1);
                    // 如果当前槽位有元素了就瞬移一位.
                    while (newTab[h] != null)
                        h = nextIndex(h, newLen);
                    newTab[h] = e;
                    count++;
                }
            }
        }
        setThreshold(newLen);
        size = count;
        table = newTab;
    }

    private Entry getEntry(ThreadLocal<?> key) {
        // 计算槽位
        int i = key.threadLocalHashCode & (table.length - 1);
        Entry e = table[i];
        // 命中entry
        if (e != null && e.get() == key)
            return e;
        else
            return getEntryAfterMiss(key, i, e);
    }

    private Entry getEntryAfterMiss(ThreadLocal<?> key, int i, Entry e) {
        Entry[] tab = table;
        int len = tab.length;
        // 向后遍历, 寻找Entry
        while (e != null) {
            ThreadLocal<?> k = e.get();
            if (k == key)
                return e;
            // 清除过期key
            if (k == null)
                expungeStaleEntry(i);
            else
                i = nextIndex(i, len);
            e = tab[i];
        }
        return null;
    }

    // 开放地址法
    private static int nextIndex(int i, int len) {
        return ((i + 1 < len) ? i + 1 : 0);
    }

    private void remove(ThreadLocal<?> key) {
        Entry[] tab = table;
        int len = tab.length;
        int i = key.threadLocalHashCode & (len-1);
        // 从i向后找一直找到key
        for (Entry e = tab[i]; e != null; e = tab[i = nextIndex(i, len)]) {
            if (e.get() == key) {
                // 清除当前元素
                e.clear();
                expungeStaleEntry(i);
                return;
            }
        }
    }
}

总之, ThreadLocalMap 在get/set/remove都会尝试清除哈希表中的过期元素.

9. 并发容器

9.1. BlockingQueue

9.1.1. api

方法 抛出异常 返回特殊值 阻塞 超时时间控制

插入

boolean add(e) 队列满了抛出 IllegalStateException 异常, 否则插入成功返回true.

boolean offer(e) 队列满了返回false, 否则插入成功返回true.

void put(e) 队列满了会一直阻塞, 一直到可以插入.

boolean offer(e, time, unit) 队列满了会等待指定时间直到能插入.

删除

E remove() 队列如果为空抛出 NoSuchElementException 异常, 否则直接删除队列头部元素.

E poll() 队列如果为空返回null, 否则直接删除队列头部元素.

E take() 队列为空会一直阻塞, 一直到队列存在元素.

poll(time, unit) 队列为空会等待指定时间直到能删除.

查看头部元素

E element() 队列如果为空抛出 NoSuchElementException 异常, 否则返回队列头部元素.

E peek() 队列如果为空返回null, 否则返回队列头部元素.

-

-

9.1.2. ArrayBlockingQueue

ArrayBlockingQueue 是一个用数组实现的环形队列, 初始化时会指定数组的长度.
插入和删除使用 ReentrantLock 实现加锁和释放锁了来实现线程安全.

ArrayBlockingQueue.java
public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable {

    final Object[] items;
    int takeIndex;
    int putIndex;
    int count;

    final ReentrantLock lock;
    private final Condition notEmpty;
    private final Condition notFull;

    public ArrayBlockingQueue(int capacity, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        this.items = new Object[capacity];
        lock = new ReentrantLock(fair);
        notEmpty = lock.newCondition();
        notFull =  lock.newCondition();
    }

    public boolean add(E e) {
        if (offer(e))
            return true;
        else
            // 队列满了抛出异常
            throw new IllegalStateException("Queue full");
    }

    public boolean offer(E e) {
        Objects.requireNonNull(e);
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            if (count == items.length)
                return false;
            else {
                enqueue(e);
                return true;
            }
        } finally {
            lock.unlock();
        }
    }

    private void enqueue(E e) {
        final Object[] items = this.items;
        // 将元素插入到items数组里
        items[putIndex] = e;
        // putIndex即为队列尾部索引
        if (++putIndex == items.length) putIndex = 0;
        count++;
        notEmpty.signal();
    }

    public void put(E e) throws InterruptedException {
        Objects.requireNonNull(e);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            // 如果数组满了则阻塞住
            while (count == items.length)
                notFull.await();
            enqueue(e);
        } finally {
            lock.unlock();
        }
    }

    public E remove() {
        E x = poll();
        if (x != null)
            return x;
        // 如果数组满了则抛出异常
        else
            throw new NoSuchElementException();
    }

    public E poll() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return (count == 0) ? null : dequeue();
        } finally {
            lock.unlock();
        }
    }

    private E dequeue() {
        final Object[] items = this.items;
        @SuppressWarnings("unchecked")
        E e = (E) items[takeIndex];
        // 删除数组元素
        items[takeIndex] = null;
        // takeIndex即为队列头部索引
        if (++takeIndex == items.length) takeIndex = 0;
        count--;
        if (itrs != null)
            itrs.elementDequeued();
        // 唤醒阻塞在notFull的线程
        notFull.signal();
        return e;
    }

    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            // 数组为空时阻塞
            while (count == 0)
                notEmpty.await();
            return dequeue();
        } finally {
            lock.unlock();
        }
    }

    public E element() {
        E x = peek();
        if (x != null)
            return x;
        // 数组为空时抛出异常
        else
            throw new NoSuchElementException();
    }

    public E peek() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            // 直接返回takeIndex所在元素(队列头部)
            return itemAt(takeIndex); // null when queue is empty
        } finally {
            lock.unlock();
        }
    }

}

9.1.3. LinkedBlockingQueue

LinkedBlockingQueue 是基于单向链表实现的阻塞队列, 默认最大容量为 Integer.MAX_VALUE .
内部分别为put和take持有锁, 所以put和put互斥, take和take互斥, put和take不互斥.

LinkedBlockingQueue.java
public class LinkedBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable {

    // 单向链表节点
    static class Node<E> {
        E item;
        Node<E> next;
        Node(E x) { item = x; }
    }

    private final int capacity;
    private final AtomicInteger count = new AtomicInteger();
    transient Node<E> head;
    private transient Node<E> last;

    private final ReentrantLock takeLock = new ReentrantLock();
    private final ReentrantLock putLock = new ReentrantLock();
    private final Condition notEmpty = takeLock.newCondition();
    private final Condition notFull = putLock.newCondition();

    public boolean offer(E e) {
        if (e == null) throw new NullPointerException();
        final AtomicInteger count = this.count;
        if (count.get() == capacity)
            return false;
        final int c;
        final Node<E> node = new Node<E>(e);
        final ReentrantLock putLock = this.putLock;
        putLock.lock();
        try {
            // 判断队列是否满了
            if (count.get() == capacity)
                return false;
            // 加入到队尾
            enqueue(node);
            c = count.getAndIncrement();
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            putLock.unlock();
        }
        if (c == 0)
            signalNotEmpty();
        return true;
    }

    public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        final int c;
        final Node<E> node = new Node<E>(e);
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        putLock.lockInterruptibly();
        try {
            // 队列满了阻塞
            while (count.get() == capacity) {
                notFull.await();
            }
            enqueue(node);
            c = count.getAndIncrement();
            if (c + 1 < capacity)
                // LinkedBlockingQueue的put也会唤醒其他的put操作
                notFull.signal();
        } finally {
            putLock.unlock();
        }
        if (c == 0)
            signalNotEmpty();
    }

    public E poll() {
        final AtomicInteger count = this.count;
        if (count.get() == 0)
            return null;
        final E x;
        final int c;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();
        try {
            // 队列为空返回null
            if (count.get() == 0)
                return null;
            x = dequeue();
            c = count.getAndDecrement();
            if (c > 1)
                // LinkedBlockingQueue的poll也会唤醒其他的poll操作
                notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
        if (c == capacity)
            // 唤醒阻塞在notFull的线程
            signalNotFull();
        return x;
    }

    private void enqueue(Node<E> node) {
        last = last.next = node;
    }

    private E dequeue() {
        Node<E> h = head;
        Node<E> first = h.next;
        h.next = h; // help GC
        head = first;
        // head为标识队列头的哑元节点, 此处将head.next变为head, 并将item清除
        E x = first.item;
        first.item = null;
        return x;
    }

}

9.1.4. PriorityBlockingQueue

PriorityBlockingQueue 会将元素排好序存放.

PriorityBlockingQueue.java
public class PriorityBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable {

    private transient Object[] queue;

    private transient Comparator<? super E> comparator;

    private final ReentrantLock lock = new ReentrantLock();
    // PriorityBlockingQueue因为会无限增长, 所以只会有notEmpty的等待条件
    private final Condition notEmpty = lock.newCondition();

    public PriorityBlockingQueue(int initialCapacity,
                                 Comparator<? super E> comparator) {
        if (initialCapacity < 1)
            throw new IllegalArgumentException();
        this.comparator = comparator;
        this.queue = new Object[Math.max(1, initialCapacity)];
    }

    // 不存在插入不了的情况, 所以add/put都直接调用的offer
    public boolean add(E e) {
        return offer(e);
    }

    public void put(E e) {
        offer(e); // never need to block
    }

    public boolean offer(E e) {
        if (e == null)
            throw new NullPointerException();
        final ReentrantLock lock = this.lock;
        lock.lock();
        int n, cap;
        Object[] es;
        // 如果queue数组满了, 则扩容
        while ((n = size) >= (cap = (es = queue).length))
            tryGrow(es, cap);
        try {
            final Comparator<? super E> cmp;
            if ((cmp = comparator) == null)
                siftUpComparable(n, e, es);
            else
                siftUpUsingComparator(n, e, es, cmp);
            size = n + 1;
            notEmpty.signal();
        } finally {
            lock.unlock();
        }
        return true;
    }

    private void tryGrow(Object[] array, int oldCap) {
        lock.unlock(); // must release and then re-acquire main lock
        Object[] newArray = null;
        if (allocationSpinLock == 0 &&
            ALLOCATIONSPINLOCK.compareAndSet(this, 0, 1)) {
            try {
                // 扩容两倍或直接+2
                int growth = oldCap < 64 ? oldCap + 2 : oldCap >> 1;
                int newCap = ArraysSupport.newLength(oldCap, 1, growth);
                if (queue == array)
                    newArray = new Object[newCap];
            } finally {
                allocationSpinLock = 0;
            }
        }
        if (newArray == null) // back off if another thread is allocating
            Thread.yield();
        lock.lock();
        if (newArray != null && queue == array) {
            queue = newArray;
            System.arraycopy(array, 0, newArray, 0, oldCap);
        }
    }

    public E remove() {
        E x = poll();
        if (x != null)
            return x;
        else
            throw new NoSuchElementException();
    }

    public E poll() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return dequeue();
        } finally {
            lock.unlock();
        }
    }

    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        E result;
        try {
            // 如果队列没有元素, 则阻塞
            while ( (result = dequeue()) == null)
                notEmpty.await();
        } finally {
            lock.unlock();
        }
        return result;
    }

    // 二叉堆删除元素
    private E dequeue() {
        // assert lock.isHeldByCurrentThread();
        final Object[] es;
        final E result;

        if ((result = (E) ((es = queue)[0])) != null) {
            final int n;
            final E x = (E) es[(n = --size)];
            es[n] = null;
            if (n > 0) {
                final Comparator<? super E> cmp;
                if ((cmp = comparator) == null)
                    siftDownComparable(0, x, es, n);
                else
                    siftDownUsingComparator(0, x, es, n, cmp);
            }
        }
        return result;
    }

}

9.1.5. DelayQueue

DelayQueue 是一个按照延迟时间从小到大出队的优先队列.

DelayQueue.java
public interface Delayed extends Comparable<Delayed> {
    long getDelay(TimeUnit unit);
}

public class DelayQueue<E extends Delayed> extends AbstractQueue<E> implements BlockingQueue<E> {
    private Thread leader;
    private final PriorityQueue<E> q = new PriorityQueue<E>();
    private final transient ReentrantLock lock = new ReentrantLock();
    // queue非空的条件变量
    private final Condition available = lock.newCondition();

    // 与PriorityBlokingQueue一样, 没有容量的限制, 所以add/put都直接调用的offer
    public boolean add(E e) {
        return offer(e);
    }

    public void put(E e) {
        offer(e);
    }

    public boolean offer(E e) {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            q.offer(e);
            // 只有当堆顶的元素为新插入的元素才需要唤醒阻塞在take的线程
            if (q.peek() == e) {
                leader = null;
                available.signal();
            }
            return true;
        } finally {
            lock.unlock();
        }
    }

    public E remove() {
        E x = poll();
        if (x != null)
            return x;
        else
            throw new NoSuchElementException();
    }

    public E poll() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            E first = q.peek();
            // 当第一个元素的超时时间大于0的时候, 直接返回null, 否则返回第一个元素
            return (first == null || first.getDelay(NANOSECONDS) > 0)
                ? null
                : q.poll();
        } finally {
            lock.unlock();
        }
    }

    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            for (;;) {
                // 取出延迟时间最小的元素
                E first = q.peek();
                // 如果为空则阻塞
                if (first == null)
                    available.await();
                else {
                    long delay = first.getDelay(NANOSECONDS);
                    if (delay <= 0L)
                        return q.poll();
                    first = null; // don't retain ref while waiting
                    if (leader != null)
                        // 如果有其他线程在等待元素, 则阻塞住.
                        available.await();
                    else {
                        Thread thisThread = Thread.currentThread();
                        leader = thisThread;
                        try {
                            // 等待队列头部元素的delay时间
                            available.awaitNanos(delay);
                        } finally {
                            if (leader == thisThread)
                                leader = null;
                        }
                    }
                }
            }
        } finally {
            if (leader == null && q.peek() != null)
                // 取出队列头部的元素后, 唤醒其他阻塞在take的线程
                available.signal();
            lock.unlock();
        }
    }
}

9.1.6. SynchronousQueue

SynchronousQueue 本身不存储元素, 插入元素后会阻塞, 一直到有其他线程取出元素.
存在非公平和公平两种模式(默认非公平), 公平下第一次take的元素为第一个put进去的元素, 非公平下第一次take的元素为最后一个put进去的元素.

SynchronousQueue.java
public class SynchronousQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable {
    public SynchronousQueue() {
        this(false);
    }

    public SynchronousQueue(boolean fair) {
        // 默认为非公平
        transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
    }

    public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        // 插入元素时第一个参数传这个元素
        if (transferer.transfer(e, false, 0) == null) {
            Thread.interrupted();
            throw new InterruptedException();
        }
    }

    public boolean offer(E e) {
        if (e == null) throw new NullPointerException();
        return transferer.transfer(e, true, 0) != null;
    }

    public E poll() {
        // 取出元素时第一个参数传null
        return transferer.transfer(null, true, 0);
    }

    public E take() throws InterruptedException {
        E e = transferer.transfer(null, false, 0);
        if (e != null)
            return e;
        Thread.interrupted();
        throw new InterruptedException();
    }

}

9.2. BlockingDeque

BlockingDeque 继承了 BlockingQueue , 增加了对队列头部和尾部元素操作的api.
BlockingDeque 的实现类只有 LinkedBlockingDeque .

9.2.1. LinkedBlockingDeque

LinkedBlockingDeque.java
public class LinkedBlockingDeque<E> extends AbstractQueue<E> implements BlockingDeque<E>, java.io.Serializable {

    // 双向链表节点
    static final class Node<E> {
        E item;
        Node<E> prev;
        Node<E> next;
        Node(E x) {
            item = x;
        }
    }

    // 头节点
    transient Node<E> first;
    // 尾节点
    transient Node<E> last;
    // 节点总数
    private transient int count;
    // 链表容量
    private final int capacity;

    final ReentrantLock lock = new ReentrantLock();
    private final Condition notEmpty = lock.newCondition();
    private final Condition notFull = lock.newCondition();

    public LinkedBlockingDeque() {
        this(Integer.MAX_VALUE);
    }

    public LinkedBlockingDeque(int capacity) {
        if (capacity <= 0) throw new IllegalArgumentException();
        this.capacity = capacity;
    }

    public void putFirst(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        Node<E> node = new Node<E>(e);
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            // 队列满了会阻塞
            while (!linkFirst(node))
                notFull.await();
        } finally {
            lock.unlock();
        }
    }

    private boolean linkFirst(Node<E> node) {
        if (count >= capacity)
            return false;
        Node<E> f = first;
        // node的next为当前的头节点
        node.next = f;
        // 将头节点改为node
        first = node;
        if (last == null)
            last = node;
        else
            f.prev = node;
        ++count;
        // 唤醒阻塞在notEmpty条件变量的线程
        notEmpty.signal();
        return true;
    }

    public E takeLast() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            E x;
            // 链表空时会阻塞
            while ( (x = unlinkLast()) == null)
                notEmpty.await();
            return x;
        } finally {
            lock.unlock();
        }
    }

    private E unlinkLast() {
        Node<E> l = last;
        if (l == null)
            return null;
        Node<E> p = l.prev;
        E item = l.item;
        l.item = null;
        // 改掉last.prev指针, 防止强引用无法回收last节点
        l.prev = l;
        // last = last.prev
        last = p;
        if (p == null)
            first = null;
        else
            p.next = null;
        --count;
        // 唤醒阻塞在notFull条件变量的线程
        notFull.signal();
        return item;
    }
}

9.3. CopyOnWrite

CopyOnWrite 指的是在写数据的时候将源数据拷贝出来一份作修改, 然后将改后的数据作为源数据的引用.

9.3.1. CopyOnWriteArrayList

CopyOnWriteArrayList.java
public class CopyOnWriteArrayList<E> implements List<E>, RandomAccess, Cloneable, java.io.Serializable {

    // 写操作的锁对象(synchronized锁)
    final transient Object lock = new Object();

    private transient volatile Object[] array;

    public boolean add(E e) {
        synchronized (lock) {
            Object[] es = getArray();
            int len = es.length;
            // 先拷贝出来一份数组
            es = Arrays.copyOf(es, len + 1);
            // 写入到拷贝出来的数组
            es[len] = e;
            // 重新赋值数组
            setArray(es);
            return true;
        }
    }

    public boolean addIfAbsent(E e) {
        Object[] snapshot = getArray();
        // 先不加锁判断数组是否包含该元素, 然后加锁再判断一次, 如果不存在则插入元素
        return indexOfRange(e, snapshot, 0, snapshot.length) < 0
            && addIfAbsent(e, snapshot);
    }

    public E remove(int index) {
        synchronized (lock) {
            Object[] es = getArray();
            int len = es.length;
            E oldValue = elementAt(es, index);
            int numMoved = len - index - 1;
            Object[] newElements;
            if (numMoved == 0)
                // 如果删除的是最后一个元素, 则直接复制数组的0~len-1位
                newElements = Arrays.copyOf(es, len - 1);
            else {
                // 将删除位置的左右两侧复制到新的数组里
                newElements = new Object[len - 1];
                System.arraycopy(es, 0, newElements, 0, index);
                System.arraycopy(es, index + 1, newElements, index,
                                 numMoved);
            }
            setArray(newElements);
            return oldValue;
        }
    }
}

9.3.2. CopyOnWriteArraySet

CopyOnWriteArraySet 内部还是使用的 CopyOnWriteArrayList , 只是添加元素的时候判断不存在才添加.

CopyOnWriteArraySet.java
public class CopyOnWriteArraySet<E> extends AbstractSet<E> implements java.io.Serializable {

    private final CopyOnWriteArrayList<E> al;

    public CopyOnWriteArraySet() {
        al = new CopyOnWriteArrayList<E>();
    }

    // 判断不存在则插入元素
    public boolean add(E e) {
        return al.addIfAbsent(e);
    }
}

9.4. ConcurrentLinkedQueue

ConcurrentLinkedQueue 通过对head和tail节点的next指针进行CAS操作实现入队和出队, 而非使用ReentrantLock悲观锁直接操作队列的head和tail节点.
个人猜测是因为CAS只能操作一个变量, 没法同时更新tail和tail.next, 所以插入和删除时优先更新tail.next指针和head.item, 然后才尝试cas修改head和tail节点.

ConcurrentLinkedQueue.java
public class ConcurrentLinkedQueue<E> extends AbstractQueue<E> implements Queue<E>, java.io.Serializable {

    static final class Node<E> {
        volatile E item;
        volatile Node<E> next;

        Node(E item) {
            ITEM.set(this, item);
        }
        Node() {}

        void appendRelaxed(Node<E> next) {
            NEXT.set(this, next);
        }

        boolean casItem(E cmp, E val) {
            return ITEM.compareAndSet(this, cmp, val);
        }
    }

    public boolean offer(E e) {
        final Node<E> newNode = new Node<E>(Objects.requireNonNull(e));

        for (Node<E> t = tail, p = t;;) {
            Node<E> q = p.next;
            if (q == null) {
                // 1. tail.next=newNode
                if (NEXT.compareAndSet(p, null, newNode)) {
                    if (p != t)
                        // 3. 第二步操作后, p就不等于tail, 这时候更新tail=tail.next(newNode)
                        TAIL.weakCompareAndSet(this, t, newNode);
                    return true;
                }
            }
            // 如果p==p.next, 说明有其他线程更新了head节点, 当前遍历的属于清空数据的脏节点(详见updateHead方法), 更新下p和t变量
            else if (p == q)
                p = (t != (t = tail)) ? t : head;
            else
                // 2. 第一次没有去更新tail, 只是设置了tail的next指针. 所以q!=null, 这时更新p=p.next
                p = (p != t && t != (t = tail)) ? t : q;
        }
    }

    public E poll() {
        restartFromHead: for (;;) {
            for (Node<E> h = head, p = h, q;; p = q) {
                final E item;
                // 第一次删除节点时, 只是将head.item设置为null
                // 第二次删除节点时, 先将p=p.next, 然后将更新后p的next清空
                if ((item = p.item) != null && p.casItem(item, null)) {
                    // 第二次删除元素后, p=p.next, 所以此时p!=h
                    if (p != h)
                        // 如果p.next不为null, 则将head更新为p.next(存储数据的节点), 否则更新为p(哑元节点)
                        updateHead(h, ((q = p.next) != null) ? q : p);
                    return item;
                }
                // 如果p和p.next都为null, 代表整个队列都没节点, 可以直接返回null
                else if ((q = p.next) == null) {
                    updateHead(h, p);
                    return null;
                }
                // 如果p==p.next, 说明有其他线程更新了head节点, 当前遍历的属于清空数据的脏节点(详见updateHead方法), 重新遍历来取到最新的head
                else if (p == q)
                    continue restartFromHead;
            }
        }
    }

    public boolean isEmpty() {
        return first() == null;
    }

    Node<E> first() {
        restartFromHead: for (;;) {
            for (Node<E> h = head, p = h, q;; p = q) {
                boolean hasItem = (p.item != null);
                if (hasItem || (q = p.next) == null) {
                    updateHead(h, p);
                    return hasItem ? p : null;
                }
                // 如果p==p.next, 说明有其他线程更新了head节点, 当前遍历的属于清空数据的脏节点(详见updateHead方法), 重新遍历来取到最新的head
                else if (p == q)
                    continue restartFromHead;
            }
        }
    }

    final void updateHead(Node<E> h, Node<E> p) {
        // 更新head为p
        if (h != p && HEAD.compareAndSet(this, h, p))
            // 将旧head节点的next指针指向自己, 防止旧head节点因为next指针指向新的head(强引用)导致不被GC
            NEXT.setRelease(h, h);
    }
}

9.5. ConcurrentLinkedDeque

ConcurrentLinkedDeque 的实现与 ConcurrentLinkedQueue 类似, 区别是 ConcurrentLinkedDeque 使用双向链表存储节点.

10. 线程池

10.2. 使用

10.2.1. ThreadPoolExecutor构造函数参数

  • int corePoolSize : 核心线程数个数

  • int maximumPoolSize : 最大线程数个数

  • long keepAliveTime : 核心线程之外的线程存活时间

  • TimeUnit unit : KeepAliveTime时间单位

  • BlockingQueue<Runnable> workQueue : 线程池所用的阻塞队列类型

  • ThreadFactory threadFactory : 线程创建的工厂类

  • RejectedExecutionHandler handler : 最大线程满载后的线程提交后拒绝策略

10.2.2. 线程池分类

  • FixedThreadPool: 核心线程数等于最大线程数, 使用无界队列存储多余任务, 适用于任务量已知且耗时的场景.
    new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())

  • CachedThreadPool: 没有核心线程数, 适用于任务执行时间短且密集的场景.
    new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());

  • SingleThreadPool: 核心线程数和最大线程数都为1, 适用于希望任务排队串行执行的场景.
    new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()))

10.3. 线程池状态

Diagram
状态名 标志 是否接收新任务 是否处理阻塞队列任务 说明

RUNNING

111

SHUTDOWN

000

×

不会接收新的任务, 只会处理阻塞队列中剩余的任务.

STOP

001

×

×

中断正在执行的任务, 抛弃阻塞队列中的任务.

TIDYING

010

×

×

任务全部执行完毕, 活动线程为0, 即将进入终结状态.

TERMINATED

011

×

×

终结状态

10.4. 线程调度流程

  1. 如果当前线程池中的线程数目小于corePoolSize,则每来一个任务,就会创建一个线程去执行这个任务;

  2. 如果当前线程池中的线程数目大于等于corePoolSize,则每来一个任务,会尝试将其添加到任务缓存队列当中,若添加成功,则该任务会等待空闲线程将其取出去执行;若添加失败(一般来说是任务缓存队列已满),则会尝试创建新的线程去执行这个任务;

  3. 如果当前线程池中的线程数目达到maximumPoolSize,则会采取任务拒绝策略进行处理;

  4. 如果线程池中的线程数量大于corePoolSize时,如果某线程空闲时间超过keepAliveTime,线程将被终止,直至线程池中的线程数目不大于corePoolSize;如果允许为核心池中的线程设置存活时间,那么核心池中的线程空闲时间超过keepAliveTime,线程也会被终止。

10.5. 线程池关闭

10.5.1. shutdown

线程池状态更新为 SHUTDOWN , 只执行所有已提交的任务(包括阻塞队列里的任务), 不再接受新的任务.

    private static void shutdown() {
        ExecutorService executorService = Executors.newFixedThreadPool(2);
        executorService.execute(() -> {
            SleepUtil.sleep(1000);
            log.info("sleep 1s");
        });
        executorService.execute(() -> {
            SleepUtil.sleep(2000);
            log.info("sleep 2s");
        });
        executorService.execute(() -> {
            SleepUtil.sleep(3000);
            log.info("sleep 3s");
        });
        executorService.shutdown(); (1)
        log.info("shutdown called");
        executorService.execute(() -> System.out.println("no more")); (2)
        (3)
    }
1 线程池状态变为SHUTDOWN, 只会执行已经提交的任务.
2 reject任务.
3 等待任务1/2/3执行完

10.5.2. shutdownNow

调用所有核心线程的interrupt()方法, 并直接返回阻塞队列里的任务.

    private static void shutdownNow() {
        ExecutorService executorService = Executors.newFixedThreadPool(2);
        executorService.execute(() -> {
            SleepUtil.sleep(10000);
            log.info("sleep 10s");
        });
        executorService.execute(() -> {
            SleepUtil.sleep(20000);
            log.info("sleep 20s");
        });
        executorService.execute(() -> {
            SleepUtil.sleep(30000);
            log.info("sleep 30s");
        });
        List<Runnable> runnables = executorService.shutdownNow(); (1)
        log.info("shutdownNow: {}", runnables); (2)
        log.info("shutdown called");
        executorService.execute(() -> System.out.println("no more")); (3)
        (4)
    }
1 调用所有核心线程的interrupt()方法, 并直接返回阻塞队列里的任务.
2 返回任务3.
3 reject任务.
4 等待任务1/2执行完

10.6. 实现

10.6.1. 线程池的关闭

  1. 外部调用线程池的 shutdown 或者 shutdownNow 方法.

  2. 外部循环调用线程池的 awaitTermination 方法.

  3. 如果调用的是 shutdown , 线程池会打断所有的空闲线程, 否则直接打断所有的线程, 并将阻塞队列的线程对象返回出来.

  4. 线程池状态升为 TIDYING .

  5. 执行线程池 terminated 模板方法.

  6. 线程池状态升为 TERMINATED .

  7. 唤醒阻塞在 awaitTermination 方法的外部调用线程.

ThreadPoolExecutor.java
public class ThreadPoolExecutor extends AbstractExecutorService {

    // 前3位标识线程池的状态, 后29位标识线程数
    private static final int COUNT_BITS = Integer.SIZE - 3;
    private static final int COUNT_MASK = (1 << COUNT_BITS) - 1;

    // 线程池的5种状态常量
    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;

    // 线程池的当前状态
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

    private final ReentrantLock mainLock = new ReentrantLock();
    private final Condition termination = mainLock.newCondition();

    public void shutdown() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            // 设置线程池状态为SHUTDOWN
            advanceRunState(SHUTDOWN);
            // 打断空闲线程
            interruptIdleWorkers();
            onShutdown(); // hook for ScheduledThreadPoolExecutor
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
    }

    public List<Runnable> shutdownNow() {
        List<Runnable> tasks;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            // 设置线程池状态为STOP
            advanceRunState(STOP);
            // 打断所有线程
            interruptWorkers();
            tasks = drainQueue();
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
        return tasks;
    }

    private void interruptIdleWorkers(boolean onlyOne) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (Worker w : workers) {
                Thread t = w.thread;
                // 能获得锁说明该线程是空闲的, 此处打断空闲线程
                if (!t.isInterrupted() && w.tryLock()) {
                    try {
                        t.interrupt();
                    } catch (SecurityException ignore) {
                    } finally {
                        w.unlock();
                    }
                }
                if (onlyOne)
                    break;
            }
        } finally {
            mainLock.unlock();
        }
    }

    private void interruptWorkers() {
        for (Worker w : workers)
            // 不管能不能获得锁, 都直接打断该线程
            w.interruptIfStarted();
    }

    private List<Runnable> drainQueue() {
        BlockingQueue<Runnable> q = workQueue;
        ArrayList<Runnable> taskList = new ArrayList<>();
        // 将阻塞队列里的线程对象放入到taskList里
        q.drainTo(taskList);
        if (!q.isEmpty()) {
            for (Runnable r : q.toArray(new Runnable[0])) {
                // 如果是DelayQueue, drainTo方法可能会没有把队列元素全放入taskList里, 此处手动删除再加入到taskList里
                if (q.remove(r))
                    taskList.add(r);
            }
        }
        return taskList;
    }

    final void tryTerminate() {
        for (;;) {
            int c = ctl.get();
            if (isRunning(c) ||
                runStateAtLeast(c, TIDYING) ||
                (runStateLessThan(c, STOP) && ! workQueue.isEmpty()))
                return;
            if (workerCountOf(c) != 0) { // Eligible to terminate
                interruptIdleWorkers(ONLY_ONE);
                return;
            }

            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // 阻塞队列为空并且没有工作线程时, 设置线程池状态为TIDYING
                if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                    try {
                        // 执行terminated回调
                        terminated();
                    } finally {
                        // 设置线程池状态为TERMINATED
                        ctl.set(ctlOf(TERMINATED, 0));
                        // 唤醒阻塞在awaitTermination方法的线程
                        termination.signalAll();
                    }
                    return;
                }
            } finally {
                mainLock.unlock();
            }
            // else retry on failed CAS
        }
    }

    // 等待线程池状态变为TERMINATED
    public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
        long nanos = unit.toNanos(timeout);
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            // 如果当前线程池的状态小于TERMINATED(3), 则阻塞
            while (runStateLessThan(ctl.get(), TERMINATED)) {
                if (nanos <= 0L)
                    return false;
                nanos = termination.awaitNanos(nanos);
            }
            return true;
        } finally {
            mainLock.unlock();
        }
    }

}

10.6.2. 线程池任务的提交

ThreadPoolExecutor.java
public class ThreadPoolExecutor extends AbstractExecutorService {

    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            // 如果工作线程小于核心线程数, 则直接执行
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        // 将任务线程添加到阻塞队列中
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            // 如果线程池状态不是RUNNING, 则拒绝执行新提交的任务
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        // 如果阻塞队列放不下, 则启动新线程, 直到超过最大线程数
        else if (!addWorker(command, false))
            // 超过最大线程数, 拒绝执行
            reject(command);
    }

    // 新开一个线程直接执行
    private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (int c = ctl.get();;) {
            if (runStateAtLeast(c, SHUTDOWN)
                && (runStateAtLeast(c, STOP)
                    || firstTask != null
                    || workQueue.isEmpty()))
                return false;

            for (;;) {
                // 大于核心线程数or最大线程数
                if (workerCountOf(c)
                    >= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))
                    return false;
                // 工作线程计数加1
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateAtLeast(c, SHUTDOWN))
                    continue retry;
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    int c = ctl.get();

                    if (isRunning(c) ||
                        (runStateLessThan(c, STOP) && firstTask == null)) {
                        if (t.getState() != Thread.State.NEW)
                            throw new IllegalThreadStateException();
                        // 将线程对象添加到workers集合中
                        workers.add(w);
                        workerAdded = true;
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    // 执行该工作线程
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }
}

10.6.3. 线程池任务的执行

ThreadPoolExecutor.java
public class ThreadPoolExecutor extends AbstractExecutorService {

    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock();
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {
                w.lock();
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    try {
                        task.run();
                        afterExecute(task, null);
                    } catch (Throwable ex) {
                        afterExecute(task, ex);
                        throw ex;
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }
}

10.7. 线程池的4种拒绝策略

  • AbortPolicy : 新的任务过来时, 直接抛出异常(默认).

  • CallerRunsPolicy : 让调用方线程去执行新的任务.

  • DiscardPolicy : 忽略掉新的任务.

  • DiscardOldestPolicy : 将队列最老的任务删除掉, 然后去执行新的任务.