JUC笔记(三)并发编程进阶

并发编程进阶

欢迎来到JUC学习的最后一章,王炸当然是放在最后了。

线程池

在我们的程序中,多多少少都会用到多线程技术,而我们以往都是使用Thread类来创建一个新的线程:
利用多线程,我们的程序可以更加合理地使用CPU多核心资源,在同一时间完成更多的工作。但是,如果我们的程序频繁地创建线程,由于线程的创建和销毁也需要占用系统资源,因此这样会降低我们整个程序的性能,那么怎么做,才能更高效地使用多线程呢?
我们其实可以将已创建的线程复用,利用池化技术,就像数据库连接池一样,我们也可以创建很多个线程,然后反复地使用这些线程,而不对它们进行销毁。
虽然听起来这个想法比较新颖,但是实际上线程池早已利用到各个地方,比如我们的Tomcat服务器,要在同一时间接受和处理大量的请求,那么就必须要在短时间内创建大量的线程,结束后又进行销毁,这显然会导致很大的开销,因此这种情况下使用线程池显然是更好的解决方案。
由于线程池可以反复利用已有线程执行多线程操作,所以它一般是有容量限制的,当所有的线程都处于工作状态时,那么新的多线程请求会被阻塞,直到有一个线程空闲出来为止,实际上这里就会用到我们之前讲解的阻塞队列。
所以我们可以暂时得到下面一个样子:
notion image
image-20230306172249412
当然,JUC提供的线程池肯定没有这么简单,接下来就让我们深入进行了解。

线程池的使用

我们可以直接创建一个新的线程池对象,它已经提前帮助我们实现好了线程的调度机制,我们先来看它的构造方法:
参数稍微有一点多,这里我们依次进行讲解:
  • corePoolSize:核心线程池大小,我们每向线程池提交一个多线程任务时,都会创建一个新的核心线程,无论是否存在其他空闲线程,直到到达核心线程池大小为止,之后会尝试复用线程资源。当然也可以在一开始就全部初始化好,调用prestartAllCoreThreads()即可。
  • maximumPoolSize:最大线程池大小,当目前线程池中所有的线程都处于运行状态,并且等待队列已满,那么就会直接尝试继续创建新的非核心线程运行,但是不能超过最大线程池大小。
  • keepAliveTime:线程最大空闲时间,当一个非核心线程空闲超过一定时间,会自动销毁。
  • unit:线程最大空闲时间的时间单位
  • workQueue:线程等待队列,当线程池中核心线程数已满时,就会将任务暂时存到等待队列中,直到有线程资源可用为止,这里可以使用我们上一章学到的阻塞队列。
  • threadFactory:线程创建工厂,我们可以干涉线程池中线程的创建过程,进行自定义。
  • handler:拒绝策略,当等待队列和线程池都没有空间了,真的不能再来新的任务时,来了个新的多线程任务,那么只能拒绝了,这时就会根据当前设定的拒绝策略进行处理。
最为重要的就是线程池大小的限定了,这个也是很有学问的,合理地分配大小会使得线程池的执行效率事半功倍:
  • 首先我们可以分析一下,线程池执行任务的特性,是CPU 密集型还是 IO 密集型
    • CPU密集型: 主要是执行计算任务,响应时间很快,CPU一直在运行,这种任务CPU的利用率很高,那么线程数应该是根据 CPU 核心数来决定,CPU 核心数 = 最大同时执行线程数,以 i5-9400F 处理器为例,CPU 核心数为 6,那么最多就能同时执行 6 个线程。
    • IO密集型: 主要是进行 IO 操作,因为执行 IO 操作的时间比较较长,比如从硬盘读取数据之类的,CPU就得等着IO操作,很容易出现空闲状态,导致 CPU 的利用率不高,这种情况下可以适当增加线程池的大小,让更多的线程可以一起进行IO操作,一般可以配置为CPU核心数的2倍。
这里我们手动创建一个新的线程池看看效果:
这里我们创建了一个核心容量为2,最大容量为4,等待队列长度为2,空闲时间为3秒的线程池,现在我们向其中执行6个任务,每个任务都会进行1秒钟休眠,那么当线程池中2个核心线程都被占用时,还有4个线程就只能进入到等待队列中了,但是等待队列中只有2个容量,这时紧接着的2个任务,线程池将直接尝试创建线程,由于不大于最大容量,因此可以成功创建。最后所有线程完成之后,在等待5秒后,超过了线程池的最大空闲时间,非核心线程被回收了,所以线程池中只有2个线程存在。
那么要是等待队列设定为没有容量的SynchronousQueue呢,这个时候会发生什么?
可以看到,前4个任务都可以正常执行,但是到第五个任务时,直接抛出了异常,这其实就是因为等待队列的容量为0,相当于没有容量,那么这个时候,就只能拒绝任务了,拒绝的操作会根据拒绝策略决定。
线程池的拒绝策略默认有以下几个:
  • AbortPolicy(默认):像上面一样,直接抛异常。
  • CallerRunsPolicy:直接让提交任务的线程运行这个任务,比如在主线程向线程池提交了任务,那么就直接由主线程执行。
  • DiscardOldestPolicy:丢弃队列中最近的一个任务,替换为当前任务。
  • DiscardPolicy:什么也不用做。
这里我们进行一下测试:
CallerRunsPolicy策略是谁提交的谁自己执行,所以:
可以看到,当队列塞不下时,直接在主线程运行任务,运行完之后再继续向下执行。
我们吧策略修改为DiscardOldestPolicy试试看:
它会移除等待队列中的最近的一个任务,所以可以看到有一个任务实际上是被抛弃了的:
比较有意思的是,如果选择没有容量的SynchronousQueue作为等待队列会爆栈:
这是为什么呢?我们来看看这个拒绝策略的源码:
可以看到,它会先对等待队列进行出队操作,但是由于SynchronousQueue压根没容量,所有这个操作毫无意义,然后就会递归执行execute方法,而进入之后,又发现没有容量不能插入,于是又重复上面的操作,这样就会无限的递归下去,最后就爆栈了。
当然,除了使用官方提供的4种策略之外,我们还可以使用自定义的策略:
接着我们来看线程创建工厂,我们可以自己决定如何创建新的线程:
这里传入的Runnable对象就是我们提交的任务,可以看到需要我们返回一个Thread对象,其实就是线程池创建线程的过程,而如何创建这个对象,以及它的一些属性,就都由我们来决定。
各位有没有想过这样一个情况,如果我们的任务在运行过程中出现异常了,那么是不是会导致线程池中的线程被销毁呢?
可以看到,出现异常之后,再次提交新的任务,执行的线程是一个新的线程了。
除了我们自己创建线程池之外,官方也提供了很多的线程池定义,我们可以使用Executors工具类来快速创建线程池:
可以看到它的内部实现为:
这里直接将最大线程和核心线程数量设定为一样的,并且等待时间为0,因为压根不需要,并且采用的是一个无界的LinkedBlockingQueue作为等待队列。
使用newSingleThreadExecutor来创建只有一个线程的线程池:
原理如下:
可以看到这里并不是直接创建的一个ThreadPoolExecutor对象,而是套了一层FinalizableDelegatedExecutorService,那么这个又是什么东西呢?
所以,下面两种写法的区别在于:
前者实际上是被代理了,我们没办法直接修改前者的相关属性,显然使用前者创建只有一个线程的线程池更加专业和安全(可以防止属性被修改)一些。
最后我们来看newCachedThreadPool方法:
我们来看看它的实现:
可以看到,核心线程数为0,那么也就是说所有的线程都是非核心线程,也就是说线程空闲时间超过1秒钟,一律销毁。但是它的最大容量是Integer.MAX_VALUE,也就是说,它可以无限制地增长下去,所以这玩意一定要慎用。

执行带返回值的任务

一个多线程任务不仅仅可以是void无返回值任务,比如我们现在需要执行一个任务,但是我们需要在任务执行之后得到一个结果,这个时候怎么办呢?
这里我们就可以使用到Future了,它可以返回任务的计算结果,我们可以通过它来获取任务的结果以及任务当前是否完成:
当然结果也可以一开始就定义好,然后等待Runnable执行完之后再返回:
还可以通过传入FutureTask对象的方式:
我们可以还通过Future对象获取当前任务的一些状态:
我们来试试看在任务执行途中取消任务:

执行定时任务

既然线程池怎么强大,那么线程池能不能执行定时任务呢?我们之前如果需要执行一个定时任务,那么肯定会用到Timer和TimerTask,但是它只会创建一个线程处理我们的定时任务,无法实现多线程调度,并且它无法处理异常情况一旦抛出未捕获异常那么会直接终止,显然我们需要一个更加强大的定时器。
JDK5之后,我们可以使用ScheduledThreadPoolExecutor来提交定时任务,它继承自ThreadPoolExecutor,并且所有的构造方法都必须要求最大线程池容量为Integer.MAX_VALUE,并且都是采用的DelayedWorkQueue作为等待队列。
我们来测试一下它的方法,这个方法可以提交一个延时任务,只有到达指定时间之后才会开始:
我们也可以像之前一样,传入一个Callable对象,用于接收返回值:
可以看到schedule方法返回了一个ScheduledFuture对象,和Future一样,它也支持返回值的获取、包括对任务的取消同时还支持获取剩余等待时间。
那么如果我们希望按照一定的频率不断执行任务呢?
Executors也为我们预置了newScheduledThreadPool方法用于创建线程池:

线程池实现原理

前面我们了解了线程池的使用,那么接着我们来看看它的详细实现过程,结构稍微有点复杂,坐稳,发车了。
这里需要首先介绍一下ctl变量:
notion image
image-20230306172347411
我们先从最简单的入手,看看在调用execute方法之后,线程池会做些什么:
是不是感觉思路还挺清晰的,我们接着来看addWorker是怎么创建和执行任务的,又是一大堆代码:
接着我们来看Worker类是如何实现的,它继承自AbstractQueuedSynchronizer,时隔两章,居然再次遇到AQS,那也就是说,它本身就是一把锁:
最后我们来看看一个Worker到底是怎么在进行任务的:
那么它是怎么从阻塞队列里面获取任务的呢:
虽然我们的源码解读越来越深,但是只要各位的思路不断,依然是可以继续往下看的。到此,有关execute()方法的源码解读,就先到这里。
接着我们来看当线程池关闭时会做什么事情:
shutdownNow()方法也差不多,但是这里会更直接一些:
最后的最后,我们再来看看tryTerminate()是怎么完完全全终止掉一个线程池的:
OK,有关线程池的实现原理,我们就暂时先介绍到这里,关于更高级的定时任务线程池,这里就不做讲解了。

并发工具类

计数器锁 CountDownLatch

多任务同步神器。它允许一个或多个线程,等待其他线程完成工作,比如现在我们有这样的一个需求:
  • 有20个计算任务,我们需要先将这些任务的结果全部计算出来,每个任务的执行时间未知
  • 当所有任务结束之后,立即整合统计最终结果
要实现这个需求,那么有一个很麻烦的地方,我们不知道任务到底什么时候执行完毕,那么可否将最终统计延迟一定时间进行呢?但是最终统计无论延迟多久进行,要么不能保证所有任务都完成,要么可能所有任务都完成了而这里还在等。
所以说,我们需要一个能够实现子任务同步的工具。
我们在调用await()方法之后,实际上就是一个等待计数器衰减为0的过程,而进行自减操作则由各个子线程来完成,当子线程完成工作后,那么就将计数器-1,所有的子线程完成之后,计数器为0,结束等待。
那么它是如何实现的呢?实现 原理非常简单:
在深入讲解之前,我们先大致了解一下CountDownLatch的基本实现思路:
  • 利用共享锁实现
  • 在一开始的时候就是已经上了count层锁的状态,也就是state = count
  • await()就是加共享锁,但是必须state0才能加锁成功,否则按照AQS的机制,会进入等待队列阻塞,加锁成功后结束阻塞
  • countDown()就是解1层锁,也就是靠这个方法一点一点把state的值减到0
由于我们前面只对独占锁进行了讲解,没有对共享锁进行讲解,这里还是稍微提一下它:
我们接着来看,它的countdown过程:
可能看完之后还是有点乱,我们再来理一下:
  • 共享锁是线程共享的,同一时刻能有多个线程拥有共享锁。
  • 如果一个线程刚获取了共享锁,那么在其之后等待的线程也很有可能能够获取到锁,所以得传播下去继续尝试唤醒后面的结点,不像独占锁,独占的压根不需要考虑这些。
  • 如果一个线程刚释放了锁,不管是独占锁还是共享锁,都需要唤醒后续等待结点的线程。
回到CountDownLatch,再结合整个AQS共享锁的实现机制,进行一次完整的推导,看明白还是比较简单的。

循环屏障 CyclicBarrier

好比一场游戏,我们必须等待房间内人数足够之后才能开始,并且游戏开始之后玩家需要同时进入游戏以保证公平性。
假如现在游戏房间内一共5人,但是游戏开始需要10人,所以我们必须等待剩下5人到来之后才能开始游戏,并且保证游戏开始时所有玩家都是同时进入,那么怎么实现这个功能呢?我们可以使用CyclicBarrier,翻译过来就是循环屏障,那么这个屏障正式为了解决这个问题而出现的。
可以看到,循环屏障会不断阻挡线程,直到被阻挡的线程足够多时,才能一起冲破屏障,并且在冲破屏障时,我们也可以做一些其他的任务。这和人多力量大的道理是差不多的,当人足够多时方能冲破阻碍,到达美好的明天。当然,屏障由于是可循环的,所以它在被冲破后,会重新开始计数,继续阻挡后续的线程:
可以看到,通过使用循环屏障,我们可以对线程进行一波一波地放行,每一波都放行5个线程,当然除了自动重置之外,我们也可以调用reset()方法来手动进行重置操作,同样会重新计数:
可以看到,在调用reset()之后,处于等待状态下的线程,全部被中断并且抛出BrokenBarrierException异常,循环屏障等待线程数归零。那么要是处于等待状态下的线程被中断了呢?屏障的线程等待数量会不会自动减少?
可以看到,当await()状态下的线程被中断,那么屏障会直接变成损坏状态,一旦屏障损坏,那么这一轮就无法再做任何等待操作了。也就是说,本来大家计划一起合力冲破屏障,结果有一个人摆烂中途退出了,那么所有人的努力都前功尽弃,这一轮的屏障也不可能再被冲破了(所以CyclicBarrier告诉我们,不要做那个害群之马,要相信你的团队,不然没有好果汁吃),只能进行reset()重置操作进行重置才能恢复正常。
乍一看,怎么感觉和之前讲的CountDownLatch有点像,好了,这里就得区分一下了,千万别搞混:
  • CountDownLatch:
      1. 它只能使用一次,是一个一次性的工具
      1. 它是一个或多个线程用于等待其他线程完成的同步工具
  • CyclicBarrier
      1. 它可以反复使用,允许自动或手动重置计数
      1. 它是让一定数量的线程在同一时间开始运行的同步工具
我们接着来看循环屏障的实现细节:
看完了CyclicBarrier的源码之后,是不是感觉比CountDownLatch更简单一些?

信号量 Semaphore

还记得我们在《操作系统》中学习的信号量机制吗?它在解决进程之间的同步问题中起着非常大的作用。
信号量(Semaphore),有时被称为信号灯,是在多线程环境下使用的一种设施,是可以用来保证两个或多个关键代码段不被并发调用。在进入一个关键代码段之前,线程必须获取一个信号量;一旦该关键代码段完成了,那么该线程必须释放信号量。其它想进入该关键代码段的线程必须等待直到第一个线程释放信号量。
通过使用信号量,我们可以决定某个资源同一时间能够被访问的最大线程数,它相当于对某个资源的访问进行了流量控制。简单来说,它就是一个可以被N个线程占用的排它锁(因此也支持公平和非公平模式),我们可以在最开始设定Semaphore的许可证数量,每个线程都可以获得1个或n个许可证,当许可证耗尽或不足以供其他线程获取时,其他线程将被阻塞。
我们也可以通过Semaphore获取一些常规信息:
我们可以手动回收掉所有的许可证:
这里我们模拟一下,比如现在有10个线程同时进行任务,任务要求是执行某个方法,但是这个方法最多同时只能由5个线程执行,这里我们使用信号量就非常合适。

数据交换 Exchanger

线程之间的数据传递也可以这么简单。
使用Exchanger,它能够实现线程之间的数据交换:
在调用exchange方法后,当前线程会等待其他线程调用同一个exchanger对象的exchange方法,当另一个线程也调用之后,方法会返回对方线程传入的参数。
可见功能还是比较简单的。

Fork/Join框架

在JDK7时,出现了一个新的框架用于并行执行任务,它的目的是为了把大型任务拆分为多个小任务,最后汇总多个小任务的结果,得到整大任务的结果,并且这些小任务都是同时在进行,大大提高运算效率。Fork就是拆分,Join就是合并。
我们来演示一下实际的情况,比如一个算式:18x7+36x8+9x77+8x53,可以拆分为四个小任务:18x7、36x8、9x77、8x53,最后我们只需要将这四个任务的结果加起来,就是我们原本算式的结果了,有点归并排序的味道。
notion image
image-20230306172442485
它不仅仅只是拆分任务并使用多线程,而且还可以利用工作窃取算法,提高线程的利用率。
工作窃取算法: 是指某个线程从其他队列里窃取任务来执行。一个大任务分割为若干个互不依赖的子任务,为了减少线程间的竞争,把这些子任务分别放到不同的队列里,并为每个队列创建一个单独的线程来执行队列里的任务,线程和队列一一对应。但是有的线程会先把自己队列里的任务干完,而其他线程对应的队列里还有任务待处理。干完活的线程与其等着,不如帮其他线程干活,于是它就去其他线程的队列里窃取一个任务来执行。
notion image
image-20230306172457006
现在我们来看看如何使用它,这里以计算1-1000的和为例,我们可以将其拆分为8个小段的数相加,比如1-125、126-250… ,最后再汇总即可,它也是依靠线程池来实现的:
可以看到,结果非常正确,但是整个计算任务实际上是拆分为了8个子任务同时完成的,结合多线程,原本的单线程任务,在多线程的加持下速度成倍提升。
包括Arrays工具类提供的并行排序也是利用了ForkJoinPool来实现:
并行排序的性能在多核心CPU环境下,肯定是优于普通排序的,并且排序规模越大优势越显著。
至此,并发编程篇完结。
———————————————— 版权声明:本文为柏码知识库版权所有,禁止一切未经授权的转载、发布、出售等行为,违者将被追究法律责任。 原文链接:https://www.itbaima.cn/document/1scf51z5300mzxkh
Loading...