当前位置:数据分析 > 一万字长文的硬核AQS源码解析

一万字长文的硬核AQS源码解析

  • 发布:2023-10-05 04:32

在阅读本文之前,需要储备的知识点如下。点击链接可直接跳转。
java线程详解
Java不能操作内存?了解不安全
在一篇文章中阅读 LockSupport

AQS简介

AQS是AbstractQueuedSynchronizer的缩写,翻译过来就是抽象队列同步器,由Doug Lea开发。说它是抽象的,是因为它提供了一个基于队列的同步器框架,并定义了一些基本的功能方法(控制状态变量、获取和释放同步状态方法以及入队和出队操作等)。具体场景只需根据需要使用即可。只需实现相应的方法即可。我们可以在锁(如ReentrantLock)和并发工具类(如CountDownLatch)中看到,内部类继承了AbstractQueuedSynchronizer,这意味着AQS是这些类的基石。说了这么多,感觉抽象越来越抽象了。我们先举几个栗子。

注:本文使用的JDK版本为JDK8。 AQS的代码非常巧妙且经典。很多细节和模块都可以抽出来单独写一篇文章。很多细节建议自己去阅读和思考。
本文主要讲独占模式的应用和原理分析。这里不再详细讨论共享模式。

应用示例

ReentrantLock的使用

三个线程获取相同的锁,并在获取后休眠 1 秒,因此三个线程每隔 1 秒打印一次输出。

公共类ReentrantLockTest {
公共静态无效主(字符串[] args){
锁测试();
}
公共静态无效lockTest(){
ReentrantLock 锁 = new ReentrantLock();
PrintThread t1 = new PrintThread(lock, "t1");
PrintThread t2 = new PrintThread(lock, "t2");
PrintThread t3 = new PrintThread(lock, "t3");
t1.start();t2.start();
t3.start();
}
}
类 PrintThread 扩展 Thread {
私人锁锁;
公共 PrintThread(Lock 锁, String 线程名) {
this.lock = 锁;
this.setName(线程名);
}
@覆盖
公共无效运行(){
锁.lock();
尝试 {
System.out.println(String.format("时间:%s,线程:%s,结果:%s",
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss").format(www.sychzs.cn()),
Thread.currentThread().getName(),"获取锁成功"));
线程睡眠(1000);
} catch (异常 e) {
e.printStackTrace();
} 最后 {
锁.解锁();
}
}
}

打印结果如下

时间:2021-04-13 13:53:55,线程:t1,结果:获取锁成功
时间:2021-04-13 13:53:56,线程:t2,结果:获取锁成功
时间:2021-04-13 13:53:57,线程:t3,结果:获取锁成功

是因为这三个线程执行时,首先要获取锁,执行完释放锁之前的逻辑,而ReentrantLock就是独占锁,相当于锁在这三个线程之间。串行执行,彼此之间间隔1秒(注意,线程的执行顺序不一定是固定的,但线程中有1秒的睡眠操作,所以它们至少相隔1秒)

CountDownLatch的使用

主线程创建一个CountDownLatchatch = new CountDownLatch(1),3个线程持有CountDownLatch并调用CountDownLatchawait() 方法,直到主线程休眠2秒,执行CountDownLatchcountDown()方法,释放同步状态使计数值为0,唤醒在等待wait() 线程继续执行。

公共类 CountDownLatchTest {
公共静态无效主(字符串[] args)抛出InterruptedException {
CountDownLatch 锁存器 = new CountDownLatch(1);
ConcurrentThread并发线程1 = new ConcurrentThread(latch, "t1");
ConcurrentThread并发Thread2 = new ConcurrentThread(latch, "t2");
ConcurrentThread并发Thread3 = new ConcurrentThread(latch, "t3");
并发线程1.start();
并发线程2.start();
并发线程3.start();
线程睡眠(2000);
System.out.println(Thread.currentThread().getName() + " 倒计时...");
闩锁.countDown();
}
}
类 ConcurrentThread 扩展 Thread {
私有 CountDownLatch 锁存器;
公共ConcurrentThread(CountDownLatch闩锁,字符串线程名){this.latch = 锁存器;
this.setName(线程名);
}
@覆盖
公共无效运行(){
System.out.println(Thread.currentThread().getName() + " 已准备好...");
尝试 {
闩锁.await();
System.out.println(Thread.currentThread().getName() + "正在执行...");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

打印结果如下(注意线程的执行顺序不一定是固定的)

t1 已准备就绪...
T3准备好了...
T2准备好了...
主要倒计时...
t1 正在执行...
t3 正在执行...
t2 正在执行...

这三个线程在执行过程中首先打印“...ready”,然后等待await()方法。由于CountDownLatch是一个共享锁,并且初始状态为1。主线程休眠2秒后,调用countDown()方法会将状态设置为0,即会唤醒等待队列中所有后续线程,因此会依次打印“executing...”。
这里给出两个简单的使用示例,但是可以看出都是在多线程场景下使用的,代码中并没有AQS相关的影子。那是因为这些类内部有内部类继承 AbstractQueuedSynchronizer,这些内部类处理业务逻辑。底层核心逻辑由AQS框架提供(线程排队、线程等待、线程唤醒、超时处理、中断处理等)。子类调用API来实现核心逻辑。 AQS 多线程开始发挥作用。我们来一步步分析AQS。

AQS原理解析

类UML图

图中红色连接线代表内部类,蓝色线代表继承

我们先看一下AQS相关的URL类图。从JDK源码中,我们发现AQS确实出现在两个地方。第一个是锁(如ReentrantLock等),第二个是并发。工具类(如CountDownLatch、Semaphore等),这些内部类继承AQS实现相关方法来辅助主类实现相关控制,但是我们在JDK源码中可以看到这些锁和并发工具类有很多应用程序的地方,比如队列、线程池以及一些与并发类相关的地方。

上图展示了各种方法。我们可以看到,继承AQS类的Sync内部类只需要重写并实现少量方法即可完成特定的功能。由于大部分底层通用逻辑已经在AQS类中实现,其子类只需要实现部分对外暴露的方法即可。同样,我们也可以继承AQS来实现自定义锁或者工具类。

类和方法介绍

AbstractOwnableSynchronizer

公共抽象类 AbstractOwnableSynchronizer
实现 java.io.Serialized {
私有瞬态线程 ExclusiveOwnerThread;
受保护的最终无效 setExclusiveOwnerThread(Thread 线程) {
独占所有者线程=线程;
}受保护的最终线程 getExclusiveOwnerThread() {
返回独占所有者线程;
}
}

AbstractOwnableSynchronizer 类包含 Thread 属性并提供 get 和 set 方法。该Thread对象是当前持有锁的线程。一个线程能否支持重入功能就是判断当前线程和持有锁的线程是否是同一个对象。仅同步状态值增加。线程主动释放锁后,同步状态值减小。
这个类用abstract修饰,但是类中没有抽象方法。目的是这个类不被外界直接使用,而get和set方法都用protectedfinal修饰,表示该方法可以被子类使用,但不能被子类使用。类重写。
另外,exclusiveOwnerThread用transient修饰,表示该属性不参与序列化,因为Thread没有实现Serialized接口,无法序列化。另外,进程是系统资源分配的最小单位,线程是进程执行的最小单位。线程是由操作系统分配和调度的,因此线程不能被序列化。

AbstractQueuedSynchronizer

AbstractQueuedSynchronizer类也是一个抽象类,继承自AbstractOwnableSynchronizer,并且具有设置持有锁的线程的能力。该类还被 abstract 修饰,目的就是这个类不能被外界直接使用,需要特定的子类来继承和使用。虽然实现了序列化接口,但是其内部类Node并没有实现序列化接口,所以AbstractQueuedSynchronizer类的属性head和tail都是Node类型,并且添加了。 transient关键字不参与序列化。从上面我们大概可以猜到,如果AQS是序列化的,那么它只保存一些基本属性的值,不包括线程和队列。使用过程中基本上不会被修正。它是序列化的。具体的属性和队列稍后会详细介绍。以下是 AQS 类中的一些重要方法和属性。

公共抽象类 AbstractQueuedSynchronizer
扩展 AbstractOwnableSynchronizer
实现 java.io.Serialized {
/**
* 独占模式,尝试获取同步状态,并立即返回成功或失败,需要子类实现
*/
受保护的布尔值 tryAcquire(int arg) {
抛出新的 UnsupportedOperationException();
}
/**
* 独占模式,尝试释放同步状态,成功或失败立即返回。需要子类实现。
*/
受保护的布尔 tryRelease(int arg) {
抛出新的 UnsupportedOperationException();
}
/**
* 共享模式,试图获取共享锁,需要子类实现,
* 立即返回获取到的数量值
* 0: 获取锁成功,没有剩余资源
* > 0: 获取锁成功,有剩余资源
* < 0: 获取失败
*/
受保护的 int tryAcquireShared(int arg) {
抛出新的 UnsupportedOperationException();
}
/**
* 在共享模式下,尝试释放共享锁,需要由子类来实现。如果释放成功则返回true。
*/
受保护的布尔 tryReleaseShared(int arg) {
抛出新的 UnsupportedOperationException();
}
/**
* 当前线程是否占用独占资源需要子类实现,true:是,false:否
*/
受保护的布尔值 isHeldExclusively() {
抛出新的 UnsupportedOperationException();
}
/**
* 加入团队
*/
私有节点 enq(最终节点节点) {...}
/**
* 将当前线程封装成Node逻辑还有调用enq方法入队列的逻辑
*/
private Node addWaiter(节点模式){...}
/*** [重要] 外部提供的获取锁的方法。子类调用该方法来执行获取锁的动作。
* 内部调用包括获取锁、排队、阻塞、中断等操作
*/
公共最终无效获取(int arg){...}
/**
* [重要] 锁释放方法是外部提供的。子类调用该方法来执行释放锁的动作。
* 内部包括更新状态和唤醒等待队列的第一个等待节点
*/
公共最终布尔释放(int arg){...}
/**
* [重要]双向队列头节点
*/
私有瞬态易失性节点头;
/**
* 【重要】双向队列尾节点
*/
私有瞬态易失性节点尾部;
/**
* [重要] 同步状态,控制线程是否可以获得资源,用整型变量表示。
* 添加了 volatile 以保证该变量跨多个线程的可见性
*/
私有易失性 int 状态;
/**
* 静态内部类,将等待锁的线程封装到Node中进行排队
*/
静态最终类节点{
...
}
//其他方法、属性、内部类就不列出来了
...
}

该类中没有抽象方法,但是上面提到的方法都抛出UnsupportedOperationExceptionException,表示在具体子类实现时需要重写。这也是独占模式和共享模式。要对应实现方法。
head和tail这两个Node类型属性分别代表双向链表的头和尾。如果线程无法获得锁,就会进入队列等待唤醒或者超时中断。细节将在稍后讨论。
整数状态属性比较核心,代表同步状态,用于控制线程是否需要阻塞。上面的代码没有列出其他方法,后面会详细分析一些方法的源码。

节点类

AQS类中有一个非常重要的内部类Node。我们称之为节点。这个内部类是AQS框架中线程排队的基石,非常核心。根据注释,Node类是CLHqueue的变种(CLH队列是一种单向队列,这里就不介绍了,有兴趣的可以自行搜索)。 Node 类是一个双向队列,具有内部 Node prev 和 Node next 属性。 ,分别代表前驱节点和后继节点,还有一个Thread属性,代表当前封装的线程,所以AQS队列实际上是一个由Node节点组成的双向链表,结构如下:

我们看一下Node类的属性和方法类图。

  • 节点模式:
    Node SHARED = new Node() 表示共享模式,Node EXCLUSIVE = null 表示独占模式。
  • 节点等待状态waitStatus:
    该属性字段很重要,因为它是AQS控制线程执行的关键字段。这个值的改变是由CAS操作的。其数值仅如下。
    (1) 1:CANCELLED,取消状态。有可能某个节点在等待超时后被取消或者中断,这意味着这个Node节点包含的线程还没有获取到锁。后续逻辑是否需要执行由具体业务决定。
    (2) 0:初始化值。创建节点时会默认初始化。 0 是其默认值。
    (3)-1:SIGNAL,表示该节点后续线程需要等待唤醒,后续节点的线程可以被阻塞。
    (4)-2:CONDITION,表示该节点的线程需要等待,由ConditionObject用来实现条件队列。
    (5)-3:PROPAGATE,通常为共享模式,该状态表明头节点已获得共享资源,可以向后传播。等待队列中的其他节点也可以获得共享资源。
  • Thread线程属性对象
    AQS框架将当前正在获取同步状态的线程包装成Node节点的一个属性,并根据Node节点的waitStatus状态来控制是唤醒当前线程继续尝试获取锁还是取消线程。

队列

AQS内部的两个变量head代表队列的头节点,tail代表队列的尾节点。这是一个双向队列。正如Node类所介绍的,头点和尾点如下图所示。

注意:头节点比较特殊。队列中需要被唤醒的线程从头节点的下一个节点开始。当队列初始化时,会放置一个新的 Node() 对象。属性线程未分配值。后续排队的线程在醒来时,会将自己设置为head,并将线程属性设置为null。所以头节点可以这么理解。当头节点初始化时,它是一个虚拟节点。这是没有用的。它只是充当队列头标识符。当队列中有线程在排队时,说明头节点已经是获取锁的线程的节点。等待这个线程执行完毕后,需要在www.sychzs.cn之后唤醒线程才能继续执行。这就是排队和叫醒的逻辑。

同步状态

在AQS类中,有一个state属性,描述如下

 /**
* 同步状态。
*/
私有易失性 int 状态;

state是一个整型变量,称为同步状态,或者锁的数量。用 volatile 进行修改,保证线程之间的可见性。所有线程能否获得锁资源取决于该字段的值。操作来确定。对于排他锁来说,初始情况state=0,表示当前资源空闲,可以被线程获取锁。如果state>0,说明有线程已经占用了资源,后续线程(非持有锁的线程)需要进入Queue,不会出现<0的情况,因为state时如果将exclusiveOwnerThread设置为null锁释放过程中达到=0,所以当多次调用锁释放方法时,如果exclusiveOwnerThread不是当前线程,就会抛出IllegalMonitorStateException异常。

公平锁定与非公平锁定

  • 公平锁:

多个线程获取锁时,按照请求的顺序排队,不存在跳队的情况。
常用的实现方法如下:

最终无效锁() {
获取(1);
}

acquire方法是AQS锁获取方法。多线程在竞争获取锁时会排队。

  • 不公平锁:

多个线程获取锁时,首先不会按照请求的顺序排队,而是先尝试获取锁,这就是抢占式获取。如果获得,则该线程是持有锁的线程并且可以执行其逻辑。如果没有获取到锁,就会进入排队过程,所以有可能后来到达的线程先于等待队列中的线程获取到锁。
常用的实现方法如下:

最终无效锁() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
别的
获取(1);
}

从代码中可以看到,在不公平的情况下,线程会首先尝试使用cas方法来设置状态。如果设置成功,就会获得锁。如果设置失败,就会排队等待锁获取过程。
因此,两者的区别在于锁是否会被抢占。当设置为公平锁时,每个线程获取锁的概率是相同的。每个线程都会首先检查等待队列是否为空。如果为空,则直接获取锁。如果不为空,则自动排队等待获取锁;设置为不公平锁定时,所有线程都会先尝试竞争锁,不会按顺序等待。如果无法抢到锁,就会使用类似于公平锁的方法来获取锁。
那么为什么要这样设计呢?这两种类型分别在什么场景下使用?

  1. 恢复挂起的线程和获取实际的锁之间仍然存在时间差。从开发者的角度来看,这个时间可以忽略不计,但是从CPU的角度来看,这个时间差异还是很明显的。因此,非公平锁可以充分利用CPU时间片,最小化CPU空闲状态时间
  2. 使用多线程时一个非常重要的考虑因素是线程切换的成本。使用非公平锁时,当一个线程请求锁获得同步状态,然后释放同步状态时,不需要考虑是否有前驱节点,因此刚刚释放锁的线程获得的概率此时再次同步状态变得很高,因此线程开销减少了
    看起来上面提到的两点是非公平锁比较好,但是非公平锁也有自己的问题,可能会导致排队的线程长时间排队而没有机会获得锁。这就是传说中的“锁饥饿”。如果使用超时获取锁的话,可能会导致队列中大面积的线程超时而无法获取锁。
    那么什么时候使用公平锁,什么时候使用非公平锁呢?
    如果想要更高的吞吐量,非公平锁比较合适,因为节省了大量的线程切换时间,吞吐量自然会提高;否则,使用公平锁,每个人都会按照请求的顺序排队。

专属锁加锁流程

以ReentrantLock公平锁方式无超时、不间断获取锁为例。
整体流程如下。这将有助于我们首先了解整个过程。会涉及到子流程,单独给出流程图。

获取锁的主要代码如下。这也是调用锁的入口点。逻辑见代码注释:

公共最终无效获取(int arg){
/*(1)tryAcquire方法由子类实现,尝试获取锁。
返回true不涉及后续判断,说明已经获取到锁。返回false表示尚未获取锁,后续的队列等待过程完成。
(2)addWaiter方法将当前线程封装成一个Node对象并返回,其中还包括加入队列的操作。
(3) acquireQueued方法主要尝试再次获取锁。
如果获取到则返回是否被中断。如果没有获取到,则需要确认线程是否需要阻塞以及阻塞操作。
最后返回释放中断标志。
(4) selfInterrupt中断当前线程,因为LockSupport.park阻塞线程时不会响应中断。
但通过Thread.interrupted()方法,可以获得当前线程是否被中断的标志。
*/
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
自我中断();
}

这里tryAcquire(arg)尝试通过AQS子类来获取锁,其他三个方法(acquireQueued、addWaiter、selfInterrupt)都是AQS实现的。这也是一种模板方法设计模式。
tryAcquire(arg)进程尝试获取锁的具体实现逻辑。

代码如下:

受保护的最终布尔值 tryAcquire(int acquires) {
// 获取当前线程
最终线程当前 = Thread.currentThread();
// 获取AQS的同步状态值state
int c = getState();
// state为0,表示没有线程持有锁,可以尝试获取锁
如果(c==0){
/*
(1) hasQueuedPredecessors方法判断队列中当前线程的Node之前是否还有其他Node。返回true表示还有其他线程在等待并且尝试获取锁失败。返回 false 表示前面没有线程在等待。
您可以继续执行逻辑。这里首先判断state=0,没有直接进行cas操作,然后判断队列中是否有等待线程。
充分体现公平性
(2) 如果compareAndSetState(0, acquires)也设置成功,则说明加锁成功。
设置exclusiveOwnerThread为当前线程,返回true表示获取锁成功。
*/
if (!hasQueuedPredecessors() &&
CompareAndSetState(0, 获取)) {
setExclusiveOwnerThread(当前);
返回真;
}
}
/*
这个else if逻辑主要是可重入的判断和处理。
如果持有锁的线程是当前线程,state = state + acquires
*/
否则 if (当前 == getExclusiveOwnerThread()) {
int nextc = c + 获取;
if (下一个 < 0)
throw new Error("超过最大锁定计数");
设置状态(下一个);
返回真;
}
返回假;
}

addWaiter(Node.EXCLUSIVE)流程,将线程打包成Node节点的逻辑,入队入队,返回打包好的Node节点的逻辑。

代码如下:

私有节点addWaiter(节点模式) {
// 将当前节点封装成Node对象
Node 节点 = new Node(Thread.currentThread(), mode);
// 尝试enq的快速路径;失败时备份到完整 enq节点 pred = tail;
if (pred != null) {
/*
(1)当队列不为空时,首先尝试将节点插入到队列末尾。
如果compareAndSetTail返回成功,说明该节点入队列成功,直接返回。否则需要进入队列进程。
(2)主要是将当前节点的prev指向原tail,原tail节点的next指向当前节点。
这样就完成了节点的入队。
*/
节点.prev = pred;
if (compareAndSetTail(pred, 节点)) {
www.sychzs.cn = 节点;
返回节点;
}
}
// 如果尝试直接插入到队列尾部失败,就会进入队列逻辑。
enq(节点);
// 返回当前线程封装的Node对象
返回节点;
}
私有节点 enq(最终节点节点) {
// 加入队伍时使用的for无限循环是一个旋转的过程,直到成功
为了 (;;) {
节点 t = 尾部;
/*
如果队列尾部为空,则说明队列还没有初始化。首先初始化头节点,然后尾部也指向头。
完成初始化队列,虽然只有一个节点,但head和tail都有了指向
*/
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
/*
如果队尾tail不为空,则采用cas方式将当前node插入队尾,
成功则返回,否则一直自旋尝试直到成功
*/
node.prev = t;
if (compareAndSetTail(t, node)) {
www.sychzs.cn = node;
return t;
}
}
}
}

线程阻塞逻辑,acquireQueued(final Node node, int arg)具体实现流程

代码如下:

final boolean acquireQueued(final Node node, int arg) {
/*
failed变量表示获取锁是否失败,初始化为true表示失败,只有在获取到锁时failed为false,
为true时表示获取锁过程中异常,finally块里的判断是否需要取消当前这个线程获取锁的相关逻辑,
包括队列的调整以及后继Node里线程的唤醒
*/
boolean failed = true;
try {
/*
interrupted变量表示当前线程是否被中断的标识,true:线程被中断,false:线程未被中断,
这个方法整体返回的就是这个值,用来确定后续是否要调用selfInterrupt()方法中断当前线程
*/
boolean interrupted = false;
// for无限循环,自旋处理
for (;;) {
// 取当前节点的前一个节点
final Node p = node.predecessor();
// 如果前一个节点是head并且tryAcquire尝试获取到锁了,则将当前线程设置成head
if (p == head && tryAcquire(arg)) {
setHead(node);
www.sychzs.cn = null; // help GC
failed = false;
return interrupted;
}
/*
这里就是线程阻塞等待的核心了,尝试获取锁失败时,判断是否需要阻塞,
需要阻塞的话就调用LockSupport.park方法阻塞当前线程
*/
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
/*
在不可中断模式下,failed的值始终会是false,因为虽然被中断了,
但是当前线程还是获取到锁了,走正常的后续处理逻辑,finally这里的逻辑就不会走了
*/
if (failed)
cancelAcquire(node);
}
}

尝试获取锁失败时是否需要阻塞当前线程判断流程,shouldParkAfterFailedAcquire(Node pred, Node node)逻辑

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
/*
当前线程的前一个节点的waitStatus状态是Node.SIGNAL,
则说明前一个线程如果获取到锁并且执行完成后释放了锁需要唤醒后续节点,
从另一个角度来说当前线程自然要阻塞等待了
*/
if (ws == Node.SIGNAL)
return true;
if (ws > 0) {
/*
当前线程的前一个节点的waitStatus状态是Node.CANCELLED时,说明前驱节点已经取消获取锁了
需要从当前节点一直向前查找知道节点没有被取消,
然后把找到的第一个没有被取消的节点的next指向当前节点,这样就把当前节点前取消状态的都删掉
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
www.sychzs.cn = node;
} else {
/*
前一个节点的waitStatus状态还是0,或者是共享锁的传播状态PROPAGATE时,
则会把前一个节点的waitStatus状态改成Node.SIGNAL
所以是后一个节点排队时把前一个节点waitStatus改成Node.SIGNAL,
表示前一个节点执行完释放锁了要走唤醒后续节点的逻辑,
依次类推,队列里只有最后一个Node节点的waitStatus是0,因为它没有后续节点,
也不需要执行唤醒操作,其余在没有被中断状态下应该都是Node.SIGNAL
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
private final boolean parkAndCheckInterrupt() {
/*
阻塞当前线程调的就是LockSupport.park,原理之前文章有讲过,这就是线程阻塞等待的核心实现了
线程被LockSupport.park了不会响应中断,
如果线程被中断了需要用Thread.interrupted()获取当前线程的中断标识
*/
LockSupport.park(this);
return Thread.interrupted();
}

独占锁释放锁流程

以ReentrantLock释放锁为例,释放锁不区分公平锁还是非公平锁,释放的逻辑是一样的,整体流程如下。

release(int arg)这是AQS里定义的模板方法,主要释放锁代码如下,这也是调用释放锁的入口,逻辑看代码注释:

public final boolean release(int arg) {
// 尝试释放锁,由子类实现具体逻辑
if (tryRelease(arg)) {
Node h = head;
// 头节点不为null,并且waitStatus!=0,说明要唤醒后续节点
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
// 返回锁是否空闲标识,其实就是tryRelease(arg)的返回结果
return false;
}

tryRelease(int releases)是尝试释放锁的逻辑,AQS定义的方法,默认是抛异常,子类根据具体场景实现逻辑。以下是ReentrantLock的内部类Sync的具体实现,返回true表示现在锁空闲了,返回false表示锁现在还被占用。

protected final boolean tryRelease(int releases) {
// 计算释放releases后,新的state值
int c = getState() - releases;
// 如果当前释放锁的线程不是持有锁的线程直接抛异常,只有持有锁的线程才能释放锁
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
/*
如果释放releases后,新的state是0,那么说明锁就空闲了,将free标识赋值为true,
然后将exclusiveOwnerThread赋值为null
*/
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
// 设置state新值,只有持有锁的线程才可操作,无需cas
setState(c);
return free;
}

unparkSuccessor(Node node) 这个方法就是关键的唤醒后续等待队列里的线程关键方法。通过调用LockSupport.unpark方法将阻塞的线程唤醒继续执行。

private void unparkSuccessor(Node node) {
// node是当前释放锁的线程,它的waitStatus如果<0就把他置成0
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
如果node的next节点是null或者取消了,则从队尾往前查找,一直找到node节点,
获得第一个未被取消的节点
*/
Node s = www.sychzs.cn;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
// 找到第一个未被取消的节点,并唤醒线程,使其继续执行
if (s != null)
LockSupport.unpark(s.thread);
}

这里有一个比较关键的地方,如果node的next节点是null或者取消状态,则从队尾往前查找,一直找到node节点,为什么会从后往前遍历?
这里考虑了并发的场景,从后往前不会导致node丢失,具体我们可以从addWaiter方法看。

private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
www.sychzs.cn = node;
return node;
}
}
enq(node);
return node;
}

这里的第6、7、8行就是关键了,先设置prev节点,这样就保证了所有的节点都有前驱节点,第7、8这两行没有保证原子操作,如果cas成功了,但是刚好cpu时间片切换,第8行未执行,那么pred的next就是空了,所以从前往后可能会漏节点,从后往前是完整的队列,举个栗子:
(1)假如释放锁的线程是tail尾节点,刚好unparkSuccessor时,执行到www.sychzs.cn为空的判断之前,cpu时间片切换了。
(2)有个线程调用了addWaiter方法,把新node的prev指向了tail,cas设置尾节点也成功了,就在这儿cpu又切换了,那么原tail节点的next还没有设置。
(3)cpu再切回到unparkSuccessor的www.sychzs.cn为空判断时,这时候他的next是null(因为next指针还没有指向新node节点),实际上后面还有一个node节点,这样就会漏掉节点数据了。
如果从后往前的话,每一个node的前驱肯定是有值的,但是高并发情况下不能保证每一个node的后继节点也能及时连接上。所以从后往前就确保了能遍历到每一个节点。
也就是从等待队列里阻塞的方法恢复执行,返回线程是否中断标识,然后再继续尝试获取锁。

private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}

到这里,基本上已经把独占锁的获取锁和释放锁的流程和逻辑都讲完了,AQS基本已经把大部分的核心功能帮我们写好了,我们只用去写或利用他已有的方法,实现我们自己的逻辑即可,就比如以上讲到的独占锁的获取和释放,其实我们自己仅仅具体实现了tryAcquire(int acquires)、tryRelease(int releases)这两个方法,花了大篇幅讲的都是AQS的流程和逻辑,由此,真正的感受到了AQS的巧妙设计。

超时&中断处理

理解了上面的独占锁的加锁流程,对于超时和中断处理的理解就很容易了,这两种其实都有线程中断抛出异常逻辑,另外将带超时时间获取锁和可响应中断获取锁这两种方式关于获取结果交给开发人员自行处理,既体现了设计的灵活性也可让开发人员根据具体业务场景具体处理,还是以ReentrantLock来讲解。

超时

关于超时,就是在指定的时间内未获取到锁就返回获取失败,在指定的时间内获取到了锁返回成功,有两种,一个是尝试获取,例如:tryLock(),不管有没有获取到立即返回,相当于超时是0,另一种是指定超时时间,如果指定时间未获取到锁就返回false,例如:tryLock(long timeout, TimeUnit unit),下面详细讲解下。

  • tryLock()
public boolean tryLock() {
// 入口方法,是以非公平方式尝试获取锁,返回true:获取成功,false:获取失败
return sync.nonfairTryAcquire(1);
}
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
// state值是0时,表示暂时锁空闲,尝试cas赋值,也可以理解成尝试加锁
if (c == 0) {
// cas成功,则说明加锁成功,设置当前线程为持有锁的线程,返回true:获取成功
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 当前线程如果是持有锁的线程,可重入,判断并设置state=state+acquires,返回true:获取成功
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
// 尝试没有获取到锁,当前线程也不是持有锁的线程,直接返回false:获取失败
return false;
}

tryLock()的实现逻辑还是挺简单了,不带超时相关设置,相当于超时时间是0,要么立即成功,要么立即失败,不涉及复杂的入队、阻塞、唤醒、取消相关逻辑。单纯的看state=0说明空闲cas成功则立即获取锁,或者持有锁的线程是当前线程,这样就可重入,获取锁成功,其他情况均尝试获取锁失败,直接返回。

  • tryLock(long timeout, TimeUnit unit)
public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
/*
主入口方法,带超时时间尝试获取锁,获取到返回true,未获取到返回false,
注意还有可能抛出被中断异常InterruptedException
*/
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
// 判断如果线程被中断,则抛异常
if (Thread.interrupted())
throw new InterruptedException();
//还是先尝试获取锁,获取成功则返回true,获取失败执行后面的doAcquireNanos方法,带超时等待
return tryAcquire(arg) ||
doAcquireNanos(arg, nanosTimeout);
}
/**
* 这个方法就是带超时等待获取锁的核心实现,
* 大体流程上跟acquireQueued(final Node node, int arg)这个方法差不多
* 逻辑里调用了相同的方法的就不再详细阐述了,只说不同的核心关键逻辑
*/
private boolean doAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (nanosTimeout <= 0L)
return false;
final long deadline = System.nanoTime() + nanosTimeout;
// 先入队列
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
// 如果头节点是head并且尝试获取锁成功则返回true
if (p == head && tryAcquire(arg)) {
setHead(node);
www.sychzs.cn = null; // help GC
failed = false;
return true;
}
nanosTimeout = deadline - System.nanoTime();
// 方法执行到这里已经超时了,直接返回false
if (nanosTimeout <= 0L)
return false;
/*
以下的逻辑是关键实现超时返回的逻辑
先判断是否需要阻塞,再判断超时时间是否大于1000纳秒即0.001 毫秒,
这个时间可以说非常短了,但对于高速CPU来说还是需要一定的时间,
如果这两个条件都成功,则阻塞,否则自旋
阻塞调用的是LockSupport.parkNanos(this, nanosTimeout);精确到纳秒级的阻塞,
并且第一个参数是this,表明了这个线程具体阻塞在哪个对象上,通过jstat可查看到
*/
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
// 判断如果线程被中断,则抛异常
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
/*
这里可能会走,虽然LockSupport.parkNanos不响应中断,
但是最后的逻辑判断了当前线程是否中断的标识,如果中断了则会抛InterruptedException异常,
那么failed变量的值还是true,需要走取消的逻辑,将当前线程的Node从队列去掉相关逻辑处理
*/
if (failed)
cancelAcquire(node);
}
}

中断

上文已经说过了,如果线程进入等待队列并且阻塞了,那么它是不会响应中断的,虽然阻塞队列不响应中断,但是被唤醒后,线程的中断标识是可以获取到的,所以可以通过该标识来处理是否需要主动抛异常中断处理。

需要注意中断并不是实时感知的,虽然被中断了如果没有被唤醒,还是需要继续等待,直到被唤醒后,获取中断标识来做处理。

我们还是以ReentrantLock为例,lockInterruptibly()这个就是可以响应中断的方法。

public void lockInterruptibly() throws InterruptedException {
// sync这个对象继承了AbstractQueuedSynchronizer,这里直接调用的是AQS的方法了。
sync.acquireInterruptibly(1);
}
public final void acquireInterruptibly(int arg)
throws InterruptedException {
// 先判断下如果线程已经被中断了,直接抛出InterruptedException异常
if (Thread.interrupted())
throw new InterruptedException();
if (!tryAcquire(arg))
// 尝试获取锁没有成功时,才进入可响应中断获取锁的方法里
doAcquireInterruptibly(arg);
}
/**
* 这个方法就是获取锁时可响应中断核心实现,
* 大体流程上跟tryLock(long timeout, TimeUnit unit)这个方法差不多
* 逻辑里调用了相同的方法的就不再详细阐述了,只说不同的核心关键逻辑
*/
private void doAcquireInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
www.sychzs.cn = null; // help GC
failed = false;
return;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
/*
主要的处理就在这里了,判断需要阻塞并且阻塞被唤醒后,
如果中断标识为true则抛出InterruptedException异常
*/
throw new InterruptedException();
}
} finally {
/*
这里可能会走,如果线程被中断了,抛出InterruptedException异常后,failed变量还是true
需要走取消的逻辑,将当前线程的Node从队列去掉相关逻辑处理
*/
if (failed)
cancelAcquire(node);
}
}

AQS的使用

AQS是一个抽象队列同步框架,支持独占模式和共享模式,由于AQS是一个抽象类,仅仅需要子类去实现具体的获取锁释放锁方法,锁的获取和释放入口统一由AQS提供,如下所示。

独占模式

  • 获取锁入口

(1)不响应中断

public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

(2)响应中断

public final void acquireInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}

独占模式下,不管是否响应中断,获取锁时子类仅需要实现tryAcquire(arg)方法,尝试获取资源,成功则返回true,失败则返回false,其他都由AQS提供。

  • 释放锁入口
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}

独占模式下,释放锁时子类仅需要实现tryRelease(arg)方法,尝试释放资源,成功则返回true,失败则返回false,其他都由AQS提供。

共享模式

  • 获取锁入口

(1) 不响应中断

public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}

(2) 响应中断

public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}

共享模式下,不管是否响应中断,获取锁时子类仅需要实现tryAcquireShared(arg)方法,尝试获取资源,返回值<0表示失败;=0表示成功,但没有剩余可用资源;>0表示成功,且有剩余资源,其他都由AQS提供。

  • 释放锁入口
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}

共享模式下,释放锁时子类仅需要实现tryReleaseShared(arg)方法,尝试释放资源,如果释放后允许唤醒后续等待结点返回true,否则返回false,其他都由AQS提供。

自定义锁的实现

使用AQS自定义锁时,子类可以实现Lock接口(因为Lock定义了获取锁和释放锁的方法,也可以不实现这个接口,自己定义方法),然后实现尝试获取锁和释放锁的方法即可。

需求

实现一个独占不响应中断不可重入的公平锁。

分析

独占锁需要实现tryAcquire(arg)、tryRelease(arg)这两个方法。不可重入,则要判断只要有线程占用锁,不管是不是当前线程都返回获取失败,公平锁说明尝试获取锁时要先看队列里是否有等待获取锁的Node。

实现

其实也就是ReentrantLock的另一个版本

  1. 定义一个实现需求的MyLock类。
  2. 定义MyLock类的加锁方法lock()和释放锁方法unLock()。
  3. 在MyLock类内部定义一个Sync类继承AbstractQueuedSynchronizer类,实现tryAcquire(int arg)和tryRelease(int arg)方法。
  4. MyLock类中定义一个Sync的变量,构造函数中实例化Sync类,在lock方法调用sync.acquire(1),在unlock方法中调用sync.release(1)

这样锁的定义和实现都完成了,代码如下。

public class MyLock {
private Sync sync;
public MyLock() {
sync = new Sync();
}
private class Sync extends AbstractQueuedSynchronizer {
@Override
protected boolean tryAcquire(int arg) {
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, arg)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
}
return false;
}
@Override
protected boolean tryRelease(int arg) {
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (getState() == 1) {
free = true;
setExclusiveOwnerThread(null);
setState(0);
}
return free;
}
}
public final void lock() {
sync.acquire(1);
}
public void unLock() {
sync.release(1);
}
}

测试

  • 多个线程获取锁
class Test {
public static void main(String[] args) {
MyLock myLock = new MyLock();
List list = new ArrayList<>();
for (int i = 0; i < 5; i++) {
list.add(new Thread(() -> {
System.out.println(www.sychzs.cn() + ":" + Thread.currentThread().getName() + "将要加锁");
myLock.lock();
System.out.println(www.sychzs.cn() + ":" + Thread.currentThread().getName() + "加锁成功");
try {
System.out.println(www.sychzs.cn() + ":" + Thread.currentThread().getName() + "执行业务逻辑");
Thread.sleep(new Random().nextInt(10));
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println(www.sychzs.cn() + ":" + Thread.currentThread().getName() + "解锁成功");
myLock.unLock();
}
}, "t" + i));
}
list.forEach(Thread::start);
}
}

结果输出:

2023-06-08T11:35:27.822:t0将要加锁
2023-06-08T11:35:27.822:t4将要加锁
2023-06-08T11:35:27.822:t3将要加锁
2023-06-08T11:35:27.822:t1将要加锁
2023-06-08T11:35:27.822:t2将要加锁
2023-06-08T11:35:27.823:t0加锁成功
2023-06-08T11:35:27.823:t0执行业务逻辑
2023-06-08T11:35:27.828:t0解锁成功
2023-06-08T11:35:27.828:t4加锁成功
2023-06-08T11:35:27.828:t4执行业务逻辑
2023-06-08T11:35:27.831:t4解锁成功
2023-06-08T11:35:27.831:t3加锁成功
2023-06-08T11:35:27.831:t3执行业务逻辑
2023-06-08T11:35:27.836:t3解锁成功
2023-06-08T11:35:27.836:t1加锁成功
2023-06-08T11:35:27.836:t1执行业务逻辑
2023-06-08T11:35:27.837:t1解锁成功
2023-06-08T11:35:27.837:t2加锁成功
2023-06-08T11:35:27.837:t2执行业务逻辑
2023-06-08T11:35:27.845:t2解锁成功
  • 线程是否可重入
class Test {
public static void main(String[] args) {
MyLock myLock = new MyLock();
new Thread(() -> {
System.out.println(www.sychzs.cn() + ":" + Thread.currentThread().getName() + "将要加锁");
myLock.lock();
System.out.println(www.sychzs.cn() + ":" + Thread.currentThread().getName() + "加锁成功");
try {
myLock.lock();
System.out.println(www.sychzs.cn() + ":" + Thread.currentThread().getName() + "再次加锁成功");
System.out.println(www.sychzs.cn() + ":" + Thread.currentThread().getName() + "执行业务逻试");
Thread.sleep(new Random().nextInt(10));
myLock.unLock();
}
catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println(www.sychzs.cn() + ":" + Thread.currentThread().getName() + "解锁成功");
myLock.unLock();
}
},"t1").start();
new Thread(() -> {
System.out.println(www.sychzs.cn() + ":" + Thread.currentThread().getName() + "将要加锁");
myLock.lock();
System.out.println(www.sychzs.cn() + ":" + Thread.currentThread().getName() + "加锁成功");
try {
System.out.println(www.sychzs.cn() + ":" + Thread.currentThread().getName() + "执行业务逻试");
Thread.sleep(new Random().nextInt(10));
myLock.unLock();
System.out.println(www.sychzs.cn() + ":" + Thread.currentThread().getName() + "解锁成功");
myLock.lock();
System.out.println(www.sychzs.cn() + ":" + Thread.currentThread().getName() + "再次加锁成功");
myLock.unLock();
System.out.println(www.sychzs.cn() + ":" + Thread.currentThread().getName() + "再次解锁成功");
}
catch (InterruptedException e) {
e.printStackTrace();
}
},"t2").start();
}
}

有两种可能的输出:

  1. t1先获取锁成功

这种情况输出如下,t1先加锁成功,t2等待,实现了多线程间的加锁互斥,另外t1加锁成功后有再次加锁,发现还是等待,这说明锁不可重入,功能实现,这两个线程都将一直等下去。

2023-06-08T11:47:57.016:t1将要加锁
2023-06-08T11:47:57.017:t1加锁成功
2023-06-08T11:47:57.016:t2将要加锁
  1. t2先获取锁成功

这种情况输出如下,t2先加锁成功,正常执行业务逻辑后释放锁,t2释放锁后线程可正常结束。t2释放了锁,则t1加锁成功,当t1想第二次再加锁时,发现需要等待,锁不可重入。

2023-06-08T11:49:28.492:t2将要加锁
2023-06-08T11:49:28.492:t1将要加锁
2023-06-08T11:49:28.493:t2加锁成功
2023-06-08T11:49:28.493:t2执行业务逻试
2023-06-08T11:49:28.501:t2解锁成功
2023-06-08T11:49:28.501:t1加锁成功

通过这两个例子,我们可以看出,这种独占锁、不可重入的情况下,lock()和unlock()方法必须配对使用,不能连续加锁和释放锁。

JUC包下AQS子类锁的实现

java.util.concurrent包下有几个基于AQS实现的锁,如下所示,有了以上知识基础,再理解这些锁是很容易的,了解详细可参考具体源码实现。

类型 描述
ReentrantLock 独享锁 可重入锁
ReentrantReadWriteLock 独享锁、共享锁兼备 ReadLock是共享锁,WriteLock是独享锁
CountDownLatch 共享锁 不可重复使用
Semaphore 共享锁 可重复使用
CyclicBarrier 共享锁 使用ReentrantLock实现的共享锁,可重复使用

总结

主要讲解了AQS的独占模式,提到了一些共享模式相关的知识,有了独享模式的基础,理解共享模式并不难,还有关于Condition相关的知识没有讲,所以关于共享模式和Condition相关的大家可以自行去阅读源码,后续有机会也会出相关的文章。
还有另外一个类AbstractQueuedLongSynchronizer,这个类是AbstractQueuedSynchronizer的一个变种,只是把state的类型从int变成long了,所有涉及跟这个state相关的操作参数和返回都改成long类型了,理论上使用这个类实现的锁可以超过Integer.MAX_VALUE的限制,最大的可获取锁的次数就变成Long.MAX_VALUE,这个在如多级锁和需要64位状态时会非常有用,目前在JDK里并没有发现使用的地方,而在HikariCP连接池com.zaxxer.hikari.util.QueuedSequenceSynchronizer这个类内部使用到了这个类,感兴趣的可自行阅读。
AQS的设计确实相当巧妙、逻辑非常严谨,在多线程下使用,已尽可能最大限度支持高并发操作,通过对源码的学习,我们了解了锁的设计,大部分的工作都由AQS完成(包括线程的包装排队、阻塞、唤醒、超时处理、中断处理等),剩下的小部分代码由开发者根据业务场景具体实现(尝试获取锁,释放锁),不得不佩服如此精美巧妙的设计和实现,Doug Lea,我永远的神!

相关文章

最新资讯