摘要:所以也很容易想到可以利用等待通知机制来实现,和上文的并发包入坑指北之阻塞队列的类似。
前言
在面试过程中聊到并发相关的内容时,不少面试官都喜欢问这类问题:
当 N 个线程同时完成某项任务时,如何知道他们都已经执行完毕了。
这也是本次讨论的话题之一,所以本篇为『并发包入坑指北』的第二篇;来聊聊常见的并发工具。
自己实现其实这类问题的核心论点都是:如何在一个线程中得知其他线程是否执行完毕。
假设现在有 3 个线程在运行,需要在主线程中得知他们的运行结果;可以分为以下几步:
定义一个计数器为 3。
每个线程完成任务后计数减一。
一旦计数器减为 0 则通知等待的线程。
所以也很容易想到可以利用等待通知机制来实现,和上文的『并发包入坑指北』之阻塞队列的类似。
按照这个思路自定义了一个 MultipleThreadCountDownKit 工具,构造函数如下:
考虑到并发的前提,这个计数器自然需要保证线程安全,所以采用了 AtomicInteger。
所以在初始化时需要根据线程数量来构建对象。
计数器减一当其中一个业务线程完成后需要将这个计数器减一,直到减为0为止。
/** * 线程完成后计数 -1 */ public void countDown(){ if (counter.get() <= 0){ return; } int count = this.counter.decrementAndGet(); if (count < 0){ throw new RuntimeException("concurrent error") ; } if (count == 0){ synchronized (notify){ notify.notify(); } } }
利用 counter.decrementAndGet() 来保证多线程的原子性,当减为 0 时则利用等待通知机制来 notify 其他线程。
等待所有线程完成而需要知道业务线程执行完毕的其他线程则需要在未完成之前一直处于等待状态,直到上文提到的在计数器变为 0 时得到通知。
/** * 等待所有的线程完成 * @throws InterruptedException */ public void await() throws InterruptedException { synchronized (notify){ while (counter.get() > 0){ notify.wait(); } if (notifyListen != null){ notifyListen.notifyListen(); } } }
原理也很简单,一旦计数器还存在时则会利用 notify 对象进行等待,直到被业务线程唤醒。
同时这里新增了一个通知接口可以自定义实现唤醒后的一些业务逻辑,后文会做演示。
并发测试主要就是这两个函数,下面来做一个演示。
初始化了三个计数器的并发工具 MultipleThreadCountDownKit
创建了三个线程分别执行业务逻辑,完毕后执行 countDown()。
线程 3 休眠了 2s 用于模拟业务耗时。
主线程执行 await() 等待他们三个线程执行完毕。
通过执行结果可以看出主线程会等待最后一个线程完成后才会退出;从而达到了主线程等待其余线程的效果。
MultipleThreadCountDownKit multipleThreadKit = new MultipleThreadCountDownKit(3); multipleThreadKit.setNotify(() -> LOGGER.info("三个线程完成了任务"));
也可以在初始化的时候指定一个回调接口,用于接收业务线程执行完毕后的通知。
当然和在主线程中执行这段逻辑效果是一样的(和执行 await() 方法处于同一个线程)。
CountDownLatch当然我们自己实现的代码没有经过大量生产环境的验证,所以主要的目的还是尝试窥探官方的实现原理。
所以我们现在来看看 juc 下的 CountDownLatch 是如何实现的。
通过构造函数会发现有一个 内部类 Sync,他是继承于 AbstractQueuedSynchronizer ;这是 Java 并发包中的基础框架,都可以多带带拿来讲了,所以这次重点不是它,今后我们再着重介绍。
这里就可以把他简单理解为提供了和上文类似的一个计数器及线程通知工具就行了。countDown
其实他的核心逻辑和我们自己实现的区别不大。
public void countDown() { sync.releaseShared(1); } public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; }
利用这个内部类的 releaseShared 方法,我们可以理解为他想要将计数器减一。
看到这里有没有似曾相识的感觉。
没错,在 JDK1.7 中的 AtomicInteger 自减就是这样实现的(利用 CAS 保证了线程安全)。
只是一旦计数器减为 0 时则会执行 doReleaseShared 唤醒其他的线程。
这里我们只需要关心红框部分(其他的暂时不用关心,这里涉及到了 AQS 中的队列相关),最终会调用 LockSupport.unpark 来唤醒线程;就相当于上文调用 object.notify()。
所以其实本质上还是相同的。
await其中的 await() 也是借用 Sync 对象的方法实现的。
public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); } public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); //判断计数器是否还未完成 if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); } protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; }
一旦还存在未完成的线程时,则会调用 doAcquireSharedInterruptibly 进入阻塞状态。
private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); }
同样的由于这也是 AQS 中的方法,我们只需要关心红框部分;其实最终就是调用了 LockSupport.park 方法,也就相当于执行了 object.wait() 。
所有的业务线程执行完毕后会在计数器减为 0 时调用 LockSupport.unpark 来唤醒线程。
等待线程一旦计数器 > 0 时则会利用 LockSupport.park 来等待唤醒。
这样整个流程也就串起来了,它的使用方法也和上文的类似。
就不做过多介绍了。
实际案例同样的来看一个实际案例。
在上一篇《一次分表踩坑实践的探讨》提到了对于全表扫描的情况下,需要利用多线程来提高查询效率。
比如我们这里分为了 64 张表,计划利用 8 个线程来分别处理这些表的数据,伪代码如下:
CountDownLatch count = new CountDownLatch(64); ConcurrentHashMap total = new ConcurrentHashMap(); for(Integer i=0;i<=63;i++){ executor.execute(new Runnable(){ @Override public void run(){ List value = queryTable(i); total.put(value,NULL); count.countDown(); } }) ; } count.await(); System.out.println("查询完毕");
这样就可以实现所有数据都查询完毕后再做统一汇总;代码挺简单,也好理解(当然也可以使用线程池的 API)。
总结CountDownLatch 算是 juc 中一个高频使用的工具,学会和理解他的使用会帮助我们更容易编写并发应用。
文中涉及到的源码:
https://github.com/crossoverJie/JCSprout/blob/master/src/main/java/com/crossoverjie/concurrent/communication/MultipleThreadCountDownKit.java
你的点赞与分享是对我最大的支持
文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。
转载请注明本文地址:https://www.ucloud.cn/yun/74357.html
摘要:自己实现在自己实现之前先搞清楚阻塞队列的几个特点基本队列特性先进先出。消费队列空时会阻塞直到写入线程写入了队列数据后唤醒消费线程。最终的队列大小为,可见线程也是安全的。 showImg(https://segmentfault.com/img/remote/1460000018811340); 前言 较长一段时间以来我都发现不少开发者对 jdk 中的 J.U.C(java.util.c...
摘要:注意版本的是普通的超集,包含了所有正常版的功能,可以理解为。因为识别的还是之前的版本。安装好以后就可以愉快地使用各种库了。 写在前面 之前搞树莓派,opencv的contrib版本死活装不上,最后用C++版本四线程编译了一天, 浪费生命的玩意儿我明明记得之前,pip install opencv-contrib是可以安装的......,年级大了,老了最近终于找到了一篇推文,原来是pip...
摘要:最近业务需要抽离,抽离出来的应用需要做成第三方包的形式,可以在任何也没那么神奇,例如有些版本就没测试版本项目中,直接安装使用,所以这里还是需要发包到。第一次发包我是先发到环境,看下发包还是不是符合我的预期,毕竟很长时间没发过包。 最近业务需要抽离,抽离出来的应用需要做成 Django 第三方包的形式,可以在任何 Django(也没那么神奇,例如有些版本就没测试)版本项目中,直接安装使用...
摘要:模块什么是模块什么是模块化玩过游戏的朋友应该知道,一把装配完整的步枪,一般是枪身消音器倍镜握把枪托。更重要的是,其它大部分语言都支持模块化。这一点与规范完全不同。模块输出的是值的缓存,不存在动态更新。 1.模块 1.1 什么是模块?什么是模块化? 玩过FPS游戏的朋友应该知道,一把装配完整的M4步枪,一般是枪身+消音器+倍镜+握把+枪托。 如果把M4步枪看成是一个页面的话,那么我们可以...
阅读 2397·2021-11-19 09:59
阅读 1917·2019-08-30 15:55
阅读 890·2019-08-29 13:30
阅读 1312·2019-08-26 10:18
阅读 3065·2019-08-23 18:36
阅读 2362·2019-08-23 18:25
阅读 1141·2019-08-23 18:07
阅读 412·2019-08-23 17:15