等待通知机制==阻塞唤醒机制

Condition简介

  • Object的wait和notify是与monitor配合完成线程间等待-通知机制的,只能拥有一个同步队列和一个等待队列
  • Condition与Lock配置完成等待-通知机制的,可以拥有一个同步队列和多个等待队列
    前边是java底层级别的,后者是语言级别的,具有更高可控性和扩展性
    两者的不同:
  1. condition支持不响应中断
  2. condition支持多个等待队列
  3. condition支持超时时间的设置

condition的API介绍

  1. await() thorws interruptedException
    当前线程进入等待状态,直到被其他线程通知,中断
  2. long awaitNanos(long nanosTimeout)
    当前线程进入等待状态直到被其他通知,中断或者超时
  3. boolean awaitUntil(Date deadline)
    当前线程进入等待状态直到被其他线程通知,中断或者到达某个时间

  1. void siginal()
    通知等待在condition上的第一个线程,将该线程从等待队列转移到同步队列,如果在同步队列能够获取到锁,则可以从等待方法中返回
  2. void siginal()
    通知所有等待在condition上的线程

condition实现原理

等待队列

AQS内部维护了一个同步队列获取锁失败的线程尾插入到同步队列中。
condition内部也维护了一个等待队列,所有调用condition.await的线程加入到等待队列中。
等待队列是一个单向队列,而同步队列是一个双向队列,复用相同Node节点,但是等待队列不带头节点

lock.newCondition()可创建多个condition对象,也就是多个等待队列

await实现原理

当调用condition.await方法后,会使当前线程进入到等待队列,然后释放lock,直到被signal通知后使当前线程从等待队列移动到同步队列中,直到获取锁才会从await方法返回,或者被中断

public final void await() throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
	// 1. 将当前线程包装成Node,尾插入到等待队列中
    Node node = addConditionWaiter();
	// 2. 释放当前线程所占用的lock,在释放的过程中会唤醒同步队列中的下一个节点
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    //当前节点进入到同步队列 或者被中断,退出循环
    while (!isOnSyncQueue(node)) { //怎样退出循环 1. break  2. 逻辑判断
		// 3. 当前线程进入到等待状态
        LockSupport.park(this);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
	// 4. 自旋等待获取到同步状态(即获取到lock)   获取到锁才会退出await
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
	// 5. 处理被中断的情况
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}

  1. 怎样将当前线程添加到等待队列中?? addConditionWaiter尾插法
private Node addConditionWaiter() {
    Node t = lastWaiter;
	//将当前线程包装成Node
    Node node = new Node(Thread.currentThread(), Node.CONDITION);
    if (t == null)
        firstWaiter = node;
    else
		//尾插入
        t.nextWaiter = node;
	//更新lastWaiter
    lastWaiter = node;
    return node;
}
  1. 怎样释放锁 fullyRelease
private Node addConditionWaiter() {
    Node t = lastWaiter;
    // If lastWaiter is cancelled, clean out.
    if (t != null && t.waitStatus != Node.CONDITION) {
        unlinkCancelledWaiters();
        t = lastWaiter;
    }
	//将当前线程包装成Node
    Node node = new Node(Thread.currentThread(), Node.CONDITION);
    if (t == null)
        firstWaiter = node;
    else
		//尾插入
        t.nextWaiter = node;
	//更新lastWaiter
    lastWaiter = node;
    return node;
}

  1. 怎样从await退出 当前节点进入到同步队列 或者被中断,获取到锁

signal实现原理

调用siginal将等待队列中最先进来的节点/头节点移动到同步队列中,使其有机会获得lock

public final void signal() {
    //1. 先检测当前线程是否已经获取lock
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    //2. 获取等待队列中第一个节点,之后的操作都是针对这个节点
	Node first = firstWaiter;
    if (first != null)
        doSignal(first);
}

首先检查当前线程是否获取锁,若未获取则抛出异常

final boolean transferForSignal(Node node) {
    /*
     * If cannot change waitStatus, the node has been cancelled.
     */
	//1. 更新状态为0
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;

    /*
     * Splice onto queue and try to set waitStatus of predecessor to
     * indicate that thread is (probably) waiting. If cancelled or
     * attempt to set waitStatus fails, wake up to resync (in which
     * case the waitStatus can be transiently and harmlessly wrong).
     */
	//2.将该节点移入到同步队列中去
    Node p = enq(node);   //将节点插入到同步队列中去
    int ws = p.waitStatus;
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        LockSupport.unpark(node.thread);
    return true;
}

总结:
condition的siginal将等待队列中的头节点移动到同步队列,然后判断其是否获取到lock锁,如果获取成功,则调用LockSupport.unpark()方法通知await方法

await与signal/signalAll的结合思考

线程A通过lock.lock()获取锁成功后,调用condition.await()方法释放锁并进入等待队列
线程B通过lock.lock()获取锁成功后,调用condition.signal()方法使线程A从等待队列进入到同步队列中,并自旋获取锁
线程A成功获取到锁,从而使其能够从await方法中返回执行后续操作

condition等待通知例子

public class AwaitSignal {
    private static ReentrantLock lock = new ReentrantLock();
    private static Condition condition = lock.newCondition();
    private static volatile boolean flag = false;

    public static void main(String[] args) {
        Thread waiter = new Thread(new waiter());
        waiter.start();
        Thread signaler = new Thread(new signaler());
        signaler.start();
    }

    static class waiter implements Runnable {

        @Override
        public void run() {
            lock.lock();
            try {
                while (!flag) {
                    System.out.println(Thread.currentThread().getName() + "当前条件不满足等待");
                    try {
                        condition.await();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                System.out.println(Thread.currentThread().getName() + "接收到通知条件满足");
            } finally {
                lock.unlock();
            }
        }
    }

    static class signaler implements Runnable {

        @Override
        public void run() {
            lock.lock();
            try {
                flag = true;
                condition.signalAll();
            } finally {
                lock.unlock();
            }
        }
    }
}

LockSupport简介

LockSupport是线程的阻塞原语,用来阻塞线程和唤醒线程。每个使用LockSupport的线程都会与一个许可关联,若许可可用,则调用park()方法立即返回,若许可不能用,则unpark()使其可用

API

void park() 阻塞当前线程

void unpark(Thread thread) 唤醒处于阻塞的指定线程

public class LockSupportDemo {
    public static void main(String[] args) {
        Thread thread = new Thread(() -> {
            LockSupport.park();
            System.out.println(Thread.currentThread().getName() + "被唤醒");
        });
        thread.start();
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        LockSupport.unpark(thread);
    }
}
Logo

CSDN联合极客时间,共同打造面向开发者的精品内容学习社区,助力成长!

更多推荐