资讯专栏INFORMATION COLUMN

Java ExecutorService线程池中的小坑——关于线程池中抛出的异常处理

wow_worktile / 2639人阅读

摘要:先看写的简略的代码线程池中发现异常,被中断线程池中发现异常,被中断我这是一个订单处理流程,主要用到了一个方法,就是。好了,以上就是对线程池异常捕捉的一个记录。

开发自己的项目有一段时间了,因为是个长时间跑的服务器端程序,所以异常处理显得尤为重要。
对于异常的抓取和日志(狭义上的日志)的分析一点都不能落下。

我们使用了Java自带的Executor模块,我只是稍微看了下Executors当中三个线程池的实现(策略为:Fixed, Cached, Schedule),其实光看名字就可以了解各自的一些策略信息。OK,这一次我需要一种策略合并Fixed和Cached的两种特点的自定义Executor。其实很简单,给Cached设置一个上线就是了。注意他们的同步队列使用的不同,用LinkedBlockingQueue是个不错的选择,至于BlockingQueue的实现可以自行谷歌(以后再记吧)。

先看写的简略的代码

package com.zjseek.recharge.core;

import com.zjseek.recharge.exception.SKErrorCode;
import com.zjseek.recharge.exception.SKOrderState;
import com.zjseek.recharge.model.OrderModel;
import com.zjseek.recharge.service.OrderService;
import org.apache.log4j.Logger;

import java.sql.Timestamp;
import java.util.concurrent.*;

/**
 * Created by geminiwen on 14-6-28.
 */
public class OrderExceptionThreadExecutor extends ThreadPoolExecutor {
    private Logger logger = Logger.getLogger(OrderExceptionThreadExecutor.class);
    private OrderService orderService;

    public OrderExceptionThreadExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
        init();
    }

    public OrderExceptionThreadExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, ThreadFactory threadFactory) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
        init();
    }

    public OrderExceptionThreadExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, RejectedExecutionHandler handler) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
        init();
    }

    public OrderExceptionThreadExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
        init();
    }

    private void init() {
        this.orderService = new OrderService();
    }

    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        super.afterExecute(r, t);
        Future f = (Future) r;
        try {
            f.get();
        } catch (InterruptedException e) {
            logger.error("线程池中发现异常,被中断", e);
        } catch (ExecutionException e) {
            logger.error("线程池中发现异常,被中断", e);
        }

    }
}

我这是一个订单处理流程,主要用到了一个protected方法,就是afterExecute。一看这个函数的样子,想当然的以为如果线程池中出了问题,异常自然回在第二个参数t中传过来。
也许的确是这样的,但是这里有一个区别。
我们知道ExecutorServcie中执行一个Runnable有两个方法,两个分别是

public void execute(Runnable command);
public  Future submit(Runnable task, T result);

别看接受的参数差不多,其实submit最后是调用的execute的,而且在调用execute前,对task进行了一次封装,变成了RunnableFuture(它是接口,继承了RunnableFuture实际是一个实现类FutureTask)。

OK,对于实际操作Runnable的不同,暂时说到这,看下execute方法做了什么事
execute方法对进来的Runnable又包装成了worker然后进入runWorker
runWorker方法中有这么几行

try {
    beforeExecute(wt, task);
    Throwable thrown = null;
    try {
       task.run();
    } catch (RuntimeException x) {
       thrown = x; throw x;
    } catch (Error x) {
       thrown = x; throw x;
    } catch (Throwable x) {
       thrown = x; throw new Error(x);
    } finally {
       afterExecute(task, thrown);
    }
} finally {
    task = null;
    w.completedTasks++;
    w.unlock();
}

好了,到了最关键的afterExecute这个步骤,我满心以为这里所有的异常都会通过thrown传递进来,看来我还是太年轻了,之前我们分析过,这个Runnable已经被submit封装成了FutureTask,那么这个task.run()除了我们自己定义的run任务之外,到底还干了啥呢?

public void run() {
    if (state != NEW ||
        !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                     null, Thread.currentThread()))
        return;
    try {
        Callable c = callable;
        if (c != null && state == NEW) {
            V result;
            boolean ran;
            try {
                result = c.call();
                ran = true;
            } catch (Throwable ex) {
                result = null;
                ran = false;
                setException(ex);
            }
            if (ran)
                set(result);
        }
    } finally {
        // runner must be non-null until state is settled to
        // prevent concurrent calls to run()
        runner = null;
        // state must be re-read after nulling runner to prevent
        // leaked interrupts
        int s = state;
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
}

OK,这段源码摘自FutureTask中的run方法,实际我们自己定义的任务已经变成了Callable:

   public FutureTask(Runnable runnable, V result) {
        this.callable = Executors.callable(runnable, result);
        this.state = NEW;       // ensure visibility of callable
    }

从它的构造函数就可以看出来。

然后我们在上面实际运行task的地方其实是c.call()这一句。

  

result = c.call();

我们写的任务全部在这句代码里面执行完毕了,看看外面都wrap了啥? OK 我们所有的Throwable全部已经被setException吃掉了,怎么还会抛出到外面那层的execute中呢?
所以我之前实验的时候,在submit中提交任务无论任务怎么抛异常,在afterExecute中的第二个参数是取不到的,原因就在这。

再回头看看针对submit改造的函数

protected void afterExecute(Runnable r, Throwable t) {
    super.afterExecute(r, t);
    Future f = (Future) r;
    try {
        f.get();
    } catch (InterruptedException e) {
        logger.error("线程池中发现异常,被中断", e);
    } catch (ExecutionException e) {
        logger.error("线程池中发现异常,被中断", e);
    }

}

当然,这里已经默认r是实现Future接口了。通过FutureTask的get方法,能把刚刚setException中的异常给抛出来,这样我们就能真的拿到这些异常了。

结论

如果我们关心线程池执行的结果,则需要使用submit来提交task,那么在afterExecute中对异常的处理也需要通过Future接口调用get方法去取结果,才能拿到异常,如果我们不关心这个任务的结果,可以直接使用ExecutorService中的execute方法(实际是继承Executor接口)来直接去执行任务,这样的话,我们的Runnable没有经过多余的封装,在runWorker中得到的异常也直接能在afterExecute中捕捉。

好了,以上就是对线程池异常捕捉的一个记录。想想应该不难,今天也是偶然机会看到的。今天在开发中碰到PHP锁的问题,头疼死了。

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/64138.html

相关文章

  • Java 并发学习笔记

    摘要:方法可以将当前线程放入等待集合中,并释放当前线程持有的锁。此后,该线程不会接收到的调度,并进入休眠状态。该线程会唤醒,并尝试恢复之前的状态。 并发 最近重新复习了一边并发的知识,发现自己之前对于并发的了解只是皮毛。这里总结以下Java并发需要掌握的点。 使用并发的一个重要原因是提高执行效率。由于I/O等情况阻塞,单个任务并不能充分利用CPU时间。所以在单处理器的机器上也应该使用并发。为...

    DrizzleX 评论0 收藏0
  • 任务异常导致线程中的线程变为waiting状态

    摘要:通过搜索引擎了解到以下观点提交到线程池的任务如果抛出异常会导致线程挂掉,遂将提交到线程池的任务中可能出现的异常进行了处理,确实解决了问题。 背景 项目中存在一些定时任务来更新数据库表,借助了线程池提供的一些能力,线上环境偶尔会出现网络波动导致服务实例无法连上数据库,只要出现了这种情况,就会导致数据不会再被更新,通过一些命令发现更新数据库的线程池中的所有线程都处于waiting状态。通过...

    fyber 评论0 收藏0
  • Java线程池简单总结

    摘要:本文主要内容为简单总结中线程池的相关信息。方法簇方法簇用于创建固定线程数的线程池。三种常见线程池的对比上文总结了工具类创建常见线程池的方法,现对三种线程池区别进行比较。 概述 线程可认为是操作系统可调度的最小的程序执行序列,一般作为进程的组成部分,同一进程中多个线程可共享该进程的资源(如内存等)。在单核处理器架构下,操作系统一般使用分时的方式实现多线程;在多核处理器架构下,多个线程能够...

    CoorChice 评论0 收藏0
  • 美团面试题:Java-线程池 ThreadPool 专题详解

    摘要:去美团面试,问到了什么是线程池,如何使用,为什么要用以下做个总结。二线程池线程池的作用线程池作用就是限制系统中执行线程的数量。真正的线程池接口是。创建固定大小的线程池。此线程池支持定时以及周期性执行任务的需求。 去美团面试,问到了什么是线程池,如何使用,为什么要用,以下做个总结。关于线程之前也写过一篇文章《高级面试题总结—线程池还能这么玩?》 1、什么是线程池:  java.util...

    enrecul101 评论0 收藏0
  • 美团面试题:Java-线程池 ThreadPool 专题详解

    摘要:去美团面试,问到了什么是线程池,如何使用,为什么要用以下做个总结。二线程池线程池的作用线程池作用就是限制系统中执行线程的数量。真正的线程池接口是。创建固定大小的线程池。此线程池支持定时以及周期性执行任务的需求。 去美团面试,问到了什么是线程池,如何使用,为什么要用,以下做个总结。关于线程之前也写过一篇文章《高级面试题总结—线程池还能这么玩?》 1、什么是线程池:  java.util...

    wujl596 评论0 收藏0

发表评论

0条评论

wow_worktile

|高级讲师

TA的文章

阅读更多
最新活动
阅读需要支付1元查看
<