学习笔记

1、LockSupport

要想弄明白AbstractQueuedSynchronizer(AQS)的工作原理,首先要了解一下JUC包下的LockSupport工具类,该类私有化了构造方法,所有方法均为静态方法以便AQS调用。

1.1 工具方法

  • park():暂停当前线程
  • unpark():恢复当前线程
  • parkNanos():暂停当前线程,不过有超时限制
  • parkUntil():暂停当前线程,直到某个Deadline

1.2 优点

  • 相较于被锁对象调用wait()和notify()方法,对线程调用工具方法更符合逻辑,便于理解
  • 操作更加精准,对线程操作可以精确唤醒某个线程,而notify()是随即唤醒一个线程

2、Node.waitStatus

Node是AQS类的一个静态内部类,用于存储队列中的一个线程,其内部有一个属性waitStatus,代表节点的状态

Name Value Comment
CANCELLED 1 当前节点取消调度,当超时或响应中断后进入此状态
SIGNAL -1 后继节点等待当前节点唤醒,后继节点入队时,会将前置节点置为该状态
CONDITION -2 当前节点等待在Condition上,等待其他线程调用Condition.signal()
PROPAGATE -3 共享模式,前继节点不仅会唤醒其后继节点,还会唤醒其后继节点的后继节点
INITIAL 0 新节点入队时的默认状态

3、AQS

AQS使用了模板方法模式,其模板方法分为独占式和共享式。

3.1 acquire()

//以独占方式获取,忽视中断,它会至少调用一次tryAcquire()直到成功获取并返回。
//否则线程进入队列,通过调用tryAcquire()重复阻塞和解除阻塞,直到成功。
public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}
  • tryAcquire()方法需要自己实现具体细节

  • addWaiter()根据给定的模式将当前线程入队,为了并发入队安全采用了Unsafe类的compareAndSetTail()方法保证原子性进入双向队列,返回值为新当前线程入队后作为队尾的Node

    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }
    
  • acquireQueued()会接收这个线程的Node,获取它的前一个节点,不断循环检查其前一个节点是否为队头,如果是队头调用tryAcquire()尝试获取锁,如果本次获取锁失败按照参数决定是否检查有没有中断,没有中断则进入下一次循环,如果获取到锁或者被中断,返回值为是否被中断的Boolean的值

    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
    
  • 如果acquireQueued()返回值为true,即被中断,则最终进入if()的内部,执行selfInterrupted()。如果acquireQueued()返回值为false,则直接离开if语句块,直接到达acquire()结束位置,acquire()方法执行结束

3.2 acquireShared()

//以共享方式获取,忽视中断,它会至少调用一次tryAcquireShared()直到成功获取并返回。
//否则线程进入队列,通过调用tryAcquireShared()重复阻塞和解除阻塞,直到成功。
public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}
  • acquireShared()需要用户自行实现,返回值为int

    • 返回值为负数,目前排队线程数
    • 返回值为零,本次获取成功,但剩余资源为0
    • 返回值为正数,本次获取成功,剩余资源数为返回值的数值
  • doAcquireShared(),当资源不足时,进入if语句块入队,该方法类似于独占式获取中的acquireQueued(addWaiter())方法

    private void doAcquireShared(int arg) {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
    

3.3 acquireInterruptibly()

//第一次tryAcquire()方法前检查有没有中断
//这样就不会像acquire()方法那样在acquireQueued()方法前一定会tryAcquire()一次
public final void acquireInterruptibly(int arg) throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (!tryAcquire(arg))
        doAcquireInterruptibly(arg);
}

3.4 release()

//以独占方式模式实现,通过tryRelease()方法解除一或多个线程的阻塞
public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
  • tryRlease()方法需要用户自行实现
  • 如果tryRelease()返回为真,则获取队列头,如果队列头不为空且队列头的状态不为0,调用LockSupport.unpark()方法来激活线程

3.5 releaseShared()

//以共享模式释放,通过调用tryReleaseShared()解除一或多个线程的阻塞
public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }
  • tryReleaseShared()方法需要用户自行实现

  • doReleaseShared()不同于doRelease(),doRelease()只需要查看队头是否需要唤醒,doReleaseShared()需要关心队列中所有能唤醒的并保证唤醒信号在队列中的传播

    private void doReleaseShared() {
        for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    unparkSuccessor(h);
                }
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            if (h == head)                   // loop if head changed
                break;
        }
    }