
本文共 11293 字,大约阅读时间需要 37 分钟。
就人的性格而言,可以分为乐天派和悲观派。对于乐天派来说,他们总是会把事情往好的方面想。他们认为所有事情总是不太容易发生问题,出错是小概率的,因此可以大胆地做事。如果真的不幸遇到了问题,则努力解决问题。而对于悲观的人来说,他们总是担惊受怕,认为出错是一种常态,所以无论大小事情都考虑得面面俱到,为人处世,确保万无一失。
对于并发控制而言,锁是一种悲观的策略。它总是假设每一次的临界区操作会产生冲突,因此,必须对每次操作都小心翼翼。如果有多个线程同时需要访问临界区资源,则宁可牺牲性能让线程进行等待,所以说锁会阻塞线程执行。而无锁是一种乐观的策略,它会假设对资源的访问是没有冲突的。既然没有冲突,自然不需要等待,所以所有的线程都可以在不停顿的状态下持续执行。那遇到冲突怎么办呢?无锁的策略使用一种叫作比较交换(CAS,Compare And Swap)的技术来鉴别线程冲突,一旦检测到冲突产生,就重试当前操作直到没有冲突为止。
文章目录
一、与众不同的并发策略:比较交换
与锁相比,使用比较交换会使程序看起来更加复杂一些,但由于其非阻塞性,它对死锁问题天生免疫,并且线程间的相互影响也远远比基于锁的方式要小。更为重要的是,使用无锁的方式完全没有锁竞争带来的系统开销,也没有线程间频繁调度带来的开销,因此,它要比基于锁的方式拥有更优越的性能。
CAS算法的过程是:它包含三个参数CAS(V,E,N),其中V表示要更新的变量,E表示预期值,N表示新值。仅当V值等于E值时,才会将V的值设为N,如果V值和E值不同,说明已经有其他线程做了更新,则当前线程什么都不做。最后,CAS返回当前V的真实值。CAS操作是抱着乐观的态度进行的,它总是认为自己可以成功完成操作。当多个线程同时使用CAS操作一个变量时,只有一个会胜出,并成功更新,其余均会失败。失败的线程不会被挂起,仅是被告知失败,并且允许再次尝试,当然也允许失败的线程放弃操作。基于这样的原理,CAS操作即使没有锁,也可以发现其他线程对当前线程的干扰,并进行恰当的处理。
简单地说,CAS需要你额外给出一个期望值,也就是你认为这个变量现在应该是什么样子的。如果变量不是你想象的那样,则说明它已经被别人修改过了。你就重新读取,再次尝试修改就好了。
在硬件层面,大部分的现代处理器都已经支持原子化的CAS指令。在JDK 5以后,虚拟机便可以使用这个指令来实现并发操作和并发数据结构,并且这种操作在虚拟机中可以说是无处不在的。
二、无锁的线程安全整数:AtomicInteger
为了让Java程序员能够受益于CAS等CPU指令,JDK并发包中有一个atomic包,里面实现了一些直接使用CAS操作的线程安全的类型。
其中,最常用的一个类就是AtomicInteger
,可以把它看作一个整数。与Integer不同,它是可变的,并且是线程安全的。对其进行修改等任何操作都是用CAS指令进行的。这里简单列举一下AtomicInteger
的一些主要方法,对于其他原子类,操作也是非常类似的。

AtomicInteger的使用非常简单,这里给出一个示例:
使用AtomicInteger会比使用锁具有更好的性能。由于篇幅限制,这里不再给出AtomicInteger和锁的性能对比的测试代码,相信写一段简单的代码测试两者的性能应该不是难事。这里让我们关注一下incrementAndGet()方法的内部实现(基于对JDK 1.7的分析可知,JDK 1.8与JDK 1.7的实现有所不同)。
get()
方法非常简单,就是返回内部数据value。 以上就是CAS操作的基本思想,无论程序多么复杂,其基本原理总是不变的。
和AtomicInteger类似的类还有:AtomicLong用来代表long型数据;AtomicBoolean表示boolean型数据;AtomicReference表示对象引用。
三、Java中的指针:Unsafe类
如果你对技术有追求,应该还会特别在意incrementAndGet() 方法中compareAndSet()方法的实现。现在,就让我们进一步看一下它吧!
而这里的Unsafe类就是封装了一些类似指针的操作。compareAndSwapInt()方法是一个navtive方法,它的几个参数含义如下:
不难看出,compareAndSwapInt()方法的内部,必然是使用CAS原子指令来完成的。此外,Unsafe类还提供了一些方法,主要有以下几种(以int操作为例,其他数据类型是类似的):
这里就可以看到,虽然Java抛弃了指针,但是在关键时刻,类似指针的技术还是必不可少的。这里底层的Unsafe类实现就是最好的例子。但是很不幸,JDK的开发人员并不希望大家使用这个类。获得Unsafe类实例的方法是调动其工厂方法getUnsafe(),但是它的实现却是这样的:
注意:根据Java类加载器的工作原理,应用程序的类由App Loader加载。而系统核心类,如rt.jar中的类由Bootstrap类加载器加载。Bootstrap类加载器没有Java对象的对象,因此试图获得这个类加载器会返回null。所以,当一个类的类加载器为null时,说明它是由Bootstrap类加载器加载的,而这个类也极有可能是rt.jar中的类。
四、无锁的对象引用:AtomicReference
AtomicReference和AtomicInteger非常类似,不同之处就在于AtomicInteger是对整数的封装,而AtomicReference则是对应普通的对象引用。也就是它可以保证你在修改对象引用时的线程安全性。在介绍AtomicReference的同时,我希望同时提出一个有关原子操作的逻辑上的不足。
之前我们说过,线程判断被修改对象是否可以正确写入的条件是对象的当前值和期望值是否一致。这个逻辑从一般意义上来说是正确的。但有可能出现一个小小的例外,就是当你获得对象当前数据后,在准备修改为新值前,对象的值被其他线程连续修改了两次,而经过这两次修改后,对象的值又恢复为旧值。这样,当前线程就无法正确判断这个对象究竟是否被修改过,图4.2显示了这种情况。
一般来说,发生这种情况的概率很小,即使发生了,可能也不是什么大问题。比如,我们只是简单地要做一个数值加法,即使在取得期望值后,这个数字被不断地修改,只要它最终改回了我的期望值,我的加法计算就不会出错。也就是说,当你修改的对象没有过程的状态信息时,所有的信息都只保存于对象的数值本身。
但是,在现实中,还可能存在另外一种场景,就是我们是否能修改对象的值,不仅取决于当前值,还和对象的过程变化有关,这时,AtomicReference就无能为力了
打一个比方,有一家蛋糕店,为了挽留客户,决定为贵宾卡里余额小于20元的客户一次性赠送20元,刺激客户充值和消费,但条件是,每一位客户只能被赠送一次。
现在,我们就来模拟这个场景,为了演示AtomicReference,我在这里使用AtomicReference实现这个功能。首先,我们模拟客户账户余额。
如果在赠予金额到账的同时,客户进行了一次消费,使得总金额又小于20元,并且正好累计消费了20元。使得消费、赠予后的金额等于消费前、赠予前的金额,那么后台的赠予进程就会误以为这个账户还没有赠予,所以,存在被多次赠予的可能。模拟这个消费线程:

虽然这种情况出现的概率不大,但是依然是有可能出现的。因此,当业务上确实可能出现这种情况时,我们也必须多加防范。JDK也已经为我们考虑到了这种情况,使用AtomicStampedReference就可以很好地解决这个问题。
五、带有时间戳的对象引用:AtomicStampedReference
AtomicReference无法解决上述问题的根本原因是,对象在修改过程中丢失了状态信息,对象值本身与状态被画上了等号。因此,我们只要能够记录对象在修改过程中的状态值,就可以很好地解决对象被反复修改导致线程无法正确判断对象状态的问题
AtomicStampedReference正是这么做的。它内部不仅维护了对象值,还维护了一个时间戳(我这里把它称为时间戳,实际上它可以使任何一个整数来表示状态值)。当AtomicStampedReference对应的数值被修改时,除了更新数据本身外,还必须要更新时间戳。当AtomicStampedReference设置对象值时,对象值及时间戳都必须满足期望值,写入才会成功。因此,即使对象值被反复读写,写回原值,只要时间戳发生变化,就能防止不恰当的写入。
AtomicStampedReference的几个API在AtomicReference的基础上新增了有关时间戳的信息。

六、数组也能无锁:AtomicIntegerArray
除提供基本数据类型以外,JDK还为我们准备了数组等复合结构。当前可用的原子数组有:AtomicIntegerArray、AtomicLongArray和AtomicReferenceArray,分别表示整数数组、long型数组和普通的对象数组。
这里以AtomicIntegerArray为例,展示原子数组的使用方式。
AtomicIntegerArray本质上是对int[]类型的封装,使用Unsafe类通过CAS的方式控制int[]在多线程下的安全性。它提供了以下几个核心API。

七、让普通变量也享受原子操作:AtomicIntegerFieldUpdater
有时候,由于初期考虑不周,或者后期的需求变化,一些普通变量可能也会有线程安全的需求。如果改动不大,则可以简单地修改程序中每一个使用或者读取这个变量的地方。但显然,这样并不符合软件设计中的一条重要原则—开闭原则。也就是系统对功能的增加应该是开放的,而对修改应该是相对保守的。而且,如果系统里使用到这个变量的地方特别多,一个一个修改也是一件令人厌烦的事情(况且很多使用场景下可能是只读的,并无线程安全的强烈要求,完全可以保持原样)。
如果你有这种困扰,在这里根本不需要担心,因为在原子包里还有一个实用的工具类AtomicIntegerFieldUpdater。它可以让你在不改动(或者极少改动)原有代码的基础上,让普通的变量也享受CAS操作带来的线程安全性,这样你可以通过修改极少的代码来获得线程安全的保证。这听起来是不是让人很激动呢?
根据数据类型不同,Updater有三种,分别是AtomicIntegerFieldUpdater、AtomicLong-FieldUpdater和AtomicReferenceFieldUpdater。顾名思义,它们分别可以对int、long和普通对象进行CAS修改。
现在来思考这么一个场景:假设某地要进行一次选举。现在模拟这个投票场景,如果选民投了候选人一票,就记为1,否则记为0。最终的选票显然就是所有数据的简单求和。
大家如果运行这段程序,不难发现,最终的Candidate.score总是和allScore绝对相等。这说明AtomicIntegerFieldUpdater很好地保证了Candidate.score的线程安全。
虽然AtomicIntegerFieldUpdater很好用,但是还是有几个注意事项。
第一,Updater只能修改它可见范围内的变量,因为Updater使用反射得到这个变量。如果变量不可见,就会出错。比如score声明为private,就是不可行的。
第二,为了确保变量被正确的读取,它必须是volatile类型的。如果我们原有代码中未声明这个类型,那么简单地声明一下就行,这不会引起什么问题。
第三,由于CAS操作会通过对象实例中的偏移量直接进行赋值,因此,它不支持static字段(Unsafe.objectFieldOffset()方法不支持静态变量)。
通过AtomicIntegerFieldUpdater,我们可以更加随心所欲地对系统关键数据进行线程安全的保护。
八、挑战无锁算法:无锁的Vector实现
我们已经比较完整地介绍了有关无锁的概念和使用方法。相对于有锁的方法,使用无锁的方式编程更加考验一个程序员的耐心和智力。但是,无锁带来的好处也是显而易见的,第一,在高并发的情况下,它比有锁的程序拥有更好的性能;第二,它天生就是死锁免疫的。就凭借这两个优势,就值得我们冒险尝试使用无锁并发。
这里向大家介绍一种使用无锁方式实现的Vector。通过这个案例,我们可以更加深刻地认识无锁的算法,同时也可以学习一下有关Vector实现的细节和算法技巧(本例讲述的无锁Vector来自amino并发包)。
我们将这个无锁的Vector称为LockFreeVector。它的特点是可以根据需求动态扩展其内部空间。在这里,我们使用二维数组来表示LockFreeVector的内部存储。
此外,为了更有序的读写数组,定义一个称为Descriptor的元素。它的作用是使用CAS操作写入新数据。

第24行WriteDescriptor的构造函数接收四个参数。第一个参数addr表示要修改的原子数组,第二个参数为要写入的数组索引位置,第三个oldV为期望值,第四个newV为需要写入的值。
在构造LockFreeVector时,显然需要将buckets和descriptor进行初始化。

在循环最开始(第5行),使用descriptor先将数据写入数组,是为了防止上一个线程设置完descriptor后(第22行),还没来得及执行第23行的写入,因此做一次预防性操作。
因为限制要将元素e压入Vector,所以我们必须首先知道这个e应该放在哪个位置。由于目前使用了二维数组,因此我们自然需要知道e所在的数组(buckets中的下标位置)和数组中的下标。
第8~10行通过当前Vector的大小(desc.size),计算新的元素应该落入哪个数组。这里使用了位运算进行计算。也许你会觉得这几行代码看起来有些奇怪,我的解释如下:LockFreeVector每次都会成倍的扩容。它的第1个数组长度为8,第2个就是16,第3个就是32,依此类推。它们的二进制表示就是:
导致这个数字进位的最小条件,就是加上二进制的1000。而这个数字正好是8(FIRST_BUCKET_SIZE就是8)。这就是第8行代码的意义。它可以使得数组大小发生一次二进制的进位(如果不进位说明还在第一个数组中),进位后前导零的数量就会发生变化。而元素所在的数组,和pos(第8行定义的变量)的前导零直接相关。每进行一次数组扩容,它的前导零就会减1。如果从来没有扩容过,那么它的前导零就是28个,以后逐级减1。这就是第9行获得pos前导零的原因。第10行,通过pos的前导零可以立即定位使用哪个数组(也就是得到了bucketInd的值)。
到此,我们就已经得到新元素位置的全部信息,剩下的就是将这些信息传递给Descriptor,让它在给定的位置把元素e安置上去即可。这里通过CAS操作,保证写入正确性。
九、让线程之间互相帮助:细看SynchronousQueue的实现
在对线程池的介绍中,提到了一个非常特殊的等待队列SynchronousQueue。SynchronousQueue的容量为0,任何一个对SynchronousQueue的写需要等待一个对SynchronousQueue的读,反之亦然。因此,SynchronousQueue与其说是一个队列,不如说是一个数据交换通道。那SynchronousQueue的奇妙功能是如何实现的呢?
SynchronousQueue和无锁的操作脱离不了关系,实际上SynchronousQueue内部也大量使用了无锁工具。
对SynchronousQueue来说,它将put()和take()两种功能截然不同的方法抽象为一个共同的方法Transferer.transfer()。从字面上看,这就是数据传递的意思。它的完整签名如下:
SynchronousQueue内部会维护一个线程等待队列。等待队列中会保存等待线程及相关数据的信息。比如,生产者将数据放入SynchronousQueue时,如果没有消费者接收,那么数据本身和线程对象都会打包在队列中等待(因为SynchronousQueue容积为0,没有数据可以正常放入)。
Transferer.transfer()函数的实现是SynchronousQueue的核心,它大体上分为三个步骤。
(1)如果等待队列为空,或者队列中节点的类型和本次操作是一致的,那么将当前操作压入队列等待。比如,等待队列中是读线程等待,本次操作也是读,因此这两个读都需要等待。进入等待队列的线程可能会被挂起,它们会等待一个“匹配”操作。
(2)如果等待队列中的元素和本次操作是互补的(比如等待操作是读,而本次操作是写),那么就插入一个“完成”状态的节点,并且让它“匹配”到一个等待节点上。接着弹出这两个节点,并且使得对应的两个线程继续执行。 (3)如果线程发现等待队列的节点就是“完成”节点,那么帮助这个节点完成任务,其流程和步骤(2)是一致的。从整个数据投递的过程中可以看到,在SynchronousQueue中,参与工作的所有线程不仅仅是竞争资源的关系,更重要的是,它们彼此之间还会互相帮助。在一个线程内部,可能会帮助其他线程完成它们的工作。这种模式可以在更大程度上减少饥饿的可能,提高系统整体的并行度。
发表评论
最新留言
关于作者
