本文参考 黑马Java并发编程
ReentrantLock
ReentrantLock的特点
-
可重入:同一个线程首次获得了锁后,有权利再次获取这把锁
-
可中断
try{ lock.lockInterruptibly() } catch (InterruptedEception e){ return; } lock.unlock();
-
可设置超时时间
lock.tryLock(1, TimeUnit.SECONDS);
-
可设置为公平锁
//ReentrantLock默认为非公平锁 //公平锁:等待队列的第一个才能拿到锁 //非公平锁:可以不进入等待队列就尝试获取锁 ReentrantLock lock = new ReentrantLock(false);
-
支持多个条件变量
Synchronized中使用wait和notify也能操作条件变量Condition condition1 = lock.newCondition(); Condition condition2 = lock.newCondition(); //调用await方法需要先获得锁,执行后会释放锁 //被唤醒、打断或者超时会重新竞争锁,得到锁后往下执行 try{ lock.lock(); while(条件1){ try { condition1.await(); }catch (e){ 处理异常... } 执行方法... } } finally { lock.unlock(); } condition1.signal();//唤醒
AQS
概述
全称:AbstractQueuedSynchronizer,是一个实现阻塞式同步器的重要抽象父类
关键属性:
- state属性,用来表示资源的状态
- getState
- setState
- compareAndSetState
- 在子类中操作state,可以用来实现独占和共享
- head、tail、Node组成的基于FIFO的等待队列,类似Monitor 的EntryList
- ConditionObject,实现等待、唤醒机制,类似WaitSet
- exclusiveOwnerThread 独占线程
子类主要实现的方法:
- tryAcquire
- tryRelease
- tryAcquireShared
- tryReleaseShared
- isHeldExclusively
接下来尝试用AQS写一个简单的不可重入锁
// 1.创建类MyLock实现Lock接口
// 2.创建内部类Syn继承AQS,并实现三个基本方法
class MyLock2 implements Lock {
private Sync sync = new Sync();
class Sync extends AbstractQueuedSynchronizer {
@Override
protected boolean tryAcquire(int arg) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
@Override
protected boolean tryRelease(int arg) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
@Override
protected boolean isHeldExclusively() {
return getState() == 1;
}
protected Condition newCondition() {
return new ConditionObject();
}
}
@Override
public void lock() {
sync.acquire(1);
}
@Override
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
@Override
public boolean tryLock() {
return sync.tryAcquire(1);
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(time));
}
@Override
public void unlock() {
sync.release(1);
}
@Override
public Condition newCondition() {
return sync.newCondition();
}
}
可以看到,我们在AQS的子类Sync中实现了tryAcquire这些方法后,直接调用相对应的方法就实现了一个简单的锁,说明AQS帮我们预先写好了许多功能,我们只需要在待实现的方法中设置好State的改变方式,便可轻松实现各种需求的锁!
ReentrantLock的源码分析
接下来,我们结合ReentrantLock的源码和AQS,一起看看如何实现R锁的加锁、解锁和各种功能
① 先来看看ReentrantLock的构造方法:
public ReentrantLock() { sync = new NonfairSync(); }
可以看出默认情况下我们使用的是非公平锁,我们也主要分析非公平锁的源码
② 看看如何加锁,找到lock方法
public void lock() { sync.lock(); }
//点进sync.lock() 查看源码👇
final void lock() {
if (compareAndSetState(0, 1)) //加锁成功
setExclusiveOwnerThread(Thread.currentThread());
else //加锁失败
acquire(1);
}
这里加锁成功和之前我们写的锁一样,我们着重看看这里加锁失败会执行的acquire方法
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) //放入一个空节点作为Dummy Head
selfInterrupt();
}
//先看tryAcquire(arg),最后会跳到NoFair的方法👇
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) { //如果当前state为0,会cas尝试获得锁
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
//如果当前线程已经获得锁,是重入情况,则把state增加值
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
//获取锁失败,返回false
return false;
}
//如果在上一步我们没能获取到锁,我们会来到acquireQueued方法,看下👇
//在此之前,我们需要先将当前线程加入等待队列,即AddWaiter方法
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) { //无限循环
final Node p = node.predecessor();
//如果当前节点是第一个线程节点,则👆尝试获取锁,需要竞争
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
//should方法中,判断前一个结点的waitStatus是否为-1
//若不为-1,则将其置为-1,返回false,再次循环一次尝试获取锁
//若为-1,则返回true,执行park阻塞
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
走到这一步,我们来看下现在的结构是怎么样的👇
③ 解锁:现在我们再进来几个节点,期间锁一直被占用:
这时如果Thread-0释放锁,会调用unlock方法,各个线程会参与竞争,来看看源码👇
public void unlock() { sync.release(1); }
//跳转一下 👇
public final boolean release(int arg) {
if (tryRelease(arg)) { //跳去看tryRelease
Node h = head;
//如果当前头节点的waitStatus不为0,则要唤醒后一个节点
if (h != null && h.waitStatus != 0)
//恢复运行,参与竞争
unparkSuccessor(h);
//那从哪里开始呢,👆 看acquireQueued那里的无限循环获取锁!
return true;
}
return false;
}
//我们先跳过来看tryRelease 👇
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) { //如果重入的锁已经删除完了,就真正去掉锁
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
//考虑可重入的情况,若解锁的是重入的锁,则返回false
return free;
}
//调回去看release 👆
看下图:
④ 可重入的实现原理:
//我们回顾一下代码,就能轻松明白,这里只放核心代码
//加锁tryAcquire方法:
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
//解锁 tryRelease方法:
if (c == 0) { //如果重入的锁已经删除完了,就真正去掉锁
free = true;
setExclusiveOwnerThread(null);
}
⑤ 可打断原理:
默认情况下是不可打断的,所以我们来看下可打断代码有什么不同
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
//跳入acquireInterruptibly 👇
public final void acquireInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}
//继续进入doAcquireInterruptibly方法
private void doAcquireInterruptibly(int arg) throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException(); //这里!!!
//interrupted = true; 对比不可打断
}
} finally {
if (failed)
cancelAcquire(node);
}
}
可以看到,在打断处我们会直接返回异常,而不是重新设置被被打断标志
⑥ 公平锁原理:简单来说就是在tryAcquire方法,线程来获取锁时,先去检查一下AQS等待队列中是否有节点,没有才能竞争锁
if (c == 0) {
//就是这个hasQueuedPredecessors()
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
⑦ 条件变量原理:
//主要是用ConditionObject
//先来看await流程
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter(); //添加结点👇
int savedState = fullyRelease(node);//完全释放锁(可重入),唤醒下一个结点
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
LockSupport.park(this); //park住了
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
//看下添加结点的关键代码
private Node addConditionWaiter() {
...
//设置结点的waitStatus为-2
Node node = new Node(Thread.currentThread(), Node.CONDITION);
...
return node;
}
//再来看看signal方法,这里最终会跳到doSignal
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
//上面这一块代码是把当前线程结点从ConditionObject队列取出
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
//这里不停尝试把结点转移到AQS等待队列,如果转移失败,就尝试换下一个转移
//转移失败情况:结点放弃锁、超时
}
Q.E.D.