资讯专栏INFORMATION COLUMN

<jdk7学习笔记>读书笔记-并行api

bovenson / 597人阅读

摘要:然而,这两个方法都只是读取对象状态,如果只是读取操作,就可以允许线程并行,这样读取效率将会提高。分配线程执行子任务执行子任务获得子任务进行完成的结果

Lock

Lock接口主要操作类是ReentrantLock,可以起到synchronized的作用,另外也提供额外的功能。
用Lock重写上一篇中的死锁例子

import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class Resource {
    Lock lock=new ReentrantLock();
    int num=0;
    void doSome(){
        
    }
    public void deal(Resource res){
        while(true){
            boolean mylock=this.lock.tryLock();//尝试获得当前Resource的锁定
            boolean resLock=res.lock.tryLock();//尝试获得传入的Resource的锁定
            try{
                if(mylock&&resLock){
                    res.doSome();
                    System.out.println(res+":"+this.num);
                    break;//退出循环
                }
            }finally{
                if(mylock)
                    this.lock.unlock();
                if(resLock)
                    res.lock.unlock();
            }
        }
    }
}

重写后不会出现死锁的原因在于,当无法同时获得两个锁定时,干脆释放已获得的锁定。
上面代码使用当前Resource的Lock的tryLock()方法尝试获得锁定,以及传入Resource的Lock的tryLock()方法尝试获得锁定。只有当可以获得两个Resource的锁定,才能执行res.doSome().最后无论什么情况,都要finally解除锁定。

ReadWriteLock

ReadWriteLock接口定义了读取锁定和写入锁定的行为。可以使用readLock(),writeLock()方法返回Lock操作对象。
ReentrantReadWriteLock是ReadWriteLock接口的主要操作类.
ReentrantReadWriteLock.readLock操作Lock接口,调用其lock()方法时,若没有任何ReentrantReadWriteLock.writeLock调用过lock()方法,也就是没有任何写入锁定时,才可以取得读取锁定。
下面用ReadWriteLock试着写一个ArrayList

import java.util.Arrays;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

public class MyArrayList {
    private ReadWriteLock lock=new ReentrantReadWriteLock();
    private Object[] list;
    private int next=0;
    public MyArrayList(){
        list=new Object[16];
    }
    public void add(T obj){
        try{
            lock.writeLock().lock();//获取写入锁定
            if(next==list.length)
                list=Arrays.copyOf(list, list.length*2);
            list[next++]=obj;
        }finally{
            lock.writeLock().unlock();//解除写入锁定
        }
    }
    @SuppressWarnings("unchecked")
    public T get(int index){
        try{
            lock.readLock().lock();//获取读取锁定
            return (T) list[index];
        }finally{
            lock.readLock().unlock();//解除读取锁定
        }
    }
    public int size(){
        try{
            lock.readLock().lock();
            return next;
        }finally{
            lock.readLock().unlock();
        }
    }
}

重写后的效果是

若有线程调用add()方法进行写入操作,先获得写入锁定。这时如果有其他线程准备获得写入锁定或读取锁定,都必须等待前面的写入锁定解除。

若有线程调用get()方法进行读取操作,先获得读取锁定。这时如果有其他线程准备获得读取锁定,则可以获得;但如果是准备获得写入锁定,仍然要等待所有读取锁定解除。

使用ReadWriteLock的好处在于,如果有两个线程都想调用get()和size()方法,由于锁定的关系,其中一个线程只能等到另一个线程解除锁定。然而,这两个方法都只是读取对象状态,如果只是读取操作,就可以允许线程并行,这样读取效率将会提高。

Condition

Condition接口用来搭配Lock,最基本的用法就是达到Object的wait(),notify(),notifyAll()方法的作用。
下面用wait(),notify(),notifyAll()实现生产者与消费者.

店员从生产者获得生产出的商品,消费者从店员取走商品

若生产者生产速度较快,店员那可能有很多商品,店员会叫生产者停一下。过一段时间,店员那商品不多了,再通知生产者继续生产

若消费者取走速度过快,店员那可能没有商品可供取走,店员会叫消费者停一下。过一段时间,店员那有商品了,再通知消费者过来取

这里假定店员那最多只能放一件商品

public class Producer implements Runnable{
    private Clerk clerk;
    public Producer(Clerk clerk){
        this.clerk=clerk;
    }
    @Override
    public void run() {
        for(int i=0;i<10;i++){
            try {
                Thread.sleep((int)Math.random()*3000);
            } catch (InterruptedException e) {
                
            }
            clerk.setProduct(i);
        }
    }
}
public class Consumer implements Runnable{
    private Clerk clerk;
    public Consumer(Clerk clerk){
        this.clerk=clerk;
    }
    @Override
    public void run() {
        for(int i=0;i<10;i++){
            try {
                Thread.sleep((int)Math.random()*3000);
            } catch (InterruptedException e) {
                
            }
            clerk.getProduct();
        }
    }
}
public class Clerk extends Thread{
    private int product=-1;//没有商品
    public synchronized void setProduct(int product){
        while(this.product!=-1){
            try {
                wait();//店员那有商品,生产者停一下
            } catch (InterruptedException e) {
                
            }
        }
        this.product=product;
        System.out.println("生产者生产商品"+this.product);
        notify();//通知等待集合(唤醒的可能是消费者,也可能是生产者)
    }
    public synchronized int getProduct(){
        while(this.product==-1){
            try {
                wait();//店员没有商品,消费者停一下
            } catch (InterruptedException e) {
                
            }
        }
        int p=this.product;
        System.out.println("消费者消费商品"+this.product);
        this.product=-1;//商品已经被取走
        notify();
        return p;
    }
    public static void main(String[] args){
        Clerk clerk=new Clerk();
        new Thread(new Producer(clerk)).start();
        new Thread(new Consumer(clerk)).start();
    }
}
生产者生产商品0
消费者消费商品0
生产者生产商品1
消费者消费商品1
生产者生产商品2
消费者消费商品2
生产者生产商品3
消费者消费商品3
生产者生产商品4
消费者消费商品4
生产者生产商品5
消费者消费商品5
生产者生产商品6
消费者消费商品6
生产者生产商品7
消费者消费商品7
生产者生产商品8
消费者消费商品8
生产者生产商品9
消费者消费商品9

现在用Condition接口重写

public class Clerk {
    private int product=-1;//没有商品
    Lock lock=new ReentrantLock();
    private Condition condition=lock.newCondition();
    public void setProduct(int product){
        try{
            lock.lock();
            while(this.product!=-1){
                try {
                    condition.await();//店员那有商品,生产者停一下
                } catch (InterruptedException e) {
                    
                }
            }
            this.product=product;
            System.out.println("生产者生产商品"+this.product);
            condition.signal();//通知等待集合(唤醒的可能是消费者,也可能是生产者)
        }finally{
            lock.unlock();
        }
    }
    public int getProduct(){
        try{
            lock.lock();
            while(this.product==-1){
                try {
                    condition.await();//店员没有商品,消费者停一下
                } catch (InterruptedException e) {
                    
                }
            }
            int p=this.product;
            System.out.println("消费者消费商品"+this.product);
            this.product=-1;//商品已经被取走
            condition.signal();
            return p;
        }finally{
            lock.unlock();
        }
    }
    public static void main(String[] args){
        Clerk clerk=new Clerk();
        new Thread(new Producer(clerk)).start();
        new Thread(new Consumer(clerk)).start();
    }
}

注意在多个生产者,消费者线程的情况下,等待集合中两者都会有,而condition.signal()从等待集合中唤醒的具体对象是不确定的。有可能消费者取走商品后,唤醒的还是消费者,这时,消费者又会执行while循环,进入等待集合。
事实上,一个Condition对象可以表示一个等待集合。这样上面例子,可以有两个等待集合,一个给消费者用,一个给生产者用。生产者只会通知消费者的等待集合,消费者也只会通知生产者的等待集合。这样效率会高些。

public class Clerk {
    ...
    private Condition producerCondition=lock.newCondition();//生产者的等待集合
    private Condition consumerCondition=lock.newCondition();//消费者的等待集合
    public void setProduct(int product){
        try{
            lock.lock();
            while(this.product!=-1){
                try {
                    producerCondition.await();//店员那有商品,生产者停一下
                } catch (InterruptedException e) {
                    
                }
            }
            this.product=product;
            System.out.println("生产者生产商品"+this.product);
            consumerCondition.signal();//唤醒消费者等待集合
        }finally{
            lock.unlock();
        }
    }
    public int getProduct(){
        try{
            lock.lock();
            while(this.product==-1){
                try {
                    consumerCondition.await();//店员没有商品,消费者停一下
                } catch (InterruptedException e) {
                    
                }
            }
            int p=this.product;
            System.out.println("消费者消费商品"+this.product);
            this.product=-1;//商品已经被取走
            producerCondition.signal();//唤醒生产者等待集合
            return p;
        }finally{
            lock.unlock();
        }
    }
    ...
}
Executor

定义Executor接口的目的是将Runnable的指定与如何执行分离。它只定义了一个execute()方法。

public class Page{
    private Executor executor;
    public Page(Executor executor){
        this.executor=executor;
    }
    ...
    public void method1(){
        ...
        executor.execute(new Runnable(){
            @Override
            public void run(){
                ...
            }
        });
        ...
    }

}

public class DirectExecutor implements Executor{
    public void execute(Runnable r){
        r.run();
    }
}

调用

new Page(new DirectExecutor()).method1();

Executor api

ThreadPoolExecutor

像线程池这类服务,实际上是定义在Executor接口的子接口ExecutorService中。通用的ExecutorService由抽象类AbstractExecutorService操作,如果需要线程池功能,可以使用其子类ThreadPoolExecutor.
重写上面executor例子

ExecutorService executorService=Executors.newCachedThreadPool();
new Page(executorService).method1();
executorService.shutdown();//在指定执行的Runnable都完成后,将ExecutorService关闭
Future与Callable

ExecutorService还定义了submit(),invokeAll(),invokeAny()等方法,这些方法出现在java.util.concurrent.Future,java.util.concurrent.Callable接口
Future定义的行为就是让你在将来取得结果。你可以将想执行的工作交给Future,Future会使用另一个线程处理,你可以先做别的事情。过些时候,再调用Future的get()获得结果。
如果结果已经产生,get()会直接返回,否则会进入阻塞状态直到结果返回。get()的另一种重载方法可以指定等待结果的时间,若指定时间内结果还没产生,则抛出TimeoutException异常。也可以使用Future的isDone()方法看结果是否产生。
Future经常与Callable一起使用,Callable的作用与Runnable相似,都是用来定义执行的流程。

Runnable的run()方法无返回值,也无法抛出异常

Callable的call()方法可以有返回值,也可以抛出异常

FutureTask是Future的操作类,创建时可传入Callable对象指定执行的流程

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;

public class FutureTaskDemo {
    public static int fib(int n){
        return n<=1?n:fib(n-1)+fib(n-2);
    }
    public static void main(String[] args){
        FutureTask task=new FutureTask(
                new Callable(){
                    @Override
                    public Integer call() throws Exception {
                        return fib(30);
                    }    
                }
        );
        new Thread(task).start();
        try {
            Thread.sleep(3000);
            System.out.println(task.get());
        } catch (InterruptedException|ExecutionException e) {
            
        }
    }
}

FutureTask构造类

FutureTask实现RunnableFuture接口RunnableFuture接口继承Runnable,Future接口。所以可以new Thread(task).
ExecutorService的submit()方法也可以接受Callable对象,调用后返回Future对象。

ExecutorService service=Executors.newCachedThreadPool();
Future future=service.submit(new Callable(){
    @Override
    public Integer call() throws Exception {
        return fib(30);
    }    
});

如果有多个Callable,可以先将它们收集到Collection中,然后调用ExecutorService的invokeAll()方法,返回List

如果有多个Callable,要求其中只要有一个执行完成就行了,则可以先将它们收集到Collection中,然后调用ExecutorService的invokeAny()方法

ScheduledThreadPoolExecutor

ScheduledThreadPoolExecutor用来进行工作排程,其中的schedule()方法用来排定Runnable或Callable实例延迟多久执行一次,并返回Future子接口ScheduledFuture的实例。

import java.util.Date;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class ScheduledExecution {
    public static void main(String[] args){
        ScheduledExecutorService service=Executors.newSingleThreadScheduledExecutor();
        service.scheduleWithFixedDelay(new Runnable(){
            public void run(){
                System.out.println(new Date());
                try {
                    Thread.sleep(2000);//假设工作会执行2s
                } catch (InterruptedException e) {
                    
                }
            }
        }, 2000, 1000, TimeUnit.MILLISECONDS);
    }
}
Sat Oct 24 17:11:59 CST 2015
Sat Oct 24 17:12:02 CST 2015
Sat Oct 24 17:12:05 CST 2015
Sat Oct 24 17:12:08 CST 2015
Sat Oct 24 17:12:11 CST 2015

可以看到,输出两两间相差3s.
scheduleWithFixedDelay()方法参数

如果把方法换成scheduleAtFixedRate()

Sat Oct 24 17:28:28 CST 2015
Sat Oct 24 17:28:30 CST 2015
Sat Oct 24 17:28:32 CST 2015
Sat Oct 24 17:28:34 CST 2015

每次排定的执行周期是1s,但是工作执行的时间是2s,会超过排定的执行周期,所以输出两两间相差2s。

ForkJoinPool

Future的另一个操作类ForkJoinTask,与ExecutorService的另一个操作类ForkJoinPool有关,它们都是jdk7新增的api,用来解决分而治之的问题。

ForkJoinTask操作Future接口,可以在未来获得耗时工作的执行结果

ForkJoinPool管理ForkJoinTask,调用fork()方法,可以让另一个线程执行ForkJoinTask

如果要获得ForkJoinTask的执行结果,可以调用join()方法。如果执行结果还没产生,会阻塞直至有执行结果返回

使用ForkJoinTask的子类RecursiveTask,它是个抽象类,使用时必须继承它,并操作compute()方法。

import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;

public class FibDemo extends RecursiveTask{
    final int n;
    FibDemo(int n){
        this.n=n;
    }
    public static int fib(int n){
        return n<=1?n:fib(n-1)+fib(n-2);
    }
    @Override
    protected Integer compute() {
        if(n<=10){
            return fib(n);
        }
        FibDemo f1=new FibDemo(n-1);
        f1.fork();//ForkJoinPool分配线程执行子任务
        FibDemo f2=new FibDemo(n-2);
        return f2.compute()+f1.join();//执行f2子任务+获得f1子任务进行完成的结果
    }
    public static void main(String[] args){
        FibDemo fib=new FibDemo(40);
        ForkJoinPool pool=new ForkJoinPool();
        System.out.println(pool.invoke(fib));
    }
}

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/64701.html

相关文章

  • &lt;jdk7学习笔记&gt;读书笔记-线程

    摘要:如果把注释去掉,则在所以非线程都结束时,自动终止。默认所有从线程产生的线程也是线程。停止线程线程完成方法后,就进入状态。被标示为的区块会被监控,任何线程要执行区块都必须先获得指定的对象锁定。 Tread和Runnable 定义线程 实现Runnable接口,重写run()方法 继承Thread类,重写run()方法 启动线程 Runnable tortoise=ne...

    woshicixide 评论0 收藏0
  • 《HTML与CSS 第一章 认识HTML》读书笔记

    摘要:一让广播明星黯然失色要建立页面,需要创建用超文本标记语言,编写的文件,把它们放在一个服务器上二服务器能做什么服务器在互联网上有一份全天候的工作。一、Web让广播明星黯然失色    要建立Web页面,需要创建用超文本标记语言(HyperText Markup Language,HTML)编写的文件,把它们放在一个Web服务器上二、Web服务器能做什么?  Web服务器在互联网上有一份全天候的工...

    番茄西红柿 评论0 收藏0
  • &lt;javascript高级程序设计&gt;第十二章读书笔记----偏移量

    摘要:包括元素的高度上下内边距上下边框值,如果元素的的值为那么该值为。该值为元素的包含元素。最后,所有这些偏移量都是只读的,而且每次访问他们都需要重新计算。为了避免重复计算,可以将计算的值保存起来,以提高性能。 offsetHeight 包括元素的高度、上下内边距、上下边框值,如果元素的style.display的值为none,那么该值为0。offsetWidth 包括元素的宽度、左...

    dayday_up 评论0 收藏0
  • &lt;java核心技术&gt;读书笔记2

    摘要:如果需要收集参数化类型对象,只有使用警告这节讨论,向参数可变的方法传递一个泛型类型的实例。异常不能抛出或捕获泛型类的实例实际上,泛型类扩展也是不合法的。 Object:所有类的超类 java中每个类都是由它扩展而来,但是并不需要这样写:class Employee extends Object.如果没有明确指出超类,Object类就被认为是这个的超类。可以使用Object类型的变量引用...

    jimhs 评论0 收藏0
  • &lt;java核心技术&gt;读书笔记1

    摘要:关键字作用调用超类方法调用超类构造器关键字作用引用隐式参数如调用该类的其他构造器在覆盖一个方法时,子类方法可见性不能低于超类方法阻止继承类和方法目的确保它们不会在子类中改变语义。但是如果将一个类声明为后面可以改变类变量的值了。 数据类型 整型 int 存储要求:4byte 取值范围:-2147483648 -- 2147483647(超过20亿) short 存储要求:2byte 取...

    William_Sang 评论0 收藏0

发表评论

0条评论

最新活动
阅读需要支付1元查看
<