资讯专栏INFORMATION COLUMN

并发基础

Warren / 737人阅读

摘要:关于,这个方法只会唤醒一个线程,并且不允许指定唤醒哪个线程,这是可能会发生死锁的。使用不可变对象降低了垃圾回收所产生的额外开销,同时也可以减少一些为了维护在并发中的的代码开销。

前言

跟着 The Java Tutorials 把并发的一些基础过了一遍,发现仍然还是有很多不清楚的地方,主要是因为平常没有机会实际应用吧,理论知识要有,实践也很重要,哪怕是写些小 demo 也可以的。

虽然主要是跟着 tutorials 的 concurrency 章节整理的,但这并不是官方文档的一个翻译哈,看到一个地方有前置技能不足的时候,就会穿插着一些对 API 的学习之类的。

还有就是。。segmentfault 对 markdown 的解析和我用的编辑器好像不太一致,引用的地方有点乱,实在懒得改了。。要是看着不舒服,可以看这个 并发基础(一)

Concurrency Foundation 1. Synchronized

Java 提供了两种基础的同步语法:1. synchronized 方法 2. synchronized 语句块

同步是在内置锁(intrinsic lock)或者叫做监视锁(monitor lock)的基础上建立的(API 中通常将其称为 monitor),这个 monitor 在同步的两个方面发挥作用: 1. 强制性的多带带访问对象 2. 建立 happens-before 关系

每个对象都有自己的内置锁,如果有线程想要访问这个对象,需要先获取这个对象的内置锁,那么此时其他线程是无法获取这个锁的,也就是无法访问这个对象,直至先前的线程释放锁。

synchronized 就是 Java 对内置锁的支持。

synchronized 方法

当线程调用 synchronized 方法时,就会自动获取该方法对象的 synchronized 锁,返回时才会释放,即使是由没被 catch 的异常返回的,也会释放锁。

那么,对于 static synchronized method 呢?这个让人有些迷惑,因为 static method 是和类关联的,而不是对象。

Intrinsic Locks and Synchronization (The Java™ Tutorials > Essential Classes > Concurrency)

In this case, the thread acquires the intrinsic lock for the Class object associated with the class. Thus access to class"s static fields is controlled by a lock that"s distinct from the lock for any instance of the class.

根据文档的描述,这种情况下,对类的静态域的访问和对类的普通实例的访问所获取的锁是不同的。

其实就是类锁和对象锁的区别,类锁也就是 static synchronized method 和 synchronized(xx.class){} 所使用的锁,对象锁就是 non-static synchronized method 和 synchronized(xxx){} 所使用的锁。

因为这两个锁是不同的,所以如果对同一个类 A,线程 1 想获取类 A 对象实例的锁,和线程 2 想获取类 A 的类锁,这两个线程是不存在竞争关系的。

下面看看 synchronized method 的具体作用:

作用:

同一个对象上的两个 synchronized 方法的调用不会交替进行,只有一个线程释放后其他线程才能获取这个锁

当一个 synchronized 方法退出后,它会自动和后面再对这个对象的 synchrnoized 方法的调用建立一个 happens-before 关系,这可以保证这个对象的状态的改变对其他线程都可见。

需要注意的是,构造器是不能被同步的,这是语法上面的要求,这好理解,构造器本来就是只有创建对象的线程才能访问,所以不需要同步。

这个 warning 值得注意

Synchronized Methods (The Java™ Tutorials > Essential Classes > Concurrency)

Warning: When constructing an object that will be shared between threads, be very careful that a reference to the object does not "leak" prematurely. For example, suppose you want to maintain a List called instances containing every instance of class. You might be tempted to add the following line to your constructor:

instances.add(this);

But then other threads can use instances to access the object before construction of the object is complete.

这是在多线程中很容易忽略的一个问题,就是在还没有构建对象完成时,其他线程就已经访问了这个对象。

synchronized 算是一种简单直接的方式去解决多线程之间的干扰和内存一致性错误,但是可能会带来活跃性的问题。

synchronized 语句和可重入的同步

这部分以及活跃性问题在之前的博客里有详细写了 线程安全性

2. Atomic Access

原子性的动作是一次完成的

Atomic Access (The Java™ Tutorials > Essential Classes > Concurrency)

  • Reads and writes are atomic for reference variables and for most primitive variables (all types except long and double).
  • Reads and writes are atomic for all variables declared volatile (including long and double variables).

翻译过来就是:

对引用变量和除了 long 和 double 的基本数据类型的读、写都是原子操作

对 volatile 修饰的变量的读、写也是原子操作。

这段我看了之后有点迷,因为之前的概念中就是 volatile 只能保证被修饰变量的可见性而不能保证原子性,为何文档中说 "Reads and writes are atomic for all variables declared volatile"?还有就是,对引用变量的读写是原子操作?

想了想,我觉得他想表达的意思是这样的:

首先,文档中说的读写,肯定指的是多带带的读、多带带的写,比如 int a = 1; 这句肯定是一个原子操作,就是向 a 写入值 1,这很容易看出来。

可如果是 b = a; 呢?这样就很容易让人感到迷惑了,猛然一看,这就是对引用变量的写入,但是,它应该是分为几步操作的,1. 读取 a 2. 写入 b,如果说多带带对 a 的读取和多带带对 b 的写入,这些都是原子操作,文档中说的是没有错的,可是实际上是对于 a 这个变量引用,它是随时有可能被更新的,也就是说 b = a; 可能被分解为 1. 读取 a 2. 写入 b 3. 写入 a,这个时候 b = a; 这个语句就会带来错误。

因为这种情况其实是很容易遇到的,所以给我的印象就是 b = a; 这种对引用变量的读写通常并不是原子操作,所以看到文档中这段话感到很迷惑,文档的描述是正确的,只不过他所界定的原子操作和读写的概念,指的是一种最窄的概念,并且,对于 b = a; 这种对引用变量的赋值,之所以出现问题,实际上是因为对变量 a 可见性没有保证,这就不能让原子操作背这个锅了。

文档中后面也说到了,虽然原子操作是完整的操作,使用它们可以不用担心线程干扰,但是有时仍然需要对原子操作进行同步,因为即使是原子操作也避免不了可能出现的内存一致性错误。

Atomic actions cannot be interleaved, so they can be used without fear of thread interference. However, this does not eliminate all need to synchronize atomic actions, because memory consistency errors are still possible.

所以说,使用简单的原子操作访问会比用 synchronized 访问变量会更有效,但是这就需要我们考虑到内存一致性的问题,因此需要使用哪种方式访问变量,就要看应用的规模和复杂程度了。

3. Guarded blocks

多线程之间肯定少不了协同工作,最常见的方式就是使用 Guarded block:

public void guardedJoy() {
    // Simple loop guard. Wastes
    // processor time. Don"t do this!
    while(!joy) {}
    System.out.println("Joy has been achieved!");
}

这就是一个 Guarded block,它会不断地检查 joy 这个值,而 joy 是由另一个线程所设置的。

不过,像上面这样的循环等待对资源也太浪费了,可以采用另一种方式:

public synchronized void guardedJoy() {
    // This guard only loops once for each special event, which may not
    // be the event we"re waiting for.
    while(!joy) {
        try {
            wait();
        } catch (InterruptedException e) {}
    }
    System.out.println("Joy and efficiency have been achieved!");
}

这里使用到了 Object.wait()
Object (Java Platform SE 8 )

Causes the current thread to wait until another thread invokes the notify() method or the notifyAll() method for this object. In other words, this method behaves exactly as if it simply performs the call wait(0).

它的作用就是让调用该方法的线程进入 WAITING 状态,直到有其他线程调用该对象的 notify() or notifyAll() 方法

The current thread must own this object"s monitor. The thread releases ownership of this monitor and waits until another thread notifies threads waiting on this object"s monitor to wake up either through a call to the notify method or the notifyAll method. The thread then waits until it can re-obtain ownership of the monitor and resumes execution.

调用 wait() 之前需要先获取这个对象的锁,一旦调用之后就会释放对象的锁,然后等待,直到其他线程调用 notify() or notifyAll(),但也不是能让等待的线程立马从 wait() 返回,而是要等到调用 notify() or notifyAll() 的线程释放锁之后,等待线程才有机会从 wait() 返回,这个机会就是:当前线程可以重新获取锁的持有权限,然后才能继续执行。

还是回到 guardedJoy 那个例子,这个版本的 guardedJoy 变成了 synchronized 的了,为什么呢?刚刚介绍 wait() 方法时说到了,一个线程想要调用一个对象的 wait() 方法首先要获取到这个对象的锁,而获取锁的最简单的方式就是在 synchronized 方法里调用 wait() 啦

那么,当 Thread1 调用 wait() 时,就会释放锁然后挂起。当其他线程 Thread2 请求获取这个锁并成功后调用 notifyAll() 通知等待的所有线程,在 Thread2 释放这个锁后,等待的线程 Thread1 再次申请就可以重新获得这个锁,之后就可以从 wait() 方法返回继续执行了。

public synchronized notifyJoy() {
    joy = true;
    notifyAll();
}

关于 notify(),这个方法只会唤醒一个线程,并且不允许指定唤醒哪个线程,这是可能会发生死锁的。以生产者消费者问题为例,假设分别有 2 个consumer 和 producer,缓冲区大小为 1,有可能你唤醒的那个线程可能恰好是相同角色的线程,也就是说现在可能是一个 consumer 唤醒了另一个 consumer,本来 consumer 想要唤醒的是 producer,缓存区仍然为空的,但是 producer 却还在 wait,因为错过了被 consumer 唤醒的机会,从而就会产生死锁。

那么什么时候可以使用 notify() 呢?文档中是这样给的建议:
Guarded Blocks (The Java™ Tutorials >Essential Classes > Concurrency)

Note: There is a second notification method, notify, which wakes up a single thread. Because notify doesn"t allow you to specify the thread that is woken up, it is useful only in massively parallel applications — that is, programs with a large number of threads, all doing similar chores. In such an application, you don"t care which thread gets woken up.

所以 notify() 一般只在大规模并发应用(即系统有大量相似任务的线程)中使用。因为对于大规模并发应用,我们其实并不关心哪一个线程被唤醒。

看一下官网给的生产者消费者的示例程序:

数据通过 Drop 对象共享信息:

public class Drop {
    // Message sent from producer
    // to consumer.
    private String message;
    // true 表示为空,需要等待生产数据
    // false 表示可以取数据了
    private boolean empty = true; 

    public synchronized String take() {
        // Wait until message is
        // available.
        while (empty) {
            try {
                wait();
            } catch (InterruptedException e) {}
        }
        // Toggle status.
        empty = true;
        // Notify producer that
        // status has changed.
        notifyAll();
        return message;
    }

    public synchronized void put(String message) {
        // Wait until message has
        // been retrieved.
        while (!empty) {
            try { 
                wait();
            } catch (InterruptedException e) {}
        }
        // Toggle status.
        empty = false;
        // Store message.
        this.message = message;
        // Notify consumer that status
        // has changed.
        notifyAll();
    }
}

生产者:

import java.util.Random;

public class Producer implements Runnable {
    private Drop drop;

    public Producer(Drop drop) {
        this.drop = drop;
    }

    public void run() {
        String importantInfo[] = {
            "Mares eat oats",
            "Does eat oats",
            "Little lambs eat ivy",
            "A kid will eat ivy too"
        };
        Random random = new Random();

        for (int i = 0;
             i < importantInfo.length;
             i++) {
            drop.put(importantInfo[i]);
            try {
                // 随机的暂停一段时间,接近实际中情况
                Thread.sleep(random.nextInt(5000));
            } catch (InterruptedException e) {}
        }
        drop.put("DONE");
    }
}

消费者:

import java.util.Random;

public class Consumer implements Runnable {
    private Drop drop;

    public Consumer(Drop drop) {
        this.drop = drop;
    }

    public void run() {
        Random random = new Random();
        for (String message = drop.take();
             ! message.equals("DONE");
             message = drop.take()) {
            System.out.format("MESSAGE RECEIVED: %s%n", message);
            try {
                Thread.sleep(random.nextInt(5000));
            } catch (InterruptedException e) {}
        }
    }
}

主线程

public class ProducerConsumerExample {
    public static void main(String[] args) {
        Drop drop = new Drop();
        (new Thread(new Producer(drop))).start();
        (new Thread(new Consumer(drop))).start();
    }
}
4. Immutable Objects

在并发编程中,一种被普遍认可的原则就是:尽可能的使用不可变对象来创建简单、可靠的代码。
因为 immutable objects 创建后不能被修改,所以不会出现由于线程干扰产生的错误 or 内存一致性错误。

但有时候会有这种担忧,就是使用 immutable objects 每次创建新对象的开销会不会太大,如果不使用 immutable objects,就可以避免创建过多新对象,只需要 update 就可以。

而文档中说到,这种创建对象的开销常常被过分高估,因为使用不可变对象所带来的一些效率提升可以抵消这种开销。

e.g. 使用不可变对象降低了垃圾回收所产生的额外开销,同时也可以减少一些为了维护在并发中的 mutable objects 的代码开销。

看一下举得这个例子:

public class SynchronizedRGB {

    // Values must be between 0 and 255.
    private int red;
    private int green;
    private int blue;
    private String name;

    private void check(int red,
                       int green,
                       int blue) {
        if (red < 0 || red > 255
            || green < 0 || green > 255
            || blue < 0 || blue > 255) {
            throw new IllegalArgumentException();
        }
    }

    public SynchronizedRGB(int red,
                           int green,
                           int blue,
                           String name) {
        check(red, green, blue);
        this.red = red;
        this.green = green;
        this.blue = blue;
        this.name = name;
    }

    public void set(int red,
                    int green,
                    int blue,
                    String name) {
        check(red, green, blue);
        synchronized (this) {
            this.red = red;
            this.green = green;
            this.blue = blue;
            this.name = name;
        }
    }

    public synchronized int getRGB() {
        return ((red << 16) | (green << 8) | blue);
    }

    public synchronized String getName() {
        return name;
    }

    public synchronized void invert() {
        red = 255 - red;
        green = 255 - green;
        blue = 255 - blue;
        name = "Inverse of " + name;
    }
}

假如现在 线程1 在执行下列代码,已经执行到 Statement 1,然后 线程2 恰好也在执行,但是 线程2 偏偏是执行到 Statement 1 和 2 之间,此时 线程1 如果再继续执行 Statement 2,就会出现 getRGB() 和 getName() 结果不匹配的情况,这就是在并发中很容易出现的问题。

SynchronizedRGB color =
    new SynchronizedRGB(0, 0, 0, "Pitch Black");
...
int myColorInt = color.getRGB();      //Statement 1
String myColorName = color.getName(); //Statement 2

此时加上了 synchronized,将 Statement 1 和 2 绑定到了一起,这样并发就不会出问题了。

synchronized (color) {
    int myColorInt = color.getRGB();
    String myColorName = color.getName();
}

会出现上面这种不匹配的情况,是因为 color 是一个 mutable object,如果它变成 immutable object,就不会出现这种问题了。

对于如何定义 immutable objects,文档给出了一个 strategy
A Strategy for Defining Immutable Objects (The Java™ Tutorials >Essential Classes > Concurrency)

  1. Don"t provide "setter" methods — methods that modify fields or objects referred to by fields.
  2. Make all fields final and private.
  3. Don"t allow subclasses to override methods. The simplest way to do this is to declare the class as final. A more sophisticated approach is to make the constructor private and construct instances in factory methods.
  4. If the instance fields include references to mutable objects, don"t allow those objects to be changed:
    • Don"t provide methods that modify the mutable objects.
    • Don"t share references to the mutable objects. Never store references to external, mutable objects passed to the constructor; if necessary, create copies, and store references to the copies. Similarly, create copies of your internal mutable objects when necessary to avoid returning the originals in your methods.

最后一点,关于对 mutable objects 的引用:如果一个对外部可变对象的引用需要被传到构造函数中,一定不要保存这个引用,如果必须要保存,就做一个该对象的 copy,然后保存这个 copy。类似的,如果是引用内部的可变对象,必要时也要创建内部可变对象的 copy,以避免在方法中返回原对象引用。

这一点是很容易被忽略的,在 Core Java 中也提到了类似的情况,作者的建议也是做 copy,不要直接引用原对象,因为很难知道这个对象引用在其他地方是不是会被改变。

根据上面的 strategy,重新定义 RGB:

final public class ImmutableRGB {

    // Values must be between 0 and 255.
    final private int red;
    final private int green;
    final private int blue;
    final private String name;

    private void check(int red,
                       int green,
                       int blue) {
        if (red < 0 || red > 255
            || green < 0 || green > 255
            || blue < 0 || blue > 255) {
            throw new IllegalArgumentException();
        }
    }

    public ImmutableRGB(int red,
                        int green,
                        int blue,
                        String name) {
        check(red, green, blue);
        this.red = red;
        this.green = green;
        this.blue = blue;
        this.name = name;
    }


    public int getRGB() {
        return ((red << 16) | (green << 8) | blue);
    }

    public String getName() {
        return name;
    }

    public ImmutableRGB invert() {
        return new ImmutableRGB(255 - red,
                       255 - green,
                       255 - blue,
                       "Inverse of " + name);
    }
}
High Level Concurrency Objects 1. Lock Objects

在 liveness 那一节中有举了一个 Alphonse 和 Gaston 鞠躬产生死锁的例子,这里产生死锁的原因在于可能 线程1 进入 bow,线程2 进入 bowBack,当 线程1 进入 bow 里的 bowBack 时,线程2 恰好也正在进入 bowBack,二者都在等彼此退出,但是却又永远不会退出,因为彼此在循环等待,会一直阻塞在这里,从而产生死锁(循环等待、不可剥夺、独自占有、保持请求)。

public class Deadlock {
    static class Friend {
        private final String name;
        public Friend(String name) {
            this.name = name;
        }
        public String getName() {
            return this.name;
        }
        public synchronized void bow(Friend bower) {
            System.out.format("%s: %s"
                + "  has bowed to me!%n", 
                this.name, bower.getName());
            bower.bowBack(this);
        }
        public synchronized void bowBack(Friend bower) {
            System.out.format("%s: %s"
                + " has bowed back to me!%n",
                this.name, bower.getName());
        }
    }

    public static void main(String[] args) {
        final Friend alphonse =
            new Friend("Alphonse");
        final Friend gaston =
            new Friend("Gaston");
        new Thread(new Runnable() {
            public void run() { alphonse.bow(gaston); }
        }).start();
        new Thread(new Runnable() {
            public void run() { gaston.bow(alphonse); }
        }).start();
    }
}

在这一节作者用 lock objects 来解决这个问题

先简单了解一下 Lock 这个接口:

Lock Objects (The Java™ Tutorials >Essential Classes > Concurrency)

Lock objects work very much like the implicit locks used by synchronized code. As with implicit locks, only one thread can own a Lock object at a time. Lock objects also support a wait/notify mechanism, through their associated Condition objects.

Lock 很像同步代码使用的内置锁,在同一时刻只有一个线程可以获得 Lock 对象。通过关联 Condition 对象,Lock 对象也支持 wait/notify 机制。

关于 Condition:
Condition (Java Platform SE 8 )

Where a Lock replaces the use of synchronized methods and statements, a Condition replaces the use of the Object monitor methods.

在 Lock 取代了 synchronized 方法和语句的地方,Condition 相应地取代了 Object 监视器方法(wait, notify and notifyAll)的使用。

看一下 tryLock() 这个方法:
Lock (Java Platform SE 8 )

  • boolean tryLock()
    Acquires the lock only if it is free at the time of invocation.

    Acquires the lock if it is available and returns immediately
    with the value true.
    If the lock is not available then this method will return
    immediately with the value false.

    A typical usage idiom for this method would be:

     
    Lock lock = ...;
    if (lock.tryLock()) {
    try {
     // manipulate protected state

    } finally {

     lock.unlock();

    }
    } else {
    // perform alternative actions
    }


This usage ensures that the lock is unlocked if it was acquired, and
doesn"t try to unlock if the lock was not acquired.

Returns:
true if the lock was acquired and false otherwise

它会尝试获取锁,如果成功,立刻返回 true,如果失败也会立刻返回 false,API 文档中举得那个例子就是一个典型的用法,这种使用 tryLock() 的方式可以确保在获取锁后一定会释放锁,因为 Lock 和 synchronized 有一个不一样的地方在于 synchronized 会自动释放,而 Lock 不会,所以一定要保证手动释放 Lock 锁,并且这种方式也可以保证如果没获取锁的情况不会 unlock()。

下面看怎么用 Lock 解决鞠躬的死锁问题:

import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.Random;

public class Safelock {
    // 让 Friend 作为一个静态内部类,因为不需要引用外部变量
    static class Friend {
        private final String name;
        // 这里使用的是可重入锁
        private final Lock lock = new ReentrantLock();

        public Friend(String name) {
            this.name = name;
        }

        public String getName() {
            return this.name;
        }

        // 即将到来的鞠躬
        public boolean impendingBow(Friend bower) {
            Boolean myLock = false;
            Boolean yourLock = false;
            try {
                myLock = lock.tryLock();
                yourLock = bower.lock.tryLock();
            } finally {
                // 如果都获取到了锁 or 都没获取到,就不会释放锁
                // 如果只有一个获取到了,需要释放
                if (! (myLock && yourLock)) {
                    if (myLock) {
                        lock.unlock();
                    }
                    if (yourLock) {
                        bower.lock.unlock();
                    }
                }
            }
            // 只有两个锁都获取到时才会返回 true
            return myLock && yourLock;
        }
            
        public void bow(Friend bower) {
            // 如果都获取到了锁
            if (impendingBow(bower)) {
                try {
                    System.out.format("%s: %s has"
                        + " bowed to me!%n", 
                        this.name, bower.getName());
                    bower.bowBack(this);
                } finally {
                    lock.unlock();
                    bower.lock.unlock();
                }
            } else {
                // 如果只获取到了一个锁,说明其他线程抢占了另一个锁
                // 在这个情景中就是 Alphonse 鞠躬之前发现在 Gaston 正要鞠躬
                // 因此自己就不鞠躬了,从而避免了死锁
                System.out.format("%s: %s started"
                    + " to bow to me, but saw that"
                    + " I was already bowing to"
                    + " him.%n",
                    this.name, bower.getName());
            }
        }

        public void bowBack(Friend bower) {
            System.out.format("%s: %s has" +
                " bowed back to me!%n",
                this.name, bower.getName());
        }
    }

    // 同样地,也是将 BowLoop 作为静态内部类
    static class BowLoop implements Runnable {
        private Friend bower;
        private Friend bowee;

        public BowLoop(Friend bower, Friend bowee) {
            this.bower = bower;
            this.bowee = bowee;
        }
    
        public void run() {
            Random random = new Random();
            for (;;) {
                try {
                    Thread.sleep(random.nextInt(10));
                } catch (InterruptedException e) {}
                bowee.bow(bower);
            }
        }
    }
            

    public static void main(String[] args) {
        final Friend alphonse =
            new Friend("Alphonse");
        final Friend gaston =
            new Friend("Gaston");
        new Thread(new BowLoop(alphonse, gaston)).start();
        new Thread(new BowLoop(gaston, alphonse)).start();
    }
}
2. Executors

文档教程只是一些比较简单的介绍,先过一遍了。

2.1 Executor Interfaces

concurrent 包中主要有三个 Executor 接口

Executor

ExecutorService

ScheduledExecutorService

Executor

运行新任务。
只有一个 execute() 方法,用来创建线程。但是这个方法没有定义具体的实现方式,所以对于不同的 Executor 的实现,有不同的创建方式。
Executor (Java Platform SE 8 )

  • execute

    void execute(Runnable command)
    Executes the given command at some time in the future. The command may execute in a new thread, in a pooled thread, or in the calling
    thread, at the discretion of the Executor implementation.
    Parameters:
    command - the runnable task
    Throws:
    RejectedExecutionException - if this task cannot be
    accepted for execution
    NullPointerException - if command is null

execute() 接受一个 Runnable 对象。

ExecutorService

ExecutorService 除了提供 execute() 方法还提供了 submit() 方法,这个方法不仅可以接受 Runnable 对象还可以接受 Callable 对象。Callable 对象可以使任务返还执行的结果
Callable (Java Platform SE 8 )

  • call

    V call() throws Exception
    Computes a result, or throws an exception if unable to do so.
    Returns:
    computed result
    Throws:
    Exception - if unable to compute a result

call() 方法会返回计算的结果。

通过 submit() 方法返回的 Future 对象可以读取 Callable 任务的执行结果,或是管理 Callable 任务和 Runnable 任务的状态。

关于 Future 这个接口,它是表示异步计算的结果,提供的方法是用来 1. 检查计算是否完成 2. 是否等待计算完成 3. 是否查找计算的结果

简单使用的例子:

 interface ArchiveSearcher {
    String search (String target);
 }

 class App {

    ExecutorService executor = ...
    ArchiveSearcher searcher = ...

    void showSearch(final String target) throws InterruptedException {

        Future future = executor.submit(new Callable() {
            public String call() {
                return searcher.search(target);
            }
        });

        displayOtherThings(); // do other things while searching

        try {
            // 计算完成时才能用 get() 获取结果,如果没有计算完成会阻塞着
            displayText(future.get()); // use future
        } catch (ExecutionException ex) {
            cleanup();
            return;
        }

    }
    
 }

submit 里的 Callable 对象也可以替换成如下代码:

 FutureTask future =
   new FutureTask(new Callable() {
     public String call() {
       return searcher.search(target);
   }});
 executor.execute(future);

因为 FutureTask 是实现了 Runnable 接口的 Future 的实现,所以可以由 Executor 来执行。

public class FutureTask 
extends Object 
implements RunnableFuture
public interface RunnableFuture
extends Runnable, Future

ExecutorService 也提供了批量运行 Callable 任务的方法。最后,ExecutorService 还提供了一些关闭执行器的方法。如果需要支持即时关闭,执行器所执行的任务需要正确处理中断。

ScheduledExecutorService

扩展了 ExecutorService 接口,添加了 schedule 方法。

public interface ScheduledExecutorService
extends ExecutorService

通过 schedule 方法可以让命令在给定延迟的时间之后执行或者定期执行。

2.2 Thread Pools

这里只是对线程池的一个简单的介绍。

大多数 concurrent 包里的 executor 的实现都使用了线程池(由 worker 线程组成),worker 线程独立于它所执行的 Runnable 任务和 Callable 任务,并且经常用来执行多个任务。

使用 worker 线程可以使创建线程的开销最小化。在大规模并发应用中,创建大量的Thread对象会占用占用大量系统内存,分配和回收这些对象会产生很大的开销。

一种最常见的线程池是 fixed thread pool(固定大小的线程池)。这种线程池始终有一定数量的线程在运行,如果一个线程由于某种原因终止运行了,线程池会自动创建一个新的线程来代替它。需要执行的任务通过一个内部队列提交给线程,当没有更多的工作线程可以用来执行任务时,队列保存额外的任务。 使用 fixed thread pool 的一个很重要的好处是可以 degrade gracefully。

比如一个 Web 服务器,每一个 HTTP 请求都是由一个多带带的线程来处理,不可能为每一个 HTTP 请求都创建一个新线程,这样的话当系统的开销超出其能力时,会突然地对所有请求都停止响应。如果限制 Web 服务器可以创建的线程数量,那么它就不必立即处理所有收到的请求,而是在有能力处理请求时才处理。

创建一个使用 fixed thread pool 的 executor 的最简单的方法是调用 java.util.concurrent.Executors 的 newFixedThreadPool 工厂方法。

Executors 还提供了下面的工厂方法:

newCachedThreadPool

newSingleThreadExecutor

还有一些创建 ScheduledExecutorService executor 的方法。

还有其他的,如 ThreadPoolExecutor or ScheduledThreadPoolExecutor

2.3 Fork/Join

Basic use:

if (my portion of the work is small enough)
  do the work directly
else
  split my work into two pieces
  invoke the two pieces and wait for the results

思想有点像分而治之的感觉,主要是可以充分利用多处理器系统的并行处理能力。

文档中举了一个图片模糊处理的例子:

假设你想要模糊一张图片。原始的 source 图片由一个整数的数组表示,每个整数表示一个像素点的颜色数值。与 source 图片相同,模糊之后的 destination 图片也由一个整数数组表示。 对图片的模糊操作是通过对 source 数组中的每一个像素点进行处理完成的。

处理的过程:将每个像素点的色值取出,与周围像素的色值(红、黄、蓝)放在一起取平均值,得到的结果被放入 destination 数组。

因为一张图片会由一个很大的数组来表示,所以处理图片过程可能会很耗时,但是如果使用 fork/join 框架来完成,就可以充分利用多处理器系统的并行处理能力,加快处理速度。

public class ForkBlur extends RecursiveAction {
    private int[] mSource;
    private int mStart;
    private int mLength;
    private int[] mDestination;
  
    // Processing window size; should be odd.
    private int mBlurWidth = 15;
  
    public ForkBlur(int[] src, int start, int length, int[] dst) {
        mSource = src;
        mStart = start;
        mLength = length;
        mDestination = dst;
    }

    protected void computeDirectly() {
        int sidePixels = (mBlurWidth - 1) / 2;
        for (int index = mStart; index < mStart + mLength; index++) {
            // Calculate average.
            float rt = 0, gt = 0, bt = 0;
            for (int mi = -sidePixels; mi <= sidePixels; mi++) {
                int mindex = Math.min(Math.max(mi + index, 0),
                                    mSource.length - 1);
                int pixel = mSource[mindex];
                rt += (float)((pixel & 0x00ff0000) >> 16)
                      / mBlurWidth;
                gt += (float)((pixel & 0x0000ff00) >>  8)
                      / mBlurWidth;
                bt += (float)((pixel & 0x000000ff) >>  0)
                      / mBlurWidth;
            }
          
            // Reassemble destination pixel.
            int dpixel = (0xff000000     ) |
                   (((int)rt) << 16) |
                   (((int)gt) <<  8) |
                   (((int)bt) <<  0);
            mDestination[index] = dpixel;
        }
    }
  
  ...

现在实现抽象方法 compute(),在处理时,可以直接计算,也可以将其分开计算,取决于一个阈值,这个阈值可以简单地用数组的长度来代表。

protected static int sThreshold = 100000;

protected void compute() {
    if (mLength < sThreshold) {
        computeDirectly();
        return;
    }
    
    int split = mLength / 2;
    
    invokeAll(new ForkBlur(mSource, mStart, split, mDestination),
              new ForkBlur(mSource, mStart + split, mLength - split,
                           mDestination));
}

因为 fork/join 的核心就是 ForkJoinPool 类,ForkJoinPool 可以执行 ForkJoinTask 任务,而 RecursiveAction 继承了 ForkJoinTask。所以上面这个方法如果是在 RecursiveAction 类中,就可以在 ForkJoinPool 中设置任务并令其执行。

// source image pixels are in src
// destination image pixels are in dst
ForkBlur fb = new ForkBlur(src, 0, src.length, dst);

// Create the ForkJoinPool that will run the task.
ForkJoinPool pool = new ForkJoinPool();

// Run the task.
pool.invoke(fb);
参考资料

The Java™ Tutorials

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/69669.html

相关文章

发表评论

0条评论

Warren

|高级讲师

TA的文章

阅读更多
最新活动
阅读需要支付1元查看
<