Java并发-线程池

本文最后更新于:2 年前

优点

  • 降低资源消耗。通过重复利⽤已创建的线程降低线程创建和销毁造成的消耗。
  • 提⾼响应速度。当任务到达时,任务可以不需要的等到线程创建就能⽴即执⾏。
  • 提⾼线程的可管理性。线程是稀缺资源,如果⽆限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使⽤线程池可以进⾏统⼀的分配,调优和监控

创建的两种方式

实现 Runnable 接⼝和 Callable 接⼝的区别

Runnable ⾃ Java 1.0 以来⼀直存在,但 Callable 仅在 Java 1.5 中引⼊,⽬的就是为了来处理 Runnable 不⽀持的⽤例。

Runnable 接⼝不会返回结果或抛出检查异常,但是 Callable 接⼝可以。所以,如果任务不需要返回结果或抛出异常推荐使⽤ Runnable 接⼝,这样代码看起来会更加简洁。

⼯具类 Executors 可以实现 Runnable 对象和 Callable 对象之间的相互转换。
( Executors.callable(Runnable task )或 Executors.callable(Runnable task,Object resule)。

执⾏ execute()⽅法和 submit()⽅法的区别是什么呢?

  1. execute() ⽅法⽤于提交不需要返回值的任务,所以⽆法判断任务是否被线程池执⾏成功与否;
  2. submit() ⽅法⽤于提交需要返回值的任务。线程池会返回⼀个 Future 类型的对象,通过这个 Future 对象可以判断任务是否执⾏成功,并且可以通过 Future 的 get() ⽅法来获取返回值get() ⽅法会阻塞当前线程直到任务完成,⽽使⽤ get(long timeout,TimeUnitunit)⽅法则会阻塞当前线程⼀段时间后⽴即返回,这时候有可能任务没有执⾏完。

《阿⾥巴巴 Java 开发⼿册》中强制线程池不允许使⽤ Executors 去创建,⽽是通 ThreadPoolExecutor 的⽅式,这样的处理⽅式让写的同学更加明确线程池的运⾏规则,规避资源耗尽的⻛险。

Executors 返回线程池对象的弊端如下:

FixedThreadPool 和 SingleThreadExecutor : 允许请求的队列⻓度为 Integer.MAX_VALUE ,可能堆积⼤量的请求,从⽽导致 OOM。
CachedThreadPool 和 ScheduledThreadPool : 允许创建的线程数量为 Integer.MAX_VALUE ,可能会创建⼤量线程,从⽽导致 OOM。

创建线程池的方式

  1. 构造方法:new Threadpoolexecutor(七大参数)。
  2. 工具类:executors.new ……
  • FixedThreadPool : 该⽅法返回⼀个固定线程数量的线程池。该线程池中的线程数量始终不变。当有⼀个新的任务提交时,线程池中若有空闲线程,则⽴即执⾏。若没有,则新的任务会被暂存在⼀个任务队列中,待有线程空闲时,便处理在任务队列中的任务。
  • SingleThreadExecutor: ⽅法返回⼀个只有⼀个线程的线程池。若多余⼀个任务被提交到该线程池,任务会被保存在⼀个任务队列中,待线程空闲,按先⼊先出的顺序执⾏队列中的任务。
  • CachedThreadPool: (适合于负载不高,每个线程时间较短)该⽅法返回⼀个可根据实际情况调整线程数量的线程池。线程池的线程数量不确定,但若有空闲线程可以复⽤,则会优先使⽤可复⽤的线程。若所有线程均在⼯作,⼜有新的任务提交,则会创建新的线程处理任务。所有线程在当前任务执⾏完毕后,将返回线程池进⾏复⽤。
  • newScheduledThreadPool:创建一个以延迟或定时的方式来执行任务的线程池,工作队列为 DelayedWorkQueue。适用于需要多个后台线程执行周期任务

核心参数

image-20210803222327559

3 个最重要的参数:

  • corePoolSize : 当线程池运行的线程少于 corePoolSize 时,将创建一个新线程来处理请求,即使其
    他工作线程处于空闲状态,核⼼线程数线程数定义了最⼩可以同时运⾏的线程数量。
  • maximumPoolSize : 当队列中存放的任务达到队列容量的时候,当前可以同时运⾏的线程数量变为最⼤线程数。
  • workQueue : 当新任务来的时候会先判断当前运⾏的线程数量是否达到核⼼线程数,如果达到的话,新任务就会被存放在队列中。
    其他常⻅参数:
  • keepAliveTime :当线程池中的线程数量⼤于 corePoolSize 的时候,如果这时没有新的任务提交,核⼼线程外的线程不会⽴即销毁,⽽是会等待,直到等待的时间超过了 keepAliveTime 才会被回收销毁;
  • unit : keepAliveTime 参数的时间单位。
  • threadFactory :executor 创建新线程的时候会⽤到。
  • handler :饱和策略。

ThreadPoolExecutor 饱和策略定义

如果当前同时运⾏的线程数量达到最⼤线程数量并且队列也已经被放满了任务时, ThreadPoolTaskExecutor 定义⼀些策略:

  • ThreadPoolExecutor.AbortPolicy :抛出 RejectedExecutionException 来拒绝新任务的处理。

  • ThreadPoolExecutor.CallerRunsPolicy :调⽤执⾏⾃⼰的线程运⾏任务。但是这种策略会降低对于新任务提交速度,影响程序的整体性能。另外,这个策略喜欢增加队列容量。如果您的应⽤程序可以承受此延迟并且你不能任务丢弃任何⼀个任务请求的话,你可以选择这个策略。

  • ThreadPoolExecutor.DiscardPolicy : 不处理新任务,直接丢弃掉。

  • ThreadPoolExecutor.DiscardOldestPolicy : 此策略将丢弃最早的未处理的任务请求

执行流程

image-20210803222534163

线程池状态

image-20210803223448446

ctl

ctl 是一个打包两个概念字段的原子整数。
1)workerCount:指示线程的有效数量
2)runState:指示线程池的运行状态,有 RUNNING、SHUTDOWN、STOP、TIDYING、TERMINATED 等状态。
int 类型有 32 位,其中 ctl 的低 29 为用于表示 workerCount,高 3 位用于表示 runState,如下图所示。

这么设计的主要好处是将对 runState 和 workerCount 的操作封装成了一个原子操作

runState 和 workerCount 是线程池正常运转中的 2 个最重要属性,线程池在某一时刻该做什么操作,取决于这 2 个属性的值。

因此无论是查询还是修改,我们必须保证对这 2 个属性的操作是属于“同一时刻”的,也就是原子操作,否则就会出现错乱的情况。如果我们使用 2 个变量来分别存储,要保证原子性则需要额外进行加锁操作,这显然会带来额外的开销,而将这 2 个变量封装成 1 个 AtomicInteger 则不会带来额外的加锁开销,而且只需使用简单的位操作就能分别得到 runState 和 workerCount。

由于这个设计,workerCount 的上限 CAPACITY = (1 << 29) - 1,对应的二进制原码为:0001 1111 1111 1111 1111 1111 1111 1111(不用数了,29 个 1)。
通过 ctl 得到 runState,只需通过位操作:ctl & CAPACITY。
~(按位取反),于是“
CAPACITY”的值为:1110 0000 0000 0000 0000 0000 0000 0000,只有高 3 位为 1,与 ctl 进行 & 操作,结果为 ctl 高 3 位的值,也就是 runState。

通过 ctl 得到 workerCount 则更简单了,只需通过位操作:c & CAPACITY

在我们实际使用中,线程池的大小配置多少合适?

要想合理的配置线程池大小,首先我们需要区分任务是计算密集型还是 I/O 密集型

对于计算密集型,设置 线程数 = CPU 数 + 1,通常能实现最优的利用率。

对于 I/O 密集型,网上常见的说法是设置 线程数 = CPU 数 * 2 ,这个做法是可以的,但个人觉得不是最优的。

在我们日常的开发中,我们的任务几乎是离不开 I/O 的,常见的网络 I/O(RPC 调用)、磁盘 I/O(数据库操作),并且 I/O 的等待时间通常会占整个任务处理时间的很大一部分,在这种情况下,开启更多的线程可以让 CPU 得到更充分的使用,一个较合理的计算公式如下:
线程数 = CPU 数 * CPU 利用率 * (任务等待时间 / 任务计算时间 + 1)

例如我们有个定时任务,部署在 4 核的服务器上,该任务有 100ms 在计算,900ms 在 I/O 等待,则线程数约为:4 * 1 * (1 + 900 / 100) = 40 个。

当然,具体我们还要结合实际的使用场景来考虑。如果要求比较精确,可以通过压测来获取一个合理的值。