摘要:能够异步的执行任务,并且通常管理一个线程池。这样我们就不用手动的去创建线程了,线程池中的所有线程都将被重用。在之后不能再提交任务到线程池。它不使用固定大小的线程池,默认情况下是主机的可用内核数。
原文地址: Java 8 Concurrency Tutorial: Threads and Executors
Java 5 初次引入了Concurrency API,并在随后的发布版本中不断优化和改进。这篇文章的大部分概念也适用于老的版本。我的代码示例主要聚焦在Java 8上,并大量适用 lambda 表达式和一些新特性。如果你还不熟悉 lambda 表达式,建议先阅读 Java 8 Tutorial。
Threads 和 Runnables所有现代操作系统都是通过进程和线程来支持并发的。进程通常是相互独立运行的程序实例。例如,你启动一个 Java 程序,操作系统会产生一个新的进程和其他程序并行运行。在这些进程中可以利用线程同时执行代码。这样我们就可以充分利用 CPU。
Java 从 JDK 1.0 开始就支持线程。在开始一个新线程之前,必须先指定运行的代码,通常称为 Task。下面是通过实现 Runnable 接口来启动一个新线程的例子:
Runnable task = () -> { String threadName = Thread.currentThread().getName(); System.out.println("Hello " + threadName); }; task.run(); Thread thread = new Thread(task); thread.start(); System.out.println("Done!");
由于 Runnable 是一个函数式接口,我们可以使用 lambda 表达式来打印线程的名字到控制台。我们直接在主线程上执行Runnable,然后开始一个新线程。在控制台你将看到这样的结果:
Hello main Hello Thread-0 Done!
或者:
Hello main Done! Hello Thread-0
由于是并发执行,我们无法预测 Runnable 是在打印 Done 之前还是之后调用,顺序不是不确定的,因此并发编程成为大型应用程序开发中一项复杂的任务。
线程也可以休眠一段时间,例如下面的例子:
Runnable runnable = () -> { try { String name = Thread.currentThread().getName(); System.out.println("Foo " + name); TimeUnit.SECONDS.sleep(1); System.out.println("Bar " + name); } catch (InterruptedException e) { e.printStackTrace(); } }; Thread thread = new Thread(runnable); thread.start();
执行上面的代码会在两个打印语句之间停留1秒钟。TimeUnit 是一个时间单位的枚举,或者可以通过调用 Thread.sleep(1000) 实现。
使用 Thread 类可能非常繁琐且容易出错。由于这个原因,在2004年,Java 5版本引入了 Concurrency API。API 位于 java.util.concurrent 包下,包含了许多有用的有关并发编程的类。从那时起,每个新发布的 Java 版本都增加了并发 API,Java 8 也提供了新的类和方法来处理并发。
现在我们来深入了解一下Concurrency API中最重要的部分 - executor services。
ExecutorsConcurrency API 引入了 ExecutorService 的概念,作为处理线程的高级别方式用来替代 Threads。 Executors 能够异步的执行任务,并且通常管理一个线程池。这样我们就不用手动的去创建线程了,线程池中的所有线程都将被重用。从而可以在一个
executor service 的整个应用程序生命周期中运行尽可能多的并发任务。
下面是一个简单的 executors 例子:
ExecutorService executor = Executors.newSingleThreadExecutor(); executor.submit(() -> { String threadName = Thread.currentThread().getName(); System.out.println("Hello " + threadName); }); // => Hello pool-1-thread-1
Executors 类提供了方便的工厂方法来创建不同类型的 executor services 。在这个例子中使用了只执行一个线程的 executor。
执行结果看起来和上面的示例类似,但是你会注意到一个重要区别:Java 进程永远不会停止,执行者必须明确的停止它,否则它会不断的接受新的任务。
ExecutorService 为此提供了两种方法:shutdown() 等待当前任务执行完毕,而 shutdownNow() 则中断所有正在执行的任务,并立即关闭执行程序。在 shudown 之后不能再提交任务到线程池。
下面是我关闭程序的首选方式:
try { System.out.println("attempt to shutdown executor"); executor.shutdown(); executor.awaitTermination(5, TimeUnit.SECONDS); } catch (InterruptedException e) { System.err.println("tasks interrupted"); } finally { if (!executor.isTerminated()) { System.err.println("cancel non-finished tasks"); } executor.shutdownNow(); System.out.println("shutdown finished"); }
执行者调用 shutdown 关闭 executor,在等待 5 秒钟钟后,不管任务有没有执行完毕都调用 shutdownNow 中断正在执行的任务而关闭。
Callables 和 Futures除了 Runnable 以外,executors 还支持 Callable 任务,和 Runnable 一样是一个函数式接口,但它是有返回值的。
下面是一个使用 lambda 表达式定义的 Callable ,在睡眠 1 秒后返回一个整形值。
Callabletask = () -> { try { TimeUnit.SECONDS.sleep(1); return 123; } catch (InterruptedException e) { throw new IllegalStateException("task interrupted", e); } };
和 Runnable 一样,Callable 也可以提交到 executor services,但是执行的结果是什么?由于 submit() 不等待任务执行完成,executor service 不能直接返回调用的结果。相对应的,它返回一个 Future 类型的结果,使用 Future 可以检索实际执行结果。
ExecutorService executor = Executors.newFixedThreadPool(1); Futurefuture = executor.submit(task); System.out.println("future done? " + future.isDone()); Integer result = future.get(); System.out.println("future done? " + future.isDone()); System.out.print("result: " + result);
在将 Callable 提交给 executor 后,首先通过 isDone() 来检查 future 是否执行完毕。我敢肯定,情况并非如此,因为上面的调用在返回整数之前睡眠了 1 秒钟。
调用方法 get() 会阻塞当前线程,直到 callable 执行完成返回结果,现在 future 执行完成,并在控制台输出下面的结果:
future done? false future done? true result: 123
Future 与 executor service 紧密结合,如果关闭 executor service, 每个 Future 都会抛出异常。
executor.shutdownNow(); future.get();
这里创建 executor 的方式与前面的示例不同,这里使用 newFixedThreadPool(1) 来创建一个线程数量为 1 的线程池来支持 executor, 这相当于 newSingleThreadExecutor() ,稍后我们我们会通过传递一个大于 1 的值来增加线程池的大小。
Timeouts任何对 future.get()的调用都会阻塞并等待 Callable 被终止。 在最坏的情况下,一个可调用函数将永远运行,从而使应用程序无法响应。可以简单地通过超时来抵消这些情况:
ExecutorService executor = Executors.newFixedThreadPool(1); Futurefuture = executor.submit(() -> { try { TimeUnit.SECONDS.sleep(2); return 123; } catch (InterruptedException e) { throw new IllegalStateException("task interrupted", e); } }); future.get(1, TimeUnit.SECONDS);
执行上面的代码会抛出 TimeoutException
Exception in thread "main" java.util.concurrent.TimeoutException at java.util.concurrent.FutureTask.get(FutureTask.java:205)
指定了 1 秒钟的最长等待时间,但是在返回结果之前,可调用事实上需要 2 秒钟的时间。
InvokeAllExecutors 支持通过 invokeAll() 批量提交多个 Callable 。这个方法接受一个 Callable 类型集合的参数,并返回一个 Future 类型的 List。
ExecutorService executor = Executors.newWorkStealingPool(); List> callables = Arrays.asList( () -> "task1", () -> "task2", () -> "task3"); executor.invokeAll(callables) .stream() .map(future -> { try { return future.get(); } catch (Exception e) { throw new IllegalStateException(e); } }) .forEach(System.out::println);
在这个例子中,我们利用 Java 8 的流来处理 invokeAll 调用返回的所有 Future。 我们首先映射每个 Future 的返回值,然后将每个值打印到控制台。 如果还不熟悉流,请阅读Java 8 Stream Tutorial。
InvokeAny批量提交可调用的另一种方法是 invokeAny(),它与 invokeAll() 略有不同。 该方法不会返回所有的 Future 对象,它只返回第一个执行完毕任务的结果。
Callablecallable(String result, long sleepSeconds) { return () -> { TimeUnit.SECONDS.sleep(sleepSeconds); return result; }; }
我们使用这种方法来创建一个有三个不同睡眠时间的 Callable。 通过 invokeAny()将这些可调用对象提交给 executor ,返回最快执行完毕结果,在这种情况下,task2:
ExecutorService executor = Executors.newWorkStealingPool(); List> callables = Arrays.asList( callable("task1", 2), callable("task2", 1), callable("task3", 3)); String result = executor.invokeAny(callables); System.out.println(result); // => task2
上面的例子使用通过 newWorkStealingPool() 创建的另一种类型的 executor。 这个工厂方法是 Java 8 的一部分,并且返回一个类型为 ForkJoinPool的 executor,它与正常的 executor 略有不同。 它不使用固定大小的线程池,默认情况下是主机CPU的可用内核数。
Scheduled Executors我们已经学会了如何在 Executors 上提交和运行任务。 为了多次定期运行任务,我们可以使用 scheduled thread pools。
ScheduledExecutorService 能够安排任务定期运行或在一段时间过后运行一次。
下面代码示例一个任务在三秒钟后运行:
ScheduledExecutorService executor = Executors.newScheduledThreadPool(1); Runnable task = () -> System.out.println("Scheduling: " + System.nanoTime()); ScheduledFuture> future = executor.schedule(task, 3, TimeUnit.SECONDS); TimeUnit.MILLISECONDS.sleep(1337); long remainingDelay = future.getDelay(TimeUnit.MILLISECONDS); System.out.printf("Remaining Delay: %sms", remainingDelay);
调度任务产生一个类型为 ScheduledFuture的值,除了 Future 之外,它还提供getDelay() 方法来检索任务执行的剩余时间。
为了定时执行的任务,executor 提供了两个方法 scheduleAtFixedRate() 和
scheduleWithFixedDelay() 。 第一种方法能够执行具有固定时间间隔的任务,例如, 每秒一次:
ScheduledExecutorService executor = Executors.newScheduledThreadPool(1); Runnable task = () -> System.out.println("Scheduling: " + System.nanoTime()); int initialDelay = 0; int period = 1; executor.scheduleAtFixedRate(task, initialDelay, period, TimeUnit.SECONDS);
此外,此方法还可以设置延迟时间,该延迟描述了首次执行任务之前的等待时间。
scheduleWithFixedDelay() 方法与 scheduleAtFixedRate() 略有不同,不同之处是它们的等待时间,scheduleWithFixedDelay() 的等待时间是在上一个任务结束和下一个任务开始之间施加的。
ScheduledExecutorService executor = Executors.newScheduledThreadPool(1); Runnable task = () -> { try { TimeUnit.SECONDS.sleep(2); System.out.println("Scheduling: " + System.nanoTime()); } catch (InterruptedException e) { System.err.println("task interrupted"); } }; executor.scheduleWithFixedDelay(task, 0, 1, TimeUnit.SECONDS);
本示例在执行结束和下一次执行开始之间延迟 1 秒。 初始延迟为 0,任务持续时间为 2 秒。 所以我们结束了一个0s,3s,6s,9s等的执行间隔。
文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。
转载请注明本文地址:https://www.ucloud.cn/yun/70831.html
摘要:在这个示例中我们使用了一个单线程线程池的。在延迟消逝后,任务将会并发执行。这是并发系列教程的第一部分。第一部分线程和执行器第二部分同步和锁第三部分原子操作和 Java 8 并发教程:线程和执行器 原文:Java 8 Concurrency Tutorial: Threads and Executors 译者:BlankKelly 来源:Java8并发教程:Threads和Execut...
摘要:前言在上一篇文章中多线程奇幻之旅算法实现线程安全,我们介绍了和方式实现线程安全类的方法,两种方式一个是锁定阻塞方式,一个是非阻塞方式。 前言 在上一篇文章中《Java多线程奇幻之旅——CAS算法实现线程安全》,我们介绍了Synchronized和CAS方式实现线程安全类的方法,两种方式一个是锁定阻塞方式,一个是非阻塞方式。本文专注于两种实现方式效率问题。本文是上篇文章的延续,会借用到上...
摘要:无限期等待另一个线程执行特定操作。线程安全基本版请说明以及的区别值都不能为空数组结构上,通过数组和链表实现。优先考虑响应中断,而不是响应锁的普通获取或重入获取。只是在最后获取锁成功后再把当前线程置为状态然后再中断线程。 前段时间在慕课网直播上听小马哥面试劝退(面试虐我千百遍,Java 并发真讨厌),发现讲得东西比自己拿到offer还要高兴,于是自己在线下做了一点小笔记,供各位参考。 课...
摘要:序本文主要研究一下的这里如果的为,则会创建这里如果是的话,参数传递的是如果是同步的方法,则传的值是这里创建了一个,然后调用,这里使用了可以看到这里使用的是的方法注意这个方法是才有的,也是在这里使用的由于默认是使用创建的, 序 本文主要研究一下jdk httpclient的executor HttpClientImpl java.net.http/jdk/internal/net/htt...
摘要:用于限定中线程数的最大值。该线程池中的任务队列维护着等待执行的对象。线程池和消息队列笔者在实际工程应用中,使用过多线程和消息队列处理过异步任务。以上是笔者在学习实践之后对于多线程和消息队列的粗浅认识,初学者切莫混淆两者的作用。 多线程编程很难,难点在于多线程代码的执行不是按照我们直觉上的执行顺序。所以多线程编程必须要建立起一个宏观的认识。 线程池是多线程编程中的一个重要概念。为了能够更...
阅读 3976·2021-11-22 13:53
阅读 3582·2021-11-19 11:29
阅读 1164·2021-09-08 09:35
阅读 3121·2020-12-03 17:26
阅读 477·2019-08-29 16:06
阅读 2077·2019-08-26 13:50
阅读 1147·2019-08-23 18:32
阅读 2109·2019-08-23 18:12