Z次元

前言

线程池: 简单理解,它就是一个管理线程的池子。

线程池的优点:

  • 降低资源消耗:通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
  • 提高响应速度:当任务到达时,任务可以不需要的等到线程创建就能立即执行。
  • 提高线程的可管理性:线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。

线程池的总结

工作流程

图解线程池流程
先来一张通俗易懂的:
图片
文字理解:工厂中有固定的一批工人,称为正式工人,工厂接收的订单由这些工人去完成。当订单增加,正式工人已经忙不过来了,工厂会将生产原料暂时堆积在仓库中,等有空闲的工人时再处理(因为工人空闲了也不会主动处理仓库中的生产任务,所以需要调度员实时调度)。仓库堆积满了后,订单还在增加怎么办?工厂只能临时扩招一批工人来应对生产高峰,而这批工人高峰结束后是要清退的,所以称为临时工。当时临时工也以招满后(受限于工位限制,临时工数量有上限),后面的订单只能忍痛拒绝了。
将其翻译成线程池理解:
图片

关键参数

线程池关键参数的理解(在图中可以看到):

  • corePoolSize(必需):核心线程数。即池中一直保持存活的线程数,即使这些线程处于空闲。但是将allowCoreThreadTimeOut参数设置为true后,核心线程处于空闲一段时间以上,也会被回收。
  • maximumPoolSize(必需):池中允许的最大线程数。当核心线程全部繁忙且任务队列打满之后,线程池会临时追加线程,直到总线程数达到maximumPoolSize这个上限。
  • keepAliveTime(必需):线程空闲超时时间。当非核心线程处于空闲状态的时间超过这个时间后,该线程将被回收。将allowCoreThreadTimeOut参数设置为true后,核心线程也会被回收。
  • unit(必需):keepAliveTime参数的时间单位。有:TimeUnit.DAYS(天)、TimeUnit.HOURS(小时)、TimeUnit.MINUTES(分钟)、TimeUnit.SECONDS(秒)、TimeUnit.MILLISECONDS(毫秒)、TimeUnit.MICROSECONDS(微秒)、TimeUnit.NANOSECONDS(纳秒)
  • workQueue(必需):任务队列,采用阻塞队列实现。当核心线程全部繁忙时,后续由execute方法提交的Runnable将存放在任务队列中,等待被线程处理。
  • threadFactory(可选):线程工厂。指定线程池创建线程的方式。
  • handler(可选):拒绝策略。当线程池中线程数达到maximumPoolSize且workQueue打满时,后续提交的任务将被拒绝,handler可以指定用什么方式拒绝任务。

图解参数之间的关系:
图片

工作原理

看完工作流程后再来看下线程池工作原理(图解):
图片

拒绝策略

  1. AbortPolicy(抛出一个异常,默认的)
  2. DiscardPolicy(直接丢弃任务)
  3. DiscardOldestPolicy(丢弃队列里最老的任务,将当前这个任务继续提交给线程池)
  4. CallerRunsPolicy(交给线程池调用所在的线程进行处理)

任务队列

ArrayBlockingQueue

ArrayBlockingQueue(有界队列)是一个用数组实现的有界阻塞队列,按FIFO排序量。

LinkedBlockingQueue

LinkedBlockingQueue(可设置容量队列)基于链表结构的阻塞队列,按FIFO排序任务,容量可以选择进行设置,不设置的话,将是一个无边界的阻塞队列,最大长度为Integer.MAX_VALUE,吞吐量通常要高于ArrayBlockingQuene;newFixedThreadPool线程池使用了这个队列

DelayQueue

DelayQueue(延迟队列)是一个任务定时周期的延迟执行的队列。根据指定的执行时间从小到大排序,否则根据插入到队列的先后排序。newScheduledThreadPool线程池使用了这个队列。

PriorityBlockingQueue

PriorityBlockingQueue(优先级队列)是具有优先级的无界阻塞队列;

SynchronousQueue

SynchronousQueue(同步队列)一个不存储元素的阻塞队列,每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQuene,newCachedThreadPool线程池使用了这个队列。

创建方式

创建线程池的方式:

  1. 通过ThreadPoolExecutor构造函数实现(推荐)
  2. 通过 Executor 框架的工具类 Executors 来实现,总共有三种类型(方法内部也是调用了ThreadPoolExecutor的构造方法)
  • FixedThreadPool
  • SingleThreadExecutor
  • CachedThreadPool

ThreadPoolExecutor

《阿里巴巴 Java 开发手册》“并发处理”章节中,明确指出线程资源必须通过线程池提供,不允许在应用中自行显示创建线程。(具体原因可参照线程池的优点)
并且强制要求,不允许使用 Executors 去快捷创建线程池,而是通过 ThreadPoolExecutor 构造函数的方式,除了避免 OOM 的原因外,这样的处理方式可以让我们更加明确线程池的运行规则,规避资源耗尽的风险。
也就是说使用有界队列,控制线程创建数量。因为实际使用中需要根据自己机器的性能、业务场景来手动配置线程池的参数比如核心线程数、使用的任务队列、饱和策略等等。我们应该显示地给我们的线程池命名,有助于后期快速定位问题。

Executors 返回线程池对象的弊端:
FixedThreadPoolSingleThreadExecutor :允许请求的队列长度为 Integer.MAX_VALUE,可能堆积大量的请求,从而导致 OOM(java.lang.OutOfMemoryError:内存用完了)。
CachedThreadPool ScheduledThreadPool:允许创建的线程数量为 Integer.MAX_VALUE ,可能会创建大量线程,从而导致 OOM。

示例:一个简单的for遍历线程

import java.util.Date;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class App 
{
    //corePoolSize: 定义核心线程数为 3
    private static final Integer CORE_POOL_SIZE = 3;
    //maximumPoolSize:定义最大线程数为 9
    private static final Integer MAX_POOL_SIZE = 9;
    //workQueue:定义任务队列为 ArrayBlockingQueue,并且容量为 90
    private static final Integer QUEUE_CAPACITY = 90;
    //keepAliveTime: 定义等待时间为 1
    private static final Long KEEP_ALIVE_TIME = 1L;


    public static void main(String[] args) {

        //通过ThreadPoolExecutor构造函数自定义参数创建
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
                CORE_POOL_SIZE,
                MAX_POOL_SIZE,
                KEEP_ALIVE_TIME,
                TimeUnit.SECONDS,//unit: 等待时间的单位为 TimeUnit.SECONDS
                new ArrayBlockingQueue<Runnable>(QUEUE_CAPACITY), //workQueue:定义任务队列为 ArrayBlockingQueue
                new ThreadPoolExecutor.CallerRunsPolicy());//handler:定义拒绝策略为 CallerRunsPolicy


        // 模拟向线程池提交(6个线程)任务
        for (int i = 0; i < 6; i++) {

            //执行线程;这里直接使用的Runnable匿名对象;也可以自行创建Runnable或Callable的实现类
            executor.execute(new Runnable() {
                                 @Override
                                 public void run() {
                                     Long start = System.currentTimeMillis(); //获取线程开始执行时间
                                     //这里随便写了一个遍历功能的线程
                                     for (int numValue = 0; numValue <= 2; numValue++) {

                                         System.out.println(Thread.currentThread().getName() + ":" + numValue + " " + new Date(System.currentTimeMillis()));
                                         try {
                                             Thread.sleep(500);
                                         } catch (InterruptedException e) {
                                             e.printStackTrace();
                                         }
                                     }
                                     System.out.println(Thread.currentThread().getName() + ":当前线程任务结束,总共花费时间:" + (System.currentTimeMillis() - start) / 1000 + "秒");
                                 }
                             }

            );
        }

        //终止线程池
        executor.shutdown(); //设置线程池的状态为SHUTDOWN,然后中断所有没有正在执行任务的线程
        // executor.shutdownNow(); //(慎用)设置线程池的状态为STOP,然后尝试停止所有的正在执行或暂停任务的线程,并返回等待执行任务的列表,该方法要慎用,容易造成不可控的后果
        while (!executor.isTerminated()) {
            //使用isTerminated()函数判断线程池中的所有线程是否执行完毕时候,不能直接使用该函数,必须在shutdown()方法关闭线程池之后才能使用,
            //否则isTerminated()永远为false,线程将一直阻塞在该判断的地方,导致程序最终崩溃。
        }
        System.out.println("所有线程都已结束");
    }
}

打印结果:

pool-1-thread-2:0 Thu Sep 30 10:42:01 CST 2021
pool-1-thread-1:0 Thu Sep 30 10:42:01 CST 2021
pool-1-thread-3:0 Thu Sep 30 10:42:01 CST 2021
pool-1-thread-2:1 Thu Sep 30 10:42:02 CST 2021
pool-1-thread-1:1 Thu Sep 30 10:42:02 CST 2021
pool-1-thread-3:1 Thu Sep 30 10:42:02 CST 2021
pool-1-thread-2:2 Thu Sep 30 10:42:02 CST 2021
pool-1-thread-1:2 Thu Sep 30 10:42:02 CST 2021
pool-1-thread-3:2 Thu Sep 30 10:42:02 CST 2021
pool-1-thread-3:当前线程任务结束,总共花费时间:1秒
pool-1-thread-3:0 Thu Sep 30 10:42:03 CST 2021
pool-1-thread-1:当前线程任务结束,总共花费时间:1秒
pool-1-thread-1:0 Thu Sep 30 10:42:03 CST 2021
pool-1-thread-2:当前线程任务结束,总共花费时间:1秒
pool-1-thread-2:0 Thu Sep 30 10:42:03 CST 2021
pool-1-thread-3:1 Thu Sep 30 10:42:03 CST 2021
pool-1-thread-2:1 Thu Sep 30 10:42:03 CST 2021
pool-1-thread-1:1 Thu Sep 30 10:42:03 CST 2021
pool-1-thread-3:2 Thu Sep 30 10:42:04 CST 2021
pool-1-thread-1:2 Thu Sep 30 10:42:04 CST 2021
pool-1-thread-2:2 Thu Sep 30 10:42:04 CST 2021
pool-1-thread-2:当前线程任务结束,总共花费时间:1秒
pool-1-thread-1:当前线程任务结束,总共花费时间:1秒
pool-1-thread-3:当前线程任务结束,总共花费时间:1秒
所有线程都已结束

打印结果分析:
该示例中总共向线程池提交了6个线程任务,但是该线程池的核心线程只有3个,所以根据示例中定义的拒绝策略和任务队列,剩下的3个线程在任务队列中,根据打印结果可以看到,当前面的3个线程分别执行完毕时,空出的核心线程会被剩下的3个线程任务拿到,然后继续执行剩下的线程任务

Executors

FixedThreadPool

创建可重用固定线程数的线程池。

  • FixedThreadPool只有核心线程,且数量固定,没有非核心线程。
  • keepAliveTime设置为0L,代表多余的线程会被立即终止;即没有非空闲时间;因为不会产生多余的线程,所以keepAliveTime是无效的参数。
  • 任务队列为无界的阻塞队列LinkedBlockingQueue(容量默认为Integer.MAX_VALUE)。

图片

  1. 提交任务
  2. 如果线程数少于核心线程,创建核心线程执行任务
  3. 如果线程数等于核心线程,把任务添加到LinkedBlockingQueue阻塞队列
  4. 如果线程执行完任务,去阻塞队列取任务,继续执行。

示例:

   ExecutorService executor = Executors.newFixedThreadPool(10);
                    for (int i = 0; i < Integer.MAX_VALUE; i++) {
                        executor.execute(()->{
                            try {
                                Thread.sleep(1000);
                            } catch (InterruptedException e) {
                                //do nothing
                            }
            		});

使用场景:
适用于处理CPU密集型的任务,确保CPU在长期被工作线程使用的情况下,尽可能的少的分配线程,即适用执行长期的任务。
缺点:

  1. 当线程池中的线程数达到 corePoolSize 后,新任务将在无界队列中等待,因此线程池中的线程数不会超过 corePoolSize;
  2. 由于使用无界队列时 maximumPoolSize 将是一个无效参数,因为不可能存在任务队列满的情况。所以,通过创建 FixedThreadPool的源码可以看出创建的 FixedThreadPool 的 corePoolSize 和 maximumPoolSize 被设置为同一个值。
  3. 由于 1 和 2,使用无界队列时 keepAliveTime 将是一个无效参数;
  4. 运行中的 FixedThreadPool(未执行 shutdown()或 shutdownNow())不会拒绝任务,在任务比较多的时候会导致 OOM(内存溢出)

CachedThreadPool

一个根据需要创建线程的线程池。
核心线程数为0
最大线程数为Integer.MAX_VALUE
阻塞队列是SynchronousQueue
非核心线程空闲存活时间为60秒
当提交任务的速度大于处理任务的速度时,每次提交一个任务,就必然会创建一个线程。极端情况下会创建过多的线程,耗尽 CPU 和内存资源。由于空闲 60 秒的线程会被终止,长时间保持空闲的 CachedThreadPool 不会占用任何资源。

   public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>(),
                                      threadFactory);
   }

图片

  1. 提交任务
  2. 因为没有核心线程,所以任务直接加到SynchronousQueue队列。
  3. 判断是否有空闲线程,如果有,就去取出任务执行。
  4. 如果没有空闲线程,就新建一个线程执行。
  5. 执行完任务的线程,还可以存活60秒,如果在这期间,接到任务,可以继续活下去;否则被销毁。

使用场景:
因为maximumPoolSize是无界的,所以提交任务的速度 > 线程池中线程处理任务的速度就要不断创建新线程;每次提交任务,都会立即有线程去处理,因此CachedThreadPool适用于处理大量、耗时少的任务。


缺点:
CachedThreadPool允许创建的线程数量为 Integer.MAX_VALUE ,可能会创建大量线程,从而导致 OOM。

SingleThreadExecutor

只有一个线程的线程池。

  • 核心线程数为1
  • 最大线程数也为1
  • 阻塞队列是LinkedBlockingQueue
  • keepAliveTime为0
  ExecutorService executor = Executors.newSingleThreadExecutor();
                for (int i = 0; i < 5; i++) {
                    executor.execute(() -> {
                        System.out.println(Thread.currentThread().getName()+"正在执行");
                    });
     		   }

图片

  1. 提交任务
  2. 线程池是否有一条线程在,如果没有,新建线程执行任务
  3. 如果有,讲任务加到阻塞队列
  4. 当前的唯一线程,从队列取任务,执行完一个,再继续取,如此循环。。。。

使用场景:
适用于串行执行任务的场景,一个任务一个任务地执行。
缺点:
SingleThreadExecutor 使用无界队列 LinkedBlockingQueue 作为线程池的任务队列(队列的容量为 Intger.MAX_VALUE)。SingleThreadExecutor 使用无界队列作为线程池的任务队列会对线程池带来的影响与 FixedThreadPool 相同。说简单点就是可能会导致 OOM,

常用方法

提交

execute()方法用于提交不需要返回值的任务,所以无法判断任务是否被线程池执行成功与否;
submit()方法用于提交需要返回值的任务。线程池会返回一个 Future 类型的对象,通过这个 Future 对象可以判断任务是否执行成功,并且可以通过 Future 的 get()方法来获取返回值,get()方法会阻塞当前线程直到任务完成,而使用 get(long timeout,TimeUnit unit)方法则会阻塞当前线程一段时间后立即返回,这时候有可能任务没有执行完。
AbstractExecutorService接口中的一个 submit 方法为例:

public Future<?> submit(Runnable task) {
        if (task == null) thrownew NullPointerException();
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        execute(ftask);
        return ftask;
    }

上面方法调用的 newTaskFor()方法返回了一个 FutureTask 对象。

protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
        return new FutureTask<T>(runnable, value);
}

对比execute()方法:

public void execute(Runnable command) {
      ...
}

关闭

shutdown() :关闭线程池,线程池的状态变为 SHUTDOWN。线程池不再接受新任务了,但是队列里的任务得执行完毕。
shutdownNow() :关闭线程池,线程的状态变为 STOP。线程池会终止当前正在运行的任务,并停止处理排队的任务并返回正在等待执行的 List;该方法要慎用,容易造成不可控的后果

判断

isShutDown 当调用 shutdown()方法后,返回为 true,否则为false。
isTerminated 当调用shutdown() 方法后,并且所有提交的任务完成后返回为 true,否则为false。

状态切换

RUNNING

  • 该状态的线程池会接收新任务,并处理阻塞队列中的任务;
  • 调用线程池的shutdown()方法,可以切换到SHUTDOWN状态;
  • 调用线程池的shutdownNow()方法,可以切换到STOP状态;

SHUTDOWN

  • 该状态的线程池不会接收新任务,但会处理阻塞队列中的任务;
  • 队列为空,并且线程池中执行的任务也为空,进入TIDYING状态;

STOP

  • 该状态的线程不会接收新任务,也不会处理阻塞队列中的任务,而且会中断正在运行的任务;
  • 线程池中执行的任务为空,进入TIDYING状态;

TIDYING

  • 该状态表明所有的任务已经运行终止,记录的任务数量为0。
  • terminated()执行完毕,进入TERMINATED状态

TERMINATED

  • 该状态表示线程池彻底终止

配置线程数

CPU 密集型任务(N+1): 这种任务消耗的主要是 CPU 资源,可以将线程数设置为 N(CPU 核心数)+1,比 CPU 核心数多出来的一个线程是为了防止线程偶发的缺页中断,或者其它原因导致的任务暂停而带来的影响。一旦任务暂停,CPU 就会处于空闲状态,而在这种情况下多出来的一个线程就可以充分利用 CPU 的空闲时间。I/O 密集型任务(2N): 这种任务应用起来,系统会用大部分的时间来处理 I/O 交互,而线程在处理 I/O 的时间段内不会占用 CPU 来处理,这时就可以将 CPU 交出给其它线程使用。因此在 I/O 密集型任务的应用中,我们可以多配置一些线程,具体的计算方法是 2N。

参考文章:
新手也能看懂的线程池学习总结
Java线程池解析
深入Java线程池:从设计思想到源码解读

评论区
发表评论

这里还没有评论哦

快来发一条评论抢占前排吧

前言
线程池的总结
工作流程
关键参数
工作原理
拒绝策略
任务队列
ArrayBlockingQueue
LinkedBlockingQueue
DelayQueue
PriorityBlockingQueue
SynchronousQueue
创建方式
ThreadPoolExecutor
Executors
FixedThreadPool
CachedThreadPool
SingleThreadExecutor
常用方法
提交
关闭
判断
状态切换
配置线程数