本文参考 黑马Java并发编程

ReentrantLock

ReentrantLock的特点

  • 可重入:同一个线程首次获得了锁后,有权利再次获取这把锁

  • 可中断

    try{
        lock.lockInterruptibly()
    } catch (InterruptedEception e){
        return;
    }
    lock.unlock();
    
  • 可设置超时时间

    lock.tryLock(1, TimeUnit.SECONDS);
    
  • 可设置为公平锁

    //ReentrantLock默认为非公平锁
    //公平锁:等待队列的第一个才能拿到锁
    //非公平锁:可以不进入等待队列就尝试获取锁
    ReentrantLock lock = new ReentrantLock(false);
    
  • 支持多个条件变量
    Synchronized中使用wait和notify也能操作条件变量

    Condition condition1 = lock.newCondition();
    Condition condition2 = lock.newCondition();
    //调用await方法需要先获得锁,执行后会释放锁
    //被唤醒、打断或者超时会重新竞争锁,得到锁后往下执行
    try{
        lock.lock();
        while(条件1){
            try {
                condition1.await();
            }catch (e){
                处理异常...
            }
            执行方法...
        }
    } finally {
        lock.unlock();
    }
    
    condition1.signal();//唤醒
    
    

AQS

概述

全称:AbstractQueuedSynchronizer,是一个实现阻塞式同步器的重要抽象父类

image-20211216195618917

关键属性:

  • state属性,用来表示资源的状态
    • getState
    • setState
    • compareAndSetState
    • 在子类中操作state,可以用来实现独占和共享
  • head、tail、Node组成的基于FIFO的等待队列,类似Monitor 的EntryList
  • ConditionObject,实现等待、唤醒机制,类似WaitSet
  • exclusiveOwnerThread 独占线程

子类主要实现的方法:

  • tryAcquire
  • tryRelease
  • tryAcquireShared
  • tryReleaseShared
  • isHeldExclusively

接下来尝试用AQS写一个简单的不可重入锁

// 1.创建类MyLock实现Lock接口
// 2.创建内部类Syn继承AQS,并实现三个基本方法
class MyLock2 implements Lock {

    private Sync sync = new Sync();

    class Sync extends AbstractQueuedSynchronizer {
        @Override
        protected boolean tryAcquire(int arg) {
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        @Override
        protected boolean tryRelease(int arg) {
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }

        @Override
        protected boolean isHeldExclusively() {
            return getState() == 1;
        }


        protected Condition newCondition() {
            return new ConditionObject();
        }
    }

    @Override
    public void lock() {
        sync.acquire(1);
    }

    @Override
    public void lockInterruptibly() throws InterruptedException {
        sync.acquireInterruptibly(1);
    }

    @Override
    public boolean tryLock() {
        return sync.tryAcquire(1);
    }

    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return sync.tryAcquireNanos(1, unit.toNanos(time));
    }

    @Override
    public void unlock() {
        sync.release(1);
    }

    @Override
    public Condition newCondition() {
        return sync.newCondition();
    }
}

可以看到,我们在AQS的子类Sync中实现了tryAcquire这些方法后,直接调用相对应的方法就实现了一个简单的锁,说明AQS帮我们预先写好了许多功能,我们只需要在待实现的方法中设置好State的改变方式,便可轻松实现各种需求的锁!

ReentrantLock的源码分析

接下来,我们结合ReentrantLock的源码和AQS,一起看看如何实现R锁的加锁、解锁和各种功能

① 先来看看ReentrantLock的构造方法:

public ReentrantLock() { sync = new NonfairSync(); }

可以看出默认情况下我们使用的是非公平锁,我们也主要分析非公平锁的源码

② 看看如何加锁,找到lock方法

public void lock() { sync.lock(); }
//点进sync.lock() 查看源码👇
final void lock() {
    if (compareAndSetState(0, 1)) //加锁成功
        setExclusiveOwnerThread(Thread.currentThread());
    else						  //加锁失败
        acquire(1); 
}

这里加锁成功和之前我们写的锁一样,我们着重看看这里加锁失败会执行的acquire方法

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) //放入一个空节点作为Dummy Head
        selfInterrupt();
}
//先看tryAcquire(arg),最后会跳到NoFair的方法👇
final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) { //如果当前state为0,会cas尝试获得锁
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    //如果当前线程已经获得锁,是重入情况,则把state增加值
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    //获取锁失败,返回false
    return false;
}

//如果在上一步我们没能获取到锁,我们会来到acquireQueued方法,看下👇
//在此之前,我们需要先将当前线程加入等待队列,即AddWaiter方法
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) { //无限循环
            final Node p = node.predecessor();
            //如果当前节点是第一个线程节点,则👆尝试获取锁,需要竞争
            if (p == head && tryAcquire(arg)) { 
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            //should方法中,判断前一个结点的waitStatus是否为-1
            //若不为-1,则将其置为-1,返回false,再次循环一次尝试获取锁
            //若为-1,则返回true,执行park阻塞
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

走到这一步,我们来看下现在的结构是怎么样的👇

image-20211216211233089

③ 解锁:现在我们再进来几个节点,期间锁一直被占用:

image-20211216211348415

这时如果Thread-0释放锁,会调用unlock方法,各个线程会参与竞争,来看看源码👇

public void unlock() { sync.release(1); }
//跳转一下 👇
public final boolean release(int arg) {
    if (tryRelease(arg)) { //跳去看tryRelease
        Node h = head;
        //如果当前头节点的waitStatus不为0,则要唤醒后一个节点
        if (h != null && h.waitStatus != 0)
            //恢复运行,参与竞争
            unparkSuccessor(h);
        	//那从哪里开始呢,👆 看acquireQueued那里的无限循环获取锁!
        return true;
    }
    return false;
}
//我们先跳过来看tryRelease 👇
protected final boolean tryRelease(int releases) {
    int c = getState() - releases;
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == 0) { //如果重入的锁已经删除完了,就真正去掉锁
        free = true;
        setExclusiveOwnerThread(null);
    }
    setState(c);
    //考虑可重入的情况,若解锁的是重入的锁,则返回false
    return free;
}
//调回去看release 👆

看下图:
image-20211216213425896

image-20211216213448744

④ 可重入的实现原理:

//我们回顾一下代码,就能轻松明白,这里只放核心代码
//加锁tryAcquire方法:
else if (current == getExclusiveOwnerThread()) {
    int nextc = c + acquires;
    if (nextc < 0) // overflow
        throw new Error("Maximum lock count exceeded");
    setState(nextc);
    return true;
}
//解锁 tryRelease方法:
if (c == 0) { //如果重入的锁已经删除完了,就真正去掉锁
    free = true;
    setExclusiveOwnerThread(null);
}

⑤ 可打断原理:

默认情况下是不可打断的,所以我们来看下可打断代码有什么不同

public void lockInterruptibly() throws InterruptedException {
    sync.acquireInterruptibly(1);
}
//跳入acquireInterruptibly 👇
public final void acquireInterruptibly(int arg)
    throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (!tryAcquire(arg))
        doAcquireInterruptibly(arg);
}
//继续进入doAcquireInterruptibly方法
private void doAcquireInterruptibly(int arg) throws InterruptedException {
    final Node node = addWaiter(Node.EXCLUSIVE);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException(); //这里!!!
            	//interrupted = true; 对比不可打断
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

可以看到,在打断处我们会直接返回异常,而不是重新设置被被打断标志

⑥ 公平锁原理:简单来说就是在tryAcquire方法,线程来获取锁时,先去检查一下AQS等待队列中是否有节点,没有才能竞争锁

if (c == 0) {
    //就是这个hasQueuedPredecessors()
    if (!hasQueuedPredecessors() && 
        compareAndSetState(0, acquires)) {
        setExclusiveOwnerThread(current);
        return true;
    }
}

⑦ 条件变量原理:
image-20211216215717718

//主要是用ConditionObject
//先来看await流程
public final void await() throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    Node node = addConditionWaiter(); //添加结点👇
    int savedState = fullyRelease(node);//完全释放锁(可重入),唤醒下一个结点
    int interruptMode = 0;
    while (!isOnSyncQueue(node)) {
        LockSupport.park(this); //park住了
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}

//看下添加结点的关键代码
private Node addConditionWaiter() {
   	...
    //设置结点的waitStatus为-2
    Node node = new Node(Thread.currentThread(), Node.CONDITION);
    ...
    return node;
}

//再来看看signal方法,这里最终会跳到doSignal
private void doSignal(Node first) {
    do {
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        first.nextWaiter = null;
        //上面这一块代码是把当前线程结点从ConditionObject队列取出
    } while (!transferForSignal(first) &&
             (first = firstWaiter) != null);
    //这里不停尝试把结点转移到AQS等待队列,如果转移失败,就尝试换下一个转移
    //转移失败情况:结点放弃锁、超时
}

Q.E.D.


记录 • 分享 • 日常