摘要:同步器的实现根据其状态是否独占而有所不同。这个框架为同步状态的原子性管理线程的阻塞和解除阻塞以及排队提供了一种通用的机制。在需要使用同步器的目标类中,继承了的子类要求被声明为目标类的非公有内部类。类通过一组方法实现线程的阻塞和解除阻塞。
java.util.concurrent.locks包主要是提供线程通信的锁,下面看一下包中有哪些类。
Unsafeconcurrent包里的很多方法都是基于sun.misc.Unsafe这个类,Unsafe这个类从名字上可以看出是一个不安全的类,JDK也并没有把这个类开放给用户使用(但是我们可以通过一些比较hack的方式使用到这个类)。Unsafe是一个单例的类,通过静态的getUnsafe()方法获取到他的实例,可以看到,在方法中会判断调用Unsafe.getUnsafe()方法的类的类加载器是不是引导类加载器BootstrapClassLoader,一般我们开发的代码所属的类加载器会是AppClassLoader及其子类,所以此时会抛出SecurityException,告诉我们unsafe,不要用!!
@CallerSensitive public static Unsafe getUnsafe() { Class var0 = Reflection.getCallerClass(); if(!VM.isSystemDomainLoader(var0.getClassLoader())) { throw new SecurityException("Unsafe"); } else { return theUnsafe; } }
Unsafe类在JDK源码中经常用到,主要作用是任意内存地址位置处读写数据,以及CAS操作。它的大部分操作都是通过JNI(Java Native Interface)完成的,因此它所分配的内存需要手动free,所以是非常危险的。
Java并发中主要用到是的Unsafe中的Compare And Swap操作,CAS 操作包含三个操作数 —— 内存位置(offset)、预期原值(A)和新值(B)。如果内存位置的值与预期原值相匹配,那么处理器会自动将该位置值更新为新值。否则,处理器不做任何操作。无论哪种情况,它都会在 CAS 指令之前返回该位置的值。CAS 有效地说明了“我认为位置 V 应该包含值 A;如果包含该值,则将 B 放到这个位置;否则,不要更改该位置,只告诉我这个位置现在的值即可。”
// 获取类的某个字段在类的实例中内存位置的偏移量 public native long objectFieldOffset(Field var1); /* * 下面三个方法是类似的,对var1对象的偏移量是var2的字段进行CAS操作 * 预期值是var4,如果该字段当前值是var4,则更新为var5,否则什么都不做 */ public final native boolean compareAndSwapObject(Object var1, long var2, Object var4, Object var5); public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5); public final native boolean compareAndSwapLong(Object var1, long var2, long var4, long var6);LockSupport
LockSupport是j.u.c包中并发控制的基础,它的底层是依赖于Unsafe实现的。LockSupport提供了Thread.suspend()和Thread.resume()的替代方案,因为suspend和resume是不安全的,所以已经被标记为deprecated。suspend()和resume()必须要成对出现,否则非常容易发生死锁。
因为suspend方法并不会释放锁,如果使用suspend的目标线程对一个重要的系统资源持有锁,那么没任何线程可以使用这个资源直到要suspend的目标线程被resumed,如果一个线程在resume目标线程之前尝试持有这个重要的系统资源锁再去resume目标线程,这两条线程就相互死锁了。
public class LockSupport { private LockSupport() {} // Cannot be instantiated. public static void unpark(Thread thread); public static void park(Object blocker); public static void parkNanos(Object blocker, long nanos); public static void parkUntil(Object blocker, long deadline); public static void park(); public static void parkNanos(long nanos); public static void parkUntil(long deadline); }
LockSupport中主要用到park和unpark方法,park阻塞当前线程,unpark解除指定线程的阻塞。而且unpark可以在park之前执行,比Thread的wait/notify更加灵活。
LockSupport中有个叫做permit(许可)的概念,unpark方法有两种情况:
如果入参的线程是阻塞的,那么解除该线程的阻塞
否则给该线程一个permit,确保该线程下一次执行park的时候不被阻塞,直接返回。
相应的,park也分为两种情况:
如果一个线程有许可的话,那么它在调用park方法时就会收回它那个许可,但是不会被阻塞,而是直接返回。但是当它再次调用park方法时,因为许可已经被用掉了,于是又成了第2种情况。
如果一个线程没有许可,那么它在调用park方法时就会被阻塞,直到以下事件之一发生才会解除阻塞。
有其它线程调用unpark方法给它发许可
其他线程调用了当前线程的interrupt方法
阻塞过时(调用parkNanos(long nanos)阻塞指定时间长度或调用parkUntil(long deadline)阻塞直到指定的时间戳)
虚假唤醒(Spurious wakeup)
需要注意的一点是,一个线程一个时刻最多只能有一个许可,即使你多次调用unpark方法它也只能有一个许可.
The three forms of park each also support a blocker object parameter. This object is recorded while the thread is blocked to permit monitoring and diagnostic tools to identify the reasons that threads are blocked. (Such tools may access blockers using method getBlocker(Thread).) The use of these forms rather than the original forms without this parameter is strongly encouraged. The normal argument to supply as a blocker within a lock implementation is this.
park, parkUntil, parkNanos这3个方法都分别对应有一个带Object blocker参数的方法,表示把线程阻塞在这个对象上,类似于synchronized()中的锁对象,以允许监视工具和诊断工具确定线程受阻塞的原因。Java官方建议使用带blocker参数的park方法,并用this关键字作为blocker参数。
AbstractOwnableSynchronizer可以由线程以独占方式拥有的同步器。此类为创建锁和相关同步器(伴随着所有权的概念)提供了基础。AbstractOwnableSynchronizer 类本身不管理或使用此信息。但是,子类和工具可以使用适当维护的值帮助控制和监视访问以及提供诊断。
public abstract class AbstractOwnableSynchronizer implements java.io.Serializable { private static final long serialVersionUID = 3737899427754241961L; protected AbstractOwnableSynchronizer() { } private transient Thread exclusiveOwnerThread; protected final void setExclusiveOwnerThread(Thread thread) { exclusiveOwnerThread = thread; } protected final Thread getExclusiveOwnerThread() { return exclusiveOwnerThread; } }
这是一个抽象类,在j.u.c包中它有2个子类:AbstractQueuedSynchronizer和AbstractQueuedLongSynchronizer。同步器的实现根据其状态是否独占而有所不同。独占状态的同步器,在同一时间只有一个线程可以通过阻塞点,而共享状态的同步器可以同时有多个线程在执行。一般锁的实现类往往只维护独占状态,但是,例如计数信号量在数量许可的情况下,允许多个线程同时执行。为了使框架能得到广泛应用,这两种模式都要支持。
AbstractQueuedSynchronizer在JDK1.5之前,线程同步是通过synchronized关键字实现的,
从JDK1.5开始提供的java.util.concurrent包中,大部分的同步器(例如锁,屏障等等)都是基于AbstractQueuedSynchronizer类(下称AQS类)而构建的。这个框架为同步状态的原子性管理、线程的阻塞和解除阻塞以及排队提供了一种通用的机制。
线程同步涉及两个操作,对临界资源的竞争和释放。在j.u.c包中,这两个操作的设计思想是:
acquire
while (synchronization state does not allow acquire) { enqueue current thread if not already queued; possibly block current thread; } dequeue current thread if it was queued;
release
update synchronization state; if (state may permit a blocked thread to acquire) unblock one or more queued threads;
为了实现上述操作,需要下面三个基本组件的相互协作:
同步状态的原子性管理;
线程的阻塞与解除阻塞;
队列的管理;
AQS类的一般用法是继承,在子类中定义管理同步状态的方法,并且定义这个AQS实现类在acquire和release操作时同步状态变化对应的含义。AQS类负责管理线程的阻塞和线程队列。在需要使用同步器的目标类中,继承了AQS的子类要求被声明为目标类的非公有内部类。例如下图j.u.c包中,在需要使用AQS控制线程同步时,都是在类中声明一个内部类并继承AQS。
AQS类支持共享和排他两种模式,排他模式下,只能有一个线程acquire,共享模式下可以多个线程同时acquire。
1. 同步状态
AQS类使用单个int(32位)来保存同步状态,并暴露出getState、setState以及compareAndSetState操作来读取和更新这个状态。compareAndSetState仅当同步状态拥有一个期望值的时候,才会被原子地设置成新值。
private volatile int state; protected final boolean compareAndSetState(int expect, int update) { return unsafe.compareAndSwapInt(this, stateOffset, expect, update); }
将同步状态限制为一个32位的整型是出于实践上的考量。虽然JSR166也提供了64位long字段的原子性操作,但这些操作在很多平台上还是使用内部锁的方式来模拟实现的,这会使同步器的性能可能不会很理想。JDK1.6中新增的java.util.concurrent.locks.AbstractQueuedLongSynchronizer类,就是使用long变量维护同步状态的一个AbstractOwnableSynchronizer版本。目前来说,32位的状态对大多数应用程序都是足够的。在j.u.c包中,只有一个同步器类可能需要多于32位来维持状态,那就是CyclicBarrier类,所以它用了锁(该包中大多数更高层次的工具亦是如此)。
基于AQS的具体实现类必须根据暴露出的状态相关的方法定义tryAcquire和tryRelease方法,以控制acquire和release操作。当同步状态满足时,tryAcquire方法必须返回true,而当新的同步状态允许后续acquire时,tryRelease方法也必须返回true。这些方法都接受一个int类型的参数用于传递想要的状态。例如:可重入锁中,当某个线程从条件等待中返回,然后重新获取锁时,为了重新建立循环计数的场景。很多同步器并不需要这样一个参数,因此忽略它即可。
2. 队列
整个框架的关键就是如何管理被阻塞的线程的队列,该队列是严格的FIFO队列,因此,框架不支持基于优先级的同步。
队列中的元素Node(AQS的内部类)就是保存着线程引用和线程状态的容器,每个线程对同步器的访问,都可以看做是队列中的一个节点。Node的主要包含以下成员变量:
static final class Node { volatile int waitStatus; volatile Node prev; // 前驱节点 volatile Node next; // 后继节点 volatile Thread thread; // 入队列时的当前线程 Node nextWaiter; // 存储condition队列中的后继节点 /* waitStatus */ static final int CANCELLED = 1; static final int SIGNAL = -1; static final int CONDITION = -2; static final int PROPAGATE = -3; /* 标识节点的等待是共享模式或排他模式 */ static final Node SHARED = new Node(); static final Node EXCLUSIVE = null; }
waitStatus的含义:
CANCELLED,值为1,表示当前的线程因超时或中断被取消;
SIGNAL,值为-1,表示当前节点的后继节点包含的线程处于阻塞状态,当前节点线程释放时需要对后继进行unpark;
CONDITION,值为-2,表示当前节点在等待condition,也就是在condition队列中;
PROPAGATE,值为-3,表示在同步器在共享模式下,当前节点释放后传播到其他节点;
值为0,表示当前节点在sync队列中,等待着获取锁
enq节点入队,如果队列为空则先初始化队列,创建一个空节点作为头节点。
private transient volatile Node head; // 队列头节点 private transient volatile Node tail; // 队列尾节点 /* 入队 */ private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // 延迟初始化,队列为空时创建一个空Node,head和tail都指向这个Node if (compareAndSetHead(new Node())) tail = head; } else { // 死循环CAS操作,把新节点和队列当前尾节点做双向绑定 node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
addWaiter先判断tail如果不为空则进行一次快速的插入,否则使用enq进行可能包括队列初始化的入队操作。
/* * 把当前线程用Node包装起来并入队 * mode有两种情况: Node.EXCLUSIVE/Node.SHARED * this.nextWaiter = mode; */ private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; }
3. 阻塞
AQS可以根据具体的场景提供exclusive模式和shared模式,在exclusive模式下,同一时刻最多只能有一个线程能够处于成功获取的状态,排他锁是一个exclusive模式的例子,shared模式则可以多个线程一起获取成功,如多个许可的Semaphore。
AQS类通过一组aquire/release方法实现线程的阻塞和解除阻塞。在共享模式和独占模式下,又有所区别。
子类需要去实现以下方法:
/* 独占模式 */ protected boolean tryAcquire(int arg) protected boolean tryRelease(int arg) /* 共享模式 */ protected int tryAcquireShared(int arg) protected boolean tryReleaseShared(int arg)独占模式下的acquire
首先尝试一次tryAcquire, 如果不成功则添加一个Node节点到等待队列反复重试。
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
当前线程包装为node对象加入队尾,acquireQueued则在循环中判断node的前驱节点是不是head,如果是则继续尝试tryAcquire,如果acquire成功则说明成功通过了acquire,则将自己设置为新的head。
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; /* 死循环中不断重试acquire */ for (;;) { final Node p = node.predecessor(); /* 尝试acquire,成功则把自己设为队列head节点 */ if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } /* acquire失败后判断是否park阻塞,还是要继续重试acquire */ if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } } /* pred是node的前驱节点,此方法用于判断node节点acquire失败后是否park阻塞 */ private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) /* * 前驱节点状态是SIGNAL,release时会signal唤醒node * 所以node在acquire失败时应当继续park阻塞 */ return true; if (ws > 0) { /* * 前驱节点pred状态是CANCELLED * 向前遍历队列,直到找到状态不是CANCELLED的节点 * 把这个节点和node设置为前驱后继关系 */ do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { /* * 前驱节点的状态是0或PROPAGATE * 前驱节点状态更新为SIGNAL,release时唤醒node节点 * node节点则不需要park,继续尝试acquire */ compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; } /* 当前线程park,并返回中断状态 */ private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); }共享模式下的acquire
子类实现tryAcquireShared(arg), 调用tryAcquireShared返回值小于0说明获取失败,等于0表示获取成功,但是接下来的acquireShared不会成功,大于0说明tryAcquireShared获取成功并且接下来的acquireShared也可能成功。
public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) doAcquireShared(arg); }
实现共享模式acquire的主要逻辑在下边的doAcquireShared方法中,把当前线程封装为Node加入队列,向前遍历队列,直到当前节点的前驱是头节点,然后尝试tryAcquireShared,tryAcquireShared成功后(结果>=0),调用setHeadAndPropagate。
private void doAcquireShared(int arg) { final Node node = addWaiter(Node.SHARED); boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC if (interrupted) selfInterrupt(); failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
doAcquireShared中tryAcquireShared返回值大于0,head为null或head的waitStatus小于0,满足以上条件情况下,判断当前节点的后继节点若为null或是共享类型,调用doReleaseShared唤醒后继节点以确保共享沿队列继续传播。
private void setHeadAndPropagate(Node node, int propagate) { Node h = head; // Record old head for check below setHead(node); /* * Try to signal next queued node if: * Propagation was indicated by caller, * or was recorded (as h.waitStatus either before * or after setHead) by a previous operation * (note: this uses sign-check of waitStatus because * PROPAGATE status may transition to SIGNAL.) * and * The next node is waiting in shared mode, * or we don"t know, because it appears null * * The conservatism in both of these checks may cause * unnecessary wake-ups, but only when there are multiple * racing acquires/releases, so most need signals now or soon * anyway. */ if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Node s = node.next; if (s == null || s.isShared()) doReleaseShared(); } }独占模式下的release
如果tryRelease返回了true,说明可以唤醒其他线程,则判断head不为null并且waitStatus不为0的情况下去unpark后继节点。
public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }
unparkSuccessor中当node的后继节点为null或waitStatus > 0说明
next已经取消。此时需要从tail向前遍历找到离node最近的没有取消的节点进行unpark。如果node的后继节点s不是null而且waitStatus < 0则unpark节点s。
private void unparkSuccessor(Node node) { /* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */ int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); /* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */ Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread); }共享模式下的release
tryReleaseShared返回true,调用doReleaseShared,允许一个等待的节点 acquire成功。
public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; }
和独占模式的release只unpark一个后继节点不同的是,共享模式下 唤醒行为需要向后传播。doReleaseShared会从head开始往后检查状态,如果节点是SIGNAL状态,就唤醒它的后继节点。如果是0就标记为PROPAGATE, 等它释放锁的时候会再次唤醒后继节点。
private void doReleaseShared() { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head) // loop if head changed break; } }
文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。
转载请注明本文地址:https://www.ucloud.cn/yun/77178.html
摘要:源码学习笔记基于包源码大致分为以下几组对包集合框架的扩展更好的支持多线程并发操作线程池相关锁基本数据类型的原子性封装 Java concurrent 源码学习笔记基于JDK1.8 concurrent包源码大致分为以下几组: 对util包集合框架的扩展(更好的支持多线程并发操作) 线程池相关 锁 基本数据类型的原子性封装 showImg(https://segmentfault.c...
摘要:总结总的来说,操作顺序是进入队列唤醒,成功获得锁将状态变为并将其从转到使再次获得锁执行余下代码。当然这是理由状态下,为了讨论及的原理,实际的操作时序也有可能变化。 AQS Condition 最近面试被问到java concurrent包下有哪些熟悉的,用过的工具。因此来回顾一下,这些工具的底层实现,AbstractQueuedSynchronizer。在网上看到了其他人的一些技术博客...
摘要:总结总的来说,操作顺序是进入队列唤醒,成功获得锁将状态变为并将其从转到使再次获得锁执行余下代码。当然这是理由状态下,为了讨论及的原理,实际的操作时序也有可能变化。 AQS Condition 最近面试被问到java concurrent包下有哪些熟悉的,用过的工具。因此来回顾一下,这些工具的底层实现,AbstractQueuedSynchronizer。在网上看到了其他人的一些技术博客...
阅读 2984·2021-10-19 11:46
阅读 979·2021-08-03 14:03
阅读 2934·2021-06-11 18:08
阅读 2905·2019-08-29 13:52
阅读 2744·2019-08-29 12:49
阅读 480·2019-08-26 13:56
阅读 924·2019-08-26 13:41
阅读 849·2019-08-26 13:35