摘要:接下来就是会把任务提交到队列中给线程池调度处理因为主要关心的是这个线程怎么执行,异常的抛出和处理,所以我们暂时不解析多余的逻辑。
前言
今天遇到了一个bug,现象是,一个任务放入线程池中,似乎“没有被执行”,日志也没有打。
经过本地代码调试之后,发现在任务逻辑的前半段,抛出了NPE,但是代码外层没有try-catch,导致这个异常被吃掉。
这个问题解决起来是很简单的,外层加个try-catch就好了,但是这个异常如果没有被catch,线程池内部逻辑是怎么处理这个异常的呢?这个异常最后会跑到哪里呢?
带着疑问和好奇心,我研究了一下线程池那一块的源码,并且做了以下的总结。
源码分析项目中出问题的代码差不多就是下面这个样子
ExecutorService threadPool = Executors.newFixedThreadPool(3); threadPool.submit(() -> { String pennyStr = null; Double penny = Double.valueOf(pennyStr); ... })
先进到newFixedThreadPool这个工厂方法中看生成的具体实现类,发现是ThreadPoolExecutor
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue()); }
再看这个类的继承关系,
再进到submit方法,这个方法在ExecutorService接口中约定,其实是在AbstractExectorService中实现,ThreadPoolExecutor并没有override这个方法。
public Future> submit(Runnable task) { if (task == null) throw new NullPointerException(); RunnableFutureftask = newTaskFor(task, null); execute(ftask); return ftask; } protected RunnableFuture newTaskFor(Runnable runnable, T value) { return new FutureTask (runnable, value); }
对应的FutureTask对象的构造方法
public FutureTask(Runnable runnable, V result) { this.callable = Executors.callable(runnable, result); this.state = NEW; // state由volatile 修饰 保证多线程下的可见性 }
对应Callable 对象的构造方法
public staticCallable callable(Runnable task, T result) { if (task == null) throw new NullPointerException(); return new RunnableAdapter (task, result); }
对应RunnableAdapter 对象的构造方法
/** * A callable that runs given task and returns given result * 一个能执行所给任务并且返回结果的Callable对象 */ static final class RunnableAdapterimplements Callable { final Runnable task; final T result; RunnableAdapter(Runnable task, T result) { this.task = task; this.result = result; } public T call() { task.run(); return result; } }
总结上面的,newTaskFor就是把我们提交的Runnable 对象包装成了一个Future。
接下来就是会把任务提交到队列中给线程池调度处理:
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command); }
因为主要关心的是这个线程怎么执行,异常的抛出和处理,所以我们暂时不解析多余的逻辑。很容易发现,如果任务要被执行,肯定是进到了addWorker方法当中,所以我们再进去看,鉴于addWorker方法的很长,不想列太多的代码,我就摘了关键代码段:
private boolean addWorker(Runnable firstTask, boolean core) { ... boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { // 实例化一个worker对象 w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { // 从Worker对象的构造方法看,当这个thread对象start之后, // 之后实际上就是调用Worker对象的run() t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; } // Worker的构造方法 Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); }
我们再看这个ThreadPoolExecutor的内部类Worker对象:
private final class Worker extends AbstractQueuedSynchronizer implements Runnable { ... /** Delegates main run loop to outer runWorker */ public void run() { runWorker(this); } ... }
看来真正执行任务的是在这个外部的runWorker当中,让我们再看看这个方法是怎么消费Worker线程的。
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { while (task != null || (task = getTask()) != null) { w.lock(); if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; // ==== 关键代码 start ==== try { // 很简洁明了,调用了任务的run方法 task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } // ==== 关键代码 end ==== } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } }
终于走到底了,可以看到关键代码中的try-catch block代码块中,调用了本次执行任务的run方法。
// ==== 关键代码 start ==== try { // 很简洁明了,调用了任务的run方法 task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } // ==== 关键代码 end ====
可以看到捕捉了异常之后,会再向外抛出,只不过再finally block 中有个afterExecute()方法,似乎在这里是可以处理这个异常信息的,进去看看
protected void afterExecute(Runnable r, Throwable t) { }
可以看到ThreadPoolExecutor#afterExecute()方法中,是什么都没做的,看来是让使用者通过override这个方法来定制化任务执行之后的逻辑,其中可以包括异常处理。
那么这个异常到底是抛到哪里去了呢。我在一个大佬的文章找到了hotSpot JVM处理线程异常的逻辑,
if (!destroy_vm || JDK_Version::is_jdk12x_version()) { // JSR-166: change call from from ThreadGroup.uncaughtException to // java.lang.Thread.dispatchUncaughtException if (uncaught_exception.not_null()) { //如果有未捕获的异常 Handle group(this, java_lang_Thread::threadGroup(threadObj())); { KlassHandle recvrKlass(THREAD, threadObj->klass()); CallInfo callinfo; KlassHandle thread_klass(THREAD, SystemDictionary::Thread_klass()); /* 这里类似一个方法表,实际就会去调用Thread#dispatchUncaughtException方法 template(dispatchUncaughtException_name, "dispatchUncaughtException") */ LinkResolver::resolve_virtual_call(callinfo, threadObj, recvrKlass, thread_klass, vmSymbols::dispatchUncaughtException_name(), vmSymbols::throwable_void_signature(), KlassHandle(), false, false, THREAD); CLEAR_PENDING_EXCEPTION; methodHandle method = callinfo.selected_method(); if (method.not_null()) { JavaValue result(T_VOID); JavaCalls::call_virtual(&result, threadObj, thread_klass, vmSymbols::dispatchUncaughtException_name(), vmSymbols::throwable_void_signature(), uncaught_exception, THREAD); } else { KlassHandle thread_group(THREAD, SystemDictionary::ThreadGroup_klass()); JavaValue result(T_VOID); JavaCalls::call_virtual(&result, group, thread_group, vmSymbols::uncaughtException_name(), vmSymbols::thread_throwable_void_signature(), threadObj, // Arg 1 uncaught_exception, // Arg 2 THREAD); } if (HAS_PENDING_EXCEPTION) { ResourceMark rm(this); jio_fprintf(defaultStream::error_stream(), " Exception: %s thrown from the UncaughtExceptionHandler" " in thread "%s" ", pending_exception()->klass()->external_name(), get_thread_name()); CLEAR_PENDING_EXCEPTION; } } }
代码是C写的,有兴趣可以去全文,根据英文注释能稍微看懂一点
http://hg.openjdk.java.net/jd...
可以看到这里最终会去调用Thread#dispatchUncaughtException方法:
/** * Dispatch an uncaught exception to the handler. This method is * intended to be called only by the JVM. */ private void dispatchUncaughtException(Throwable e) { getUncaughtExceptionHandler().uncaughtException(this, e); }
/** * Called by the Java Virtual Machine when a thread in this * thread group stops because of an uncaught exception, and the thread * does not have a specific {@link Thread.UncaughtExceptionHandler} * installed. * */ public void uncaughtException(Thread t, Throwable e) { if (parent != null) { parent.uncaughtException(t, e); } else { Thread.UncaughtExceptionHandler ueh = Thread.getDefaultUncaughtExceptionHandler(); if (ueh != null) { ueh.uncaughtException(t, e); } else if (!(e instanceof ThreadDeath)) { //可以看到会打到System.err里面 System.err.print("Exception in thread "" + t.getName() + "" "); e.printStackTrace(System.err); } } }
jdk的注释也说明的很清楚了,当一个线程抛出了一个未捕获的异常,JVM会去调用这个方法。如果当前线程没有声明UncaughtExceptionHandler成员变量并且重写uncaughtException方法的时候,就会看线程所属的线程组(如果有线程组的话)有没有这个类,没有就会打到System.err里面。
IBM这篇文章也提倡我们使用ThreadGroup 提供的 uncaughtException 处理程序来在线程异常终止时进行检测。
https://www.ibm.com/developer...总结 (解决方法)
从上述源码分析中可以看到,对于本篇的异常“被吃掉”的问题,有以下几种方法
用try-catch 捕捉,一般都是用这种
线程或者线程组对象设置UncaughtExceptionHandler成员变量
Thread t = new Thread(r); t.setUncaughtExceptionHandler( (t1, e) -> LOGGER.error(t1 + " throws exception: " + e)); return t;
override 线程池的afterExecute方法。
本篇虽然是提出问题的解决方法,但主旨还是分析源码,了解了整个过程中异常的经过的流程,希望能对您产生帮助。
参考https://www.jcp.org/en/jsr/de...
https://www.ibm.com/developer...
http://ifeve.com/%E6%B7%B1%E5...
文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。
转载请注明本文地址:https://www.ucloud.cn/yun/74301.html
摘要:先看写的简略的代码线程池中发现异常,被中断线程池中发现异常,被中断我这是一个订单处理流程,主要用到了一个方法,就是。好了,以上就是对线程池异常捕捉的一个记录。 开发自己的项目有一段时间了,因为是个长时间跑的服务器端程序,所以异常处理显得尤为重要。 对于异常的抓取和日志(狭义上的日志)的分析一点都不能落下。 我们使用了Java自带的Executor模块,我只是稍微看了下Executor...
摘要:进程线程与协程它们都是并行机制的解决方案。选择是任意性的,并在对实现做出决定时发生。线程池的大小一旦达到最大值就会保持不变,如果某个线程因为执行异常而结束,那么线程池会补充一个新线程。此线程池支持定时以及周期性执行任务的需求。 并发与并行的概念 并发(Concurrency): 问题域中的概念—— 程序需要被设计成能够处理多个同时(或者几乎同时)发生的事件 并行(Parallel...
摘要:本文对多线程基础知识进行梳理,主要包括多线程的基本使用,对象及变量的并发访问,线程间通信,的使用,定时器,单例模式,以及线程状态与线程组。源码采用构建,多线程这部分源码位于模块中。通知可能等待该对象的对象锁的其他线程。 本文对多线程基础知识进行梳理,主要包括多线程的基本使用,对象及变量的并发访问,线程间通信,lock的使用,定时器,单例模式,以及线程状态与线程组。 写在前面 花了一周时...
摘要:基础问题的的性能及原理之区别详解备忘笔记深入理解流水线抽象关键字修饰符知识点总结必看篇中的关键字解析回调机制解读抽象类与三大特征时间和时间戳的相互转换为什么要使用内部类对象锁和类锁的区别,,优缺点及比较提高篇八详解内部类单例模式和 Java基础问题 String的+的性能及原理 java之yield(),sleep(),wait()区别详解-备忘笔记 深入理解Java Stream流水...
阅读 1993·2023-04-25 19:03
阅读 1179·2021-10-14 09:42
阅读 3346·2021-09-22 15:16
阅读 889·2021-09-10 10:51
阅读 1464·2021-09-06 15:00
阅读 2366·2019-08-30 15:55
阅读 434·2019-08-29 16:22
阅读 862·2019-08-26 13:49