学习笔记
1、LockSupport
要想弄明白AbstractQueuedSynchronizer(AQS)的工作原理,首先要了解一下JUC包下的LockSupport工具类,该类私有化了构造方法,所有方法均为静态方法以便AQS调用。
1.1 工具方法
- park():暂停当前线程
- unpark():恢复当前线程
- parkNanos():暂停当前线程,不过有超时限制
- parkUntil():暂停当前线程,直到某个Deadline
1.2 优点
- 相较于被锁对象调用wait()和notify()方法,对线程调用工具方法更符合逻辑,便于理解
- 操作更加精准,对线程操作可以精确唤醒某个线程,而notify()是随即唤醒一个线程
2、Node.waitStatus
Node是AQS类的一个静态内部类,用于存储队列中的一个线程,其内部有一个属性waitStatus,代表节点的状态
Name | Value | Comment |
---|---|---|
CANCELLED | 1 | 当前节点取消调度,当超时或响应中断后进入此状态 |
SIGNAL | -1 | 后继节点等待当前节点唤醒,后继节点入队时,会将前置节点置为该状态 |
CONDITION | -2 | 当前节点等待在Condition上,等待其他线程调用Condition.signal() |
PROPAGATE | -3 | 共享模式,前继节点不仅会唤醒其后继节点,还会唤醒其后继节点的后继节点 |
INITIAL | 0 | 新节点入队时的默认状态 |
3、AQS
AQS使用了模板方法模式,其模板方法分为独占式和共享式。
3.1 acquire()
//以独占方式获取,忽视中断,它会至少调用一次tryAcquire()直到成功获取并返回。
//否则线程进入队列,通过调用tryAcquire()重复阻塞和解除阻塞,直到成功。
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
-
tryAcquire()方法需要自己实现具体细节
-
addWaiter()根据给定的模式将当前线程入队,为了并发入队安全采用了Unsafe类的compareAndSetTail()方法保证原子性进入双向队列,返回值为新当前线程入队后作为队尾的Node
private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; }
-
acquireQueued()会接收这个线程的Node,获取它的前一个节点,不断循环检查其前一个节点是否为队头,如果是队头调用tryAcquire()尝试获取锁,如果本次获取锁失败按照参数决定是否检查有没有中断,没有中断则进入下一次循环,如果获取到锁或者被中断,返回值为是否被中断的Boolean的值
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
-
如果acquireQueued()返回值为true,即被中断,则最终进入if()的内部,执行selfInterrupted()。如果acquireQueued()返回值为false,则直接离开if语句块,直接到达acquire()结束位置,acquire()方法执行结束
3.2 acquireShared()
//以共享方式获取,忽视中断,它会至少调用一次tryAcquireShared()直到成功获取并返回。
//否则线程进入队列,通过调用tryAcquireShared()重复阻塞和解除阻塞,直到成功。
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
-
acquireShared()需要用户自行实现,返回值为int
- 返回值为负数,目前排队线程数
- 返回值为零,本次获取成功,但剩余资源为0
- 返回值为正数,本次获取成功,剩余资源数为返回值的数值
-
doAcquireShared(),当资源不足时,进入if语句块入队,该方法类似于独占式获取中的acquireQueued(addWaiter())方法
private void doAcquireShared(int arg) { final Node node = addWaiter(Node.SHARED); boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC if (interrupted) selfInterrupt(); failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
3.3 acquireInterruptibly()
//第一次tryAcquire()方法前检查有没有中断
//这样就不会像acquire()方法那样在acquireQueued()方法前一定会tryAcquire()一次
public final void acquireInterruptibly(int arg) throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}
3.4 release()
//以独占方式模式实现,通过tryRelease()方法解除一或多个线程的阻塞
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
- tryRlease()方法需要用户自行实现
- 如果tryRelease()返回为真,则获取队列头,如果队列头不为空且队列头的状态不为0,调用LockSupport.unpark()方法来激活线程
3.5 releaseShared()
//以共享模式释放,通过调用tryReleaseShared()解除一或多个线程的阻塞
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
-
tryReleaseShared()方法需要用户自行实现
-
doReleaseShared()不同于doRelease(),doRelease()只需要查看队头是否需要唤醒,doReleaseShared()需要关心队列中所有能唤醒的并保证唤醒信号在队列中的传播
private void doReleaseShared() { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head) // loop if head changed break; } }