资讯专栏INFORMATION COLUMN

实战java高并发程序设计第三章(一)

joyvw / 2604人阅读

摘要:只要线程池未关闭该策略直接在调用者线程中运行当前被丢弃的任务。显然这样做不会真的丢弃任务但是任务提交线程的性能极有可能会急剧下降。任务并尝试再次提交当前任务。

1. 同步控制

synchronized的扩展:重入锁

同步控制不仅有synchronized配合object.wait()以及object.notify(),也有增强版的reentrantLock(重入锁)

public class ReenterLock implements Runnable{
    public static ReentrantLock lock=new ReentrantLock();
    public static int i=0;
    @Override
    public void run() {
        for(int j=0;j<10000000;j++){
            lock.lock();
            lock.lock();        //此处演示重入性
            try{
                i++;
            }finally{
                lock.unlock();        //退出临界区必须解锁
                lock.unlock();
            }
        }
    }
    public static void main(String[] args) throws InterruptedException {
        ReenterLock tl=new ReenterLock();
        Thread t1=new Thread(tl);
        Thread t2=new Thread(tl);
        t1.start();t2.start();
        t1.join();t2.join();
        System.out.println(i);    //计算结果为 20000000

    }
}

我们来看下reentrantlock相比synchronized锁有何优点:

中断响应

面对死锁,似乎synchronized没有任何主动解决策略,而reentrantlock则可以轻松解决

public class IntLock implements Runnable {
    public static ReentrantLock lock1 = new ReentrantLock();
    public static ReentrantLock lock2 = new ReentrantLock();
    int lock;
    /**
     * 控制加锁顺序,方便构造死锁
     * @param lock
     */
    public IntLock(int lock) {
        this.lock = lock;
    }

    @Override
    public void run() {
        try {
            if (lock == 1) {
                lock1.lockInterruptibly();        //可中断的加锁
                try{
                    Thread.sleep(500);
                }catch(InterruptedException e){}
                lock2.lockInterruptibly();
            } else {
                lock2.lockInterruptibly();
                try{
                    Thread.sleep(500);
                }catch(InterruptedException e){}
                lock1.lockInterruptibly();
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
            System.out.println(Thread.currentThread().getName()+":线程被中断");
        } finally {
            if (lock1.isHeldByCurrentThread())
                lock1.unlock();
            if (lock2.isHeldByCurrentThread())
                lock2.unlock();
            System.out.println(Thread.currentThread().getName()+":线程退出");
        }
    }
    public static void main(String[] args) throws InterruptedException {
        IntLock r1 = new IntLock(1);
        IntLock r2 = new IntLock(2);
        Thread t1 = new Thread(r1,"线程1");
        Thread t2 = new Thread(r2,"线程2");
        t1.start();t2.start();
        Thread.sleep(1000);
        //中断其中一个线程
        t2.interrupt();
    }
}
//    输出结果:
//    java.lang.InterruptedException
//        at //java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireInterruptibly(AbstractQueuedSynchronizer.java:898)
//        at //java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireInterruptibly(AbstractQueuedSynchronizer.java:1222)
//        at java.util.concurrent.locks.ReentrantLock.lockInterruptibly(ReentrantLock.java:335)
//        at geym.conc.ch3.synctrl.IntLock.run(IntLock.java:31)
//        at java.lang.Thread.run(Thread.java:745)
//    线程2:线程被中断
//    线程2:线程退出
//    线程1:线程退出

由上可知,当t1,t2形成死锁时,可以主动利用中断来解开,但完成任务的只有t1,t2被中断. 而如果换成synchronized则将无法进行中断

1锁申请等待时限

lock1.tryLock();                            //尝试获取锁,获得立即返回true,未获得立即返回false
lock1.tryLock(5, TimeUnit.SECONDS);         //尝试获取锁,5秒内未获得则返回false,获得返回true
public class TryLock implements Runnable {
    public static ReentrantLock lock1 = new ReentrantLock();
    public static ReentrantLock lock2 = new ReentrantLock();
    int lock;
    public TryLock(int lock) {
        this.lock = lock;
    }
    @Override
    public void run() {
        if (lock == 1) {
            while (true) {
                if (lock1.tryLock()) {
                    try {
                        try {
                            Thread.sleep(500);
                        } catch (InterruptedException e) {
                        }
                        if (lock2.tryLock()) {
                            try {
                                System.out.println(Thread.currentThread()
                                        .getId() + ":My Job done");
                                return;
                            } finally {
                                lock2.unlock();
                            }
                        }
                    } finally {
                        lock1.unlock();
                    }
                }
            }
        } else {
            while (true) {
                if (lock2.tryLock()) {
                    try {
                        try {
                            Thread.sleep(500);
                        } catch (InterruptedException e) {
                        }
                        if (lock1.tryLock()) {
                            try {
                                System.out.println(Thread.currentThread()
                                        .getId() + ":My Job done");
                                return;
                            } finally {
                                lock1.unlock();
                            }
                        }
                    } finally {
                        lock2.unlock();
                    }
                }
            }
        }
    }
    public static void main(String[] args) throws InterruptedException {
        TryLock r1 = new TryLock(1);
        TryLock r2 = new TryLock(2);
        Thread t1 = new Thread(r1);
        Thread t2 = new Thread(r2);
        t1.start();
        t2.start();
    }
}
//    15:My Job done
//    14:My Job done

使用trylock可以有效地避免产生死锁

公平锁

synchronized锁为非公平锁,而reentrantLock既可以是公平锁也可以是非公平锁
非公平锁容易产生饥饿,公平锁先进先出,但效率不敌非公平锁

public ReentrantLock(boolean fair)

ffffd

重入锁的搭档Condition

Condition和object.wait(),object.notify()方法类似
condition的基本方法如下:

void await() throws InterruptedException;        //使当前线程等待,释放锁,能响应signal和signalAll方法,响应中断
void awaitUninterruptibly();                    //类似 await,但不响应中断
long awaitNanos(long nanosTimeout)throws InterruptedException;    //等待一段时间
boolean await (long time,TimeUnit unit)throws InterruptedException;
boolean awaitUntil(Date deadline)throws InterruptedException;
void signal();    //唤醒一个等待中的线程
void signalAll();  //唤醒所有等待中的线程

JDK内部就有很多对于ReentrantLock的使用,如ArrayBlockingQueue

    //在 ArrayBlockingQueue中的一些定义
    boolean fair = true;
    private final ReentrantLock lock = new ReentrantLock(fair);
    private final Condition notEmpty = lock.newCondition();
    private final Condition notFull = lock.newCondition();
    //put(方法的实现  
    public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        final E[] items = this.items;
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();       //put方法做同步
        try {
            try {
                while (count == items.length) //队列已满
                    notFull.await();        //等待队列有足够的空间
            } catch (InterruptedException ie) {
                notFull.signal();
                throw ie;
            }
            insert(e);                  //notFull被通知时,说明有足够的空间
        } finally {
            lock.unlock();
        }
    }
    private void insert(E x) {
        items[putIndex] = x;
        putIndex = inc(putIndex);
        ++count;
        notFull.signal();           //通知take方法的线程,队列已有数据
    }
    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();         //对take()方法做同步
        try {
            try {
                while (count == 0)          //如果队列为空
                    notEmpty.await();        //则消费者队列要等待一个非空的信号
            } catch (InterruptedException ie) {
                notEmpty.signal();
                throw ie;
            }
            E x = extract();
            return x;
        } finally {
            lock.unlock();
        }
    }
    private E extract() {
        final E[] items = this.items;
        E x = items[takeIndex];
        items[takeIndex] = null;
        takeIndex = inc(takeIndex);
        --count;
        notFull.signal();    //通知put线程队列已有空闲空间
        return x;
    }

多线程同时访问:信号量(semaphore)

同步锁只能允许一个线程进行访问,信号量可以指定多个线程同时访问同一个资源.

    //构造方法
    public Semaphore(int permits)                //传入int表示能同时访问的线程数
    public Semaphore(int permits, boolean fair)  //线程数,是否公平锁
    
    //实例方法
    public void acquire() throws InterruptedException        //获取一个访问权限,会阻塞线程,会被打断
    public void acquireUninterruptibly()                     //获取一个访问权限,会阻塞线程,不会被打断
    public boolean tryAcquire()                              //获取一个访问权限,立即返回
    public boolean tryAcquire(long timeout, TimeUnit unit)   //获取一个访问权限,尝试一段时间
    public void release()                                    //释放一个访问权限
public class SemapDemo implements Runnable {
    final Semaphore semp = new Semaphore(5);
    @Override
    public void run() {
        try {
            semp.acquire();                
            Thread.sleep(2000);
            System.out.println(Thread.currentThread().getId() + ":done!");
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            semp.release();                //使用完后要释放,否则会引起信号量泄漏
        }
    }
    public static void main(String[] args) {
        ExecutorService exec = Executors.newFixedThreadPool(20);
        final SemapDemo demo = new SemapDemo();
        for (int i = 0; i < 20; i++) {
            exec.submit(demo);
        }
    }
}

    //输出结果
    //每次输出5个结果,对应信号量的5个许可

读写锁ReadWriteLock
读写锁适用于读多写少的场景,读读之间为并行,读写之间为串行,写写之间也为串行

public class ReadWriteLockDemo {
    private static Lock lock=new ReentrantLock();
    private static ReentrantReadWriteLock readWriteLock=new ReentrantReadWriteLock();       //获取读写锁
    private static Lock readLock = readWriteLock.readLock();                                //读锁
    private static Lock writeLock = readWriteLock.writeLock();                              //写锁
    private int value;
    public Object handleRead(Lock lock) throws InterruptedException{
        try{
            lock.lock();                //模拟读操作
            Thread.sleep(1000);            //读操作的耗时越多,读写锁的优势就越明显
            return value;                
        }finally{
        lock.unlock();
        }
    }
    public void handleWrite(Lock lock,int index) throws InterruptedException{
        try{
            lock.lock();                //模拟写操作
            Thread.sleep(1000);
            value=index;
        }finally{
        lock.unlock();
        }
    }
    public static void main(String[] args) {
        final ReadWriteLockDemo demo=new ReadWriteLockDemo();
        Runnable readRunnale=new Runnable() {
            @Override
            public void run() {
                try {
                    demo.handleRead(readLock);
//                    demo.handleRead(lock);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };
        Runnable writeRunnale=new Runnable() {
            @Override
            public void run() {
                try {
                    demo.handleWrite(writeLock,new Random().nextInt());
//                    demo.handleWrite(lock,new Random().nextInt());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };
        for(int i=0;i<18;i++){
            new Thread(readRunnale).start();
        }
        
        for(int i=18;i<20;i++){
            new Thread(writeRunnale).start();
        }    
    }
}

//结果:
//读写锁明显要比单纯的锁要更快结束,说明读写锁确实提升不少效率

倒计数器CountDownLatch

让一个线程等待,知道倒计时结束

public class CountDownLatchDemo implements Runnable {
    static final CountDownLatch end = new CountDownLatch(10);        //构造倒计时器,倒计数为10
    static final CountDownLatchDemo demo=new CountDownLatchDemo();   
    @Override
    public void run() {
        try {
            //模拟检查任务
            Thread.sleep(new Random().nextInt(10)*1000);
            System.out.println("check complete");
            end.countDown();                                        //倒计时器减1
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    public static void main(String[] args) throws InterruptedException {
        ExecutorService exec = Executors.newFixedThreadPool(10);
        for(int i=0;i<10;i++){
            exec.submit(demo);
        }
        //等待检查
        end.await();                        //主线程阻塞,待其他线程全部完成后再唤醒主线程
        //发射火箭
        System.out.println("Fire!");
        exec.shutdown();
    }
}

循环栅栏CyclicBarrier
循环栅栏类似于倒计时器,但是计数器可以反复使用,cyclicBarrier比CountDownLatch稍微强大些,可以传入一个barrierAction,barrierAction指每次完成计数便出发一次

public CyclicBarrier(int parties,Runnable barrierAction)  //构造方法
public class CyclicBarrierDemo {
    public static class Soldier implements Runnable {
        private String soldier;
        private final CyclicBarrier cyclic;
        Soldier(CyclicBarrier cyclic, String soldierName) {
            this.cyclic = cyclic;
            this.soldier = soldierName;
        }
        public void run() {
            try {
                //等待所有士兵到齐
                cyclic.await();                        //触发一次循环栅栏,达到计数器后才会进行下一步工作
                doWork();
                //等待所有士兵完成工作
                cyclic.await();                        //再次触发循环栅栏,达到计数器后才会进行下一步工作
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (BrokenBarrierException e) {
                e.printStackTrace();
            }
        }
        void doWork() {
            try {
                Thread.sleep(Math.abs(new Random().nextInt()%10000));        //模拟工作
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(soldier + ":任务完成");
        }
    }
    public static class BarrierRun implements Runnable {            //用于传入CyclicBarrier的构造方法,作为达到计数器数值后的触发任务, 可以被多次调用
        boolean flag;
        int N;
        public BarrierRun(boolean flag, int N) {
            this.flag = flag;
            this.N = N;
        }
        public void run() {
            if (flag) {
                System.out.println("司令:[士兵" + N + "个,任务完成!]");
            } else {
                System.out.println("司令:[士兵" + N + "个,集合完毕!]");
                flag = true;
            }
        }
    }
    public static void main(String args[]) throws InterruptedException {
        final int N = 10;
        Thread[] allSoldier=new Thread[N];
        boolean flag = false;
        CyclicBarrier cyclic = new CyclicBarrier(N, new BarrierRun(flag, N));
        //设置屏障点,主要是为了执行这个方法
        System.out.println("集合队伍!");
        for (int i = 0; i < N; ++i) {
            System.out.println("士兵 "+i+" 报道!");
            allSoldier[i]=new Thread(new Soldier(cyclic, "士兵 " + i));
            allSoldier[i].start();
        }
    }
}
注意:
一旦其中一个被interrupt后,很可能会抛出一个interruptExpection和9个BrokenBarrierException,表示该循环栅栏已破损,防止其他线程进行无所谓的长久等待

线程阻塞工具LockSupport
LockSupport是一个非常实用的线程阻塞工具,不需要获取某个对象的锁(如wait),也不会抛出interruptedException异常

public static void park()        //挂起当前线程,
public static void park(Object blocker)        //挂起当前线程,显示阻塞对象,parking to wait for <地址值>
public class LockSupportDemo {
    public static Object u = new Object();
    static ChangeObjectThread t1 = new ChangeObjectThread("t1");
    static ChangeObjectThread t2 = new ChangeObjectThread("t2");
    public static class ChangeObjectThread extends Thread {
        public ChangeObjectThread(String name){
            super.setName(name);
        }
        @Override
        public void run() {
            synchronized (u) {
                System.out.println("in "+getName());
                LockSupport.park(this);                
            }
        }
    }
    public static void main(String[] args) throws InterruptedException {
        t1.start();
        Thread.sleep(100);
        t2.start();
        LockSupport.unpark(t1);
        LockSupport.unpark(t2);    //即使unpark发生在park前,也可以使程序正常结束
        t1.join();
        t2.join();
    }
}
LockSupport使用了类似信号量的机制,它为每个线程准备一个许可,如果许可可用,park立即返回,并且消费这个许可(转为不可用),如果许可不可用,就会阻塞,而unpark方法就是使一个许可变为可用
locksupport.park()可以相应中断,但是不会抛出interruptedException,我们可以用Thread.interrupted等方法中获取中断标记.
public class LockSupportIntDemo {
    public static Object u = new Object();
    static ChangeObjectThread t1 = new ChangeObjectThread("t1");
    static ChangeObjectThread t2 = new ChangeObjectThread("t2");
    public static class ChangeObjectThread extends Thread {
        public ChangeObjectThread(String name){
            super.setName(name);
        }
        @Override
        public void run() {
            synchronized (u) {
                System.out.println("in "+getName());
                LockSupport.park();
                if(Thread.interrupted()){                        //检测到中断位,并清除中断状态
                    System.out.println(getName()+" 被中断了");
                }
                if (Thread.currentThread().isInterrupted()){    //中断状态已被清除,无法检测到
                    System.out.println(1);
                }
            }
            System.out.println(getName()+"执行结束");
        }
    }
    public static void main(String[] args) throws InterruptedException {
        t1.start();
        Thread.sleep(100);
        t2.start();
        t1.interrupt();
        LockSupport.unpark(t2);
    }
}
//输出:
//in t1
//t1 被中断了
//t1执行结束
//in t2
//t2执行结束

Guava和Limiter限流
限流算法一般有两种:漏桶算法和令牌桶算法
漏桶算法: 利用缓存区,所有请求进入系统,都在缓存区中保存,然后以固定的流速流出缓存区进行处理.
令牌桶算法: 桶中存放令牌,每个请求拿到令牌后才能进行处理,如果没有令牌,请求要么等待,要么丢弃.RateLimiter就是采用这种算法

public class RateLimiterDemo {
    static RateLimiter limiter = RateLimiter.create(2);        //每秒处理2个请求
    public static class Task implements Runnable {
        @Override
        public void run() {
            System.out.println(System.currentTimeMillis());
        }
    }
    public static void main(String args[]) throws InterruptedException {
        for (int i = 0; i < 50; i++) {
            limiter.acquire();            //过剩流量会等待
            new Thread(new Task()).start();
        }
    }
}
//  某些场景倾向于丢弃过剩流量,tryAcquire则是立即返回,不会阻塞
//        for (int i = 0; i < 50; i++) {        
//            if(!limiter.tryAcquire()) {
//                continue;
//            }
//            new Thread(new Task()).start();
//        }
2. 线程池

Executors框架

Executor框架提供了各种类型的线程池,主要有以下工厂方法:

//固定线程数量,当有新任务提交时,若池中有空闲线程则立即执行,若没有空闲线程,任务会被暂存在一个任务队列中,直到有空闲线程
public static ExecutorService newFixedThreadPool(int nThreads)
//返回只有一个线程的线程池,多余任务被保存到一个任务队列中,线程空闲时,按先入先出的顺序执行队列中的任务
public static ExecutorService newSingleThreadPoolExecutor()
//线程数量不固定,优先使用空闲线程,多余任务会创建新线程
public static ExecutorService newCachedThreadPool()
//线程数量为1,给定时间执行某任务,或周期性执行任务
public static ScheduledExecutorService newSingleThreadScheduledExecutor()
//线程数量可以指定,定时或周期性执行任务
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)

计划任务:newScheduledThreadPool主要方法

//给定时间,对任务进行一次调度
public ScheduledFuture schedule(Runnable command,long delay, TimeUnit unit);
//周期调度,以任务完成后间隔固定时间调度下一个任务,(两者相加)
public ScheduledFuture scheduleWithFixedDelay(Runnable command,long initialDelay,
                                                     long delay,
                                                     TimeUnit unit);
//周期调度,两个任务开始的时间差为固定间隔,如果任务时间大于间隔时间则以任务时间为准(两者取其大者)
public ScheduledFuture scheduleAtFixedRate(Runnable command,long initialDelay,
                                                  long period,
                                                  TimeUnit unit);
注意:
任务异常时后续所有任务都将停止调度,因此必须保证所有任务异常均被正常处理.

核心线程池 ThreadPoolExecutor

ThreadPoolExecutor构造函数:

 public ThreadPoolExecutor(int corePoolSize,         //核心线程池大小
                              int maximumPoolSize,  //最大线程池大小
                              long keepAliveTime,  //线程池中超过corePoolSize数目的空闲线程最大存活时间;可以allowCoreThreadTimeOut(true)使得核心线程有效时间
                              TimeUnit unit,  //keepAliveTime时间单位
                              BlockingQueue workQueue, //阻塞任务队列
                              ThreadFactory threadFactory,  //新建线程工厂
                              RejectedExecutionHandler handler ) //当提交任务数超过maxmumPoolSize+workQueue之和时,任务会交给RejectedExecutionHandler来处理

workQueue指被提交但是未执行的任务队列,是BlockingQueue接口的对象
1.直接提交队列:SynchronousQueue,该队列没有容量,每个插入操作对应一个删除操作,即提交的任务总是会交给线程执行,如果没有空闲进程,则创建新线程,数量达最大则执行拒绝策略,一般需要设置很大的maximumPoolSize
2.有界任务队列:ArrayBlockingQueue,有新任务时,若线程池的实际线程数小于corePoolSize,优先创建新线程,若大于corePoolSize,加入到等待队列,若队列已满,不大于maximumPoolSize前提下,创建新线程执行;当且仅当等待队列满时才会创建新线程,否则数量一直维持在corePoolSize
3.无界任务队列:LinkedBlockingQueue,小于corePoolSize时创建线程,达到corePoolSize则加入队列直到资源消耗殆尽
4.优先任务队列:PriorityBlockingQueue,特殊无界队列,总是保证高优先级的任务先执行.

Executors分析
newFixedThreadPool: corePoolSize=maximumPoolSize,线程不会超过corePoolSize,使用LinkedBlockingQueue
newSingleThreadPoolExecutor: newFixedThreadPool的弱化版,corePoolSize只有1
newCachedThreadPool: corePoolSize=0,maximumPoolSize为无穷大,空闲线程60秒回收,使用SynchronousQueue队列

ThreadPoolExecutor的execute()方法执行逻辑

    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn"t, by returning false.
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         */
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {    //检查是否小于corePoolSize
            if (addWorker(command, true))    //添加线程,执行任务
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {    //添加进队列
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))   //双重校验
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))    //提交线程池失败
            reject(command);                    //拒绝执行
    }

拒绝策略
AbortPolicy:该策略会直接抛出异常,阻止系统正常工作。
CallerRunsPolicy:只要线程池未关闭,该策略直接在调用者线程中,运行当前被丢弃的任务。显然这样做不会真的丢弃任务,但是,任务提交线程的性能极有可 能会急剧下降。 任务,并尝试再次提交当前任务。
DiscardOldestPolicy:该策略将丢弃最老的一个请求,也就是即将被执行的一个
DiscardPolicy:该策略默默地丢弃无法处理的任务,不予任何处理。如果允许任务丢失,我觉得这可能是最好的一种方案了吧!

自定义ThreadFactory

    public static void main(String[] args) throws InterruptedException {
        MyTask task = new MyTask();
        ExecutorService es = new ThreadPoolExecutor(5, 5,
                0L, TimeUnit.MILLISECONDS,
                new SynchronousQueue(),
                new ThreadFactory(){
                    @Override
                    public Thread newThread(Runnable r) {    //自定义创建线程的方法
                        Thread t= new Thread(r);
                        t.setDaemon(true);
                        System.out.println("create "+t);
                        return t;
                    }
                }
               );
        for (int i = 0; i < 5; i++) {
            es.submit(task);
        }
        Thread.sleep(2000);
    }

扩展线程池

public class ExtThreadPool {
    public static class MyTask implements Runnable {
        public String name;
        public MyTask(String name) {
            this.name = name;
        }
        @Override
        public void run() {
            System.out.println("正在执行" + ":Thread ID:" + Thread.currentThread().getId()
                    + ",Task Name=" + name);
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    public static void main(String[] args) throws InterruptedException {
        ExecutorService es = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue()) {
            @Override
            protected void beforeExecute(Thread t, Runnable r) {
                System.out.println("准备执行:" + ((MyTask) r).name);
            }
            @Override
            protected void afterExecute(Runnable r, Throwable t) {
                System.out.println("执行完成:" + ((MyTask) r).name);
            }
            @Override
            protected void terminated() {
                System.out.println("线程池退出");
            }
        };
        for (int i = 0; i < 5; i++) {
            MyTask task = new MyTask("TASK-GEYM-" + i);
            es.execute(task);
            Thread.sleep(10);
        }
        es.shutdown();               //等待所有任务执行完毕后再关闭
    }
}

异常堆栈消息

线程池中的异常堆栈可能不会抛出,需要我们自己去包装

public class TraceThreadPoolExecutor extends ThreadPoolExecutor {
    public TraceThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
            long keepAliveTime, TimeUnit unit, BlockingQueue workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }
    @Override
    public void execute(Runnable task) {
        super.execute(wrap(task, clientTrace(), Thread.currentThread()
                .getName()));
    }
    @Override
    public Future submit(Runnable task) {
        return super.submit(wrap(task, clientTrace(), Thread.currentThread()
                .getName()));
    }
    private Exception clientTrace() {
        return new Exception("Client stack trace");
    }
    private Runnable wrap(final Runnable task, final Exception clientStack,
            String clientThreadName) {
        return new Runnable() {
            @Override
            public void run() {
                try {
                    task.run();                //外层包裹trycatch,即可打印出异常
                } catch (Exception e) {
                    clientStack.printStackTrace();
                    throw e;
                }
            }
        };
    }
}

Fork/Join框架

类似于mapreduce,用于大数据量,fork()创造子线程,join表示等待,

public class CountTask extends RecursiveTask{
    private static final int THRESHOLD = 10000;        //任务分解规模
    private long start;
    private long end;
    public CountTask(long start,long end){
        this.start=start;
        this.end=end;
    }
    @Override
    public Long compute(){
        long sum=0;
        boolean canCompute = (end-start) subTasks=new ArrayList();
            long pos=start;
            for(int i=0;i<100;i++){
                long lastOne=pos+step;
                if(lastOne>end)lastOne=end;     //最后一个任务可能小于step,故需要此步
                CountTask subTask=new CountTask(pos,lastOne);   //子任务
                pos+=step+1;        //调整下一个任务
                subTasks.add(subTask);
                subTask.fork();     //fork子任务
            }
            for(CountTask  t:subTasks){
                sum+=t.join();      //聚合任务
            }
        }
        return sum;
    }
    public static void main(String[]args){
        ForkJoinPool forkJoinPool = new ForkJoinPool();
        CountTask task = new CountTask(0,200000000000L);
        ForkJoinTask result = forkJoinPool.submit(task);
        try{
            long res = result.get();
            System.out.println("sum="+res);
        }catch(InterruptedException e){
            e.printStackTrace();
        }catch(ExecutionException e){
            e.printStackTrace();
        }
    }
}
注意:
如果任务的划分层次很多,一直得不到返回,可能有两种原因:
1.系统内线程数量越积越多,导致性能严重下降
2.函数调用层次变多,导致栈溢出

Guava对线程池的拓展

1.特殊的DirectExecutor线程池

Executor executor=MoreExecutors.directExecutor();    // 仅在当前线程运行,用于抽象

2.Daemon线程池
提供将普通线程转换为Daemon线程.很多情况下,我们不希望后台线程池阻止程序的退出

public class MoreExecutorsDemo2 {
    public static void main(String[] args) {
        ThreadPoolExecutor exceutor = (ThreadPoolExecutor)Executors.newFixedThreadPool(2);
        MoreExecutors.getExitingExecutorService(exceutor);
        exceutor.execute(() -> System.out.println("I am running in " + Thread.currentThread().getName()));
    }
}

3.future模式扩展
待续....

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/75977.html

相关文章

  • 实战java并发程序设计第三(二)

    摘要:的并发容器并发集合这是一个高效的并发你可以把它理解为一个线程安全的。可以看作一个线程安全的这是一个接口,内部通过链表数组等方式实现了这个接口。 3. JDK的并发容器 并发集合 ConcurrentHashMap:这是一个高效的并发HashMap.你可以把它理解为一个线程安全的HashMap。 CopyOnWriteArrayList:这是一个List,从名字看就知道它和Ar...

    Sike 评论0 收藏0
  • 从小白程序路晋升为大厂级技术专家我看过哪些书籍?(建议收藏)

    摘要:大家好,我是冰河有句话叫做投资啥都不如投资自己的回报率高。马上就十一国庆假期了,给小伙伴们分享下,从小白程序员到大厂高级技术专家我看过哪些技术类书籍。 大家好,我是...

    sf_wangchong 评论0 收藏0
  • 实战Java并发程序设计5】让普通变量也享受原子操作

    摘要:有时候,由于初期考虑不周,或者后期的需求变化,一些普通变量可能也会有线程安全的需求。它可以让你在不改动或者极少改动原有代码的基础上,让普通的变量也享受操作带来的线程安全性,这样你可以修改极少的代码,来获得线程安全的保证。 有时候,由于初期考虑不周,或者后期的需求变化,一些普通变量可能也会有线程安全的需求。如果改动不大,我们可以简单地修改程序中每一个使用或者读取这个变量的地方。但显然,这...

    appetizerio 评论0 收藏0
  • Java学习必备书籍推荐终极版!

    摘要:实战高并发程序设计推荐豆瓣评分书的质量没的说,推荐大家好好看一下。推荐,豆瓣评分,人评价本书介绍了在编程中条极具实用价值的经验规则,这些经验规则涵盖了大多数开发人员每天所面临的问题的解决方案。 很早就想把JavaGuide的书单更新一下了,昨晚加今天早上花了几个时间对之前的书单进行了分类和补充完善。虽是终极版,但一定还有很多不错的 Java 书籍我没有添加进去,会继续完善下去。希望这篇...

    Steve_Wang_ 评论0 收藏0

发表评论

0条评论

joyvw

|高级讲师

TA的文章

阅读更多
最新活动
阅读需要支付1元查看
<