摘要:的好处在于,在诊断问题的时候能够知道的原因推荐使用带有的操作函数作用用于挂起当前线程,如果许可可用,会立马返回,并消费掉许可。
LockSupport是用来创建locks的基本线程阻塞基元,比如AQS中实现线程挂起的方法,就是park,对应唤醒就是unpark。JDK中有使用的如下
LockSupport提供的是一个许可,如果存在许可,线程在调用park的时候,会立马返回,此时许可也会被消费掉,如果没有许可,则会阻塞。调用unpark的时候,如果许可本身不可用,则会使得许可可用
许可只有一个,不可累加park源码跟踪
park的声明形式有一下两大块
一部分多了一个Object参数,作为blocker,另外的则没有。blocker的好处在于,在诊断问题的时候能够知道park的原因
推荐使用带有Object的park操作park函数作用
park用于挂起当前线程,如果许可可用,会立马返回,并消费掉许可。
park(Object): 恢复的条件为 1:线程调用了unpark; 2:其它线程中断了线程;3:发生了不可预料的事情
parkNanos(Object blocker, long nanos):恢复的条件为 1:线程调用了unpark; 2:其它线程中断了线程;3:发生了不可预料的事情;4:过期时间到了
parkUntil(Object blocker, long deadline):恢复的条件为 1:线程调用了unpark; 2:其它线程中断了线程;3:发生了不可预料的事情;4:指定的deadLine已经到了
以park的源码为例
public static void park(Object blocker) { //获取当前线程 Thread t = Thread.currentThread(); //记录当前线程阻塞的原因,底层就是unsafe.putObject,就是把对象存储起来 setBlocker(t, blocker); //执行park unsafe.park(false, 0L); //线程恢复后,去掉阻塞原因 setBlocker(t, null); }
从源码可以看到真实的实现均在 unsafe
unsafe.park核心实现如下
JavaThread* thread=JavaThread::thread_from_jni_environment(env); ... thread->parker()->park(isAbsolute != 0, time);
就是获取java线程的parker对象,然后执行它的park方法。Parker的定义如下
class Parker : public os::PlatformParker { private: //表示许可 volatile int _counter ; Parker * FreeNext ; JavaThread * AssociatedWith ; // Current association public: Parker() : PlatformParker() { //初始化_counter _counter = 0 ; FreeNext = NULL ; AssociatedWith = NULL ; } protected: ~Parker() { ShouldNotReachHere(); } public: void park(bool isAbsolute, jlong time); void unpark(); // Lifecycle operators static Parker * Allocate (JavaThread * t) ; static void Release (Parker * e) ; private: static Parker * volatile FreeList ; static volatile int ListLock ; };
它继承了os::PlatformParker,内置了一个volatitle的 _counter。PlatformParker则是在不同的操作系统中有不同的实现,以linux为例
class PlatformParker : public CHeapObj { protected: //互斥变量类型 pthread_mutex_t _mutex [1] ; //条件变量类型 pthread_cond_t _cond [1] ; public: ~PlatformParker() { guarantee (0, "invariant") ; } public: PlatformParker() { int status; //初始化条件变量,使用 pthread_cond_t之前必须先执行初始化 status = pthread_cond_init (_cond, NULL); assert_status(status == 0, status, "cond_init”); // 初始化互斥变量,使用 pthread_mutex_t之前必须先执行初始化 status = pthread_mutex_init (_mutex, NULL); assert_status(status == 0, status, "mutex_init"); } }
上述代码均为POSIX线程接口使用,所以pthread指的也就是posixThread
parker实现如下
void Parker::park(bool isAbsolute, jlong time) { if (_counter > 0) { //已经有许可了,用掉当前许可 _counter = 0 ; //使用内存屏障,确保 _counter赋值为0(写入操作)能够被内存屏障之后的读操作获取内存屏障事前的结果,也就是能够正确的读到0 OrderAccess::fence(); //立即返回 return ; } Thread* thread = Thread::current(); assert(thread->is_Java_thread(), "Must be JavaThread"); JavaThread *jt = (JavaThread *)thread; if (Thread::is_interrupted(thread, false)) { // 线程执行了中断,返回 return; } if (time < 0 || (isAbsolute && time == 0) ) { //时间到了,或者是代表绝对时间,同时绝对时间是0(此时也是时间到了),直接返回,java中的parkUtil传的就是绝对时间,其它都不是 return; } if (time > 0) { //传入了时间参数,将其存入absTime,并解析成absTime->tv_sec(秒)和absTime->tv_nsec(纳秒)存储起来,存的是绝对时间 unpackTime(&absTime, isAbsolute, time); } //进入safepoint region,更改线程为阻塞状态 ThreadBlockInVM tbivm(jt); if (Thread::is_interrupted(thread, false) || pthread_mutex_trylock(_mutex) != 0) { //如果线程被中断,或者是在尝试给互斥变量加锁的过程中,加锁失败,比如被其它线程锁住了,直接返回 return; } //这里表示线程互斥变量锁成功了 int status ; if (_counter > 0) { // 有许可了,返回 _counter = 0; //对互斥变量解锁 status = pthread_mutex_unlock(_mutex); assert (status == 0, "invariant") ; OrderAccess::fence(); return; } #ifdef ASSERT // Don"t catch signals while blocked; let the running threads have the signals. // (This allows a debugger to break into the running thread.) //debug用 sigset_t oldsigs; sigset_t* allowdebug_blocked = os::Linux::allowdebug_blocked_signals(); pthread_sigmask(SIG_BLOCK, allowdebug_blocked, &oldsigs); #endif //将java线程所拥有的操作系统线程设置成 CONDVAR_WAIT状态 ,表示在等待某个条件的发生 OSThreadWaitState osts(thread->osthread(), false /* not Object.wait() */); //将java的_suspend_equivalent参数设置为true jt->set_suspend_equivalent(); // cleared by handle_special_suspend_equivalent_condition() or java_suspend_self() if (time == 0) { //把调用线程放到等待条件的线程列表上,然后对互斥变量解锁,(这两是原子操作),这个时候线程进入等待,当它返回时,互斥变量再次被锁住。 //成功返回0,否则返回错误编号 status = pthread_cond_wait (_cond, _mutex) ; } else { //同pthread_cond_wait,只是多了一个超时,如果超时还没有条件出现,那么重新获取胡吃两然后返回错误码 ETIMEDOUT status = os::Linux::safe_cond_timedwait (_cond, _mutex, &absTime) ; if (status != 0 && WorkAroundNPTLTimedWaitHang) { //WorkAroundNPTLTimedWaitHang 是JVM的运行参数,默认为1 //去除初始化 pthread_cond_destroy (_cond) ; //重新初始化 pthread_cond_init (_cond, NULL); } } assert_status(status == 0 || status == EINTR || status == ETIME || status == ETIMEDOUT, status, "cond_timedwait"); #ifdef ASSERT pthread_sigmask(SIG_SETMASK, &oldsigs, NULL); #endif //等待结束后,许可被消耗,改为0 _counter = 0 ; //释放互斥量的锁 status = pthread_mutex_unlock(_mutex) ; assert_status(status == 0, status, "invariant") ; // If externally suspended while waiting, re-suspend if (jt->handle_special_suspend_equivalent_condition()) { jt->java_suspend_self(); } //加入内存屏障指令 OrderAccess::fence(); }
从park的实现可以看到
无论是什么情况返回,park方法本身都不会告知调用方返回的原因,所以调用的时候一般都会去判断返回的场景,根据场景做不同的处理
线程的等待与挂起、唤醒等等就是使用的POSIX的线程API
park的许可通过原子变量_count实现,当被消耗时,_count为0,只要拥有许可,就会立即返回
OrderAccess::fence();在linux中实现原理如下
inline void OrderAccess::fence() { if (os::is_MP()) { #ifdef AMD64 // 没有使用mfence,因为mfence有时候性能差于使用 locked addl __asm__ volatile ("lock; addl $0,0(%%rsp)" : : : "cc", "memory"); #else __asm__ volatile ("lock; addl $0,0(%%esp)" : : : "cc", "memory"); #endif } }
内存重排序网上的验证ThreadBlockInVM tbivm(jt)
这属于C++新建变量的语法,也就是调用构造函数新建了一个变量,变量名为tbivm,参数为jt。类的实现为
class ThreadBlockInVM : public ThreadStateTransition { public: ThreadBlockInVM(JavaThread *thread) : ThreadStateTransition(thread) { // Once we are blocked vm expects stack to be walkable thread->frame_anchor()->make_walkable(thread); //把线程由运行状态转成阻塞状态 trans_and_fence(_thread_in_vm, _thread_blocked); } ... };
_thread_in_vm 表示线程当前在VM中执行,_thread_blocked表示线程当前阻塞了,他们是globalDefinitions.hpp中定义的枚举
//这个枚举是用来追踪线程在代码的那一块执行,用来给 safepoint code使用,有4种重要的类型,_thread_new/_thread_in_native/_thread_in_vm/_thread_in_Java。形如xxx_trans的状态都是中间状态,表示线程正在由一种状态变成另一种状态,这种方式使得 safepoint code在处理线程状态时,不需要对线程进行挂起,使得safe point code运行更快,而给定一个状态,通过+1就可以得到他的转换状态 enum JavaThreadState { _thread_uninitialized = 0, // should never happen (missing initialization) _thread_new = 2, // just starting up, i.e., in process of being initialized _thread_new_trans = 3, // corresponding transition state (not used, included for completeness) _thread_in_native = 4, // running in native code . This is a safepoint region, since all oops will be in jobject handles _thread_in_native_trans = 5, // corresponding transition state _thread_in_vm = 6, // running in VM _thread_in_vm_trans = 7, // corresponding transition state _thread_in_Java = 8, // Executing either interpreted or compiled Java code running in Java or in stub code _thread_in_Java_trans = 9, // corresponding transition state (not used, included for completeness) _thread_blocked = 10, // blocked in vm _thread_blocked_trans = 11, // corresponding transition state _thread_max_state = 12 // maximum thread state+1 - used for statistics allocation };
父类ThreadStateTransition中定义trans_and_fence如下
void trans_and_fence(JavaThreadState from, JavaThreadState to) { transition_and_fence(_thread, from, to);} //_thread即构造函数传进来de thread // transition_and_fence must be used on any thread state transition // where there might not be a Java call stub on the stack, in // particular on Windows where the Structured Exception Handler is // set up in the call stub. os::write_memory_serialize_page() can // fault and we can"t recover from it on Windows without a SEH in // place. //transition_and_fence方法必须在任何线程状态转换的时候使用 static inline void transition_and_fence(JavaThread *thread, JavaThreadState from, JavaThreadState to) { assert(thread->thread_state() == from, "coming from wrong thread state"); assert((from & 1) == 0 && (to & 1) == 0, "odd numbers are transitions states"); //标识线程转换中 thread->set_thread_state((JavaThreadState)(from + 1)); // 设置内存屏障,确保新的状态能够被VM 线程看到 if (os::is_MP()) { if (UseMembar) { // Force a fence between the write above and read below OrderAccess::fence(); } else { // Must use this rather than serialization page in particular on Windows InterfaceSupport::serialize_memory(thread); } } if (SafepointSynchronize::do_call_back()) { SafepointSynchronize::block(thread); } //线程状态转换成最终的状态,对待这里的场景就是阻塞 thread->set_thread_state(to); CHECK_UNHANDLED_OOPS_ONLY(thread->clear_unhandled_oops();) }操作系统线程状态的一般取值
在osThread中给定了操作系统线程状态的大致取值,它本身是依据平台而定
enum ThreadState { ALLOCATED, // Memory has been allocated but not initialized INITIALIZED, // The thread has been initialized but yet started RUNNABLE, // Has been started and is runnable, but not necessarily running MONITOR_WAIT, // Waiting on a contended monitor lock CONDVAR_WAIT, // Waiting on a condition variable OBJECT_WAIT, // Waiting on an Object.wait() call BREAKPOINTED, // Suspended at breakpoint SLEEPING, // Thread.sleep() ZOMBIE // All done, but not reclaimed yet };unpark 源码追踪
实现如下
void Parker::unpark() { int s, status ; //给互斥量加锁,如果互斥量已经上锁,则阻塞到互斥量被解锁 //park进入wait时,_mutex会被释放 status = pthread_mutex_lock(_mutex); assert (status == 0, "invariant") ; //存储旧的_counter s = _counter; //许可改为1,每次调用都设置成发放许可 _counter = 1; if (s < 1) { //之前没有许可 if (WorkAroundNPTLTimedWaitHang) { //默认执行 ,释放信号,表明条件已经满足,将唤醒等待的线程 status = pthread_cond_signal (_cond) ; assert (status == 0, "invariant") ; //释放锁 status = pthread_mutex_unlock(_mutex); assert (status == 0, "invariant") ; } else { status = pthread_mutex_unlock(_mutex); assert (status == 0, "invariant") ; status = pthread_cond_signal (_cond) ; assert (status == 0, "invariant") ; } } else { //一直有许可,释放掉自己加的锁,有许可park本身就返回了 pthread_mutex_unlock(_mutex); assert (status == 0, "invariant") ; } }
从源码可知unpark本身就是发放许可,并通知等待的线程,已经可以结束等待了
总结park/unpark能够精准的对线程进行唤醒和等待。
linux上的实现是通过POSIX的线程API的等待、唤醒、互斥、条件来进行实现的
park在执行过程中首选看是否有许可,有许可就立马返回,而每次unpark都会给许可设置成有,这意味着,可以先执行unpark,给予许可,再执行park立马自行,适用于producer快,而consumer还未完成的场景参考地址
文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。
转载请注明本文地址:https://www.ucloud.cn/yun/72726.html
摘要:此对象在线程受阻塞时被记录,以允许监视工具和诊断工具确定线程受阻塞的原因。阻塞当前线程,最长不超过纳秒,返回条件在的基础上增加了超时返回。唤醒线程唤醒处于阻塞状态的线程。 LockSupport 用法简介 LockSupport 和 CAS 是Java并发包中很多并发工具控制机制的基础,它们底层其实都是依赖Unsafe实现。 LockSupport是用来创建锁和其他同步类的基本线程阻塞...
摘要:此对象在线程受阻塞时被记录,以允许监视工具和诊断工具确定线程受阻塞的原因。调用该线程变量的方法,会唤醒该线程,并抛出异常。对于等待状态来说,它比状态多了一种唤醒方式,就是超过规定时间,那么线程会自动醒来。 一. LockSupport类介绍 LockSupport类可以阻塞当前线程以及唤醒指定被阻塞的线程。主要是通过park()和unpark(thread)方法来实现阻塞和唤醒线程的操...
摘要:很多开发人员包括我,尤其是刚进入软件行业的新手,认为设置线程中断就是表示线程停止了,不往前执行了,其实不是这样的,线程中断只是一个状态而已,表示已中断,表示未中断获取线程中断状态,如果中断了返回否则返回设置线程中断不影响线程的继续执行,但是 很多Java开发人员(包括我),尤其是刚进入软件行业的新手,认为Java设置线程中断就是表示线程停止了,不往前执行了, Thread.cu...
摘要:初始时,为,当调用方法时,线程的加,当调用方法时,如果为,则调用线程进入阻塞状态。该对象一般供监视诊断工具确定线程受阻塞的原因时使用。 showImg(https://segmentfault.com/img/remote/1460000016012503); 本文首发于一世流云的专栏:https://segmentfault.com/blog... 一、LockSupport类简介...
摘要:源码阅读创建锁和同步类中使用的基础的线程阻塞原语除非你是多线程专家,而且你要自己设计和实现阻塞式线程同步机制比如等等,否则你不需要用和。 LockSupport源码阅读 /* * 创建锁和同步类中使用的基础的线程阻塞原语 * * 除非你是多线程专家,而且你要自己设计和实现阻塞式线程同步机制(比如lock、condition等等),否则你不需要用park和unpark。这两个原语是...
阅读 3736·2021-11-23 09:51
阅读 4205·2021-11-15 11:37
阅读 3497·2021-09-02 15:21
阅读 2722·2021-09-01 10:31
阅读 853·2021-08-31 14:19
阅读 824·2021-08-11 11:20
阅读 3285·2021-07-30 15:30
阅读 1669·2019-08-30 15:54