原子变量与并发集合的示例
文章翻译于: http://winterbe.com/posts/2015/05/22/java8-concurrency-tutorial-atomic-concurrent-map-examples/
欢迎阅读我的Java 8中多线程编程教程系列的第三部分。本教程介绍了并发API的两个重要部分:原子变量和并发集合。 在最新的Java 8版本中引入了lambda表达式和功能编程,两者都得到了很大的改进。 所有这些新功能都用一大堆易于理解的代码示例进行描述。请享用!
为了简单起见,本教程的代码示例使用这里定义的两个辅助方法sleep(seconds)
和stop(executor)
。
原子整型(AtomicInteger)
java.concurrent.atomic
包包含许多有用的类来执行原子操作。 当你可以安全地在多个线程上并行执行操作时,操作是原子的,而不使用我以前的教程中所示的synchronized
关键字或锁。
在内部,原子类大量使用比较和交换(CAS),这是大多数现代CPU直接支持的原子指令。那些指令通常比同步锁快得多。 所以我的建议是在需要同时更改单个可变变量时使用原子类。
现在让我们选择一个原子类来举几个例子:AtomicInteger
1 |
|
通过使用AtomicInteger
替代Integer
,我们可以在线程安全的情况下同时增加数量,而不需要同步对变量的访问。 方法incrementAndGet()
是一个原子操作,所以我们可以从多个线程安全地调用这个方法。
AtomicInteger
支持各种原子操作。 方法updateAndGet()
接受一个lambda表达式,以便对整数执行任意的算术运算:
1 |
|
方法accumulateAndGet()
接受另一种类型为IntBinaryOperator
的lambda表达式。 我们使用这种方法在下一个示例中将并发把从0到1000的值求和:
1 |
|
其他有用的原子类是AtomicBoolean
,AtomicLong
和AtomicReference
。
长整型递增类(LongAdder)
可以使用LongAdder
类作为AtomicLong
的替代方法来连续地向数字添加值。
1 |
|
LongAdder
提供了方法add()
和increment()
,就像原子数字类一样,也是线程安全的。 但是,除了总结单个结果之外,这个类在内部维护一组变量以减少对线程的争用。 实际结果可以通过调用sum()
或sumThenReset()
来检索。
当多线程的更新比读取更常见时,此类通常优于原子数字类。在捕获统计数据时,通常是这种情况。 您想要计算在Web服务器上提供的请求数。 LongAdder
的缺点是更高的内存消耗,因为一组变量被保存在内存中。
长整型运算类(LongAccumulator)
LongAccumulator
是LongAdder
的更广泛版本。 代替执行简单的添加操作,类LongAccumulator
构建了LongBinaryOperator
类型的lambda表达式,如此代码示例所示:
1 |
|
我们创建一个具有函数2 * x + y
和初始值为1的LongAccumulator
。 每次调用accumulate(i)
当前结果和值i
都作为参数传递给lambda表达式。
LongAccumulator
就像LongAdder
一样,在内部维护一组变量以减少与线程的争用。
并发映射接口(ConcurrentMap)
ConcurrentMap接口扩展了map接口,并定义了最有用的并发集合类型之一。 Java 8 通过向此接口添加新方法来引入函数式编程。
在下面的代码段中,我们使用以下示例map来演示这些新方法:
1 |
|
方法forEach()接受一个类型为BiConsumer
的lambda表达式,同时具有作为参数传递的map的键和值。 它可以用作替代每个循环来遍历并发map的条目。迭代在当前线程上顺序执行。
1 |
|
方法putIfAbsent()
只有在给定键不存在任何值时,才会将新值放入map。 至少对于ConcurrentHashMap
,该方法的实现是线程安全的,就像put()
一样,所以你不必同步从不同的线程同时访问map:
1 |
|
getOrDefault()
方法返回给定键的值。如果此键不存在,则返回传递的默认值:
1 |
|
replaceAll()
方法接受一个类型为BiFunction
的lambda表达式。 BiFunctions
需要两个参数并返回一个值。 在这种情况下,使用键和每个映射条目的值调用该函数,并返回要为当前键值分配的新值:
1 |
|
compute()
让我们转换单个实体,而不是替换所有的。该方法接受要计算的键值和一个BiFunction
类的转换函数。
1 |
|
除了compute()
,还有两个变量:computeIfAbsent()
和computeIfPresent()
。 这些方法的功能参数只有在键不存在或分别存在的情况下才被调用。
最后,可以使用merge()
方法将新值与map中的现有值进行统一。 合并接受一个键值,要合并到现有条目中的新值和一个双功能来指定两个值的合并行为:
1 |
|
并发哈希映射类(ConcurrentHashMap)
以上所有这些方法都是ConcurrentMap
接口的一部分,从而可用于该接口的所有实现。 此外,最重要的实现ConcurrentHashMap
已经通过几种新方法进一步增强,以在map上执行并行操作。
就像并行流一样,这些方法使用Java 8中的ForkJoinPool.commonPool()
可以使用一个特殊的ForkJoinPool
。该池使用一个取决于可用内核数量的预设并行度。 我的机器上有四个CPU内核可以实现三个并行处理:
1 |
|
可以通过设置以下JVM参数来减小或增加该值:
1 |
|
我们使用相同的示例map来进行演示,但是这次我们通过具体实现ConcurrentHashMap
来代替ConcurrentMap
,所以我们可以从这个类访问所有的公共方法:
1 |
|
Java 8引入了三种并行操作:forEach
,search
,reduce
。 这些操作中的每一个都有四种形式接受具有键,值,实体和键值对参数的函数。
所有这些方法都使用一个共同的第一个参数,称为parallelismThreshold
。 该阈值表示并行执行操作时的最小收集大小。 例如。 如果通过阈值500,并且map的实际大小为499,则操作将在单个线程上顺序执行。 在下面的例子中,我们使用一个阈值来总是强制执行并行执行来进行演示。
ForEach
方法forEach()
能够并行迭代map的键值对。 使用当前迭代步骤的键和值调用类型BiConsumer
的lambda表达式。 为了可视化并行执行,我们将当前线程名称打印到控制台。 注意在我的情况下,底层的ForkJoinPool
最多使用三个线程。
1 |
|
Search
方法search()
接受返回当前键值对的非空搜索结果的BiFunction
,如果当前迭代不符合所需的搜索条件,则返回null
。 一旦返回非空结果,这个线程就终止了。 注意ConcurrentHashMap
是无序的。搜索功能不应取决于map的实际处理顺序。如果map的多个实体与给定的搜索函数匹配,则结果可能是非确定性的。
1 |
|
以下是另一个仅查看map的值的示例:
1 |
|
Reduce
Java 8 Streams中已知的方法reduce()
可以接受两种类型为BiFunction
的lambda表达式。 第一个函数将每个键值对转换为任何类型的单个值。 第二个功能将所有这些变换的值组合成一个单独的结果,忽略任何可能的空值。
1 |
|
我希望你喜欢阅读我的有关Java 8并发的教程系列的第三部分。 本教程的代码示例与许多其他Java 8代码片段一起托管在GitHub上。 欢迎你fork并自行尝试。
如果您想支持我的工作,请与您的朋友分享本教程。你也应该跟随我在Twitter上,因为我不断推一些Java和编程相关的东西。
(翻译完)