资讯专栏INFORMATION COLUMN

高并发 - AbstractQueuedSynchronizer

thursday / 1350人阅读

摘要:温馨提醒队列是队列的变种,队列等待采用自旋,的队列等待采用。表示对应线程是否应当阻塞,节点是正占有锁的线程的,其值为,处于后驱节点的线程才会去,由子类实现。

温馨提醒

AbstractQueuedSynchronizer队列是CLH队列的变种,CLH队列等待采用自旋,AQS的队列等待采用LockSupport#park。

Node.waitStatus表示对应线程是否应当阻塞,

head节点是正占有锁的线程的,其thread值为null,处于head后驱节点的线程才会去tryAcquire,tryAcquire由子类实现。

入队在tail,出队在head

以下必须要子类实现:
/**
 * exclusive mode 
 */
boolean tryAcquire(int arg) 
/**
 * exclusive mode.
 */
boolean tryRelease(int arg) 
/**
 * shared mode.
 */
int tryAcquireShared(int arg)
/**
 * shared mode.
 */
boolean tryReleaseShared(int arg) 
/**
 * Returns true if synchronization is held exclusively with
 * respect to the current (calling) thread.  
 */
boolean isHeldExclusively() 

独占模式acquire
private Node enq(final Node node) {
// 无限循环,即step 1返回false(有别的线程将它的node连接到tail)也会重新将node连接到tail
    for (;;) {
        Node t = tail;
        if (t == null) {     // new一个不带任何状态的Node作为头节点
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {    // step 1
                t.next = node;
                return t;    // 返回当前tail
            }
        }
    }
}


private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    //-- 和#enq逻辑比,只是取消了循环,为了更快?
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
//--
    enq(node);
    return node;    // 返回当前线程的node
}


final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
    // 无限循环,直到当前线程node的前驱node是head,否则对于node状态为SIGNAL的线程会park
        for (;;) {
            final Node p = node.predecessor();
        // 如果当前线程node的前驱是head
            if (p == head && tryAcquire(arg)) {
            // head = node; 从而让其他线程也能走入该if
        // node.thread = null; 所以head永远是一个不带Thread的空节点 
        // node.prev = null;
                setHead(node);        
                p.next = null;     // 配合上面的 node.prev = null; for GC
                failed = false;
                return interrupted;
            }
        // 判断在tryAcquire失败后是否应该park,若是,则执行park
            if (shouldParkAfterFailedAcquire(p, node) && 
          // LockSupport#park,返回Thread#interrupted
          parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}


private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
// 表示当前节点应当park
    if (ws == Node.SIGNAL) return true;
// 当前节点不断向前找,直到找到一个前驱节点waitStats不是CANCELLED的为止(状态值里面只有CANCELLED是大于0的)
    if (ws > 0) {
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
    // 中间CANCELLED的node作废
        pred.next = node;
    } 
// 0 or PROPAGATE 需要设置为SIGNAL,但仍然返回false,即don’t park
else { 
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}


// Acquires in exclusive mode, ignoring interrupts
public final void acquire(int arg) {
// 这里提前tryAcquire为了省去入队列操作,提高性能,因为大部分情况下可能都没有锁竞争
    if (!tryAcquire(arg) &&
    // 入队列,返回当前线程中断状态
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

独占模式release
public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)    // step 1
            unparkSuccessor(h);
        return true;
    }
    return false;
}


private void unparkSuccessor(Node node) {
   
    int ws = node.waitStatus;
// 这里是SIGNAL,将head的waitStatus设为0,是为了不重复step 1
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    Node s = node.next;
// 如果head(当前线程)无后驱node,或后驱node为CANCELLED
    if (s == null || s.waitStatus > 0) {
        s = null;
    // 从链表tail开始遍历,取出非CANCELLED的node
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
// 非CANCELLED的线程unpark,继续#acquireQueued的for循环
    if (s != null)
        LockSupport.unpark(s.thread);
}















文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/76325.html

相关文章

  • 并发

    摘要:表示的是两个,当其中任意一个计算完并发编程之是线程安全并且高效的,在并发编程中经常可见它的使用,在开始分析它的高并发实现机制前,先讲讲废话,看看它是如何被引入的。电商秒杀和抢购,是两个比较典型的互联网高并发场景。 干货:深度剖析分布式搜索引擎设计 分布式,高可用,和机器学习一样,最近几年被提及得最多的名词,听名字多牛逼,来,我们一步一步来击破前两个名词,今天我们首先来说说分布式。 探究...

    supernavy 评论0 收藏0
  • 并发

    摘要:表示的是两个,当其中任意一个计算完并发编程之是线程安全并且高效的,在并发编程中经常可见它的使用,在开始分析它的高并发实现机制前,先讲讲废话,看看它是如何被引入的。电商秒杀和抢购,是两个比较典型的互联网高并发场景。 干货:深度剖析分布式搜索引擎设计 分布式,高可用,和机器学习一样,最近几年被提及得最多的名词,听名字多牛逼,来,我们一步一步来击破前两个名词,今天我们首先来说说分布式。 探究...

    ddongjian0000 评论0 收藏0
  • 并发

    摘要:表示的是两个,当其中任意一个计算完并发编程之是线程安全并且高效的,在并发编程中经常可见它的使用,在开始分析它的高并发实现机制前,先讲讲废话,看看它是如何被引入的。电商秒杀和抢购,是两个比较典型的互联网高并发场景。 干货:深度剖析分布式搜索引擎设计 分布式,高可用,和机器学习一样,最近几年被提及得最多的名词,听名字多牛逼,来,我们一步一步来击破前两个名词,今天我们首先来说说分布式。 探究...

    wangdai 评论0 收藏0
  • 并发 - AbstractQueuedSynchronizer

    摘要:温馨提醒队列是队列的变种,队列等待采用自旋,的队列等待采用。表示对应线程是否应当阻塞,节点是正占有锁的线程的,其值为,处于后驱节点的线程才会去,由子类实现。 温馨提醒 AbstractQueuedSynchronizer队列是CLH队列的变种,CLH队列等待采用自旋,AQS的队列等待采用LockSupport#park。 Node.waitStatus表示对应线程是否应当阻塞, he...

    galois 评论0 收藏0
  • 并发 - 收藏集 - 掘金

    摘要:在中一般来说通过来创建所需要的线程池,如高并发原理初探后端掘金阅前热身为了更加形象的说明同步异步阻塞非阻塞,我们以小明去买奶茶为例。 AbstractQueuedSynchronizer 超详细原理解析 - 后端 - 掘金今天我们来研究学习一下AbstractQueuedSynchronizer类的相关原理,java.util.concurrent包中很多类都依赖于这个类所提供的队列式...

    levius 评论0 收藏0

发表评论

0条评论

thursday

|高级讲师

TA的文章

阅读更多
最新活动
阅读需要支付1元查看
<