Dawn's Blogs

分享技术 记录成长

0%

Java面试之并发编程 (5) 线程池

线程池

线程池就是管理线程的资源池,有任务需要处理时,直接从线程池中取得线程进行处理,处理完成后线程会被放回线程池中,而不是立即销毁。Java 已经提供了线程,为什么还需要使用线程池?

  • 降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
  • 提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。
  • 提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。

如何创建线程池

Java 中创建线程池有两种方法:ThreadPoolExecutor 和 Executor 框架。

  1. 使用 ThreadPoolExecutor 来创建线程池(推荐)。使用这种方式可以使得程序员更加明确线程池的运行规则,规避资源耗尽的风险。

通过构造方法实现

  1. 通过 Executor 框架的工具类 Executors 来创建,可以构造多种类型的线程池。这其实就是队 ThreadPoolExecutor 构造函数的封装。

img

使用 Executors 创建线程池的弊端如下:

  • FixedThreadPoolSingleThreadExecutor:使用的是无界的 LinkedBlockingQueue,任务队列最大长度为 Integer.MAX_VALUE,可能堆积大量的请求,从而导致 OOM。

    CachedThreadPool:使用的是同步队列 SynchronousQueue,允许创建的线程数量为 Integer.MAX_VALUE ,如果任务数量过多且执行速度较慢,可能会创建大量的线程,从而导致 OOM。

    ScheduledThreadPoolSingleThreadScheduledExecutor:使用的无界的延迟阻塞队列 DelayedWorkQueue,任务队列最大长度为 Integer.MAX_VALUE,可能堆积大量的请求,从而导致 OOM。

ThreadPoolExecutor

ThreadPoolExecutor 构造函数的定义如下,其中有三个重要的参数:

  • corePoolSize:任务队列未达到队列容量时,最大可以同时运行的线程数量。
  • maximumPoolSize:任务队列中存放的任务达到队列容量的时候,当前可以同时运行的线程数量变为最大线程数。
  • workQueue:新任务来的时候会先判断当前运行的线程数量是否达到核心线程数,如果达到的话,新任务就会被存放在队列中。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
/**
* 用给定的初始参数创建一个新的ThreadPoolExecutor。
*/
public ThreadPoolExecutor(int corePoolSize,//线程池的核心线程数量
int maximumPoolSize,//线程池的最大线程数
long keepAliveTime,//当线程数大于核心线程数时,多余的空闲线程存活的最长时间
TimeUnit unit,//时间单位
BlockingQueue<Runnable> workQueue,//任务队列,用来储存等待执行任务的队列
ThreadFactory threadFactory,//线程工厂,用来创建线程,一般默认即可
RejectedExecutionHandler handler//拒绝策略,当提交的任务过多而不能及时处理时,我们可以定制策略来处理任务
) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}

线程池各个参数的关系

线程池处理流程

线程池处理流程如下,需要注意的是,如果队列未满则会优先加入到队列中等待执行。

图解线程池实现原理

阻塞队列

workQueue 用于定义再核心线程满时,任务加入的阻塞队列。新任务来的时候会先判断当前运行的线程数量是否达到核心线程数,如果达到的话,新任务就会被存放在队列中。

不同的线程池会选用不同的阻塞队列,可以结合 Executors 构建的线程池来分析。

  • 容量为 Integer.MAX_VALUELinkedBlockingQueue(无界队列):FixedThreadPoolSingleThreadExectorFixedThreadPool最多只能创建核心线程数的线程(核心线程数和最大线程数相等),SingleThreadExector只能创建一个线程(核心线程数和最大线程数都是 1),二者的任务队列永远不会被放满。
  • SynchronousQueue(同步队列):CachedThreadPoolSynchronousQueue 没有容量,不存储元素,目的是保证对于提交的任务,如果有空闲线程,则使用空闲线程来处理;否则新建一个线程来处理任务。也就是说,CachedThreadPool 的最大线程数是 Integer.MAX_VALUE ,可以理解为线程数是可以无限扩展的,可能会创建大量线程,从而导致 OOM。
  • DelayedWorkQueue(延迟阻塞队列):ScheduledThreadPoolSingleThreadScheduledExecutorDelayedWorkQueue 的内部元素并不是按照放入的时间排序,而是会按照延迟的时间长短对任务进行排序,内部采用的是“堆”的数据结构,可以保证每次出队的任务都是当前队列中执行时间最靠前的。DelayedWorkQueue 添加元素满了之后会自动扩容原来容量的 1/2,即永远不会阻塞,最大扩容可达 Integer.MAX_VALUE,所以最多只能创建核心线程数的线程。

拒绝策略

如果当前同时运行的线程数量达到最大线程数量并且队列也已经被放满了任务时,RejectedExecutionHandler 定义了拒绝策略:

  • ThreadPoolExecutor.AbortPolicy:抛出 RejectedExecutionException来拒绝新任务的处理。
  • ThreadPoolExecutor.CallerRunsPolicy:调用执行自己的线程运行任务,也就是直接在调用 execute 方法的线程中运行被拒绝的任务,如果执行程序已关闭,则会丢弃该任务。因此这种策略会降低对于新任务提交速度,影响程序的整体性能。如果您的应用程序可以承受此延迟并且你要求任何一个任务请求都要被执行的话,你可以选择这个策略。
  • ThreadPoolExecutor.DiscardPolicy:不处理新任务,直接丢弃掉。
  • ThreadPoolExecutor.DiscardOldestPolicy:此策略将丢弃最早的未处理的任务请求。

面试题 - 如何设计一个优先级线程池?

核心思想:选择优先级阻塞队列(PriorityBlockingQueue)作为任务队列,作为 ThreadPoolExecuter 的构造函数。要想让 PriorityBlockingQueue 实现对任务的排序,传入其中的任务必须是具备排序能力的,方式有两种:

  1. 提交到线程池的任务实现 Comparable 接口,并重写 compareTo 方法来指定任务之间的优先级比较规则。
  2. 创建 PriorityBlockingQueue 时传入一个 Comparator 对象来指定任务之间的排序规则(推荐)。

不过,这存在一些风险和问题,比如:

  • PriorityBlockingQueue 是无界的,可能堆积大量的请求,从而导致 OOM。解决方法:继承 PriorityBlockingQueue 并重写 offer 入队方法,当插入的元素数量超过指定值就返回 false 。
  • 可能会导致饥饿问题,即低优先级的任务长时间得不到执行。解决方法:优化设计,比如等待时间过长的任务会被移除并重新添加到队列中,但是优先级会被提升。
  • 由于需要对队列中的元素进行排序操作以及保证线程安全(并发控制采用的是可重入锁 ReentrantLock),因此会降低性能。这是无法避免的问题。