JUC笔记(二)并发编程核心

多线程编程核心

在前面,我们了解了多线程的底层运作机制,我们终于知道,原来多线程环境下存在着如此之多的问题。在JDK5之前,我们只能选择synchronized关键字来实现锁,而JDK5之后,由于volatile关键字得到了升级(具体功能就是上一章所描述的),所以并发框架包便出现了,相比传统的synchronized关键字,我们对于锁的实现,有了更多的选择。
Doug Lea — JUC并发包的作者
如果IT的历史,是以人为主体串接起来的话,那么肯定少不了Doug Lea。这个鼻梁挂着眼镜,留着德王威廉二世的胡子,脸上永远挂着谦逊腼腆笑容,服务于纽约州立大学Oswego分校计算机科学系的老大爷。
说他是这个世界上对Java影响力最大的一个人,一点也不为过。因为两次Java历史上的大变革,他都间接或直接的扮演了举足轻重的角色。2004年所推出的Tiger。Tiger广纳了15项JSRs(Java Specification Requests)的语法及标准,其中一项便是JSR-166。JSR-166是来自于Doug编写的util.concurrent包。
那么,从这章开始,就让我们来感受一下,JUC为我们带来了什么。

锁框架

在JDK 5之后,并发包中新增了Lock接口(以及相关实现类)用来实现锁功能,Lock接口提供了与synchronized关键字类似的同步功能,但需要在使用时手动获取锁和释放锁。

Lock和Condition接口

使用并发包中的锁和我们传统的synchronized锁不太一样,这里的锁我们可以认为是一把真正意义上的锁,每个锁都是一个对应的锁对象,我只需要向锁对象获取锁或是释放锁即可。我们首先来看看,此接口中定义了什么:
这里我们可以演示一下,如何使用Lock类来进行加锁和释放锁操作:
可以看到,和我们之前使用synchronized相比,我们这里是真正在操作一个”锁”对象,当我们需要加锁时,只需要调用lock()方法,而需要释放锁时,只需要调用unlock()方法。程序运行的最终结果和使用synchronized锁是一样的。
那么,我们如何像传统的加锁那样,调用对象的wait()notify()方法呢,并发包提供了Condition接口:
这里我们通过一个简单的例子来演示一下:
可以发现,Condition对象使用方法和传统的对象使用差别不是很大。
思考: 下面这种情况跟上面有什么不同?
通过分析可以得到,在调用newCondition()后,会生成一个新的Condition对象,并且同一把锁内是可以存在多个Condition对象的(实际上原始的锁机制等待队列只能有一个,而这里可以创建很多个Condition来实现多等待队列),而上面的例子中,实际上使用的是不同的Condition对象,只有对同一个Condition对象进行等待和唤醒操作才会有效,而不同的Condition对象是分开计算的。
最后我们再来讲解一下时间单位,这是一个枚举类,也是位于java.util.concurrent包下:
可以看到时间单位有很多的,比如DAYSECONDSMINUTES等,我们可以直接将其作为时间单位,比如我们要让一个线程等待3秒钟,可以像下面这样编写:
当然,Lock类的tryLock方法也是支持使用时间单位的,各位可以自行进行测试。TimeUnit除了可以作为时间单位表示以外,还可以在不同单位之间相互转换:
也可以更加便捷地使用对象的wait()方法:
我们也可以直接使用它来进行休眠操作:

可重入锁

前面,我们讲解了锁框架的两个核心接口,那么我们接着来看看锁接口的具体实现类,我们前面用到了ReentrantLock,它其实是锁的一种,叫做可重入锁,那么这个可重入代表的是什么意思呢?简单来说,就是同一个线程,可以反复进行加锁操作:
可以看到,主线程连续进行了两次加锁操作(此操作是不会被阻塞的),在当前线程持有锁的情况下继续加锁不会被阻塞,并且,加锁几次,就必须要解锁几次,否则此线程依旧持有锁。我们可以使用getHoldCount()方法查看当前线程的加锁次数:
可以看到,当锁不再被任何线程持有时,值为0,并且通过isLocked()方法查询结果为false
实际上,如果存在线程持有当前的锁,那么其他线程在获取锁时,是会暂时进入到等待队列的,我们可以通过getQueueLength()方法获取等待中线程数量的预估值:
我们可以通过hasQueuedThread()方法来判断某个线程是否正在等待获取锁状态。
同样的,Condition也可以进行判断:
通过使用getWaitQueueLength()方法能够查看同一个Condition目前有多少线程处于等待状态。

公平锁与非公平锁

前面我们了解了如果线程之间争抢同一把锁,会暂时进入到等待队列中,那么多个线程获得锁的顺序是不是一定是根据线程调用lock()方法时间来定的呢,我们可以看到,ReentrantLock的构造方法中,是这样写的:
其实锁分为公平锁和非公平锁,默认我们创建出来的ReentrantLock是采用的非公平锁作为底层锁机制。那么什么是公平锁什么又是非公平锁呢?
  • 公平锁:多个线程按照申请锁的顺序去获得锁,线程会直接进入队列去排队,永远都是队列的第一位才能得到锁。
  • 非公平锁:多个线程去获取锁的时候,会直接去尝试获取,获取不到,再去进入等待队列,如果能获取到,就直接获取到锁。
简单来说,公平锁不让插队,都老老实实排着;非公平锁让插队,但是排队的人让不让你插队就是另一回事了。
我们可以来测试一下公平锁和非公平锁的表现情况:
这里我们选择使用第二个构造方法,可以选择是否为公平锁实现:
这里我们只需要对比将在1秒后开始获取锁...成功获取锁!的顺序是否一致即可,如果是一致,那说明所有的线程都是按顺序排队获取的锁,如果不是,那说明肯定是有线程插队了。
运行结果可以发现,在公平模式下,确实是按照顺序进行的,而在非公平模式下,一般会出现这种情况:线程刚开始获取锁马上就能抢到,并且此时之前早就开始的线程还在等待状态,很明显的插队行为。
那么,接着下一个问题,公平锁在任何情况下都一定是公平的吗?有关这个问题,我们会留到队列同步器中再进行讨论。

读写锁

除了可重入锁之外,还有一种类型的锁叫做读写锁,当然它并不是专门用作读写操作的锁,它和可重入锁不同的地方在于,可重入锁是一种排他锁,当一个线程得到锁之后,另一个线程必须等待其释放锁,否则一律不允许获取到锁。而读写锁在同一时间,是可以让多个线程获取到锁的,它其实就是针对于读写场景而出现的。
读写锁维护了一个读锁和一个写锁,这两个锁的机制是不同的。
  • 读锁:在没有任何线程占用写锁的情况下,同一时间可以有多个线程加读锁。
  • 写锁:在没有任何线程占用读锁的情况下,同一时间只能有一个线程加写锁。
读写锁也有一个专门的接口:
此接口有一个实现类ReentrantReadWriteLock(实现的是ReadWriteLock接口,不是Lock接口,它本身并不是锁),注意我们操作ReentrantReadWriteLock时,不能直接上锁,而是需要获取读锁或是写锁,再进行锁操作:
这里我们对读锁加锁,可以看到可以多个线程同时对读锁加锁。
有读锁状态下无法加写锁,反之亦然:
并且,ReentrantReadWriteLock不仅具有读写锁的功能,还保留了可重入锁和公平/非公平机制,比如同一个线程可以重复为写锁加锁,并且必须全部解锁才真正释放锁:
通过之前的例子来验证公平和非公平:
可以看到,结果是一致的。

锁降级和锁升级

锁降级指的是写锁降级为读锁。当一个线程持有写锁的情况下,虽然其他线程不能加读锁,但是线程自己是可以加读锁的:
那么,如果我们在同时加了写锁和读锁的情况下,释放写锁,是否其他的线程就可以一起加读锁了呢?
可以看到,一旦写锁被释放,那么主线程就只剩下读锁了,因为读锁可以被多个线程共享,所以这时第二个线程也添加了读锁。而这种操作,就被称之为”锁降级”(注意不是先释放写锁再加读锁,而是持有写锁的情况下申请读锁再释放写锁)
注意在仅持有读锁的情况下去申请写锁,属于”锁升级”,ReentrantReadWriteLock是不支持的:
可以看到线程直接卡在加写锁的那一句了。

队列同步器AQS

注意: 难度巨大,如果对锁的使用不是很熟悉建议之后再来看!
前面我们了解了可重入锁和读写锁,那么它们的底层实现原理到底是什么样的呢?又是大家看到就想跳过的套娃解析环节。
比如我们执行了ReentrantLock的lock()方法,那它的内部是怎么在执行的呢?
可以看到,它的内部实际上啥都没做,而是交给了Sync对象在进行,并且,不只是这个方法,其他的很多方法都是依靠Sync对象在进行:
那么这个Sync对象是干什么的呢?可以看到,公平锁和非公平锁都是继承自Sync,而Sync是继承自AbstractQueuedSynchronizer,简称队列同步器:
所以,要了解它的底层到底是如何进行操作的,还得看队列同步器,我们就先从这里下手吧!

底层实现

AbstractQueuedSynchronizer(下面称为AQS)是实现锁机制的基础,它的内部封装了包括锁的获取、释放、以及等待队列。
一个锁(排他锁为例)的基本功能就是获取锁、释放锁、当锁被占用时,其他线程来争抢会进入等待队列,AQS已经将这些基本的功能封装完成了,其中等待队列是核心内容,等待队列是由双向链表数据结构实现的,每个等待状态下的线程都可以被封装进结点中并放入双向链表中,而对于双向链表是以队列的形式进行操作的,它像这样:
notion image
image-20230306171328049
AQS中有一个head字段和一个tail字段分别记录双向链表的头结点和尾结点,而之后的一系列操作都是围绕此队列来进行的。我们先来了解一下每个结点都包含了哪些内容:
在一开始的时候,headtail都是nullstate为默认值0
不用担心双向链表不会进行初始化,初始化是在实际使用时才开始的,先不管,我们接着来看其他的初始化内容:
可以发现,队列同步器由于要使用到CAS算法,所以,直接使用了Unsafe工具类,Unsafe类中提供了CAS操作的方法(Java无法实现,底层由C++实现)所有对AQS类中成员字段的修改,都有对应的CAS操作封装。
现在我们大致了解了一下它的底层运作机制,我们接着来看这个类是如何进行使用的,它提供了一些可重写的方法(根据不同的锁类型和机制,可以自由定制规则,并且为独占式和非独占式锁都提供了对应的方法),以及一些已经写好的模板方法(模板方法会调用这些可重写的方法),使用此类只需要将可重写的方法进行重写,并调用提供的模板方法,从而实现锁功能(学习过设计模式会比较好理解一些)
我们首先来看可重写方法:
可以看到,这些需要重写的方法默认是直接抛出UnsupportedOperationException,也就是说根据不同的锁类型,我们需要去实现对应的方法,我们可以来看一下ReentrantLock(此类是全局独占式的)中的公平锁是如何借助AQS实现的:
我们先看看加锁操作干了什么事情,这里直接调用了AQS提供的模板方法acquire(),我们来看看它在AQS类中的实现细节:
首先会调用tryAcquire()方法(这里是由FairSync类实现的),如果尝试加独占锁失败(返回false了)说明可能这个时候有其他线程持有了此独占锁,所以当前线程得先等着,那么会调用addWaiter()方法将线程加入等待队列中:
在了解了addWaiter()方法会将节点加入等待队列之后,我们接着来看,addWaiter()会返回已经加入的节点,acquireQueued()在得到返回的节点时,也会进入自旋状态,等待唤醒(也就是开始进入到拿锁的环节了):
所以,acquire()中的if条件如果为true,那么只有一种情况,就是等待过程中被中断了,其他任何情况下都是成功获取到独占锁,所以当等待过程被中断时,会调用selfInterrupt()方法:
这里就是直接向当前线程发送中断信号了。
上面提到了LockSupport类,它是一个工具类,我们也可以来玩一下这个parkunpark:
这里我们就把公平锁的lock()方法实现讲解完毕了(让我猜猜,已经晕了对吧,越是到源码越考验个人的基础知识掌握,基础不牢地动山摇)接着我们来看公平锁的tryAcquire()方法:
在了解了公平锁的实现之后,是不是感觉有点恍然大悟的感觉,虽然整个过程非常复杂,但是只要理清思路,还是比较简单的。
加锁过程已经OK,我们接着来看,它的解锁过程,unlock()方法是在AQS中实现的:
那么我们来看看tryRelease()方法是怎么实现的,具体实现在Sync中:
综上,我们来画一个完整的流程图:
notion image
image-20230306171428206
这里我们只讲解了公平锁,有关非公平锁和读写锁,还请各位观众根据我们之前的思路,自行解读。

公平锁一定公平吗?

前面我们讲解了公平锁的实现原理,那么,我们尝试分析一下,在并发的情况下,公平锁一定公平吗?
我们再次来回顾一下tryAcquire()方法的实现:
所以hasQueuedPredecessors()这个环节容不得半点闪失,否则会直接破坏掉公平性,假如现在出现了这样的情况:
线程1已经持有锁了,这时线程2来争抢这把锁,走到hasQueuedPredecessors(),判断出为 false,线程2继续运行,然后线程2肯定获取锁失败(因为锁这时是被线程1占有的),因此就进入到等待队列中:
而碰巧不巧,这个时候线程3也来抢锁了,按照正常流程走到了hasQueuedPredecessors()方法,而在此方法中:
因此,线程3这时就紧接着准备开始CAS操作了,又碰巧,这时线程1释放锁了,现在的情况就是,线程3直接开始CAS判断,而线程2还在插入节点状态,结果可想而知,居然是线程3先拿到了锁,这显然是违背了公平锁的公平机制。
一张图就是:
notion image
image-20230814160110441
因此公不公平全看hasQueuedPredecessors(),而此方法只有在等待队列中存在节点时才能保证不会出现问题。所以公平锁,只有在等待队列存在节点时,才是真正公平的。

Condition实现原理

通过前面的学习,我们知道Condition类实际上就是用于代替传统对象的wait/notify操作的,同样可以实现等待/通知模式,并且同一把锁下可以创建多个Condition对象。那么我们接着来看看,它又是如何实现的呢,我们先从单个Condition对象进行分析:
在AQS中,Condition有一个实现类ConditionObject,而这里也是使用了链表实现了条件队列:
这里是直接使用了AQS中的Node类,但是使用的是Node类中的nextWaiter字段连接节点,并且Node的status为CONDITION:
notion image
image-20230306171600419
我们知道,当一个线程调用await()方法时,会进入等待状态,直到其他线程调用signal()方法将其唤醒,而这里的条件队列,正是用于存储这些处于等待状态的线程。
我们先来看看最关键的await()方法是如何实现的,为了防止一会绕晕,在开始之前,我们先明确此方法的目标:
  • 只有已经持有锁的线程才可以使用此方法
  • 当调用此方法后,会直接释放锁,无论加了多少次锁
  • 只有其他线程调用signal()或是被中断时才会唤醒等待中的线程
  • 被唤醒后,需要等待其他线程释放锁,拿到锁之后才可以继续执行,并且会恢复到之前的状态(await之前加了几层锁唤醒后依然是几层锁)
好了,差不多可以上源码了:
实际上await()方法比较中规中矩,大部分操作也在我们的意料之中,那么我们接着来看signal()方法是如何实现的,同样的,为了防止各位绕晕,先明确signal的目标:
  • 只有持有锁的线程才能唤醒锁所属的Condition等待的线程
  • 优先唤醒条件队列中的第一个,如果唤醒过程中出现问题,接着找往下找,直到找到一个可以唤醒的
  • 唤醒操作本质上是将条件队列中的结点直接丢进AQS等待队列中,让其参与到锁的竞争中
  • 拿到锁之后,线程才能恢复运行
notion image
image-20230306171620786
好了,上源码:
其实最让人不理解的就是倒数第二行,明明上面都正常进入到AQS等待队列了,应该是可以开始走正常流程了,那么这里为什么还要提前来一次unpark呢?
这里其实是为了进行优化而编写,直接unpark会有两种情况:
  • 如果插入结点前,AQS等待队列的队尾节点就已经被取消,则满足wc > 0
  • 如果插入node后,AQS内部等待队列的队尾节点已经稳定,满足tail.waitStatus == 0,但在执行ws > 0之后!compareAndSetWaitStatus(p, ws, Node.SIGNAL)之前被取消,则CAS也会失败,满足compareAndSetWaitStatus(p, ws, Node.SIGNAL) == false
如果这里被提前unpark,那么在await()方法中将可以被直接唤醒,并跳出while循环,直接开始争抢锁,因为前一个等待结点是被取消的状态,没有必要再等它了。
所以,大致流程下:
notion image
image-20230306171643082
只要把整个流程理清楚,还是很好理解的。

自行实现锁类

既然前面了解了那么多AQS的功能,那么我就仿照着这些锁类来实现一个简单的锁:
  • 要求:同一时间只能有一个线程持有锁,不要求可重入(反复加锁无视即可)
到这里,我们对应队列同步器AQS的讲解就先到此为止了,当然,AQS的全部机制并非仅仅只有我们讲解的内容,一些我们没有提到的内容,还请各位观众自行探索,会有满满的成就感哦~

原子类

前面我们讲解了锁框架的使用和实现原理,虽然比较复杂,但是收获还是很多的(主要是观摩大佬写的代码)这一部分我们就来讲一点轻松的。
前面我们说到,如果要保证i++的原子性,那么我们的唯一选择就是加锁,那么,除了加锁之外,还有没有其他更好的解决方法呢?JUC为我们提供了原子类,底层采用CAS算法,它是一种用法简单、性能高效、线程安全地更新变量的方式。
所有的原子类都位于java.util.concurrent.atomic包下。

原子类介绍

常用基本数据类,有对应的原子类封装:
  • AtomicInteger:原子更新int
  • AtomicLong:原子更新long
  • AtomicBoolean:原子更新boolean
那么,原子类和普通的基本类在使用上有没有什么区别呢?我们先来看正常情况下使用一个基本类型:
现在我们使用int类型对应的原子类,要实现同样的代码该如何编写:
我们可以将int数值封装到此类中(注意必须调用构造方法,它不像Integer那样有装箱机制),并且通过调用此类提供的方法来获取或是对封装的int值进行自增,乍一看,这不就是基本类型包装类嘛,有啥高级的。确实,还真有包装类那味,但是它可不仅仅是简单的包装,它的自增操作是具有原子性的:
同样是直接进行自增操作,我们发现,使用原子类是可以保证自增操作原子性的,就跟我们前面加锁一样。怎么会这么神奇?我们来看看它的底层是如何实现的,直接从构造方法点进去:
可以看到,它的底层是比较简单的,其实本质上就是封装了一个volatile类型的int值,这样能够保证可见性,在CAS操作的时候不会出现问题。
可以看到最上面是和AQS采用了类似的机制,因为要使用CAS算法更新value的值,所以得先计算出value字段在对象中的偏移地址,CAS直接修改对应位置的内存即可(可见Unsafe类的作用巨大,很多的底层操作都要靠它来完成)
接着我们来看自增操作是怎么在运行的:
可以看到这里调用了unsafe.getAndAddInt(),套娃时间到,我们接着看看Unsafe里面写了什么:
可以看到这是一个do-while循环,那么这个循环在做一个什么事情呢?感觉就和我们之前讲解的AQS队列中的机制差不多,也是采用自旋形式,来不断进行CAS操作,直到成功。
notion image
image-20230306171720896
可见,原子类底层也是采用了CAS算法来保证的原子性,包括getAndSetgetAndAdd等方法都是这样。原子类也直接提供了CAS操作方法,我们可以直接使用:
如果想以普通变量的方式来设定值,那么可以使用lazySet()方法,这样就不采用volatile的立即可见机制了。
除了基本类有原子类以外,基本类型的数组类型也有原子类:
  • AtomicIntegerArray:原子更新int数组
  • AtomicLongArray:原子更新long数组
  • AtomicReferenceArray:原子更新引用数组
其实原子数组和原子类型一样的,不过我们可以对数组内的元素进行原子操作:
在JDK8之后,新增了DoubleAdderLongAdder,在高并发情况下,LongAdder的性能比AtomicLong的性能更好,主要体现在自增上,它的大致原理如下:在低并发情况下,和AtomicLong是一样的,对value值进行CAS操作,但是出现高并发的情况时,AtomicLong会进行大量的循环操作来保证同步,而LongAdder会将对value值的CAS操作分散为对数组cells中多个元素的CAS操作(内部维护一个Cell[] as数组,每个Cell里面有一个初始值为0的long型变量,在高并发时会进行分散CAS,就是不同的线程可以对数组中不同的元素进行CAS自增,这样就避免了所有线程都对同一个值进行CAS),只需要最后再将结果加起来即可。
notion image
image-20230306171732740
使用如下:
由于底层源码比较复杂,这里就不做讲解了。两者的性能对比(这里用到了CountDownLatch,建议学完之后再来看):
除了对基本数据类型支持原子操作外,对于引用类型,也是可以实现原子操作的:
JUC还提供了字段原子更新器,可以对类中的某个指定字段进行原子操作(注意字段必须添加volatile关键字):
了解了这么多原子类,是不是感觉要实现保证原子性的工作更加轻松了?

ABA问题及解决方案

我们来想象一下这种场景:
notion image
image-20230306171801800
线程1和线程2同时开始对a的值进行CAS修改,但是线程1的速度比较快,将a的值修改为2之后紧接着又修改回1,这时线程2才开始进行判断,发现a的值是1,所以CAS操作成功。
很明显,这里的1已经不是一开始的那个1了,而是被重新赋值的1,这也是CAS操作存在的问题(无锁虽好,但是问题多多),它只会机械地比较当前值是不是预期值,但是并不会关心当前值是否被修改过,这种问题称之为ABA问题。
那么如何解决这种ABA问题呢,JUC提供了带版本号的引用类型,只要每次操作都记录一下版本号,并且版本号不会重复,那么就可以解决ABA问题了:
至此,有关原子类的讲解就到这里。

并发容器

简单的讲完了,又该讲难一点的了。
注意: 本版块的重点在于探究并发容器是如何利用锁机制和算法实现各种丰富功能的,我们会忽略一些常规功能的实现细节(比如链表如何插入元素删除元素),而更关注并发容器应对并发场景算法上的实现(比如在多线程环境下的插入操作是按照什么规则进行的)
在单线程模式下,集合类提供的容器可以说是非常方便了,几乎我们每个项目中都能或多或少的用到它们,我们在JavaSE阶段,为各位讲解了各个集合类的实现原理,我们了解了链表、顺序表、哈希表等数据结构,那么,在多线程环境下,这些数据结构还能正常工作吗?

传统容器线程安全吗

我们来测试一下,100个线程同时向ArrayList中添加元素会怎么样:
不出意外的话,肯定是会报错的:
那么我们来看看报的什么错,从栈追踪信息可以看出,是add方法出现了问题:
也就是说,同一时间其他线程也在疯狂向数组中添加元素,那么这个时候有可能在ensureCapacityInternal(确认容量足够)执行之后,elementData[size++] = e;执行之前,其他线程插入了元素,导致size的值超出了数组容量。这些在单线程的情况下不可能发生的问题,在多线程下就慢慢出现了。
我们再来看看比较常用的HashMap呢?
经过测试发现,虽然没有报错,但是最后的结果并不是我们期望的那样,实际上它还有可能导致Entry对象出现环状数据结构,引起死循环。
所以,在多线程环境下,要安全地使用集合类,我们得找找解决方案了。

并发容器介绍

怎么才能解决并发情况下的容器问题呢?我们首先想到的肯定是给方法前面加个synchronzed关键字,这样总不会抢了吧,在之前我们可以使用Vector或是Hashtable来解决,但是它们的效率实在是太低了,完全依靠锁来解决问题,因此现在已经很少再使它们了,这里也不会再去进行讲解。
JUC提供了专用于并发场景下的容器,比如我们刚刚使用的ArrayList,在多线程环境下是没办法使用的,我们可以将其替换为JUC提供的多线程专用集合类:
我们发现,使用了CopyOnWriteArrayList之后,再没出现过上面的问题。
那么它是如何实现的呢,我们先来看看它是如何进行add()操作的:
可以看到添加操作是直接上锁,并且会先拷贝一份当前存放元素的数组,然后对数组进行修改,再将此数组替换(CopyOnWrite)接着我们来看读操作:
因此,CopyOnWriteArrayList对于读操作不加锁,而对于写操作是加锁的,类似于我们前面讲解的读写锁机制,这样就可以保证不丢失读性能的情况下,写操作不会出现问题。
接着我们来看对于HashMap的并发容器ConcurrentHashMap
可以看到这里的ConcurrentHashMap就没有出现之前HashMap的问题了。因为线程之间会争抢同一把锁,我们之前在讲解LongAdder的时候学习到了一种压力分散思想,既然每个线程都想抢锁,那我就干脆多搞几把锁,让你们每个人都能拿到,这样就不会存在等待的问题了,而JDK7之前,ConcurrentHashMap的原理也比较类似,它将所有数据分为一段一段地存储,先分很多段出来,每一段都给一把锁,当一个线程占锁访问时,只会占用其中一把锁,也就是仅仅锁了一小段数据,而其他段的数据依然可以被其他线程正常访问。
notion image
image-20230306171955430
这里我们重点讲解JDK8之后它是怎么实现的,它采用了CAS算法配合锁机制实现,我们先来回顾一下JDK8下的HashMap是什么样的结构:
notion image
img
HashMap就是利用了哈希表,哈希表的本质其实就是一个用于存放后续节点的头结点的数组,数组里面的每一个元素都是一个头结点(也可以说就是一个链表),当要新插入一个数据时,会先计算该数据的哈希值,找到数组下标,然后创建一个新的节点,添加到对应的链表后面。当链表的长度达到8时,会自动将链表转换为红黑树,这样能使得原有的查询效率大幅度降低!当使用红黑树之后,我们就可以利用二分搜索的思想,快速地去寻找我们想要的结果,而不是像链表一样挨个去看。
又是基础不牢地动山摇环节,由于ConcurrentHashMap的源码比较复杂,所以我们先从最简单的构造方法开始下手:
notion image
image-20230306172041623
我们发现,它的构造方法和HashMap的构造方法有很大的出入,但是大体的结构和HashMap是差不多的,也是维护了一个哈希表,并且哈希表中存放的是链表或是红黑树,所以我们直接来看put()操作是如何实现的,只要看明白这个,基本上就懂了:
怎么样,是不是感觉看着挺复杂,其实也还好,总结一下就是:
notion image
image-20230306172102878
我们接着来看看get()操作:
综上,ConcurrentHashMap的put操作,实际上是对哈希表上的所有头结点元素分别加锁,理论上来说哈希表的长度很大程度上决定了ConcurrentHashMap在同一时间能够处理的线程数量,这也是为什么treeifyBin()会优先考虑为哈希表进行扩容的原因。显然,这种加锁方式比JDK7的分段锁机制性能更好。
其实这里也只是简单地介绍了一下它的运行机制,ConcurrentHashMap真正的难点在于扩容和迁移操作,我们主要了解的是他的并发执行机制,有关它的其他实现细节,这里暂时不进行讲解。

阻塞队列

除了我们常用的容器类之外,JUC还提供了各种各样的阻塞队列,用于不同的工作场景。
阻塞队列本身也是队列,但是它是适用于多线程环境下的,基于ReentrantLock实现的,它的接口定义如下:
比如现在有一个容量为3的阻塞队列,这个时候一个线程put向其添加了三个元素,第二个线程接着put向其添加三个元素,那么这个时候由于容量已满,会直接被阻塞,而这时第三个线程从队列中取走2个元素,线程二停止阻塞,先丢两个进去,还有一个还是进不去,所以说继续阻塞。
notion image
image-20230306172123711
利用阻塞队列,我们可以轻松地实现消费者和生产者模式,还记得我们在JavaSE中的实战吗?
所谓的生产者消费者模型,是通过一个容器来解决生产者和消费者的强耦合问题。通俗的讲,就是生产者在不断的生产,消费者也在不断的消费,可是消费者消费的产品是生产者生产的,这就必然存在一个中间容器,我们可以把这个容器想象成是一个货架,当货架空的时候,生产者要生产产品,此时消费者在等待生产者往货架上生产产品,而当货架有货物的时候,消费者可以从货架上拿走商品,生产者此时等待货架出现空位,进而补货,这样不断的循环。
通过多线程编程,来模拟一个餐厅的2个厨师和3个顾客,假设厨师炒出一个菜的时间为3秒,顾客吃掉菜品的时间为4秒,窗口上只能放一个菜。
我们来看看,使用阻塞队列如何实现,这里我们就使用ArrayBlockingQueue实现类:
可以看到,阻塞队列在多线程环境下的作用是非常明显的,算上ArrayBlockingQueue,一共有三种常用的阻塞队列:
  • ArrayBlockingQueue:有界带缓冲阻塞队列(就是队列是有容量限制的,装满了肯定是不能再装的,只能阻塞,数组实现)
  • SynchronousQueue:无缓冲阻塞队列(相当于没有容量的ArrayBlockingQueue,因此只有阻塞的情况)
  • LinkedBlockingQueue:无界带缓冲阻塞队列(没有容量限制,也可以限制容量,也会阻塞,链表实现)
这里我们以ArrayBlockingQueue为例进行源码解读,我们先来看看构造方法:
接着我们来看putoffer方法是如何实现的:
接着我们来看出队操作:
可见,如果各位对锁的使用非常熟悉的话,那么在阅读这些源码的时候,就会非常轻松了。
接着我们来看一个比较特殊的队列SynchronousQueue,它没有任何容量,也就是说正常情况下出队必须和入队操作成对出现,我们先来看它的内部,可以看到内部有一个抽象类Transferer,它定义了一个transfer方法:
乍一看,有点迷惑,难不成还要靠这玩意去实现put和take操作吗?实际上它是直接以生产者消费者模式进行的,由于不需要依靠任何容器结构来暂时存放数据,所以我们可以直接通过transfer方法来对生产者和消费者之间的数据进行传递。
比如一个线程put一个新的元素进入,这时如果没有其他线程调用take方法获取元素,那么会持续被阻塞,直到有线程取出元素,而transfer正是需要等生产者消费者双方都到齐了才能进行交接工作,单独只有其中一方都需要进行等待。
它在公平和非公平模式下,有两个实现,这里我们来看公平模式下的SynchronousQueue是如何实现的:
公平模式下,Transferer的实现是TransferQueue,是以先进先出的规则的进行的,内部有一个QNode类来保存等待的线程。
好了,我们直接上transfer()方法的实现(这里再次提醒各位,多线程环境下的源码分析和单线程的分析不同,我们需要时刻关注当前代码块的加锁状态,如果没有加锁,一定要具有多线程可能会同时运行的意识,这个意识在以后你自己处理多线程问题伴随着你,才能保证你的思路在多线程环境下是正确的):
所以,总结为以下流程:
notion image
image-20230306172203832
对于非公平模式下的SynchronousQueue,则是采用的栈结构来存储等待节点,但是思路也是与这里的一致,需要等待并进行匹配操作,各位如果感兴趣可以继续了解一下非公平模式下的SynchronousQueue实现。
在JDK7的时候,基于SynchronousQueue产生了一个更强大的TransferQueue,它保留了SynchronousQueue的匹配交接机制,并且与等待队列进行融合。
我们知道,SynchronousQueue并没有使用锁,而是采用CAS操作保证生产者与消费者的协调,但是它没有容量,而LinkedBlockingQueue虽然是有容量且无界的,但是内部基本都是基于锁实现的,性能并不是很好,这时,我们就可以将它们各自的优点单独拿出来,揉在一起,就成了性能更高的LinkedTransferQueue
相比 SynchronousQueue ,它多了一个可以存储的队列,我们依然可以像阻塞队列那样获取队列中所有元素的值,简单来说,LinkedTransferQueue其实就是一个多了存储队列的SynchronousQueue
接着我们来了解一些其他的队列:
  • PriorityBlockingQueue - 是一个支持优先级的阻塞队列,元素的获取顺序按优先级决定。
  • DelayQueue - 它能够实现延迟获取元素,同样支持优先级。
我们先来看优先级阻塞队列:
我们的重点是DelayQueue,它能实现延时出队,也就是说当一个元素插入后,如果没有超过一定时间,那么是无法让此元素出队的。
可以看到此类只接受Delayed的实现类作为元素:
这里我们手动实现一个:
接着我们在主方法中尝试使用:
我们来研究一下DelayQueue是如何实现的,首先来看add()方法:
可以看到无论是哪种入队操作,都会加锁进行,属于常规操作。我们接着来看take()方法:
到此,有关并发容器的讲解就到这里。
下一章我们会继续讲解线程池以及并发工具类。
———————————————— 版权声明:本文为柏码知识库版权所有,禁止一切未经授权的转载、发布、出售等行为,违者将被追究法律责任。 原文链接:https://www.itbaima.cn/document/5tr1sm4ho6ygpt9q
Loading...