ReentrantLock

ReentrantLock 特性

ReentrantLock 意思为可重入锁,指的是一个线程能够对一个临界资源重复加锁。

// **************************Synchronized的使用方式**************************
// 1.用于代码块
synchronized (this) {}
// 2.用于对象
synchronized (object) {}
// 3.用于方法
public synchronized void test () {}
// 4.可重入
for (int i = 0; i < 100; i++) {
	synchronized (this) {}
}
// **************************ReentrantLock的使用方式**************************
public void test () throw Exception {
	// 1.初始化选择公平锁、非公平锁
	ReentrantLock lock = new ReentrantLock(true);
	// 2.可用于代码块
	lock.lock();
	try {
		try {
			// 3.支持多种加锁方式,比较灵活; 具有可重入特性
			if(lock.tryLock(100, TimeUnit.MILLISECONDS)){ }
		} finally {
			// 4.手动释放锁
			lock.unlock()
		}
	} finally {
		lock.unlock();
	}
}

ReentrantLock 简介

ReentrantLock 意思为可重入锁,指的是一个线程能够对一个临界资源重复加锁。ReentrantLock 支持公平锁和非公平锁,ReentrantLock 的底层就是由 AQS 来实现的。

构造函数

  • 无参构造,非公平锁
  • 有参构造,true 为公平锁,false 为非公平锁。
public ReentrantLock() {  sync = new NonfairSync(); }
public ReentrantLock(boolean fair) {  sync = fair ? new FairSync() : new NonfairSync(); }

公平锁和非公平锁

公平锁就是,根据申请锁的顺序赋予锁;非公平锁与其相反。关于公平锁和非公平锁的原理分析,可参考《不可不说的 Java“锁”事》。

ReentrantLock 中非公平锁的加锁:如果加锁成功了,那么就可以让当前线程运行;那么加锁失败了,那么就需要记录下这个任务,这就涉及到 AQS 的实现。AQS 的底层是一个双向队列,没有获取锁失败的线程就会记录到这个队列中。

ReentrantLock 中公平锁的加锁:直接添加到 AQS 的双向队列中。当然也会先判断双向队列是否为空,为空且可以运行则直接运行当前线程。

// java.util.concurrent.locks.ReentrantLock#NonfairSync
// JDK1.8
static final class NonfairSync extends Sync {
	...
	final void lock() {
		if (compareAndSetState(0, 1))              // 尝试使用CAS机制来抢占锁
			setExclusiveOwnerThread(Thread.currentThread());  // 抢占成功后则设置为当前线程执行
		else
			acquire(1);                       // 失败,则调用AQS的acquire方法进行后续处理
		}
 ...
}
 
static final class FairSync extends Sync {
 ...
	final void lock() {
		acquire(1);
	}
 ...
}

AQS 原理

AQS 源码的整体框架

以当前使用的 JDK11 为例。

394-575 行定义了 AQS 的队列节点 Node。

580-613 行定义了 AQS 的同步状态 state。

615-780 行定义了 AQS 的排队方法(615 行进行了”Queuing utilities”注释)

782-887 行定义了 AQS 的为获取锁的方式提供的一些辅助方法(782 行进行了”Utilities for various versions of acquire”注释)

889-1087 行定义了 AQS 的多种获取锁的方式(889 行进行了”Various flavors of acquire”注释)

1089-1387 行定义了 AQS 的主要的公开方法,包含获取锁和释放锁(1089 行进行了”Main exported methods”注释)

1389-1565 行定义了 AQS 的队列相关判断方法(1389 行进行了”Queue inspection methods”注释)

1567-1646 行定义了 AQS 的队列监测方法(1567 行进行了”Instrumentation and monitoring methods”注释)

1664-1772 行定义了 AQS 的对于 Condition 的内部支持方法(1664 行进行了”Internal support methods for Conditions”注释)

1774-1852 行定义了 AQS 的对于 Condition 的测量方法(“Instrumentation methods for conditions”注释)

最后还有个静态代码块和两个 Node 相关的方法。

AQS 框架

  • 上图中有颜色的为 Method,无颜色的为 Attribution。
  • 总的来说,AQS 框架共分为五层,自上而下由浅入深,从 AQS 对外暴露的 API 到底层基础数据。
  • 当有自定义同步器接入时,只需重写第一层所需要的部分方法即可,不需要关注底层具体的实现流程。当自定义同步器进行加锁或者解锁操作时,先经过第一层的 API 进入 AQS 内部方法,然后经过第二层进行锁的获取,接着对于获取锁失败的流程,进入第三层和第四层的等待队列处理,而这些处理方式均依赖于第五层的基础数据提供层。

主要分析过程如下:

AQS 原理概述

AQS 核心思想是,如果被请求的共享资源空闲,那么就将当前请求资源的线程设置为有效的工作线程,将共享资源设置为锁定状态;如果共享资源被占用,就需要一定的阻塞等待唤醒机制来保证锁分配。这个机制主要用的是 CLH 队列的变体实现的,将暂时获取不到锁的线程加入到队列中。

CLH:Craig、Landin and Hagersten 队列,是单向链表,AQS 中的队列是 CLH 变体的虚拟双向队列(FIFO),AQS 是通过将每条请求共享资源的线程封装成一个节点来实现锁的分配。

主要原理图如下:

AQS 使用一个 Volatile 的 int 类型的成员变量来表示同步状态,通过内置的 FIFO 队列来完成资源获取的排队工作,通过 CAS 完成对 State 值的修改。

AQS 数据结构

先来看下 AQS 中最基本的数据结构——Node,Node 即为上面 CLH 变体队列中的节点。

几个方法和属性值的含义:

  • waitStatus :当前节点在队列中的状态
  • thread :表示处于该节点的线程
  • prev :前驱指针
  • predecessor :返回前驱节点,没有的话抛出 npe
  • nextWaiter :指向下一个处于 CONDITION 状态的节点(由于本篇文章不讲述 Condition Queue 队列,这个指针不多介绍)
  • next :后继指针

线程两种锁的模式:

  • SHARED : 表示线程以共享的模式等待锁
  • EXCLUSIVE : 表示线程正在以独占的方式等待锁

waitStatus 有下面几个枚举值:

  • 0 : 当一个 Node 被初始化的时候的默认值
  • CANCELLED : 为 1,表示线程获取锁的请求已经取消了
  • CONDITION : 为-2,表示节点在等待队列中,节点线程等待唤醒
  • PROPAGATE : 为-3,当前线程处在 SHARED 情况下,该字段才会使用
  • SIGNAL : 为-1,表示线程已经准备好了,就等资源释放了

同步状态 State

在了解数据结构后,接下来了解一下 AQS 的同步状态——State。AQS 中维护了一个名为 state 的字段,意为同步状态,是由 Volatile 修饰的,用于展示当前临界资源的获锁情况。

private volatile int state;

下面提供了几个访问这个字段的方法:

  • protected final int getState() 获取 State 的值
  • protected final void setState(int newState) 设置 State 的值
  • protected final boolean compareAndSetState(int expect, int update) 使用 CAS 方式更新 State

这几个方法都是 final 修饰的,说明子类中无法重写它们。我们可以通过修改 State 字段表示的同步状态来实现多线程的独占模式和共享模式(加锁过程)。

对于我们自定义的同步工具,需要自定义获取同步状态和释放状态的方式,也就是 AQS 架构图中的第一层:API 层。

AQS 重要方法与 ReentrantLock 的关联

从架构图中可以得知,AQS 提供了大量用于自定义同步器实现的 Protected 方法。自定义同步器实现的相关方法也只是为了通过修改 State 字段来实现多线程的独占模式或者共享模式。自定义同步器需要实现以下方法(ReentrantLock 需要实现的方法如下,并不是全部):

  • protected boolean isHeldExclusively() 该线程是否正在独占资源。只有用到 Condition 才需要去实现它。
  • protected boolean tryAcquire(int arg) 独占方式。arg 为获取锁的次数,尝试获取资源,成功则返回 True,失败则返回 False。
  • protected boolean tryRelease(int arg) 独占方式。arg 为释放锁的次数,尝试释放资源,成功则返回 True,失败则返回 False。
  • protected int tryAcquireShared(int arg) 共享方式。arg 为获取锁的次数,尝试获取资源。负数表示失败;0 表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
  • protected boolean tryReleaseShared(int arg) 共享方式。arg 为释放锁的次数,尝试释放资源,如果释放后允许唤醒后续等待结点返回 True,否则返回 False。

一般来说,自定义同步器要么是独占方式,要么是共享方式,它们也只需实现 tryAcquire-tryRelease、tryAcquireShared-tryReleaseShared 中的一种即可。AQS 也支持自定义同步器同时实现独占和共享两种方式,如 ReentrantReadWriteLock。ReentrantLock 是独占锁,所以实现了 tryAcquire-tryRelease。

以非公平锁为例,这里主要阐述一下非公平锁与 AQS 之间方法的关联之处,具体每一处核心方法的作用会在文章后面详细进行阐述。

为了帮助大家理解 ReentrantLock 和 AQS 之间方法的交互过程,以非公平锁为例,我们将加锁和解锁的交互流程单独拎出来强调一下,以便于对后续内容的理解。

加锁:

  • 通过 ReentrantLock 的加锁方法 Lock 进行加锁操作。
  • 会调用到内部类 Sync 的 Lock 方法,由于 Sync#lock 是抽象方法,根据 ReentrantLock 初始化选择的公平锁和非公平锁,执行相关内部类的 Lock 方法,本质上都会执行 AQS 的 Acquire 方法。
  • AQS 的 Acquire 方法会执行 tryAcquire 方法,但是由于 tryAcquire 需要自定义同步器实现,因此执行了 ReentrantLock 中的 tryAcquire 方法,由于 ReentrantLock 是通过公平锁和非公平锁内部类实现的 tryAcquire 方法,因此会根据锁类型不同,执行不同的 tryAcquire。
  • tryAcquire 是获取锁逻辑,获取失败后,会执行框架 AQS 的后续逻辑,跟 ReentrantLock 自定义同步器无关。

解锁:

  • 通过 ReentrantLock 的解锁方法 Unlock 进行解锁。
  • Unlock 会调用内部类 Sync 的 Release 方法,该方法继承于 AQS。
  • Release 中会调用 tryRelease 方法,tryRelease 需要自定义同步器实现,tryRelease 只在 ReentrantLock 中的 Sync 实现,因此可以看出,释放锁的过程,并不区分是否为公平锁。
  • 释放成功后,所有处理由 AQS 框架完成,与自定义同步器无关。

通过上面的描述,大概可以总结出 ReentrantLock 加锁解锁时 API 层核心方法的映射关系。

通过 ReentrantLock 理解 AQS

ReentrantLock 中公平锁和非公平锁在底层是相同的,这里以非公平锁为例进行分析。

在非公平锁中,有一段这样的代码:

// java.util.concurrent.locks.ReentrantLock
static final class NonfairSync extends Sync {
	...
	final void lock() {
		if (compareAndSetState(0, 1))
			setExclusiveOwnerThread(Thread.currentThread());
		else
			acquire(1);
	}
  ...
}

看一下这个 Acquire 是怎么写的:

// java.util.concurrent.locks.AbstractQueuedSynchronizer
public final void acquire(int arg) {
	if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
		selfInterrupt();
}

再看一下 tryAcquire 方法:

// java.util.concurrent.locks.AbstractQueuedSynchronizer
protected boolean tryAcquire(int arg) {
	throw new UnsupportedOperationException();
}

可以看出,这里只是 AQS 的简单实现,具体获取锁的实现方法是由各自的公平锁和非公平锁单独实现的(以 ReentrantLock 为例)。如果该方法返回了 True,则说明当前线程获取锁成功,就不用往后执行了;如果获取失败,就需要加入到等待队列中。下面会详细解释线程是何时以及怎样被加入进等待队列中的。

线程加入等待队列

加入队列的时机

当执行 Acquire(1)时,会通过 tryAcquire 获取锁。在这种情况下,如果获取锁失败,就会调用 addWaiter 加入到等待队列中去。

如何加入队列

获取锁失败后,会执行 addWaiter(Node.EXCLUSIVE)加入等待队列,具体实现方法如下:

// java.util.concurrent.locks.AbstractQueuedSynchronizer

private Node addWaiter(Node mode) {
	Node node = new Node(Thread.currentThread(), mode);
	// Try the fast path of enq; backup to full enq on failure
	Node pred = tail;
	if (pred != null) {
		node.prev = pred;
		if (compareAndSetTail(pred, node)) {
			pred.next = node;
			return node;
		}
	}
	enq(node);
	return node;
}
private final boolean compareAndSetTail(Node expect, Node update) {
	return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
}

主要的流程如下:

  • 通过当前的线程和锁模式新建一个节点。
  • Pred 指针指向尾节点 Tail。
  • 将 New 中 Node 的 Prev 指针指向 Pred。
  • 通过 compareAndSetTail 方法,完成尾节点的设置。这个方法主要是对 tailOffset 和 Expect 进行比较,如果 tailOffset 的 Node 和 Expect 的 Node 地址是相同的,那么设置 Tail 的值为 Update 的值。
// java.util.concurrent.locks.AbstractQueuedSynchronizer

static {
	try {
		stateOffset = unsafe.objectFieldOffset(AbstractQueuedSynchronizer.class.getDeclaredField("state"));
		headOffset = unsafe.objectFieldOffset(AbstractQueuedSynchronizer.class.getDeclaredField("head"));
		tailOffset = unsafe.objectFieldOffset(AbstractQueuedSynchronizer.class.getDeclaredField("tail"));
		waitStatusOffset = unsafe.objectFieldOffset(Node.class.getDeclaredField("waitStatus"));
		nextOffset = unsafe.objectFieldOffset(Node.class.getDeclaredField("next"));
	} catch (Exception ex) {
    throw new Error(ex);
  }
}

从 AQS 的静态代码块可以看出,都是获取一个对象的属性相对于该对象在内存当中的偏移量,这样我们就可以根据这个偏移量在对象内存当中找到这个属性。tailOffset 指的是 tail 对应的偏移量,所以这个时候会将 new 出来的 Node 置为当前队列的尾节点。同时,由于是双向链表,也需要将前一个节点指向尾节点。

  • 如果 Pred 指针是 Null(说明等待队列中没有元素),或者当前 Pred 指针和 Tail 指向的位置不同(说明被别的线程已经修改),就需要看一下 Enq 的方法。
// java.util.concurrent.locks.AbstractQueuedSynchronizer

private Node enq(final Node node) {
	for (;;) {
		Node t = tail;
		if (t == null) { // Must initialize
			if (compareAndSetHead(new Node()))
				tail = head;
		} else {
			node.prev = t;
			if (compareAndSetTail(t, node)) {
				t.next = node;
				return t;
			}
		}
	}
}

如果没有被初始化,需要进行初始化一个头结点出来。但请注意,初始化的头结点并不是当前线程节点,而是调用了无参构造函数的节点。如果经历了初始化或者并发导致队列中有元素,则与之前的方法相同。其实,addWaiter 就是一个在双端链表添加尾节点的操作,需要注意的是,双端链表的头结点是一个无参构造函数的头结点。

总结一下,线程获取锁的时候,过程大体如下:

  1. 当没有线程获取到锁时,线程 1 获取锁成功。
  2. 线程 2 申请锁,但是锁被线程 1 占有。

  1. 如果再有线程要获取锁,依次在队列中往后排队即可。

回到上边的代码,hasQueuedPredecessors 是公平锁加锁时判断等待队列中是否存在有效节点的方法。如果返回 False,说明当前线程可以争取共享资源;如果返回 True,说明队列中存在有效节点,当前线程必须加入到等待队列中。

// java.util.concurrent.locks.ReentrantLock

public final boolean hasQueuedPredecessors() {
	// The correctness of this depends on head being initialized
	// before tail and on head.next being accurate if the current
	// thread is first in queue.
	Node t = tail; // Read fields in reverse initialization order
	Node h = head;
	Node s;
	return h != t && ((s = h.next) == null || s.thread != Thread.currentThread());
}

看到这里,我们理解一下 h != t && ((s = h.next) == null || s.thread != Thread.currentThread()); 为什么要判断的头结点的下一个节点?第一个节点储存的数据是什么?

Reference