当前位置:科技动态 > 深入理解线程池ThreadPoolExecutor并实际使用

深入理解线程池ThreadPoolExecutor并实际使用

  • 发布:2023-09-30 22:09

在讲线程池之前,我们先来说说线程。

什么是线程

线程分为单线程和多线程;单线程意味着一个线程正在执行任务。多线程意味着创建多个线程同时执行任务。
比如我们使用浏览器浏览网页,如果一次只能打开一个窗口,这就是单线程;现在我们的浏览器肯定可以打开多个窗口,比如我们可以在一个窗口中听音乐,其他窗口也可以看新闻,这就是多线程的概念。并行和并发也是同一概念。例如,如果你正在开车,然后有朋友打电话,并行性:开车时,你使用蓝牙耳机接听电话,同时处理;并发:停在路边,接电话,接电话后继续行驶。

什么是线程池

​ 创建线程需要资源和时间。如果任务来了才创建线程,响应时间会变长。并且一个进程可以创建的线程数量是有限的。

​为了避免这些问题,在程序启动时会创建几个线程来响应处理。它们被称为线程池,里面的线程被称为工作线程。

java API提供了Executor框架来创建不同的线程池。

为什么要使用线程池

避免频繁创建和销毁线程,实现线程对象的复用;线程资源管理

1。如何创建线程池

JAVA中创建线程池主要有两种方法。一种是Executors工厂类提供的方法,它提供了4种不同的线程池供使用。另一个类是通过 ThreadPoolExecutor 类创建的。

1.1 Executors工厂类提供的方法

1.1.1) newCachedThreadPool

创建可缓存的线程池。如果线程数超过处理需求,一段时间后就会回收缓存。如果线程数量不够,则会创建新线程。

公共 静态   main字符串[]参数 {
ExecutorService executorService=执行器.newCachedThreadPool) );
对于int= 0;<10;++) {
int最终I=i;
executorService.执行(()-> {
系统.out.println(LocalDateTime .现在()+" " +线程当前线程getName()+" "+最终I);
尝试{
线程.睡眠(3000);
} catch InterruptedException)  e) {e.printStackTrace();
}
});
}
}
2021-06-16T10:01:27.728池 -1-螺纹-10
2021-06-16T10:01:27.728-1-螺纹-3 2
2021-06-16T10:01:27.728-1-螺纹-2 1
2021-06-16T10:01:27.728-1-螺纹-4 3
2021-06-16T10:01:27.728-1-螺纹-5 4
2021-06-16T10:01:27.728-1-螺纹-6 5
2021-06-16T10:01:27.728-1-螺纹-10 9
2021-06-16T10:01:27.728-1-螺纹-8 72021-06-16T10:01:27.728-1-螺纹-7 6
2021-06-16T10:01:27.728-1-螺纹-9 8

初始线程池没有线程,如果线程不足,就会不断创建新线程,所以线程名称都不同

1.1.2) 新固定线程池

创建固定大小的线程池,控制并发线程数,多余的线程会在队列中等待。

公共 静态   main字符串[]参数{
ExecutorService executorService =执行器.newFixedThreadPool(3);
对于int=  0; i < 10; i ++){
int最终I=i;
executorService.执行(() -> {
//获取线程名称,默认格式:pool-1-thread-1系统.out.println(LocalDateTime .现在()+" " +线程当前线程getName()+" "+最终I);
尝试{
线程.睡眠(3000);
} catch InterruptedException)  e) {
e.printStackTrace();
}
});
}
}
2021-06-16T10:12:36.436池 -1-螺纹-21
2021-06-16T10:12:36.436-1-螺纹-3 2
2021-06-16T10:12:36.436-1-螺纹-1 0
2021-06-16T10:12:39.447-1-螺纹-1 32021-06-16T10:12:39.447-1-螺纹-2 5
2021-06-16T10:12:39.447-1-螺纹-3 4
2021-06-16T10:12:42.452-1-螺纹-2 6
2021-06-16T10:12:42.452-1-螺纹-17
2021-06-16T10:12:42.452-1-螺纹-3 8
2021-06-16T10:12:45.460-1-螺纹-1 9

因为线程池大小是固定的,这里设置的是3个线程,所以线程名只有3个。线程因为线程不足会进入队列等待线程空闲,所以日志间隔3秒输出

1.1.3)新ScheduledThreadPool

创建一个循环的线程池,支持定时及循环执行任务。

公共 静态   main字符串[]参数 {ScheduledExecutorService executorService =执行器.newScheduledThreadPool() );
对于int=  0; i < 10; i ++){
int最终I=i;
executorService.时间表(() -> {
//获取线程名称,默认格式:pool-1-thread-1
系统.out.println(本地日期时间 .现在()+" “+线程当前线程getName()+" " +最终I);
尝试{
线程.睡眠(3000);
} catch InterruptedException)  e) {
e.printStackTrace();
}}3时间单位秒 );
}
}
2021-06-16T10:16:41.187池 -1-螺纹-21
2021-06-16T10:16:41.187-1-螺纹-1 0
2021-06-16T10:16:41.187-1-螺纹-3 2
2021-06-16T10:16:44.200-1-螺纹-1 3
2021-06-16T10:16:44.200-1-螺纹-3 4
2021-06-16T10:16:44.200-1-螺纹-2 5
2021-06-16T10:16:47.221-1-螺纹-1 6
2021-06-16T10:16:47.221-1-螺纹-2 72021-06-16T10:16:47.221-1-螺纹-3 8
2021-06-16T10:16:50.236-1-螺纹-3 9

延迟设置为3秒,所以任务提交后3秒才会开始执行。因为这里设置了核心线程数为3,不足的线程会进入队列等待空闲线程,所以每3秒输出一次日志。

注意:这里使用的是ScheduledExecutorService类的schedule()方法,而不是ExecutorService类的execute()方法。

1.1.4) newSingleThreadExecutor

创建单线程线程池,保证所有任务按照指定的顺序(先进先出、后进先出、优先级)执行。

公共 静态   main字符串[]参数{
ExecutorService executorService =执行器.newSingleThreadExecutor( );
对于int=  0; i < 10; i ++){
int最终I=i;executorService.执行(() -> {
//获取线程名称,默认格式:pool-1-thread-1
系统.out.println(本地日期时间 .现在()+" “+线程当前线程getName()+" " +最终I);
尝试{
线程.睡眠(3000);
} catch InterruptedException)  e) {
e.printStackTrace();
}
});
}
}
2021-06-16T10:19:20.117池 -1-螺纹-10
2021-06-16T10:19:23.122-1-螺纹-112021-06-16T10:19:26.132-1-螺纹-1 2
2021-06-16T10:19:29.147-1-螺纹-1 3
2021-06-16T10:19:32.147-1-螺纹-14
2021-06-16T10:19:35.152-1-螺纹-15
2021-06-16T10:19:38.155-1-螺纹-16
2021-06-16T10:19:41.165-1-螺纹-17
2021-06-16T10:19:44.166-1-螺纹-18
2021-06-16T10:19:47.180-1-螺纹-1 9

只有一个线程,所以线程名均相同,且是每隔3秒按顺序输出的

1.2完成戏剧类

霹雳类提供了4种构造方法,可以根据需要来自定义。

1.2.1 线程池执行规则

当线程数小于corePoolSize时,创建一个线程;

当线程数大于等于corePoolSize且任务队列未满时,将任务放入任务队列中;

当线程数大于等于corePoolSize时,任务队列已满;

如果线程数小于最大线程数,则创建线程;

如果线程数等于最大线程数,则抛出异常,任务被拒绝。

公共ThreadPoolExecutorint corePoolSize ,
int最大池大小,
 keepAliveTime,
TimeUnit 单位,
BlockingQueue<可运行>workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler 处理程序) {
//省略
}

可以看到有7个参数:

1.2.2 参数说明

1) corePoolSize 核心线程数,线程池中始终存活的线程数

2)maximumPoolSize最大线程数,线程池允许的最大线程数

3)keepAliveTime生存时间,在没有任务执行的情况下,线程可以持续多久直到终止

4)单位参数keepAliveTime时间单位

TimeUnit.DAYS 天
TimeUnit.HOURS 小时
TimeUnit.MINUTES 分钟
TimeUnit.SECONDS 秒
TimeUnit.MILLISECONDS 毫秒TimeUnit.微秒微妙
时间单位.NANOSECONDS 纳秒

5)workQueue阻塞队列,用于存放等待执行的任务,全部线程安全

ArrayBlockingQueue 由数组结构组成的有界阻塞队列。
LinkedBlockingQueue 由链表结构组成的有界阻塞队列。
SynchronousQueue 不存储元素的阻塞队列,即将元素直接提交给线程,不保留元素。
PriorityBlockingQueue 支持优先级排序的无界阻塞队列。
DelayQueue 使用优先级队列实现的无界阻塞队列,只有在延迟到期时才能从中提取元素。
LinkedTransferQueue 由链表结构组成的无界阻塞队列。与SynchronousQueue类似,它也包含非阻塞方法。
LinkedBlockingDeque 由链表结构组成的双向阻塞队列。

常用的有LinkedBlockingQueue和Synchronous。线程池的排队策略与BlockingQueue有关。

6)threadFactory线程工厂,主要用于创建线程,默认为普通优先级,非守护线程

7)handler拒绝策略,拒绝任务的策略。默认为 AbortPolicy。

AbortPolicy 拒绝并抛出异常。
CallerRunsPolicy 重试提交当前任务,即再次调用 execute()方法运行任务。
DiscardOldestPolicy 丢弃队列头部(最旧的)任务并执行当前任务。
DiscardPolicy 丢弃当前任务。

1.2.3 代码示例

公共 静态   螺纹5() {ExecutorService executorService = new 2 10 1 时间单位微秒  ArrayBlockingQueue <>(5,));
对于int= 0;<10;++) {
int索引=i;
executorService.执行(()-> {
系统.out.println(LocalDateTime .现在()+ " " +线程当前线程().getName()+》《+》 索引);
尝试{
线程.睡眠(2000);} catch InterruptedException)  e) {
e.printStackTrace();
}
});
}
}
2021-06-16T15:32:41.801-1-螺纹-2 1
2021-06-16T15:32:41.801泳池-1-线程-3 7
2021-06-16T15:32:41.801泳池-1-线程-5 9
2021-06-16T15:32:41.801泳池-1-线程-10
2021-06-16T15:32:41.801泳池-1-线程-48
2021-06-16T15:32:43.817-1-线程-3 2
2021-06-16T15:32:43.818泳池-1-线程-432021-06-16T15:32:43.818泳池-1-线程-2 4
2021-06-16T15:32:43.818泳池-1-线程-55
2021-06-16T15:32:43.818-1-线程-16

核心线程数为2,阻塞队列为5,生存时间为1分钟。所以任务流程为:

1.2.4 拒绝政策

上面提到了,线程池还有拒绝策略,即当前线程数=最大线程数,阻塞队列满了,会自动执行拒绝策略并抛出异常抛出。

代码示例

公共 静态   螺纹6() {ExecutorService executorService = new 2 10 1 时间单位微秒  ArrayBlockingQueue <>(5,));
对于int= 0;<20;++) {
int索引= i;
executorService.执行(() -> {
系统.out.println(LocalDateTime .现在()+ " " +线程当前线程().getName()+》《+》 索引);
尝试{
线程.睡眠(2000);} catch InterruptedException)  e) {
e.printStackTrace();
}
});
}
}
线程中出现异常“main” java.util.并发RejectedExecutionException任务com.ssw.配置.测试$$Lambda$1/1516369375@26f67b76被java拒绝。util .并发.消防@69d9c55[ 运行,池大小= 10, 活动线程 = 10 排队任务 = 5,已完成任务=0]
在java.util.并发.晚上$AbortPol冰冷被拒绝执行屁股java:2063)在javautil并发ThreadPoolExecutor.拒绝(ThreadPoolExecutor.java :830)
在javautil并发ThreadPoolExecutor.执行(ThreadPoolExecutor.java :1379)
在comssw配置测试.thread6(测试.java :93)
在comssw配置测试.main(测试.java :105)
2021-06-16T15:55:26.457泳池-1-线程-7 11
2021-06-16T15:55:26.457泳池-1-线程-9 13
2021-06-16T15:55:26.457泳池-1-线程-10 14
2021-06-16T15:55:26.457泳池-1-线程-2 12021-06-16T15:55:26.457泳池-1-线程-5 9
2021-06-16T15:55:26.457-1-线程-6 10
2021-06-16T15:55:26.457泳池-1-线程-10
2021-06-16T15:55:26.457泳池-1-线程-48
2021-06-16T15:55:26.457泳池-1-线程-812
2021-06-16T15:55:26.457泳池-1-线程-3 7
2021-06-16T15:55:28.463泳池-1-线程-8 2
2021-06-16T15:55:28.463泳池-1-线程-3 3
2021-06-16T15:55:28.463泳池-1-线程-142021-06-16T15:55:28.463泳池-1-线程-7 5
2021-06-16T15:55:28.463-1-线程-10 6

执行过程

2。项目实践

线程池用在

springboot项目中。场景如下:一个方法调用两个接口。如果接口1需要执行5秒,那么接口2需要执行8秒。

2.1 不使用线程池

不使用线程池,默认按顺序执行,5秒+8秒。此方法至少需要 13 秒才能完成。

@RequestMapping("线程")
公共字符串asd 抛出中断异常执行异常{
log.info("开始任务");字符串为 = test1(); 
字符串 bs = test2(); 
log.信息("任务结束");
返回+“---”+ bs;
}
public字符串test1() 抛出 InterruptedException {
线程.睡眠(5000);/ / 模拟时间 
log.info("test1执行结束");
返回 "123456";
}
公共字符串test2 抛出 InterruptedException {
线程.睡眠(8000);/ / 模拟时间 
log.info("test2执行结束");
返回 "456789";
}

该测试方法提供两个接口,打印出最大返回结果。

效率立马提高了吗?回到主题:一个请求需要5秒,一个请求需要8秒。如果同时执行两个请求,是否会在8秒内完成?效率会提高吗? ?

2.2 使用线程池ThreadPoolTask​​Executor

读完本文后,您还有什么疑问吗?上面提到的是ThreadPoolExecutor,为什么变成了ThreadPoolTask​​Executor呢?

让我告诉你两者之间的区别:

ThreadPoolTask​​Executor 使用 ThreadPoolExecutor 并对其进行增强,扩展更多功能;

ThreadPoolTask​​Executor只关注其增强部分,任务执行依然由ThreadPoolExecutor处理;

ThreadPoolTask​​Executor实现了InitializingBean、DisposableBean等,具有spring特性。 ThreadPoolExecutor不提供spring声明周期和参数组装;

您可以使用任何一种。如果是spring项目,建议使用ThreadPoolTask​​Executor。

2.2.1 异步线程池配置

@Log4j2
@EnableAsync
@配置公共  ThreadPoolTask​​Config {
@Bean(名称 = "任务执行器" )
public执行器taskExecutor(){
log.info“打开线程池”; 
ThreadPoolTask​​Executor 执行器 = new ThreadPoolTask​​Executor();
//核心线程数:创建线程池时初始化的线程数
执行器.setCorePoolSize(5);
//最大线程数:线程池中的最大线程数。只有缓冲队列满了,应用程序才会超过核心线程数
执行器.setMaxPoolSize(20);
//缓冲队列:用于缓冲执行任务的队列
执行器.setQueueCapacity(30);
//允许线程空闲时间60秒:当超过核心线程之外的线程,达到空闲时间后将被销毁
执行器.setKeepAliveSeconds(60);
//线程池名称的前缀:设置后可以方便我们定位处理任务所在的线程池执行器.setThreadNamePrefix("定时任务任务执行器-"); 
//线程池对被拒绝任务的处理策略:这里使用的是CallerRunsPolicy策略。当线程池没有处理能力时,该策略会直接在execute方法的调用线程中运行被拒绝的任务; 
 //如果执行器关闭,任务将被丢弃
执.来电运行策略()); 
 //执行初始化
执行器.初始化();
执行器.属性设置后();
返回执行者;
}
}

2.2.2 修改两个测试接口

公共 接口 TestService  {
未来<字符串>test1() ) 抛出中断异常;
未来<字符串>test2() ) 抛出中断异常;
}
@Log4j2@服务
公共  TestServiceImpl 实现 TestService {
@覆盖
@Async("任务执行器")
公共未来<字符串>测试1() )  抛出 InterruptedException  {{ 
线程.睡眠(5000);/ / 模拟时间 
log.info("发送短信方法----1执行结束"); 
返回  AsyncResult<  弦>("123456");
}
@覆盖
@Async("任务执行器")
公共未来<字符串>测试2() )  抛出 InterruptedException {{ 
线程.睡眠(8000);/ / 模拟时间 log.info("发送短信方法----2执行结束"); 
返回  AsyncResult<  弦>("456789");
}
}

2.2.3 控制层调用

@Log4j2
@RestController
@RequestMapping("测试")
公共TestThreadPoolController {
@Autowired
私有 TestService testService;
@RequestMapping("线程1")
公共字符串asd1() 抛出中断异常执行异常{
log.info("开始任务");
未来<String>= testService .测试1();未来<String> bs = testService .test2();
log.信息("任务结束");
返回.得到()+》---》+ bs.得到();
}
}

测试

至此,就完成了我们想要的效果,同步执行。其实呢,也不能说是使用线程池实现了并行执行,使用多线程也可以。我们只是将创建线程的操作,交给了线程池,在项目启动完成后就启动了若干线程,使用线程池来管理线程。

2.3 失效场景

2.3.1 必须增加@EnableAsync和@Async注解

@EnableAsync用来开启项目异步支持,@Async用来开启对某个方法进行异步执行。

2.3.2 异步方法需要纳入到Spring的bean中

通过@Component注解,或其他注解纳入Spring的bean容器中。

2.3.3 调用时通过自动装配

类中使用@Autowired或@Resource等注解自动注入,不能手动new对象。

2.3.4 异步方法不能使用static修饰

2.3.5 调用方不能和异步方法不能在同一个类

如下:

@RequestMapping("thread2")
public String asd12() throws InterruptedException, ExecutionException {
log.info("开始任务");
Future<String> as = test1();
Future<String> bs = test2();
log.info("结束任务");
return as.get() + "---" + bs.get();
}
@Async("taskExecutor")
public Future<String> test1() throws InterruptedException {
Thread.sleep(5000); // 模拟耗时
log.info("发送短信方法---- 1   执行结束");
return new AsyncResult<String>("123456");
}
@Async("taskExecutor")
public Future<String> test2() throws InterruptedException {
Thread.sleep(8000); // 模拟耗时
log.info("发送短信方法---- 2   执行结束");
return new AsyncResult<String>("456789");
}

调用方和所需要调用的异步方法在同一个类中

相关文章