用并发解决的问题大体可以分为“速度”和“设计可管理性”两种。 速度并发解决“速度”问题不仅仅是利用多个CPU去解决分片的问题,也就是说并发不仅仅是多个CPU的事情,也是单个CPU的事情。如果提高程序在单个CPU的性能,就得考虑具体情况,正常情况单个CPU运行多任务(task)是有上下文切换的性能损耗。但在阻塞(Blocking)的情况下就不同了。
在单个CPU的系统中性能提高的常见示例:事件驱动编程(event-driven programing)。
Java语言采用更加传统的方式,在顺序语言的基础上提供对线程的支持。 Java的目的是“编写一次,到处运行”,所以在OSX之前的Macintosh操作系统版本是不支持多任务,因此Java支持多线程机制,让并发Java程序能够移植到Macintosh和类似的平台上。
线程带来设计上的演变为了获取线程的结果,于是产生轮询,然后再后来为了解决轮询,引进了静态方法的回调,再后来带来实例方法的回调,最后引出设计模式:策略模式 和Java5引进多线程编程的新方法,通过隐藏细节可以更容易地处理回调——ExecutorService和Futrue
package com.jc.thread; import com.jc.thinkinjava.io.util.Directory; import javax.xml.bind.DatatypeConverter; import java.io.File; import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.IOException; import java.security.DigestInputStream; import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; import java.util.ArrayList; import java.util.List; /** * 回调例子-前序 * * 计算文件的256位的SHA-2消息摘要 * 由于瓶颈在IO上,所以采用多线程 * * 尝试去获取线程返回的值,但发现需要另外个线程不停的轮询,这是很耗cpu资源 */ @SuppressWarnings("Duplicates") public class ReturnDigest extends Thread { private String fileName; private byte[] digest; public ReturnDigest(String fileName) { this.fileName = fileName; } @Override public void run() { try { // System.out.println(fileName); FileInputStream in = new FileInputStream(fileName); MessageDigest sha = MessageDigest.getInstance("SHA-256"); DigestInputStream digestInputStream = new DigestInputStream(in, sha); while (digestInputStream.read() != -1) ; digestInputStream.close(); digest = sha.digest(); //注意,不是DigestInputStream的方法哦 StringBuilder sb = new StringBuilder(fileName); sb.append(":").append(DatatypeConverter.printHexBinary(digest)); System.out.println(sb.toString()); } catch (FileNotFoundException e) { e.printStackTrace(); } catch (NoSuchAlgorithmException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } } public byte[] getDigest() { return this.digest; } public static void main(String[] args) { File[] files = Directory.local(".", ".*"); ListfileList = new ArrayList (); for (int i = 0; i < files.length; i++) { File file = files[i]; if (!file.isDirectory()) { fileList.add(file); } } ReturnDigest[] digests = new ReturnDigest[fileList.size()]; for (int i = 0; i < fileList.size(); i++) { File file = fileList.get(0); digests[i] = new ReturnDigest(file.getAbsolutePath()); digests[i].start(); } for(int i=0;i 然后为了解决轮询,产生了静态方法的回调:
package com.jc.thread.callback; import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.IOException; import java.security.DigestInputStream; import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; /** * 回调例子 * 静态方法的回调 */ @SuppressWarnings("Duplicates") public class CallbackDigest implements Runnable{ private String fileName; public CallbackDigest(String fileName) { this.fileName = fileName; } @Override public void run() { try { // System.out.println(fileName); FileInputStream in = new FileInputStream(fileName); MessageDigest sha = MessageDigest.getInstance("SHA-256"); DigestInputStream digestInputStream = new DigestInputStream(in, sha); while (digestInputStream.read() != -1) ; digestInputStream.close(); byte[] digest = sha.digest(); //注意,不是DigestInputStream的方法哦 CallbackDigestUserInterface.receiveDigest(digest,fileName); } catch (FileNotFoundException e) { e.printStackTrace(); } catch (NoSuchAlgorithmException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } } }package com.jc.thread.callback; import com.jc.thinkinjava.io.util.Directory; import com.jc.thread.DigestRunnable; import javax.xml.bind.DatatypeConverter; import java.io.File; /** * 回调例子 * 静态方法的回调 */ public class CallbackDigestUserInterface { public static void receiveDigest(byte[] digest,String fileName){ StringBuilder sb = new StringBuilder(fileName); sb.append(":").append(DatatypeConverter.printHexBinary(digest)); System.out.println(sb.toString()); } public static void main(String[] args) { File[] files = Directory.local(".", ".*"); for (File file : files) { if (!file.isDirectory()) new Thread(new DigestRunnable(file.getAbsolutePath())).start(); } } }实例方法的回调:
package com.jc.thread.callback; import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.IOException; import java.security.DigestInputStream; import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; public class InstanceCallbackDigest implements Runnable{ private String fileName; private InstanceCallbackDigestUserInterface callback; public InstanceCallbackDigest(String fileName, InstanceCallbackDigestUserInterface instanceCallbackDigestUserInterface) { this.fileName = fileName; this.callback = instanceCallbackDigestUserInterface; } @Override public void run() { try { // System.out.println(fileName); FileInputStream in = new FileInputStream(fileName); MessageDigest sha = MessageDigest.getInstance("SHA-256"); DigestInputStream digestInputStream = new DigestInputStream(in, sha); while (digestInputStream.read() != -1) ; digestInputStream.close(); byte[] digest = sha.digest(); //注意,不是DigestInputStream的方法哦 callback.receiveDigest(digest); } catch (FileNotFoundException e) { e.printStackTrace(); } catch (NoSuchAlgorithmException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } } }package com.jc.thread.callback; import com.jc.thinkinjava.io.util.Directory; import com.jc.thread.ReturnDigest; import javax.xml.bind.DatatypeConverter; import java.io.File; import java.util.ArrayList; import java.util.Arrays; import java.util.List; /** * 回调例子 ** 使用实例方法代替静态方法进行回调 *
* 虽然复杂点,但优点很多。如: * 1. 主类(InstanceCallbackDigestUserInterface)的各个实例映射为一个文件,可以很自然地记录跟踪这个文件的信息,而不需要额外的数据结构 * 2. 这个实例在有必要时可以容易地重新计算某个特定文件的摘要 *
* 实际上,经证明,这种机制有更大的灵活性。 *
* 这种机制,也称为:观察者模式,如Swing、AWT */ public class InstanceCallbackDigestUserInterface { private String fileName; private byte[] digest; public InstanceCallbackDigestUserInterface(String fileName) { this.fileName = fileName; } public void calculateDigest() { InstanceCallbackDigest instanceCallbackDigest = new InstanceCallbackDigest(fileName, this); new Thread(instanceCallbackDigest).start(); } public void receiveDigest(byte[] digest) { this.digest = digest; System.out.println(this); } @Override public String toString() { String result = fileName + ": "; if (digest != null) { result += DatatypeConverter.printHexBinary(digest); } else { result += "digest not available"; } return result; } public static void main(String[] args) { File[] files = Directory.local(".", ".*"); List
fileList = new ArrayList (); for (int i = 0; i < files.length; i++) { File file = files[i]; if (!file.isDirectory()) { fileList.add(file); } } for (int i = 0; i < fileList.size(); i++) { File file = fileList.get(0); InstanceCallbackDigestUserInterface instanceCallbackDigestUserInterface = new InstanceCallbackDigestUserInterface(file.getAbsolutePath()); instanceCallbackDigestUserInterface.calculateDigest(); } } } Java5引进的新方法,ExecutorService和Future:
package com.jc.thread.callback; import java.util.concurrent.Callable; public class FindMaxTask implements Callable{ private int[] data; private int start; private int end; public FindMaxTask(int[] data, int start, int end) { this.data = data; this.start = start; this.end = end; } @Override public Integer call() throws Exception { int max = Integer.MAX_VALUE; for (int i = start; i < end; i++) { if (data[i] > max) max = data[i]; } return max; } } package com.jc.thread.callback; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; /** * * Java5引入了多线程编程的一个新方法,通过隐藏细节可以更容易地处理回调 * 使用回调实现的Futrue */ public class MultithreadedMaxFinder { public static int max(int[] data) throws ExecutionException, InterruptedException { if (data.length == 1) { return data[0]; } else if (data.length == 0) { throw new IllegalArgumentException(); } FindMaxTask task1 = new FindMaxTask(data,0,data.length/2); FindMaxTask task2 = new FindMaxTask(data,data.length/2,data.length); ExecutorService executorService = Executors.newFixedThreadPool(2); Future基本线程机制future1 = executorService.submit(task1); Future future2 = executorService.submit(task2); //调用future1.get()时,这个方法会进行阻塞,等待第一个FindMaxTask完成。只有当第一个FindMaxTask完成,才会调用future2.get() return Math.max(future1.get(),future2.get()); } } 并发编程使我们可以将程序划分为多个分离的、独立运行的任务。通过使用多线程机制,这些独立任务(也被称为子任务)中的每一个都将由执行线程来驱动。
所以,使用线程机制是一种建立透明的、可扩展的程序的方法,如果程序运行得太慢,为机器增添一个CPU就能容易地加快程序的运行速度。多任务和多线程往往是使用多处理器系统的最合理方式。//此方法调用是对 线程调度器 的一种建议:我已经执行完生命周期中最重要的部分了,此刻正是切换给其他任务执行一段时间的大好时机。 Thread.yield();Thread.yield();这个方法叫“让步”,不过没有任何机制保证它将会被采纳。
那为什么Java设计者不用Task而用Thread或Runnable呢? 之所以有上述的困惑(概念混淆),那是因为,虽然从概念上讲,我们应该只关注任务,而不需要关注线程的细节,我们只需要定义任务,然后说“开始”就好。但实际情况是,在物理上,创建线程可能会代价很高,因此需要人工去保存和管理它们。而且Java的线程机制是基于C的低级的P线程(pthread)方式。所以才导致任务和线程这两个概念总是混在一起。站在实现和更抽象的角度,这两者应该分开,所以编写代码时,你必须遵守规则。为了描述更清楚,因为定义为要执行的工作则为“任务”,引用到驱动任务的具体机制时,用“线程”。 如果只是概念级别上讨论系统,则只用“任务”就行。
package com.jc.concurrency; /** * 一个线程可以等待一个线程完成,那就是用join * @author * */ class Sleeper extends Thread { private int duration; public Sleeper(String name, int sleepTime) { super(name); duration = sleepTime; start(); } public void run() { try { sleep(duration); } catch (InterruptedException e) { //异常捕获时会将Interrupted这个标志位重置为false,所以在这里输出false System.out.println(getName() + " was interrupted. " + "isInterrupted(): " + isInterrupted()); return; } System.out.println(getName() + " has awakened"); } } class Joiner extends Thread { private Sleeper sleeper; public Joiner(String name, Sleeper sleeper) { super(name); this.sleeper = sleeper; start(); } public void run() { try { sleeper.join(); } catch (InterruptedException e) { System.out.println("Interrupted"); } System.out.println(getName() + " join completed"); } } public class Joining { public static void main(String[] args) { Sleeper sleepy = new Sleeper("Sleepy", 1500), grumpy = new Sleeper("Grumpy", 1500); Joiner dopey = new Joiner("Dopey", sleepy), doc = new Joiner("Doc", grumpy); grumpy.interrupt(); } }捕获异常在main方法是无法捕获到线程里的异常。为解决这个问题,我们修改Executor产生线程的方式。Java SE5中的新接口:Thread.UncaughtExceptionHandler
package com.jc.concurrency; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; /** * 使用Thread.UncaughtExceptionHandler处理线程抛出的异常 * * MyUncaughtExceptionHandler会新建线程去处理其他线程跑出来的异常 * * @author * */ class ExceptionThread2 implements Runnable { public void run() { Thread t = Thread.currentThread(); System.out.println("run() by " + t); System.out.println("eh = " + t.getUncaughtExceptionHandler()); throw new RuntimeException(); } } class MyUncaughtExceptionHandler implements Thread.UncaughtExceptionHandler { public void uncaughtException(Thread t, Throwable e) { System.out.println("caught " + t + ""s " + e); } } class HandlerThreadFactory implements ThreadFactory { @Override public Thread newThread(Runnable r) { System.out.println(this + " creating new Thread"); Thread t = new Thread(r); System.out.println("created " + t); t.setUncaughtExceptionHandler(new MyUncaughtExceptionHandler()); System.out.println("eh = " + t.getUncaughtExceptionHandler()); return t; } } public class CaptureUncaughtException { public static void main(String[] args) { ExecutorService exec = Executors.newCachedThreadPool(new HandlerThreadFactory()); exec.execute(new ExceptionThread2()); } }/* * output: * * com.jc.concurrency.HandlerThreadFactory@4e25154f creating new Thread * created Thread[Thread-0,5,main] eh = * com.jc.concurrency.MyUncaughtExceptionHandler@70dea4e run() by * Thread[Thread-0,5,main] eh = * com.jc.concurrency.MyUncaughtExceptionHandler@70dea4e * com.jc.concurrency.HandlerThreadFactory@4e25154f creating new Thread * created Thread[Thread-1,5,main] eh = * com.jc.concurrency.MyUncaughtExceptionHandler@5490c2f5 caught * Thread[Thread-0,5,main]"s java.lang.RuntimeException * * * */还可以设置默认异常处理器:
package com.jc.concurrency; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * 设置默认的线程异常处理类 * @author * */ public class SettingDefaultHandler { public static void main(String[] args) { Thread.setDefaultUncaughtExceptionHandler(new MyUncaughtExceptionHandler()); ExecutorService exec = Executors.newCachedThreadPool(); exec.execute(new ExceptionThread()); } }线程状态(Thread state)1.新建(new):一个线程可以处于四种状态之一:新建(new),就绪(Runnable),阻塞(Blocked),死亡(Dead)。
通过调用wait()使线程挂起。直到线程得到了notify()或notifyAll()消息(或者在Java SE5的java.util.concurrent类库中等价的signal()或signalAll()消息),线程才会进入就绪状态。
package com.jc.concurrency; import java.io.IOException; import java.io.InputStream; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; /** * 中断处于阻塞状态的线程例子 * 发现只有sleep()操作的才能中断,其余的io和同步都不能被中断 * @author * */ class SleepBlocked implements Runnable { public void run() { try { TimeUnit.SECONDS.sleep(100); } catch (InterruptedException e) { System.out.println("InterruptedException"); } System.out.println("Exiting SleepBlocked.run()"); } } class IOBlocked implements Runnable { private InputStream in; public IOBlocked(InputStream is) { in = is; } public void run() { try { System.out.println("Waiting for read():"); in.read(); } catch (IOException e) { if (Thread.currentThread().isInterrupted()) { System.out.println("Interrupted from blocked I/O"); } else { throw new RuntimeException(e); } } System.out.println("Exiting IOBlocked.run()"); } } class SynchronizedBlocked implements Runnable { public synchronized void f() { while (true) // Never releases lock Thread.yield(); } public SynchronizedBlocked() { new Thread() { public void run() { f(); // Lock acquired by this thread } }.start(); } public void run() { System.out.println("Trying to call f()"); f(); System.out.println("Exiting SynchronizedBlocked.run()"); } } public class Interrupting { private static ExecutorService exec = Executors.newCachedThreadPool(); static void test(Runnable r) throws InterruptedException { Future> f = exec.submit(r); TimeUnit.MILLISECONDS.sleep(100); System.out.println("Interrupting " + r.getClass().getName()); f.cancel(true); // Interrupts if running System.out.println("Interrupt sent to " + r.getClass().getName()); } public static void main(String[] args) throws Exception { test(new SleepBlocked()); test(new IOBlocked(System.in)); test(new SynchronizedBlocked()); TimeUnit.SECONDS.sleep(3); System.out.println("Aborting with System.exit(0)"); System.exit(0); // ... since last 2 interrupts failed } }发现只有sleep()操作的才能中断,其余的io和同步都不能被中断。所以有个比较不优雅,但有效的关闭方式:
package com.jc.concurrency; import java.io.InputStream; import java.net.ServerSocket; import java.net.Socket; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; /** * 中断IO阻塞的线程的方式:关闭资源 * @author * */ public class CloseResource { public static void main(String[] args) throws Exception { ExecutorService exec = Executors.newCachedThreadPool(); ServerSocket server = new ServerSocket(8080); InputStream socketInput = new Socket("localhost", 8080).getInputStream(); exec.execute(new IOBlocked(socketInput)); exec.execute(new IOBlocked(System.in)); TimeUnit.MILLISECONDS.sleep(100); System.out.println("Shutting down all threads"); exec.shutdownNow(); TimeUnit.SECONDS.sleep(1); System.out.println("Closing " + socketInput.getClass().getName()); socketInput.close(); // Releases blocked thread TimeUnit.SECONDS.sleep(1); System.out.println("Closing " + System.in.getClass().getName()); System.in.close(); // Releases blocked thread } }之所以要sleep,是想要interrupt都传到各个线程里面。以达到中断的效果。
/** * NIO提供了优雅的I/O中断 * @author * */ class NIOBlocked implements Runnable { private final SocketChannel sc; public NIOBlocked(SocketChannel sc) { this.sc = sc; } public void run() { try { System.out.println("Waiting for read() in " + this); sc.read(ByteBuffer.allocate(1)); } catch (ClosedByInterruptException e) { System.out.println("ClosedByInterruptException" + this); } catch (AsynchronousCloseException e) { System.out.println("AsynchronousCloseException" + this); } catch (IOException e) { throw new RuntimeException(e); } System.out.println("Exiting NIOBlocked.run() " + this); } } public class NIOInterruption { public static void main(String[] args) throws Exception { ExecutorService exec = Executors.newCachedThreadPool(); ServerSocket server = new ServerSocket(8080); InetSocketAddress isa = new InetSocketAddress("localhost", 8080); SocketChannel sc1 = SocketChannel.open(isa); SocketChannel sc2 = SocketChannel.open(isa); System.out.println(sc1); System.out.println(sc2); Future> f = exec.submit(new NIOBlocked(sc1)); exec.execute(new NIOBlocked(sc2)); exec.shutdown(); TimeUnit.SECONDS.sleep(1); // Produce an interrupt via cancel: f.cancel(true); TimeUnit.SECONDS.sleep(1); // Release the block by closing the channel: sc2.close(); } }SleepBlocked例子展示了synchronized的锁是不可以中断,这是很危险的。所以ReentrantLock提供了可中断的能力
package com.jc.concurrency; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; /** * SleepBlocked例子展示了synchronized的锁是不可以中断,这是很危险的。 * 所以ReentrantLock提供了可中断的能力 * @author * */ class BlockedMutex { private Lock lock = new ReentrantLock(); public BlockedMutex() { // Acquire it right away, to demonstrate interruption // of a task blocked on a ReentrantLock: lock.lock(); } public void f() { try { // This will never be available to a second task lock.lockInterruptibly(); // Special call System.out.println("lock acquired in f()"); } catch (InterruptedException e) { System.out.println("Interrupted from lock acquisition in f()"); } } } class Blocked2 implements Runnable { BlockedMutex blocked = new BlockedMutex(); public void run() { System.out.println("Waiting for f() in BlockedMutex"); blocked.f(); System.out.println("Broken out of blocked call"); } } public class Interrupting2 { public static void main(String[] args) throws Exception { Thread t = new Thread(new Blocked2()); t.start(); TimeUnit.SECONDS.sleep(1); System.out.println("Issuing t.interrupt()"); t.interrupt(); } }/**output: Waiting for f() in BlockedMutex Issuing t.interrupt() Interrupted from lock acquisition in f() Broken out of blocked call **/在没有阻塞的语句时,通过Thread.interrupted()判断线程被中断:
package com.jc.concurrency; import java.util.concurrent.TimeUnit; /** * 在没有阻塞的语句时,通过Thread.interrupted()判断线程被中断 * @author * */ class NeedsCleanup { private final int id; public NeedsCleanup(int ident) { id = ident; System.out.println("NeedsCleanup " + id); } public void cleanup() { System.out.println("Cleaning up " + id); } } class Blocked3 implements Runnable { private volatile double d = 0.0; public void run() { // try { while (!Thread.interrupted()) { // point1 NeedsCleanup n1 = new NeedsCleanup(1); // Start try-finally immediately after definition // of n1, to guarantee proper cleanup of n1: try { System.out.println("Sleeping"); // TimeUnit.SECONDS.sleep(1); // point2 NeedsCleanup n2 = new NeedsCleanup(2); // Guarantee proper cleanup of n2: try { System.out.println("Calculating"); // A time-consuming, non-blocking operation: for (int i = 1; i < 2500000; i++) d = d + (Math.PI + Math.E) / d; System.out.println("Finished time-consuming operation"); } finally { n2.cleanup(); } } finally { n1.cleanup(); } } System.out.println("Exiting via while() test"); // } catch (InterruptedException e) { // System.out.println("Exiting via InterruptedException"); // } } } public class InterruptingIdiom { public static void main(String[] args) throws Exception { if (args.length != 1) { System.out.println("usage: java InterruptingIdiom delay-in-mS"); System.exit(1); } Thread t = new Thread(new Blocked3()); t.start(); TimeUnit.MILLISECONDS.sleep(new Integer(args[0])); t.interrupt(); } }线程协作wait()和notify()wait()、notify()以及nofityAll()有一个比较特殊的方面,那就是这些方法都是基类Object的方法,而不是Thread的一部分。一开始或许有这种困惑,觉得很奇怪。明明是线程的功能,为啥要放在Object里。那时因为这些方法需要操作锁,当一个任务在方法里遇到wait()的调用时,线程的执行被挂起(阻塞状态),对象上的锁会被是否。因此wait()方法需放在同步控制块里(与之相对比是sleep()因为不用操作锁,所以可以放在非同步控制块里,而且还是Thread的方法)。如果在非同步控制调用这些方法,程序能通过编译,但运行时会抛IllegalMonitorStateException差异。例子:
package com.jc.concurrency; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; /** * wait()和notifyAll()例子,notifyAll会将该对象的wait()方法所阻塞的线程 * @author * */ class Car { private boolean waxOn = false; public synchronized void waxed() { waxOn = true; // Ready to buff notifyAll(); } public synchronized void buffed() { waxOn = false; // Ready for another coat of wax notifyAll(); } public synchronized void waitForWaxing() throws InterruptedException { while (waxOn == false) wait(); } public synchronized void waitForBuffing() throws InterruptedException { while (waxOn == true) wait(); } } class WaxOn implements Runnable { private Car car; public WaxOn(Car c) { car = c; } public void run() { try { while (!Thread.interrupted()) { System.out.print("Wax On! "); TimeUnit.MILLISECONDS.sleep(200); car.waxed(); car.waitForBuffing(); } } catch (InterruptedException e) { System.out.println("Exiting via interrupt"); } System.out.println("Ending Wax On task"); } } class WaxOff implements Runnable { private Car car; public WaxOff(Car c) { car = c; } public void run() { try { while (!Thread.interrupted()) { car.waitForWaxing(); System.out.print("Wax Off! "); TimeUnit.MILLISECONDS.sleep(200); car.buffed(); } } catch (InterruptedException e) { System.out.println("Exiting via interrupt"); } System.out.println("Ending Wax Off task"); } } public class WaxOMatic { public static void main(String[] args) throws Exception { Car car = new Car(); ExecutorService exec = Executors.newCachedThreadPool(); exec.execute(new WaxOff(car)); exec.execute(new WaxOn(car)); TimeUnit.SECONDS.sleep(5); // Run for a while... exec.shutdownNow(); // Interrupt all tasks } }notify()和nofityAll()因为可能有多个任务在单个Car对象上处于wait()状态,因此调用nofityAll()比只调用notify()要更安全。所以上面那个程序,只有一个任务,因此可以使用notify()来代替notifyAll()。
使用 notify()而不是notifyAll()是一种优化。除非知道notify()会唤醒具体哪个任务,不如还是notifyAll()保守点
在有关Java的线程机制的讨论中,有一个令人困惑的描述:notifyAll()将唤醒“所有正在等待的任务”。其实更准确是:当notifyAll()因某个特定锁而被调用时,只有等待这个锁的任务才会被唤醒:package com.jc.concurrency; import java.util.Timer; import java.util.TimerTask; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; /** * 当notifyAll()因某个特定锁而被调用时,只有等待这个锁的任务才会被唤醒 * @author * */ class Blocker { synchronized void waitingCall() { try { while (!Thread.interrupted()) { wait(); System.out.print(Thread.currentThread() + " "); } } catch (InterruptedException e) { // OK to exit this way } } synchronized void prod() { notify(); } synchronized void prodAll() { notifyAll(); } } class Task implements Runnable { static Blocker blocker = new Blocker(); public void run() { blocker.waitingCall(); } } class Task2 implements Runnable { // A separate Blocker object: static Blocker blocker = new Blocker(); public void run() { blocker.waitingCall(); } } public class NotifyVsNotifyAll { public static void main(String[] args) throws Exception { ExecutorService exec = Executors.newCachedThreadPool(); for (int i = 0; i < 5; i++) exec.execute(new Task()); exec.execute(new Task2()); Timer timer = new Timer(); timer.scheduleAtFixedRate(new TimerTask() { boolean prod = true; public void run() { if (prod) { System.out.print(" notify() "); Task.blocker.prod(); prod = false; } else { System.out.print(" notifyAll() "); Task.blocker.prodAll(); prod = true; } } }, 400, 400); // Run every .4 second TimeUnit.SECONDS.sleep(5); // Run for a while... timer.cancel(); System.out.println(" Timer canceled"); TimeUnit.MILLISECONDS.sleep(500); System.out.print("Task2.blocker.prodAll() "); Task2.blocker.prodAll(); TimeUnit.MILLISECONDS.sleep(500); System.out.println(" Shutting down"); exec.shutdownNow(); // Interrupt all tasks } }使用wait()和notifyAll()实现生产者和消费者:一个饭店,有一个厨师和一个服务员,这个服务员必须等待厨师准备好食物,当厨师准备好后就会通知服务员,之后服务员上菜,然后服务员继续等待。
package com.jc.concurrency; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; /** * 简单的生产者消费者例子 * 此例子有点局限因为不能有多线程的生产者、多线程的消费者。 * 这例子仅仅展示如果使用wait()和notify()保证有序 * @author * */ class Meal { private final int orderNum; public Meal(int orderNum) { this.orderNum = orderNum; } public String toString() { return "Meal " + orderNum; } } class WaitPerson implements Runnable { private Restaurant restaurant; public WaitPerson(Restaurant r) { restaurant = r; } public void run() { try { while (!Thread.interrupted()) { synchronized (this) { while (restaurant.meal == null) wait(); // ... for the chef to produce a meal } System.out.println("Waitperson got " + restaurant.meal); synchronized (restaurant.chef) { restaurant.meal = null; restaurant.chef.notifyAll(); // Ready for another } } } catch (InterruptedException e) { System.out.println("WaitPerson interrupted"); } } } class Chef implements Runnable { private Restaurant restaurant; private int count = 0; public Chef(Restaurant r) { restaurant = r; } public void run() { try { while (!Thread.interrupted()) { synchronized (this) { while (restaurant.meal != null) wait(); // ... for the meal to be taken } if (++count == 10) { System.out.println("Out of food, closing"); restaurant.exec.shutdownNow(); } System.out.println("Order up! "); synchronized (restaurant.waitPerson) { restaurant.meal = new Meal(count); restaurant.waitPerson.notifyAll(); } TimeUnit.MILLISECONDS.sleep(100); } } catch (InterruptedException e) { System.out.println("Chef interrupted"); } } } public class Restaurant { Meal meal; ExecutorService exec = Executors.newCachedThreadPool(); WaitPerson waitPerson = new WaitPerson(this); Chef chef = new Chef(this); public Restaurant() { exec.execute(chef); exec.execute(waitPerson); } public static void main(String[] args) { new Restaurant(); } }使用显式锁Lock和Condition对象:
package com.jc.concurrency.waxomatic2; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; /** * 使用显式的Lock和Condition对象来修改WaxOMatic例子 * @author * */ class Car { private Lock lock = new ReentrantLock(); private Condition condition = lock.newCondition(); private boolean waxOn = false; public void waxed() { lock.lock(); try { waxOn = true; // Ready to buff condition.signalAll(); } finally { lock.unlock(); } } public void buffed() { lock.lock(); try { waxOn = false; // Ready for another coat of wax condition.signalAll(); } finally { lock.unlock(); } } public void waitForWaxing() throws InterruptedException { lock.lock(); try { while (waxOn == false) condition.await(); } finally { lock.unlock(); } } public void waitForBuffing() throws InterruptedException { lock.lock(); try { while (waxOn == true) condition.await(); } finally { lock.unlock(); } } } class WaxOn implements Runnable { private Car car; public WaxOn(Car c) { car = c; } public void run() { try { while (!Thread.interrupted()) { System.out.print("Wax On! "); TimeUnit.MILLISECONDS.sleep(200); car.waxed(); car.waitForBuffing(); } } catch (InterruptedException e) { System.out.println("Exiting via interrupt"); } System.out.println("Ending Wax On task"); } } class WaxOff implements Runnable { private Car car; public WaxOff(Car c) { car = c; } public void run() { try { while (!Thread.interrupted()) { car.waitForWaxing(); System.out.print("Wax Off! "); TimeUnit.MILLISECONDS.sleep(200); car.buffed(); } } catch (InterruptedException e) { System.out.println("Exiting via interrupt"); } System.out.println("Ending Wax Off task"); } } public class WaxOMatic2 { public static void main(String[] args) throws Exception { Car car = new Car(); ExecutorService exec = Executors.newCachedThreadPool(); exec.execute(new WaxOff(car)); exec.execute(new WaxOn(car)); TimeUnit.SECONDS.sleep(5); exec.shutdownNow(); } }基于Lock和链表存储结构写的一个消息队列:
package com.jc.framework.queue; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class JcBlockingQueueExecutorService的shutdown{ private JcQueueData head; private JcQueueData tail; private int size = 0; private int maxSize = Integer.MAX_VALUE; private final Lock lock; private final Condition full; private final Condition empty; public JcBlockingQueue() { lock = new ReentrantLock(); full = lock.newCondition(); //角度是生产者 empty = lock.newCondition(); //角度是消费者 } public void enQueue(T t) throws InterruptedException { lock.lock(); if (size == maxSize) { full.await(); } if (head == null) { head = new JcQueueData<>(t, null); tail = head; size++; empty.signalAll(); lock.unlock(); return; } JcQueueData jcQueueData = new JcQueueData<>(t, null); tail.setNext(jcQueueData); tail = jcQueueData; size++; if (size == 1) empty.signalAll(); lock.unlock(); } public T deQueue() throws InterruptedException { lock.lock(); while (head == null) { empty.await(); } T t = head.getData(); if (head.next != null) { JcQueueData next = head.next; head.next = null; head = next; } else { head = null; tail = null; } size--; if(size==maxSize-1) full.signalAll(); lock.unlock(); return t; } public int size() { return size; } private class JcQueueData { private T data; private JcQueueData next; public JcQueueData(T data, JcQueueData next) { this.data = data; this.next = next; } public T getData() { return data; } public void setData(T data) { this.data = data; } public JcQueueData getNext() { return next; } public void setNext(JcQueueData next) { this.next = next; } } } ExecutorService的shutdown方法,这有可能还有工作正在执行或准备执行,这情况下,它只是通知线程池再没有更多任务需要增加到它的内部队列,而且一旦完成所有等待的工作,就应当关闭。
