资讯专栏INFORMATION COLUMN

Java 8 并发教程:原子变量和 ConcurrentMa

bitkylin / 2324人阅读

摘要:并发教程原子变量和原文译者飞龙协议欢迎阅读我的多线程编程系列教程的第三部分。如果你能够在多线程中同时且安全地执行某个操作,而不需要关键字或上一章中的锁,那么这个操作就是原子的。当多线程的更新比读取更频繁时,这个类通常比原子数值类性能更好。

Java 8 并发教程:原子变量和 ConcurrentMap

原文:Java 8 Concurrency Tutorial: Synchronization and Locks

译者:飞龙

协议:CC BY-NC-SA 4.0

欢迎阅读我的Java8多线程编程系列教程的第三部分。这个教程包含并发API的两个重要部分:原子变量和ConcurrentMap。由于最近发布的Java8中的lambda表达式和函数式编程,二者都有了极大的改进。所有这些新特性会以一些简单易懂的代码示例来描述。希望你能喜欢。

第一部分:线程和执行器

第二部分:同步和锁

第三部分:原子变量和 ConcurrentMap

出于简单的因素,这个教程的代码示例使用了定义在这里的两个辅助函数sleep(seconds)stop(executor)

AtomicInteger

java.concurrent.atomic包包含了许多实用的类,用于执行原子操作。如果你能够在多线程中同时且安全地执行某个操作,而不需要synchronized关键字或上一章中的锁,那么这个操作就是原子的。

本质上,原子操作严重依赖于比较与交换(CAS),它是由多数现代CPU直接支持的原子指令。这些指令通常比同步块要快。所以在只需要并发修改单个可变变量的情况下,我建议你优先使用原子类,而不是上一章展示的锁。

译者注:对于其它语言,一些语言的原子操作用锁实现,而不是原子指令。

现在让我们选取一个原子类,例如AtomicInteger

AtomicInteger atomicInt = new AtomicInteger(0);

ExecutorService executor = Executors.newFixedThreadPool(2);

IntStream.range(0, 1000)
    .forEach(i -> executor.submit(atomicInt::incrementAndGet));

stop(executor);

System.out.println(atomicInt.get());    // => 1000

通过使用AtomicInteger代替Integer,我们就能线程安全地并发增加数值,而不需要同步访问变量。incrementAndGet()方法是原子操作,所以我们可以在多个线程中安全调用它。

AtomicInteger支持多种原子操作。updateAndGet()接受lambda表达式,以便在整数上执行任意操作:

AtomicInteger atomicInt = new AtomicInteger(0);

ExecutorService executor = Executors.newFixedThreadPool(2);

IntStream.range(0, 1000)
    .forEach(i -> {
        Runnable task = () ->
            atomicInt.updateAndGet(n -> n + 2);
        executor.submit(task);
    });

stop(executor);

System.out.println(atomicInt.get());    // => 2000

accumulateAndGet()方法接受另一种类型IntBinaryOperator的lambda表达式。我们在下个例子中,使用这个方法并发计算0~1000所有值的和:

AtomicInteger atomicInt = new AtomicInteger(0);

ExecutorService executor = Executors.newFixedThreadPool(2);

IntStream.range(0, 1000)
    .forEach(i -> {
        Runnable task = () ->
            atomicInt.accumulateAndGet(i, (n, m) -> n + m);
        executor.submit(task);
    });

stop(executor);

System.out.println(atomicInt.get());    // => 499500

其它实用的原子类有AtomicBooleanAtomicLongAtomicReference

LongAdder

LongAdderAtomicLong的替代,用于向某个数值连续添加值。

ExecutorService executor = Executors.newFixedThreadPool(2);

IntStream.range(0, 1000)
    .forEach(i -> executor.submit(adder::increment));

stop(executor);

System.out.println(adder.sumThenReset());   // => 1000

LongAdder提供了add()increment()方法,就像原子数值类一样,同样是线程安全的。但是这个类在内部维护一系列变量来减少线程之间的争用,而不是求和计算单一结果。实际的结果可以通过调用sum()sumThenReset()来获取。

当多线程的更新比读取更频繁时,这个类通常比原子数值类性能更好。这种情况在抓取统计数据时经常出现,例如,你希望统计Web服务器上请求的数量。LongAdder缺点是较高的内存开销,因为它在内存中储存了一系列变量。

LongAccumulator

LongAccumulatorLongAdder的更通用的版本。LongAccumulator以类型为LongBinaryOperatorlambda表达式构建,而不是仅仅执行加法操作,像这段代码展示的那样:

LongBinaryOperator op = (x, y) -> 2 * x + y;
LongAccumulator accumulator = new LongAccumulator(op, 1L);

ExecutorService executor = Executors.newFixedThreadPool(2);

IntStream.range(0, 10)
    .forEach(i -> executor.submit(() -> accumulator.accumulate(i)));

stop(executor);

System.out.println(accumulator.getThenReset());     // => 2539

我们使用函数2 * x + y创建了LongAccumulator,初始值为1。每次调用accumulate(i)的时候,当前结果和值i都会作为参数传入lambda表达式。

LongAccumulator就像LongAdder那样,在内部维护一系列变量来减少线程之间的争用。

ConcurrentMap

ConcurrentMap接口继承自Map接口,并定义了最实用的并发集合类型之一。Java8通过将新的方法添加到这个接口,引入了函数式编程。

在下面的代码中,我们使用这个映射示例来展示那些新的方法:

ConcurrentMap map = new ConcurrentHashMap<>();
map.put("foo", "bar");
map.put("han", "solo");
map.put("r2", "d2");
map.put("c3", "p0");

forEach()方法接受类型为BiConsumer的lambda表达式,以映射的键和值作为参数传递。它可以作为for-each循环的替代,来遍历并发映射中的元素。迭代在当前线程上串行执行。

map.forEach((key, value) -> System.out.printf("%s = %s
", key, value));

新方法putIfAbsent()只在提供的键不存在时,将新的值添加到映射中。至少在ConcurrentHashMap的实现中,这一方法像put()一样是线程安全的,所以你在不同线程中并发访问映射时,不需要任何同步机制。

String value = map.putIfAbsent("c3", "p1");
System.out.println(value);    // p0

getOrDefault()方法返回指定键的值。在传入的键不存在时,会返回默认值:

String value = map.getOrDefault("hi", "there");
System.out.println(value);    // there

replaceAll()接受类型为BiFunction的lambda表达式。BiFunction接受两个参数并返回一个值。函数在这里以每个元素的键和值调用,并返回要映射到当前键的新值。

map.replaceAll((key, value) -> "r2".equals(key) ? "d3" : value);
System.out.println(map.get("r2"));    // d3

compute()允许我们转换单个元素,而不是替换映射中的所有值。这个方法接受需要处理的键,和用于指定值的转换的BiFunction

map.compute("foo", (key, value) -> value + value);
System.out.println(map.get("foo"));   // barbar

除了compute()之外还有两个变体:computeIfAbsent()computeIfPresent()。这些方法的函数式参数只在键不存在或存在时被调用。

最后,merge()方法可以用于以映射中的现有值来统一新的值。这个方法接受键、需要并入现有元素的新值,以及指定两个值的合并行为的BiFunction

map.merge("foo", "boo", (oldVal, newVal) -> newVal + " was " + oldVal);
System.out.println(map.get("foo"));   // boo was foo
ConcurrentHashMap

所有这些方法都是ConcurrentMap接口的一部分,因此可在所有该接口的实现上调用。此外,最重要的实现ConcurrentHashMap使用了一些新的方法来改进,便于在映射上执行并行操作。

就像并行流那样,这些方法使用特定的ForkJoinPool,由Java8中的ForkJoinPool.commonPool()提供。该池使用了取决于可用核心数量的预置并行机制。我的电脑有四个核心可用,这会使并行性的结果为3:

System.out.println(ForkJoinPool.getCommonPoolParallelism());  // 3

这个值可以通过设置下列JVM参数来增减:

-Djava.util.concurrent.ForkJoinPool.common.parallelism=5

我们使用相同的映射示例来展示,但是这次我们使用具体的ConcurrentHashMap实现而不是ConcurrentMap接口,所以我们可以访问这个类的所有公共方法:

ConcurrentHashMap map = new ConcurrentHashMap<>();
map.put("foo", "bar");
map.put("han", "solo");
map.put("r2", "d2");
map.put("c3", "p0");

Java8引入了三种类型的并行操作:forEachsearchreduce。这些操作中每个都以四种形式提供,接受以键、值、元素或键值对为参数的函数。

所有这些方法的第一个参数是通用的parallelismThreshold。这一阈值表示操作并行执行时的最小集合大小。例如,如果你传入阈值500,而映射的实际大小是499,那么操作就会在单线程上串行执行。在下一个例子中,我们使用阈值1,始终强制并行执行来展示。

forEach

forEach()方法可以并行迭代映射中的键值对。BiConsumer以当前迭代元素的键和值调用。为了将并行执行可视化,我们向控制台打印了当前线程的名称。要注意在我这里底层的ForkJoinPool最多使用三个线程。

map.forEach(1, (key, value) ->
    System.out.printf("key: %s; value: %s; thread: %s
",
        key, value, Thread.currentThread().getName()));

// key: r2; value: d2; thread: main
// key: foo; value: bar; thread: ForkJoinPool.commonPool-worker-1
// key: han; value: solo; thread: ForkJoinPool.commonPool-worker-2
// key: c3; value: p0; thread: main
search

search()方法接受BiFunction并为当前的键值对返回一个非空的搜索结果,或者在当前迭代不匹配任何搜索条件时返回null。只要返回了非空的结果,就不会往下搜索了。要记住ConcurrentHashMap是无序的。搜索函数应该不依赖于映射实际的处理顺序。如果映射的多个元素都满足指定搜索函数,结果是非确定的。

String result = map.search(1, (key, value) -> {
    System.out.println(Thread.currentThread().getName());
    if ("foo".equals(key)) {
        return value;
    }
    return null;
});
System.out.println("Result: " + result);

// ForkJoinPool.commonPool-worker-2
// main
// ForkJoinPool.commonPool-worker-3
// Result: bar

下面是另一个例子,仅仅搜索映射中的值:

String result = map.searchValues(1, value -> {
    System.out.println(Thread.currentThread().getName());
    if (value.length() > 3) {
        return value;
    }
    return null;
});

System.out.println("Result: " + result);

// ForkJoinPool.commonPool-worker-2
// main
// main
// ForkJoinPool.commonPool-worker-1
// Result: solo
reduce

reduce()方法已经在Java 8 的数据流之中用过了,它接受两个BiFunction类型的lambda表达式。第一个函数将每个键值对转换为任意类型的单一值。第二个函数将所有这些转换后的值组合为单一结果,并忽略所有可能的null值。

String result = map.reduce(1,
    (key, value) -> {
        System.out.println("Transform: " + Thread.currentThread().getName());
        return key + "=" + value;
    },
    (s1, s2) -> {
        System.out.println("Reduce: " + Thread.currentThread().getName());
        return s1 + ", " + s2;
    });

System.out.println("Result: " + result);

// Transform: ForkJoinPool.commonPool-worker-2
// Transform: main
// Transform: ForkJoinPool.commonPool-worker-3
// Reduce: ForkJoinPool.commonPool-worker-3
// Transform: main
// Reduce: main
// Reduce: main
// Result: r2=d2, c3=p0, han=solo, foo=bar

我希望你能喜欢我的Java8并发系列教程的第三部分。这个教程的代码示例托管在Github上,还有许多其它的Java8代码片段。欢迎fork我的仓库并自己尝试。

如果你想要支持我的工作,请向你的朋友分享这篇教程。你也可以在Twiiter上关注我,因为我会不断推送一些Java或编程相关的东西。

第一部分:线程和执行器

第二部分:同步和锁

第三部分:原子变量和 ConcurrentMap

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/64957.html

相关文章

  • Java 8 并发教程:同步

    摘要:在接下来的分钟,你将会学会如何通过同步关键字,锁和信号量来同步访问共享可变变量。所以在使用乐观锁时,你需要每次在访问任何共享可变变量之后都要检查锁,来确保读锁仍然有效。 原文:Java 8 Concurrency Tutorial: Synchronization and Locks译者:飞龙 协议:CC BY-NC-SA 4.0 欢迎阅读我的Java8并发教程的第二部分。这份指南将...

    wyk1184 评论0 收藏0
  • Java教程(同步)

    同步 线程主要通过共享对字段和引用对象的引用字段的访问来进行通信,这种通信形式非常有效,但可能产生两种错误:线程干扰和内存一致性错误,防止这些错误所需的工具是同步。 但是,同步可能会引入线程竞争,当两个或多个线程同时尝试访问同一资源并导致Java运行时更慢地执行一个或多个线程,甚至暂停它们执行,饥饿和活锁是线程竞争的形式。 本节包括以下主题: 线程干扰描述了当多个线程访问共享数据时如何引入错误。...

    Edison 评论0 收藏0
  • Java教程(高级并发对象)

    高级并发对象 到目前为止,本课程重点关注从一开始就是Java平台一部分的低级别API,这些API适用于非常基础的任务,但更高级的任务需要更高级别的构建块,对于充分利用当今多处理器和多核系统的大规模并发应用程序尤其如此。 在本节中,我们将介绍Java平台5.0版中引入的一些高级并发功能,大多数这些功能都在新的java.util.concurrent包中实现,Java集合框架中还有新的并发数据结构。 ...

    xiaotianyi 评论0 收藏0
  • JAVA 多线程并发基础面试问答

    摘要:多线程和并发问题是技术面试中面试官比较喜欢问的问题之一。线程可以被称为轻量级进程。一个守护线程是在后台执行并且不会阻止终止的线程。其他的线程状态还有,和。上下文切换是多任务操作系统和多线程环境的基本特征。 多线程和并发问题是 Java 技术面试中面试官比较喜欢问的问题之一。在这里,从面试的角度列出了大部分重要的问题,但是你仍然应该牢固的掌握Java多线程基础知识来对应日后碰到的问题。(...

    dreamans 评论0 收藏0
  • Java核心技术教程整理,长期更新

    以下是Java技术栈微信公众号发布的关于 Java 的技术干货,从以下几个方面汇总。 Java 基础篇 Java 集合篇 Java 多线程篇 Java JVM篇 Java 进阶篇 Java 新特性篇 Java 工具篇 Java 书籍篇 Java基础篇 8张图带你轻松温习 Java 知识 Java父类强制转换子类原则 一张图搞清楚 Java 异常机制 通用唯一标识码UUID的介绍及使用 字符串...

    Anchorer 评论0 收藏0

发表评论

0条评论

最新活动
阅读需要支付1元查看
<