资讯专栏INFORMATION COLUMN

java 对线程安全支持有哪些?

sewerganger / 3243人阅读

摘要:它能阻塞一组线程直到某个事件发生。与闭锁的区别所有线程必须同时到达栅栏位置,才能继续执行。闭锁用于等待事件,而栅栏用于等待其它线程。闭锁一旦进入终止状态,就不能被重置,它是一次性对象,而栅栏可以重置。

同步容器。它的原理是将状态封装起来,并对每个公有方法都实行同步,使得每次只有1个线程能够访问容器的状态。

Vector和HashTable

Collections.synchronizedXXX方法

同步容器的问题

这种方式使得对容器的访问都串行化,严重降低了并发性,如果多个线程来竞争容器的锁时,吞吐量严重降低

对容器的多个方法的复合操作,是线程不安全的,比如一个线程负责删除,另一个线程负责查询,有可能出现越界的异常

并发容器。java.util.concurrent包里面的一系列实现

Concurrent开头系列。以ConcurrentHashMap为例,它的实现原理为分段锁。默认情况下有16个,每个锁守护1/16的散列数据,这样保证了并发量能达到16

分段锁缺陷在于虽然一般情况下只要一个锁,但是遇到需要扩容等类似的事情,只能去获取所有的锁

ConcurrentHashMap一些问题

需要对整个容器中的内容进行计算的方法,比如size、isEmpty、contains等等。由于并发的存在,在计算的过程中可能已进过期了,它实际上就是个估计值,但是在并发的场景下,需要使用的场景是很少的。
以ConcurrentHashMap的size方法为例:

/**
    * Returns the number of key-value mappings in this map.  If the
    * map contains more than Integer.MAX_VALUE elements, returns
    * Integer.MAX_VALUE.
    *
    * @return the number of key-value mappings in this map
    */
   public int size() {
       //为了能够算准数量,会算2次,如果两次算的不准,就锁住再算
       final Segment[] segments = this.segments;
       int size;
       boolean overflow; // true if size overflows 32 bits
       long sum;         // sum of modCounts
       long last = 0L;   // previous sum
       int retries = -1; // 第一轮的计算总数不重试
       try {
           for (;;) {
               if (retries++ == RETRIES_BEFORE_LOCK) {
               //RETRIES_BEFORE_LOCK 默认是2
                   for (int j = 0; j < segments.length; ++j)
                       ensureSegment(j).lock(); // force creation
               }
               sum = 0L;
               size = 0;
               overflow = false;
               for (int j = 0; j < segments.length; ++j) {
                   Segment seg = segmentAt(segments, j);
                   if (seg != null) {
                       sum += seg.modCount;
                       int c = seg.count;
                       if (c < 0 || (size += c) < 0)
                           overflow = true;
                   }
               }
               //第一次计算的时候
               if (sum == last)
                   break; //如果前后两次数数一致,就认为已经算好了
               last = sum;
           }
       } finally {
           if (retries > RETRIES_BEFORE_LOCK) {
               for (int j = 0; j < segments.length; ++j)
                   segmentAt(segments, j).unlock();
           }
       }
       return overflow ? Integer.MAX_VALUE : size;
   }

不能提供线程独占的功能

CopyOnWrite系列。以CopyOnWriteArrayList为例,只在每次修改的时候,进行加锁控制,修改会创建并重新发布一个新的容器副本,其它时候由于都是事实上不可变的,也就不会出现线程安全问题

CopyOnWrite的问题

每次修改都复制底层数组,存在开销,因此使用场景一般是迭代操作远多于修改操作

CopyOnWriteArrayList的读写示例
/**
   * Appends the specified element to the end of this list.
    *
   * @param e element to be appended to this list
  * @return true (as specified by {@link Collection#add})
 */
public boolean add(E e) {
       final ReentrantLock lock = this.lock;
      lock.lock();
     try {
        Object[] elements = getArray();
       int len = elements.length;
      Object[] newElements = Arrays.copyOf(elements, len + 1);
     newElements[len] = e;
    setArray(newElements);
   return true;
} finally {
  lock.unlock();
}
}
       /**
      * {@inheritDoc}
     *
     * @throws IndexOutOfBoundsException {@inheritDoc}
     */
    public E get(int index) {
        return get(getArray(), index);
    }
    /**
   * Gets the array.  Non-private so as to also be accessible
   * from CopyOnWriteArraySet class.
   */
    final Object[] getArray() {
       return array;
    }
    private E get(Object[] a, int index) {
        return (E) a[index];
     }

java中的同步工具类

阻塞队列,BlockingQueue。它提供了put和take方法,在队列不满足各自条件时将产生阻塞

BlockingQueue使用示例,生产者-消费者

public static void main(String[] args) throws Exception {
       BlockingQueue queue = new ArrayBlockingQueue(1024);
       Producer producer = new Producer(queue);
       Consumer consumer = new Consumer(queue);
       new Thread(producer).start();
       new Thread(consumer).start();
   }
}
public class Producer implements Runnable{
   protected BlockingQueue queue = null;

   public Producer(BlockingQueue queue) {
       this.queue = queue;
   }
   
   public void run() {
       try {
           queue.put("1");
           Thread.sleep(1000);
           queue.put("2");
           Thread.sleep(2000);
           queue.put("3");
       } catch (InterruptedException e) {
           e.printStackTrace();
       }
   }
}
public class Consumer implements Runnable{
   
   protected BlockingQueue queue = null;
   
   public Consumer(BlockingQueue queue) {
       this.queue = queue;
   }
   
   public void run() {
       try {
           System.out.println(queue.take());
           System.out.println("Wait 1 sec");
           System.out.println(queue.take());
           System.out.println("Wait 2 sec");
           System.out.println(queue.take());
       } catch (InterruptedException e) {
           e.printStackTrace();
       }
   }
}

输出为

1
Wait 1 sec
2
Wait 2 sec
3

闭锁

CountDownLatch。使多个线程等待一组事件发生,它包含一个计数器,表示需要等待的事件的数量,每发生一个事,就递减一次,当减为0时,所有事情发生,允许“通行”

CountDownLatch示例:

public class TestHarness{
   public long timeTasks(int nThreads,final Runnable task) throws InterruptedException {
   final CountDownLatch startGate = new CountDownLatch(1);
   final CountDownLatch endGate = new CountDownLatch(nThreads);
   for (int i=0;i
启动门使主线程能够同时释放所有的工作线程,结束门使得主线程能够等待最后一个线程执行完
- FutureTask。Future.get的如果任务执行完成,则立即返回,否则将阻塞直到任务完结,再返回结果或者是抛出异常

信号量,Semaphore 。它管理着一组虚拟的许可,许可的数量可通过构造函数指定,在执行操作时首先获得许可,并在使用后释放许可,如果没有,那么accquire将阻塞直到有许可。

Semaphore示例

public class BoundedHashSet{
   private final Set set;
   private final Semaphore sem;

   public BoundedHashSet(int bound) {
       this.set = Collections.synchronizedSet(new HashSet());
       this.sem = new Semaphore(bound);
   }
   public boolean add(T o) throws InterruptedException {
       sem.acquire();
       boolean wasAdded = false;
       try {
           wasAdded = set.add(o);
          return wasAdded;
       }finally {
           if (!wasAdded){
               sem.release();
           }
       }
   }
   public boolean remove(Object o){
       boolean wasRemoved = set.remove(o);
       if(wasRemoved){
          sem.release();
       }
       return wasRemoved;
           
   }
}

栅栏。它能阻塞一组线程直到某个事件发生。
与闭锁的区别:

所有线程必须同时到达栅栏位置,才能继续执行。闭锁用于等待事件,而栅栏用于等待其它线程。

闭锁一旦进入终止状态,就不能被重置,它是一次性对象,而栅栏可以重置

CyclicBarrier。可以使一定数量的参与方反复地在栅栏位置汇集

CyclicBarrier使用示例

public static void main(String[] args) {
//第k步执行完才能执行第k+1步
       CyclicBarrier barrier = new CyclicBarrier(3,new StageKPlusOne());
       StageK[] stageKs = new StageK[3];
       for (int i=0;i<3;i++){
           stageKs[i] = new StageK(barrier,"k part "+(i+1));
       }
       for (int i=0;i<3;i++){
           new Thread(stageKs[i]).start();
       }
}    
class StageKPlusOne implements Runnable{
   @Override
   public void run() {
       System.out.println("stage k over");
       System.out.println("stage k+1 start counting");
   }
}
class StageK implements Runnable{
   private CyclicBarrier barrier;
   private String stage;
   
   public StageK(CyclicBarrier barrier, String stage) {
       this.barrier = barrier;
       this.stage = stage;
   }
   
   @Override
   public void run() {
       System.out.println("stage "+stage+" counting...");
       try {
           TimeUnit.MILLISECONDS.sleep(500);
       } catch (InterruptedException e) {
           e.printStackTrace();
       }
       System.out.println("stage "+stage+" count over");
       try {
           barrier.await();
       } catch (InterruptedException e) {
           e.printStackTrace();
       } catch (BrokenBarrierException e) {
           e.printStackTrace();
       }
   }
}

输出为

stage k part 1 counting...
stage k part 3 counting...
stage k part 2 counting...
stage k part 2 count over
stage k part 3 count over
stage k part 1 count over
stage k over
stage k+1 start counting

Exchanger。它是一种两方栅栏,各方在栅栏位置交换数据

Exchanger 使用示例:
public static void main(String[] args) {
       Exchanger exchanger = new Exchanger();
        ExchangerRunnable er1 = new ExchangerRunnable(exchanger,"1");
        ExchangerRunnable er2 = new ExchangerRunnable(exchanger,"2");
        new Thread(er1).start();
        new Thread(er2).start();
    
    }
    class ExchangerRunnable implements Runnable{
    
    private Exchanger e;
    private Object o;

    public ExchangerRunnable(Exchanger e, Object o) {
       this.e = e;
        this.o = o;
}
   
    @Override
    public void run() {
       Object pre=o;
        try {
            o=e.exchange(o);
            System.out.println("pre:"+pre+" now:"+o);
        } catch (InterruptedException e1) {
            e1.printStackTrace();
        }
    }
}

输出如下

pre:1 now:2
pre:2 now:1

附录

案例

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/72396.html

相关文章

  • 超实用百道Java面试题

    摘要:是的简称,运行环境,为的运行提供了所需的环境。分割字符串,返回分割后的字符串数组。当计算的值相同时,我们称之为冲突,的做法是用链表和红黑树存储相同的值的。迭代器取代了集合框架中的,迭代器允许调用者在迭代过程中移除元素。 Java基础1.JDK和JRE有什么区别? JDK 是java development kit的简称,java开发工具包,提供java的开发环境和运行环境。JRE 是j...

    MkkHou 评论0 收藏0
  • Java 最常见 200+ 面试题全解析:面试必备(附答案)

    摘要:的简称,运行环境,为的运行提供了所需环境。分割字符串,返回一个分割后的字符串数组。线程安全是线程安全的,而是非线程安全的。迭代器取代了集合框架中的,迭代器允许调用者在迭代过程中移除元素。 本文分为十九个模块,分别是: Java 基础、容器、多线程、反射、对象拷贝、Java Web 、异常、网络、设计模式、Spring/Spring MVC、Spring Boot/Spring Clou...

    hufeng 评论0 收藏0
  • Java面试题

    摘要:近段时间在准备实习的面试,在网上看到一份面试题,就慢慢试着做,争取每天积累一点点。现在每天给自己在面试题编写的任务是题,有时候忙起来可能就没有时间写了,但是争取日更,即使当天没更也会在之后的更新补上。     近段时间在准备实习的面试,在网上看到一份面试题,就慢慢试着做,争取每天积累一点点。    暂时手头上的面试题只有一份,题量还是挺大的,有208题,所以可能讲的不是很详细,只是我自...

    OnlyMyRailgun 评论0 收藏0
  • 假如我是面试官,我会这样虐你

    摘要:又是金三银四的时候,我希望这份面试题能够祝你一臂之力自我和项目相关自我介绍你觉得自己的优点是你觉得自己有啥缺点你有哪些你为什么要离开上家公司你上家公司在,我们公司在,离这么远为什么要选择我们这里上家公司的同事和领导是怎么评价你的介绍下你的上 又是金三银四的时候,我希望这份面试题能够祝你一臂之力! 自我和项目相关 1、自我介绍 2、你觉得自己的优点是?你觉得自己有啥缺点? 3、你有哪些 ...

    Benedict Evans 评论0 收藏0
  • 过滤器监听器面试题都在这里

    摘要:中的异步处理指的是什么中的异步处理指的是什么答在中引入了一项新的技术可以让异步处理请求。开启异步处理代码开启异步支持启动异步处理的上下文在此处添加异步处理的代码如果文章有错的地方欢迎指正,大家互相交流。 以下我是归纳的过滤器监听器知识点图: showImg(https://segmentfault.com/img/remote/1460000013263166?w=3974&h=187...

    crelaber 评论0 收藏0

发表评论

0条评论

最新活动
阅读需要支付1元查看
<