资讯专栏INFORMATION COLUMN

Java Executors 源码分析

itvincent / 2131人阅读

摘要:表示一个异步任务的结果,就是向线程池提交一个任务后,它会返回对应的对象。它们分别提供两个重要的功能阻塞当前线程等待一段时间直到完成或者异常终止取消任务。此时,线程从中返回,然后检查当前的状态已经被改变,随后退出循环。

0 引言

前段时间需要把一个C++的项目port到Java中,因此时隔三年后重新熟悉了下Java。由于需要一个通用的线程池,自然而然就想到了Executors。

用了后,感觉很爽... 于是忍不住抠了下源码。因此就有了这篇学习笔记。

言归正传,Java Executor是一个功能丰富,接口设计很好的,基于生产者-消费者模式的通用线程池。这种线程池的设计思想也在很多地方被应用。

在这篇文章中,我并不打算介绍java线程池的使用,生产者-消费者模式,并发编程基本概念等。

通常来说,一个线程池的实现包括四个部分:

执行任务的线程

用于封装任务的task对象

存储任务的数据结构

线程池本身

1 Thread

Thread 并不是concurrent包的一部分。Thread包含着name, priority等成员和对应的操作方法。

它是继承自runable的,也就是说线程的入口函数是run。它的继承体系和重要操作函数如下图:

它实现了一系列包括sleep, yield等静态方法。以及获取当前线程的静态方法currentThread()。这些都是native方法。

值得注意的是它的中断机制(虽然它也实现了suspend和resume方法,但是这两个方法已被弃用):

通过调用interrupt来触发一个中断

isInterrupted() 用来查询线程的中断状态

interrupted() 用来查询并清除线程的中断状态

public void interrupt() {
    if (this != Thread.currentThread())
        checkAccess();
    synchronized (blockerLock) {
        Interruptible b = blocker;
        if (b != null) {
            interrupt0();           // Just to set the interrupt flag
            b.interrupt(this);
            return;
        }
    }
    interrupt0();
}

在默认的情况下,blocker (Interruptible 成员变量)的值为null, 这时调用interrupt,仅仅是调用interrupt0设置一个标志位。

而如果blocker的值不为null,则会调用其interrupt方法实现真正的中断。

(关于blocker值何时被设置,在后面会看到一个使用场景。)

当线程处于可中断的阻塞状态时,比如说阻塞在sleep, wait, join,select等操作时,调用interrupt方法会让线程从阻塞状态退出,并抛出InterruptedException。

值得注意的一点是:interrupt让我们从阻塞的方法中退出,但线程的中断状态却并不会被设置

try {
    Thread.sleep(10);
}
catch (InterruptedException e) {
    System.out.println("IsInterrupted: " + Thread.currentThread().isInterrupted());
}

如上述示例代码,此时你得到的输出是: IsInterrupted : false 。这是一个有点令人意外的地方。
上述代码并不是一个好的示例,因为interrupt被我们“吃”掉了!除非你明确的知道这是你想要的。否则的话请考虑在异常捕获中(catch段中)加上:

Thread.currentThread.interrupt();
2. Task

Java可执行的接口类有两种,Runnable和Callable,它们的区别是Callable可以带返回值,一个需要实现Run()方法,另一个需要实现带返回值的Call() 方法。

在java.util.concurret中还有另外一个接口类Future。

Future表示一个异步任务的结果,就是user code向线程池提交一个任务后,它会返回对应的 Future对象。用以观察任务执行的状态(isCancelled, isDone),取消任务(Cancel)或者等待任务执行(get, timeout get)。

如上图,RunnableFuture是一个中间类,它将Runnable和Future的功能糅合到一起。FutureTask 则是真正的实现。

FutureTask

FutureTask可以从一个Runnable和Callable构造,当通过Runnable构造时,它会调用Excutors.callable接口将其转为Callable对象保存起来。

从上面的类图中可以看出,FutureTask除了简单的状态查询等接口外,还具有两个重要的接口:get()get(long timeout, TimeUnit unit)), cancel(bool mayInterruptIfRunning)

它们分别提供两个重要的功能:阻塞(当前线程)等待(一段时间)直到task完成或者异常终止;取消任务。

任务取消

一个任务具有三种状态:尚未运行,正在运行,已经执行完毕。

在调用cancel后,如果任务处于已经执行完毕了,则不需要做任何事情直接返回;

如果任务尚未运行,将其状态设为cancelled;
如果任务正在执行,而且user以cancel(true)的方式取消这个任务。那么FutureTask会通过调用Thread.interrupt来终止当前任务。

public boolean cancel(boolean mayInterruptIfRunning) {
    // 任务已经完成或者被中断等其他状态
    if (state != NEW)
        return false;
    if (mayInterruptIfRunning) {
        // 正在运行,或者尚未运行
        if (!UNSAFE.compareAndSwapInt(this, stateOffset, NEW, INTERRUPTING))
            return false;
        Thread t = runner;
        if (t != null)
            t.interrupt();
        UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); // final state
    }
    // 设置cancel标志位
    else if (!UNSAFE.compareAndSwapInt(this, stateOffset, NEW, CANCELLED))
        return false;
    finishCompletion();
    return true;
}

注意到: FutureTask并没有一个RUNNING的状态来标识该任务正在执行。正常的情况下,任务从开始创建直到运行完毕,这段过程的状态都是NEW。

阻塞等待

user code可以调用get() 接口等待任务完成或者调用get(long, TimeUnit)等待一段时间。但get()接口被调用,当前的线程将被挂起,直到条件满足(任务完成或者异常退出)。

在前文中我们了解到,Thread并没有提供挂起和阻塞的方法。在这里,Java利用LockSupport类来实现目的。(我猜测其中用了类似条件变量的方法来实现)。

park

LockSupport也属于concurrent。FutureTask利用它的park (parkNanos)和unpark方法来实现线程的挂起和恢复:


public static void park(Object blocker) { Thread t = Thread.currentThread(); setBlocker(t, blocker); unsafe.park(false, 0L); setBlocker(t, null); } public static void unpark(Thread thread) { if (thread != null) unsafe.unpark(thread); }

其中parkNanos跟park方法并无本质区别,只是多了一个timeout参数。FutureTask分别用它们来实现get和timeout的get。

注意到上面的setBlocker方法了吗?没错,它就是给在上文Thread.interrupt方法中出现过的Thread成员变量blocker赋值。从这我们可以看出,它是可中断的。

而它真正实现挂起的则是依赖unsafe类。unsafe类在concurrent中频繁出现,但sun去并不建议使用它。

它除了提供park,unpark方法外,还提供了一些内存和同步原语。比如CAS等。

多个等待者

调用get()的线程可以是一个,也可以是多个。为了能够在恰当的时机将它们一一恢复,FutureTask内部需要维护一个链表来记录所有的等待线程:waiters.

static final class WaitNode {
    volatile Thread thread;
    volatile WaitNode next;
    WaitNode() { thread = Thread.currentThread(); }
}
get 全貌

至此,我们终于了解get的全貌了。get会调用awaitDone方法来实现阻塞。当然,只有两个状态需要处理:NEW, COMPLETING。

NEW的状态在前文已经有介绍过。COMPLETING状态通常持续较短,在FutureTask 内部的callable 的call方法调用完毕后,会需要将call的返回值设置到outcome这个成员变量。随后将状态设为NORMAL。这期间的状态就是COMPLETING。

显而易见,对于这种状态,我们只需要调用yield让出线程资源,使得FutureTask完成这一过程即可。


private int awaitDone(boolean timed, long nanos) throws InterruptedException { final long deadline = timed ? System.nanoTime() + nanos : 0L; WaitNode q = null; boolean queued = false; for (;;) { if (Thread.interrupted()) { removeWaiter(q); throw new InterruptedException(); } int s = state; if (s > COMPLETING) { // 1 if (q != null) q.thread = null; return s; } else if (s == COMPLETING) // cannot time out yet 2 Thread.yield(); else if (q == null) // 3 q = new WaitNode(); else if (!queued) // 4 queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q); else if (timed) { // 5 nanos = deadline - System.nanoTime(); if (nanos <= 0L) { removeWaiter(q); return state; } LockSupport.parkNanos(this, nanos); } else // 6 LockSupport.park(this); } }

当任务处于NEW状态正在被执行时,其他线程调用get而进入awaitdone函数。

此时的流程是 3 -> 4 -> 5 或者 3 -> 4 -> 6。

它会首先分配一个WaitNode对象 --> 把它插入到waiters链表的表头 --> 然后开始等待。那么park函数何时返回呢?

对应的unpark被调用(或者在这之前已经被调用)

如果设置了timeout的,会在时间到达后退出。

被中断。

其他异常。

等待线程恢复

当任务执行完毕(或者被cancel)时,FutureTask会调用最终调用finishcompletion,改函数会改变FutureTask状态,并调用LockSupport.unpark方法。

此时,awaitDone线程从park中返回,然后检查当前的状态已经被改变,随后退出for循环。

线程安全

FutureTask是会被多个线程访问的,涉及到临界区的保护,但是其内部却并没有任何的锁操作。而在该类定义的末尾,有这样的代码。

private static final sun.misc.Unsafe UNSAFE;
private static final long stateOffset;
private static final long runnerOffset;
private static final long waitersOffset;
static {
    try {
        UNSAFE = sun.misc.Unsafe.getUnsafe();
        Class k = FutureTask.class;
        stateOffset = UNSAFE.objectFieldOffset
            (k.getDeclaredField("state"));
        runnerOffset = UNSAFE.objectFieldOffset
            (k.getDeclaredField("runner"));
        waitersOffset = UNSAFE.objectFieldOffset
            (k.getDeclaredField("waiters"));
    } catch (Exception e) {
        throw new Error(e);
    }
}

这段代码会在类被加载时执行一次。注意到它利用getDeclaredField反射机制来保存了三个offset:
stateOffset,runnerOffset,waitersOffset分别对应着state,runner,waiters这三个成员的偏移量。

FutureTask真是对这三个成员变量进行CAS操作来保证原子性和无锁化的。实现CAS的类正是上文出现过的sun.misc.Unsafe类。

UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())

第一个参数是对象指针,第二个是偏移量,第三个是旧值,最后一个是新值。详细可参考Unsafe文档。

3. BlockingQueue

java实现了生产者-消费者模式的队列。由于队列的容量有限,因此涉及到在队列为空的时候取task和在队列已满的时候存task的策略,连同一系列的查询函数一起,BlockingQueue包含着11个静态方法。

BlockingQueue只是一个interface,它的实现类包括链表方式的LinkedBlockingQueue 、数组方式的ArrayBlockingQueue以及PriorityBlockingQueue等。

LinkedBlockingQueue

下面以LinkedBlockingQueue为例来了解一下它的实现。

LinkedBlockingQueue是一个FIFO的队列,它真正用来存储元素的节点类型是Node :

static class Node {
    E item;
    Node next;
    Node(E x) { item = x; }
}

对应的,在LinkedBlockingQueue中保存了头节点和尾节点 :

/**
 * Head of linked list.
 * Invariant: head.item == null
 */
private transient Node head;
/**
 * Tail of linked list.
 * Invariant: last.next == null
 */
private transient Node last;

在LinkedBlockingQueue中,Java使用了双锁机制,分别对头节点和尾节点加锁。这样取和存的操作就可以同时进行了。


/** Lock held by take, poll, etc */ private final ReentrantLock takeLock = new ReentrantLock(); /** Wait queue for waiting takes */ private final Condition notEmpty = takeLock.newCondition(); /** Lock held by put, offer, etc */ private final ReentrantLock putLock = new ReentrantLock(); /** Wait queue for waiting puts */ private final Condition notFull = putLock.newCondition();

以take为例,获取并移除此队列的头部,在元素变得可用之前一直等待(可被打断)。


public E take() throws InterruptedException { E x; int c = -1; final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; takeLock.lockInterruptibly(); try { while (count.get() == 0) { notEmpty.await(); } x = dequeue(); c = count.getAndDecrement(); if (c > 1) notEmpty.signal(); } finally { takeLock.unlock(); } if (c == capacity) signalNotFull(); return x; }

它将会一直阻塞在notEmpty.await()上,直到信号到达或者被中断。注意到它只需要对takeLock加锁,而无需对putLock加锁。

相应的,put操作也只需要锁上putLock就可以了。

有的操作则需要两个锁都锁上,比如说remove,因为我们不确定要删除的元素的位置。

public boolean remove(Object o) {
    if (o == null) return false;
    fullyLock();
    try {
        for (Node trail = head, p = trail.next;
             p != null;
             trail = p, p = p.next) {
            if (o.equals(p.item)) {
                unlink(p, trail);
                return true;
            }
        }
        return false;
    } finally {
        fullyUnlock();
    }
}

可以看到LinkedBlockingQueue 并没有直接调用lock,而是通过fullyLock和fullyUnLock来加解锁以保证一致性,避免死锁:


/** * Lock to prevent both puts and takes. */ void fullyLock() { putLock.lock(); takeLock.lock(); } /** * Unlock to allow both puts and takes. */ void fullyUnlock() { takeLock.unlock(); putLock.unlock(); }

当然,双锁队列在插入第一个元素和最后一个元素出队的时候会有冲突。这里的解决办法是加了一个哨兵,开始的时候,头尾节点都指向这个哨兵,在随后的操作中,头结点始终指向哨兵,而尾节点指向真正有效的值。

4. Executors 类结构

有了前面这些零件,我们就可以开始组装线程池对象了。java里面Executors的真正实现类主要包括两个ThreadPollExecutors和ScheduledThreadPoolExecutor。其中ScheduledThreadPoolExecutor通过实现其基类ScheduledExecutorService扩展了ThreadPoolExecutor类。

SheduledExecutorsService主要用于执行周期性的或者定时的任务。其他情况下我们更多使用ThreadPoolExecutor。

ThreadPoolExecutor

ThreadPoolExecutor总共有七个构造参数:

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {

从其注释和参数名不难猜测各个参数的用途。唯一有点麻烦的是corePoolSize, maximumPoolSize这两个参数的区别。你可以参考这里或者这里。

但大多数情况我们并不需要直接调用它的构造函数,在Executors里面定义了一系列的静态方法供我们使用。包括newFixedThreadPool、newSingleThreadExecutor等。

由于ThreadPoolExecutor是一个通用的线程池,因此它需要为各种各样的情况预留足够的接口。ThreadPoolExecutor除了提供丰富的接口外,还提供了一些“什么都不做”的函数,为user预留接口。
比如每个任务在执行之前会调用beforeExecute,执行完毕后又会调用afterExecute。又比如terminate用来通知用户代码该线程将要结束。

这些接口java都提供了及其丰富的文档。

Executor接口设计的目的或许也在于此,为简单的情况提供尽量简单的使用方法,同时为复杂的情况或者说高级用户提供足够多的接口。

一个不用担心的问题

在最初使用ThreadPoolExecutor 时候,用到FutrueTask的cancel接口,我总是担心一个问题:

由于cancel是依赖线程的interrupt方法来实现的,也就是说cancel的状态保持在线程中而不是task中。那么当这个线程执行下一个task会不会被影响?为了验证这一点,我做了个小小的实验:

public class InterruptTest
{
    public static class MyTask implements Runnable {
        @Override
        public void run() {
            System.out.println(Thread.currentThread());
            System.out.println("before interrupt " + Thread.currentThread().isInterrupted());
            Thread.currentThread().interrupt();

            System.out.println("after interrupt " + Thread.currentThread().isInterrupted());
        }
    }
    public static void main(String[] str)
    {
        ExecutorService service = Executors.newFixedThreadPool(1);
        // MyTask task1 = new MyTask();
        Future future1 = service.submit(new InterruptTest.MyTask());
        Future future2 = service.submit(new InterruptTest.MyTask());
    }
}  

输出结果说明,我的担心是多余的:

Thread[pool-1-thread-1,5,main]
before interrupt false
after interrupt true
Thread[pool-1-thread-1,5,main]
before interrupt false
after interrupt true

其关键代码就在ThreadPoolExecutor.runWorker 方法中,线程的中断状态会被清除(shutDown例外)。

final void runWorker(Worker w) {
     ...
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
    ...
}

参见 SO 的提问

其中Executors还有很多的东西,但是看看文章的长度,我决定把那些关于Executors的笔记先“藏”起来。

如果感兴趣的可以翻看源码: ThreadFactory, RejectHandler, worker, task, shutDown策略,锁机制... 看看ThreadPoolExecutor 把这些积木堆成一个房子的吧。

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/65307.html

相关文章

  • 使用 Executors,ThreadPoolExecutor,创建线程池,源码分析理解

    摘要:源码分析创建可缓冲的线程池。源码分析使用创建线程池源码分析的构造函数构造函数参数核心线程数大小,当线程数,会创建线程执行最大线程数,当线程数的时候,会把放入中保持存活时间,当线程数大于的空闲线程能保持的最大时间。 之前创建线程的时候都是用的 newCachedThreadPoo,newFixedThreadPool,newScheduledThreadPool,newSingleThr...

    Chiclaim 评论0 收藏0
  • Java多线程进阶(四三)—— J.U.C之executors框架:Fork/Join框架(2)实现

    摘要:并不会为每个任务都创建工作线程,而是根据实际情况构造线程池时的参数确定是唤醒已有空闲工作线程,还是新建工作线程。 showImg(https://segmentfault.com/img/bVbiYSP?w=1071&h=707); 本文首发于一世流云的专栏:https://segmentfault.com/blog... 一、引言 前一章——Fork/Join框架(1) 原理,我们...

    FingerLiu 评论0 收藏0
  • Java多线程进阶(一)—— J.U.C并发包概述

    摘要:整个包,按照功能可以大致划分如下锁框架原子类框架同步器框架集合框架执行器框架本系列将按上述顺序分析,分析所基于的源码为。后,根据一系列常见的多线程设计模式,设计了并发包,其中包下提供了一系列基础的锁工具,用以对等进行补充增强。 showImg(https://segmentfault.com/img/remote/1460000016012623); 本文首发于一世流云专栏:https...

    anonymoussf 评论0 收藏0
  • Java ThreadPoolExecutor 线程池源码分析

    摘要:线程池常见实现线程池一般包含三个主要部分调度器决定由哪个线程来执行任务执行任务所能够的最大耗时等线程队列存放并管理着一系列线程这些线程都处于阻塞状态或休眠状态任务队列存放着用户提交的需要被执行的任务一般任务的执行的即先提交的任务先被执行调度 线程池常见实现 线程池一般包含三个主要部分: 调度器: 决定由哪个线程来执行任务, 执行任务所能够的最大耗时等 线程队列: 存放并管理着一系列线...

    greatwhole 评论0 收藏0
  • 【源起Netty 外传】ScheduledThreadPoolExecutor源码解读

    引言 本文是源起netty专栏的第4篇文章,很明显前3篇文章已经在偏离主题的道路上越来越远。于是乎,我决定:继续保持…… 使用 首先看看源码类注释中的示例(未改变官方示例逻辑,只是增加了print输出和注释) import java.time.LocalTime; import java.util.concurrent.Executors; import java.util.concurrent....

    Eastboat 评论0 收藏0

发表评论

0条评论

itvincent

|高级讲师

TA的文章

阅读更多
最新活动
阅读需要支付1元查看
<