资讯专栏INFORMATION COLUMN

Java 8 并发: 原子变量和 ConcurrentMap

yy13818512006 / 3569人阅读

摘要:在有些情况下,原子操作可以在不使用关键字和锁的情况下解决多线程安全问题。但其内部的结果不是一个单一的值这个类的内部维护了一组变量来减少多线程的争用。当来自多线程的更新比读取更频繁时这个类往往优于其他的原子类。

原文地址: Java 8 Concurrency Tutorial: Atomic Variables and ConcurrentMap
AtomicInteger

java.concurrent.atomic 包下有很多原子操作的类。 在有些情况下,原子操作可以在不使用 synchronized 关键字和锁的情况下解决多线程安全问题。

在内部,原子类大量使用 CAS, 这是大多数现在 CPU 支持的原子操作指令, 这些指令通常情况下比锁同步要快得多。如果需要同时改变一个变量, 使用原子类是极其优雅的。

现在选择一个原子类 AtomicInteger 作为例子

AtomicInteger atomicInt = new AtomicInteger(0);

ExecutorService executor = Executors.newFixedThreadPool(2);

IntStream.range(0, 1000)
    .forEach(i -> executor.submit(atomicInt::incrementAndGet));

stop(executor);

System.out.println(atomicInt.get());    // => 1000

使用 AtomicInteger 代替 Integer 可以在线程安全的环境中增加变量, 而不要同步访问变量。incrementAndGet() 方法是一个原子操作, 我们可以在多线程中安全的调用。

AtomicInteger 支持多种的原子操作, updateAndGet() 方法接受一个 lambda 表达式,以便对整数做任何的算术运算。

AtomicInteger atomicInt = new AtomicInteger(0);

ExecutorService executor = Executors.newFixedThreadPool(2);

IntStream.range(0, 1000)
    .forEach(i -> {
        Runnable task = () ->
            atomicInt.updateAndGet(n -> n + 2);
        executor.submit(task);
    });

stop(executor);

System.out.println(atomicInt.get());    // => 2000

accumulateAndGet() 方法接受一个 IntBinaryOperator 类型的另一种 lambda 表达式, 我们是用这种方法来计算 1 -- 999 的和:

AtomicInteger atomicInt = new AtomicInteger(0);

ExecutorService executor = Executors.newFixedThreadPool(2);

IntStream.range(0, 1000)
    .forEach(i -> {
        Runnable task = () ->
            atomicInt.accumulateAndGet(i, (n, m) -> n + m);
        executor.submit(task);
    });

stop(executor);

System.out.println(atomicInt.get());    // => 499500

还有一些其他的原子操作类: AtomicBoolean AtomicLong AtomicReference

LongAdder

作为 AtomicLong 的替代, LongAdder 类可以用来连续地向数字添加值。

ExecutorService executor = Executors.newFixedThreadPool(2);

IntStream.range(0, 1000)
    .forEach(i -> executor.submit(adder::increment));

stop(executor);

System.out.println(adder.sumThenReset());   // => 1000

LongAdder 类和其他的整数原子操作类一样提供了 add()increment() 方法, 同时也是线程安全的。但其内部的结果不是一个单一的值, 这个类的内部维护了一组变量来减少多线程的争用。实际结果可以通过调用 sum()sumThenReset() 来获取。

当来自多线程的更新比读取更频繁时, 这个类往往优于其他的原子类。通常作为统计数据, 比如要统计 web 服务器的请求数量。 LongAdder 的缺点是会消耗更多的内存, 因为有一组变量保存在内存中。

LongAccumulator

LongAccumulatorLongAdder 的一个更通用的版本。它不是执行简单的添加操作, 类 LongAccumulator 围绕 LongBinaryOperator 类型的lambda表达式构建,如代码示例中所示:

LongBinaryOperator op = (x, y) -> 2 * x + y;
LongAccumulator accumulator = new LongAccumulator(op, 1L);

ExecutorService executor = Executors.newFixedThreadPool(2);

IntStream.range(0, 10)
    .forEach(i -> executor.submit(() -> accumulator.accumulate(i)));

stop(executor);

System.out.println(accumulator.getThenReset());     // => 2539

我们使用函数 2 * x + y 和初始值1创建一个 LongAccumulator。 每次调用 accumulate(i) , 当前结果和值i都作为参数传递给`lambda 表达式。

LongAdder 一样, LongAccumulator 在内部维护一组变量以减少对线程的争用。

ConcurrentMap

ConcurrentMap 接口扩展了 Map 接口,并定义了最有用的并发集合类型之一。 Java 8 通过向此接口添加新方法引入了函数式编程。

在下面的代码片段中, 来演示这些新的方法:

ConcurrentMap map = new ConcurrentHashMap<>();
map.put("foo", "bar");
map.put("han", "solo");
map.put("r2", "d2");
map.put("c3", "p0");

forEach() 接受一个类型为 BiConsumerlambda 表达式, 并将 mapkeyvalue 作为参数传递。

map.forEach((key, value) -> System.out.printf("%s = %s
", key, value));

putIfAbsent() 方法只有当给定的 key 不存在时才将数据存入 map 中, 这个方法和 put 一样是线程安全的, 当多个线程访问 map 时不要做同步操作。

String value = map.putIfAbsent("c3", "p1");
System.out.println(value);    // p0

getOrDefault() 方法返回给定 keyvalue, 当 key 不存在时返回给定的值。

String value = map.getOrDefault("hi", "there");
System.out.println(value);    // there

replaceAll() 方法接受一个 BiFunction 类型的 lambda 表达式, 并将 keyvalue 作为参数传递,用来更新 value

map.replaceAll((key, value) -> "r2".equals(key) ? "d3" : value);
System.out.println(map.get("r2"));    // d3

compute() 方法和 replaceAll() 方法有些相同, 不同的是它多一个参数, 用来更新指定 keyvalue

map.compute("foo", (key, value) -> value + value);
System.out.println(map.get("foo"));   // barbar
ConcurrentHashMap

以上所有方法都是 ConcurrentMap 接口的一部分,因此可用于该接口的所有实现。 此外,最重要的实现 ConcurrentHashMap 已经进一步增强了一些新的方法来在 Map 上执行并发操作。

就像并行流一样,这些方法在 Java 8 中通过 ForkJoinPool.commonPool()提供特殊的 ForkJoinPool 。该池使用预设的并行性, 这取决于可用内核的数量。 我的机器上有四个CPU内核可以实现三种并行性:

System.out.println(ForkJoinPool.getCommonPoolParallelism());  // 3

通过设置以下 JVM 参数可以减少或增加此值:

-Djava.util.concurrent.ForkJoinPool.common.parallelism=5

我们使用相同的示例来演示, 不过下面使用 ConcurrentHashMap 类型, 这样可以调用更多的方法。

ConcurrentHashMap map = new ConcurrentHashMap<>();
map.put("foo", "bar");
map.put("han", "solo");
map.put("r2", "d2");
map.put("c3", "p0");

Java 8 引入了三种并行操作:forEach, searchreduce。 每个操作都有四种形式, 分别用 key, value, entrieskey-value 来作为参数。

所有这些方法的第一个参数都是 parallelismThreshold 阀值。 该阈值表示操作并行执行时的最小收集大小。 例如, 如果传递的阈值为500,并且 map 的实际大小为499, 则操作将在单个线程上按顺序执行。 在下面的例子中,我们使用一个阈值来强制并行操作。

ForEach

方法 forEach() 能够并行地迭代 map 的键值对。 BiConsumer 类型的 lambda 表达式接受当前迭代的 keyvalue。 为了可视化并行执行,我们将当前线程名称打印到控制台。 请记住,在我的情况下,底层的 ForkJoinPool 最多使用三个线程。

map.forEach(1, (key, value) ->
    System.out.printf("key: %s; value: %s; thread: %s
",
        key, value, Thread.currentThread().getName()));

// key: r2; value: d2; thread: main
// key: foo; value: bar; thread: ForkJoinPool.commonPool-worker-1
// key: han; value: solo; thread: ForkJoinPool.commonPool-worker-2
// key: c3; value: p0; thread: main
Search

search() 方法接受一个 BiFunction 类型的 lambda 表达式, 它能对 map 做搜索操作, 如果当前迭代不符合所需的搜索条件,则返回 null。 请记住,ConcurrentHashMap 是无序的。 搜索功能不应该取决于地图的实际处理顺序。 如果有多个匹配结果, 则结果可能是不确定的。

String result = map.search(1, (key, value) -> {
    System.out.println(Thread.currentThread().getName());
    if ("foo".equals(key)) {
        return value;
    }
    return null;
});
System.out.println("Result: " + result);

// ForkJoinPool.commonPool-worker-2
// main
// ForkJoinPool.commonPool-worker-3
// Result: bar

下面是对 value 的搜索

String result = map.searchValues(1, value -> {
    System.out.println(Thread.currentThread().getName());
    if (value.length() > 3) {
        return value;
    }
    return null;
});

System.out.println("Result: " + result);

// ForkJoinPool.commonPool-worker-2
// main
// main
// ForkJoinPool.commonPool-worker-1
// Result: solo
Reduce

reduce() 方法接受两个类型为 BiFunctionlambda 表达式。 第一个函数将每个键值对转换为任何类型的单个值。 第二个函数将所有这些转换后的值组合成一个结果, 其中火忽略 null 值。

String result = map.reduce(1,
    (key, value) -> {
        System.out.println("Transform: " + Thread.currentThread().getName());
        return key + "=" + value;
    },
    (s1, s2) -> {
        System.out.println("Reduce: " + Thread.currentThread().getName());
        return s1 + ", " + s2;
    });

System.out.println("Result: " + result);

// Transform: ForkJoinPool.commonPool-worker-2
// Transform: main
// Transform: ForkJoinPool.commonPool-worker-3
// Reduce: ForkJoinPool.commonPool-worker-3
// Transform: main
// Reduce: main
// Reduce: main
// Result: r2=d2, c3=p0, han=solo, foo=bar

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/68434.html

相关文章

  • Java 8 并发教程:原子变量 ConcurrentMa

    摘要:并发教程原子变量和原文译者飞龙协议欢迎阅读我的多线程编程系列教程的第三部分。如果你能够在多线程中同时且安全地执行某个操作,而不需要关键字或上一章中的锁,那么这个操作就是原子的。当多线程的更新比读取更频繁时,这个类通常比原子数值类性能更好。 Java 8 并发教程:原子变量和 ConcurrentMap 原文:Java 8 Concurrency Tutorial: Synchroni...

    bitkylin 评论0 收藏0
  • Java™ 教程(高级并发对象)

    高级并发对象 到目前为止,本课程重点关注从一开始就是Java平台一部分的低级别API,这些API适用于非常基础的任务,但更高级的任务需要更高级别的构建块,对于充分利用当今多处理器和多核系统的大规模并发应用程序尤其如此。 在本节中,我们将介绍Java平台5.0版中引入的一些高级并发功能,大多数这些功能都在新的java.util.concurrent包中实现,Java集合框架中还有新的并发数据结构。 ...

    xiaotianyi 评论0 收藏0
  • Java 8 并发教程:同步

    摘要:在接下来的分钟,你将会学会如何通过同步关键字,锁和信号量来同步访问共享可变变量。所以在使用乐观锁时,你需要每次在访问任何共享可变变量之后都要检查锁,来确保读锁仍然有效。 原文:Java 8 Concurrency Tutorial: Synchronization and Locks译者:飞龙 协议:CC BY-NC-SA 4.0 欢迎阅读我的Java8并发教程的第二部分。这份指南将...

    wyk1184 评论0 收藏0
  • Java 并发方案全面学习总结

    摘要:进程线程与协程它们都是并行机制的解决方案。选择是任意性的,并在对实现做出决定时发生。线程池的大小一旦达到最大值就会保持不变,如果某个线程因为执行异常而结束,那么线程池会补充一个新线程。此线程池支持定时以及周期性执行任务的需求。 并发与并行的概念 并发(Concurrency): 问题域中的概念—— 程序需要被设计成能够处理多个同时(或者几乎同时)发生的事件 并行(Parallel...

    mengera88 评论0 收藏0
  • Java 8 并发教程:线程执行器

    摘要:在这个示例中我们使用了一个单线程线程池的。在延迟消逝后,任务将会并发执行。这是并发系列教程的第一部分。第一部分线程和执行器第二部分同步和锁第三部分原子操作和 Java 8 并发教程:线程和执行器 原文:Java 8 Concurrency Tutorial: Threads and Executors 译者:BlankKelly 来源:Java8并发教程:Threads和Execut...

    jsdt 评论0 收藏0

发表评论

0条评论

最新活动
阅读需要支付1元查看
<