资讯专栏INFORMATION COLUMN

【开发小记】 Java 线程池 之 被“吃掉”的线程异常(附源码分析和解决方法)

Soarkey / 2378人阅读

摘要:接下来就是会把任务提交到队列中给线程池调度处理因为主要关心的是这个线程怎么执行,异常的抛出和处理,所以我们暂时不解析多余的逻辑。

前言

今天遇到了一个bug,现象是,一个任务放入线程池中,似乎“没有被执行”,日志也没有打。

经过本地代码调试之后,发现在任务逻辑的前半段,抛出了NPE,但是代码外层没有try-catch,导致这个异常被吃掉。

这个问题解决起来是很简单的,外层加个try-catch就好了,但是这个异常如果没有被catch,线程池内部逻辑是怎么处理这个异常的呢?这个异常最后会跑到哪里呢?

带着疑问和好奇心,我研究了一下线程池那一块的源码,并且做了以下的总结。

源码分析

项目中出问题的代码差不多就是下面这个样子

ExecutorService threadPool = Executors.newFixedThreadPool(3);

threadPool.submit(() -> {
    String pennyStr = null;
    Double penny = Double.valueOf(pennyStr);
    ...
})

先进到newFixedThreadPool这个工厂方法中看生成的具体实现类,发现是ThreadPoolExecutor

public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue());
    }

再看这个类的继承关系,

再进到submit方法,这个方法在ExecutorService接口中约定,其实是在AbstractExectorService中实现,ThreadPoolExecutor并没有override这个方法。

 public Future submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture ftask = newTaskFor(task, null);
        execute(ftask);
        return ftask;
    }

protected  RunnableFuture newTaskFor(Runnable runnable, T value) {
        return new FutureTask(runnable, value);
    }

对应的FutureTask对象的构造方法

public FutureTask(Runnable runnable, V result) {
        this.callable = Executors.callable(runnable, result);
        this.state = NEW;       // state由volatile 修饰 保证多线程下的可见性
    }

对应Callable 对象的构造方法

public static  Callable callable(Runnable task, T result) {
        if (task == null)
            throw new NullPointerException();
        return new RunnableAdapter(task, result);
    }

对应RunnableAdapter 对象的构造方法

 /**
     * A callable that runs given task and returns given result
     * 一个能执行所给任务并且返回结果的Callable对象
     */
    static final class RunnableAdapter implements Callable {
        final Runnable task;
        final T result;
        RunnableAdapter(Runnable task, T result) {
            this.task = task;
            this.result = result;
        }
        public T call() {
            task.run();
            return result;
        }
    }

总结上面的,newTaskFor就是把我们提交的Runnable 对象包装成了一个Future

接下来就是会把任务提交到队列中给线程池调度处理:

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
    
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }

因为主要关心的是这个线程怎么执行,异常的抛出和处理,所以我们暂时不解析多余的逻辑。很容易发现,如果任务要被执行,肯定是进到了addWorker方法当中,所以我们再进去看,鉴于addWorker方法的很长,不想列太多的代码,我就摘了关键代码段:

private boolean addWorker(Runnable firstTask, boolean core) {

   ...
   boolean workerStarted = false;
   boolean workerAdded = false;
   Worker w = null;
   try {
      // 实例化一个worker对象
      w = new Worker(firstTask);
      final Thread t = w.thread;
      if (t != null) {
          final ReentrantLock mainLock = this.mainLock;
          mainLock.lock();
          try {
            
              int rs = runStateOf(ctl.get());

              if (rs < SHUTDOWN ||
                  (rs == SHUTDOWN && firstTask == null)) {
                  if (t.isAlive()) // precheck that t is startable
                      throw new IllegalThreadStateException();
                  workers.add(w);
                  int s = workers.size();
                  if (s > largestPoolSize)
                      largestPoolSize = s;
                  workerAdded = true;
              }
          } finally {
              mainLock.unlock();
          }
          if (workerAdded) {
              // 从Worker对象的构造方法看,当这个thread对象start之后,
              // 之后实际上就是调用Worker对象的run()
              t.start();
              workerStarted = true;
          }
      }
   } finally {
      if (! workerStarted)
          addWorkerFailed(w);
   }
   return workerStarted;
}

// Worker的构造方法
  Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }
 

我们再看这个ThreadPoolExecutor的内部类Worker对象:

private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
   {
        ...

        /** Delegates main run loop to outer runWorker  */
        public void run() {
            runWorker(this);
        }

      ...
   }

看来真正执行任务的是在这个外部的runWorker当中,让我们再看看这个方法是怎么消费Worker线程的。

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        while (task != null || (task = getTask()) != null) {
            w.lock();
   
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                beforeExecute(wt, task);
                Throwable thrown = null;
                // ==== 关键代码 start ====
                try {
                    // 很简洁明了,调用了任务的run方法
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
                 // ==== 关键代码 end ====
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

终于走到底了,可以看到关键代码中的try-catch block代码块中,调用了本次执行任务的run方法。

// ==== 关键代码 start ====
try {
  // 很简洁明了,调用了任务的run方法
  task.run();
} catch (RuntimeException x) {
  thrown = x; throw x;
} catch (Error x) {
  thrown = x; throw x;
} catch (Throwable x) {
  thrown = x; throw new Error(x);
} finally {
  afterExecute(task, thrown);
}
// ==== 关键代码 end ====

可以看到捕捉了异常之后,会再向外抛出,只不过再finally block 中有个afterExecute()方法,似乎在这里是可以处理这个异常信息的,进去看看

protected void afterExecute(Runnable r, Throwable t) { }

可以看到ThreadPoolExecutor#afterExecute()方法中,是什么都没做的,看来是让使用者通过override这个方法来定制化任务执行之后的逻辑,其中可以包括异常处理。

那么这个异常到底是抛到哪里去了呢。我在一个大佬的文章找到了hotSpot JVM处理线程异常的逻辑,

if (!destroy_vm || JDK_Version::is_jdk12x_version()) {
    // JSR-166: change call from from ThreadGroup.uncaughtException to
    // java.lang.Thread.dispatchUncaughtException
    if (uncaught_exception.not_null()) {
      //如果有未捕获的异常
      Handle group(this, java_lang_Thread::threadGroup(threadObj()));
      {
        KlassHandle recvrKlass(THREAD, threadObj->klass());
        CallInfo callinfo;
        KlassHandle thread_klass(THREAD, SystemDictionary::Thread_klass());
        /*  
         这里类似一个方法表,实际就会去调用Thread#dispatchUncaughtException方法
         template(dispatchUncaughtException_name,            "dispatchUncaughtException")                
        */
        LinkResolver::resolve_virtual_call(callinfo, threadObj, recvrKlass, thread_klass,
                                           vmSymbols::dispatchUncaughtException_name(),
                                           vmSymbols::throwable_void_signature(),
                                           KlassHandle(), false, false, THREAD);
        CLEAR_PENDING_EXCEPTION;
        methodHandle method = callinfo.selected_method();
        if (method.not_null()) {
          JavaValue result(T_VOID);
          JavaCalls::call_virtual(&result,
                                  threadObj, thread_klass,
                                  vmSymbols::dispatchUncaughtException_name(),
                                  vmSymbols::throwable_void_signature(),
                                  uncaught_exception,
                                  THREAD);
        } else {
          KlassHandle thread_group(THREAD, SystemDictionary::ThreadGroup_klass());
          JavaValue result(T_VOID);
          JavaCalls::call_virtual(&result,
                                  group, thread_group,
                                  vmSymbols::uncaughtException_name(),
                                  vmSymbols::thread_throwable_void_signature(),
                                  threadObj,           // Arg 1
                                  uncaught_exception,  // Arg 2
                                  THREAD);
        }
        if (HAS_PENDING_EXCEPTION) {
          ResourceMark rm(this);
          jio_fprintf(defaultStream::error_stream(),
                "
Exception: %s thrown from the UncaughtExceptionHandler"
                " in thread "%s"
",
                pending_exception()->klass()->external_name(),
                get_thread_name());
          CLEAR_PENDING_EXCEPTION;
        }
      }
    }

代码是C写的,有兴趣可以去全文,根据英文注释能稍微看懂一点

http://hg.openjdk.java.net/jd...

可以看到这里最终会去调用Thread#dispatchUncaughtException方法:

/**
     * Dispatch an uncaught exception to the handler. This method is
     * intended to be called only by the JVM.
     */
    private void dispatchUncaughtException(Throwable e) {
        getUncaughtExceptionHandler().uncaughtException(this, e);
    }
/**
 * Called by the Java Virtual Machine when a thread in this
 * thread group stops because of an uncaught exception, and the thread
 * does not have a specific {@link Thread.UncaughtExceptionHandler}
 * installed.
 *
 */
public void uncaughtException(Thread t, Throwable e) {
        if (parent != null) {
            parent.uncaughtException(t, e);
        } else {
            Thread.UncaughtExceptionHandler ueh =
                Thread.getDefaultUncaughtExceptionHandler();
            if (ueh != null) {
                ueh.uncaughtException(t, e);
            } else if (!(e instanceof ThreadDeath)) {
               //可以看到会打到System.err里面
                System.err.print("Exception in thread ""
                                 + t.getName() + "" ");
                e.printStackTrace(System.err);
            }
        }
    }

jdk的注释也说明的很清楚了,当一个线程抛出了一个未捕获的异常,JVM会去调用这个方法。如果当前线程没有声明UncaughtExceptionHandler成员变量并且重写uncaughtException方法的时候,就会看线程所属的线程组(如果有线程组的话)有没有这个类,没有就会打到System.err里面。

IBM这篇文章也提倡我们使用ThreadGroup 提供的 uncaughtException 处理程序来在线程异常终止时进行检测。

https://www.ibm.com/developer...
总结 (解决方法)

从上述源码分析中可以看到,对于本篇的异常“被吃掉”的问题,有以下几种方法

用try-catch 捕捉,一般都是用这种

线程或者线程组对象设置UncaughtExceptionHandler成员变量

  Thread t = new Thread(r);
            t.setUncaughtExceptionHandler(
                (t1, e) -> LOGGER.error(t1 + " throws exception: " + e));
            return t;

override 线程池的afterExecute方法。

本篇虽然是提出问题的解决方法,但主旨还是分析源码,了解了整个过程中异常的经过的流程,希望能对您产生帮助。

参考

https://www.jcp.org/en/jsr/de...

https://www.ibm.com/developer...

http://ifeve.com/%E6%B7%B1%E5...

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/74301.html

相关文章

  • Java ExecutorService线程小坑——关于线程中抛出异常处理

    摘要:先看写的简略的代码线程池中发现异常,被中断线程池中发现异常,被中断我这是一个订单处理流程,主要用到了一个方法,就是。好了,以上就是对线程池异常捕捉的一个记录。 开发自己的项目有一段时间了,因为是个长时间跑的服务器端程序,所以异常处理显得尤为重要。 对于异常的抓取和日志(狭义上的日志)的分析一点都不能落下。 我们使用了Java自带的Executor模块,我只是稍微看了下Executor...

    wow_worktile 评论0 收藏0
  • 线程小记

    摘要:死亡状态有两个原因会导致线程死亡方法正常退出而自然死亡。一个未捕获的异常终止了方法而使线程猝死。注意,放入的线程不必担心其结束,超过不活动,其会自动被终止。线程间相互干扰描述了当多个线程访问共享数据时可能出现的错误。 线程 进程与线程的区别 线程是指进程内的一个执行单元,也是进程内的可调度实体。一个程序至少有一个进程,一个进程至少有一个线程。 线程的五大状态 新建状态(New):例如...

    suxier 评论0 收藏0
  • Java 并发方案全面学习总结

    摘要:进程线程与协程它们都是并行机制的解决方案。选择是任意性的,并在对实现做出决定时发生。线程池的大小一旦达到最大值就会保持不变,如果某个线程因为执行异常而结束,那么线程池会补充一个新线程。此线程池支持定时以及周期性执行任务的需求。 并发与并行的概念 并发(Concurrency): 问题域中的概念—— 程序需要被设计成能够处理多个同时(或者几乎同时)发生的事件 并行(Parallel...

    mengera88 评论0 收藏0
  • Java线程核心技术梳理(源码)

    摘要:本文对多线程基础知识进行梳理,主要包括多线程的基本使用,对象及变量的并发访问,线程间通信,的使用,定时器,单例模式,以及线程状态与线程组。源码采用构建,多线程这部分源码位于模块中。通知可能等待该对象的对象锁的其他线程。 本文对多线程基础知识进行梳理,主要包括多线程的基本使用,对象及变量的并发访问,线程间通信,lock的使用,定时器,单例模式,以及线程状态与线程组。 写在前面 花了一周时...

    Winer 评论0 收藏0
  • 后台开发常问面试题集锦(问题搬运工,链接)

    摘要:基础问题的的性能及原理之区别详解备忘笔记深入理解流水线抽象关键字修饰符知识点总结必看篇中的关键字解析回调机制解读抽象类与三大特征时间和时间戳的相互转换为什么要使用内部类对象锁和类锁的区别,,优缺点及比较提高篇八详解内部类单例模式和 Java基础问题 String的+的性能及原理 java之yield(),sleep(),wait()区别详解-备忘笔记 深入理解Java Stream流水...

    spacewander 评论0 收藏0

发表评论

0条评论

最新活动
阅读需要支付1元查看
<