动态可监控线程池框架DynamicTp
动态可监控线程池框架DynamicTp
使用线程池ThreadPoolExecutor过程中有以下痛点
- 代码中创建了一个ThreadPoolExecutor,但是不知道那几个核心参数设置多少比较合适
- 凭经验设置参数值,上线后发现需要调整,改代码重新发布服务,非常麻烦
- 线程池相对开发人员来说是个黑盒,运行情况不能及时感知到,直到出现问题
如果有以上痛点,动态可监控线程池框架有帮助。
0. 项目整体架构
项目目录结构





模块详解



技术实现细节

项目优势 使用场景

依赖手动添加到maven

5个yml配置文件分析




1. dynamic-thread-pool-spring-boot-starter
模块结构

采集线程池配置数据
DynamicThreadPoolAutoConfig(动态配置入口)
- Spring Boot 自动配置入口类
测试



ThreadPoolConfigEntit(线程池配置实体对象)
- 封装一个线程池的当前状态(例如核心线程数、最大线程数、队列大小等)。

IDynamicThreadPoolService(动态线程池服务接口)
- 抽象出动态线程池服务行为,便于后续扩展和测试。

DynamicThreadPoolService(动态线程池服务)
- 动态线程池的实现类
- 实现线程池的增删查改。
- 管理多个线程池,通过线程池名字访问对应配置。
- 方法说明:
queryThreadPoolList():返回所有线程池当前状态列表。queryThreadPoolConfigByName(name):获取指定线程池的状态。updateThreadPoolConfig(entity):根据传入参数更新某线程池核心参数。





线程池数据上报(Redis注册中心)
DynamicThreadPoolAutoConfig(动态配置入口)
- redissonClient 方法创建了一个 RedissonClient 实例,用来与 Redis 服务器连接,配置包括 Redis 地址、连接池大小、密码等。
redissonClient 方法这段代码比较固定,为了连接Redis。





DynamicThreadPoolAutoProperties(动态线程池配置)
- 连Redis需要配置,故需要一个动态线程池配置文件。
- 用于绑定 application.yml 中 dynamic.thread.pool.config 配置的属性类
- 把 Redis 的连接配置(host、port、密码、线程池配置参数)映射为 Java 对象,供自动配置类 DynamicThreadPoolAutoConfig 使用。

IRegistry(注册中心接口)
- reportThreadPool(List
threadPoolEntities):该方法接收一个 ThreadPoolConfigEntity 类型的列表,上报一组线程池的配置信息。 - reportThreadPoolConfigParameter(ThreadPoolConfigEntity threadPoolConfigEntity):该方法接收一个 ThreadPoolConfigEntity 类型的单个线程池配置,上报单个线程池的详细配置信息。

RedisRegistry(Redis注册中心)
注册中心的实现方式有很多种,Redis是其中一种。
- RedisRegistry 实现了 IRegistry 接口,这个类的核心目的是将应用中的线程池信息实时保存到 Redis 中,供其他系统或应用进行查询和使用。
- reportThreadPool:接收所有线程池的配置列表,负责上传线程池列表,覆盖 Redis 中的原有线程池列表。
- reportThreadPoolConfigParameter:接收单个线程池的配置,负责上传单个线程池的详细配置,并为其设置有效期(30 天)。

- private final RedissonClient redissonClient;
这里声明了一个 RedissonClient 类型的变量,名为 redissonClient。RedissonClient 是 Redis 的客户端,帮助我们在程序中与 Redis 进行交互,发送和获取数据。 - public RedisRegistry(RedissonClient redissonClient)
这是 RedisRegistry 类的构造函数。构造函数的作用是初始化 RedisRegistry 对象时,将传入的 redissonClient(一个已经配置好的 Redis 客户端对象)赋值给类中的 redissonClient 变量。这样,类的其他方法就可以使用这个客户端来与 Redis 交互。 - RList
list = redissonClient.getList(RegistryEnumVO.THREAD_POOL_CONFIG_LIST_KEY.getKey());
redissonClient.getList(…) 是 Redisson 客户端提供的一个方法,它返回一个可以与 Redis 中的列表进行交互的对象。RegistryEnumVO.THREAD_POOL_CONFIG_LIST_KEY.getKey() 获取到的是一个字符串,表示 Redis 中的键,用来标识存储线程池配置信息的地方。
list.delete();
调用 delete() 方法删除 Redis 中已存储的列表。这样做是为了确保每次更新时,Redis 中只保存最新的配置信息,而不保留之前的旧数据。
list.addAll(threadPoolEntities);
将传入的 threadPoolEntities 列表添加到 Redis 中。这一行代码的作用是将传入的线程池配置列表上传到 Redis 中,替代原来的数据。 - String cacheKey = RegistryEnumVO.THREAD_POOL_CONFIG_PARAMETER_LIST_KEY.getKey() + ““ + threadPoolConfigEntity.getAppName() + ““ + threadPoolConfigEntity.getThreadPoolName();
这里构建了一个缓存键 cacheKey,用于存储线程池配置信息。这个键由以上几部分组成。 - RBucket
bucket = redissonClient.getBucket(cacheKey);
redissonClient.getBucket(cacheKey) 是 Redisson 客户端提供的一个方法,用来获取 Redis 中一个键(cacheKey)对应的单个对象。这里的 RBucket 就是 Redis 中保存单个对象的容器。 - bucket.set(threadPoolConfigEntity, Duration.ofDays(30));
ucket.set(…) 将 threadPoolConfigEntity(线程池配置信息)存入 Redis 中,并设置该数据的有效期为 30 天。也就是说,这些数据在 Redis 中保存 30 天,之后会被自动删除。
RegistryEnumVO(注册中心枚举值对象 key)
- 统一管理 Redis 中的键名,避免代码中到处写字符串,方便维护和修改。

ThreadPoolDataReportJob(线程池数据上报任务)
- 一个定时任务类,定期查询动态线程池的配置信息,上报到一个指定的 registry,并且会记录日志以便追踪。

- dynamicThreadPoolService:这是一个接口类型的变量,用于调用动态线程池服务。它的作用是获取当前线程池的配置。
registry:一个接口类型的变量,负责将获取到的线程池配置信息报告(存储)到 Redis 中 - public ThreadPoolDataReportJob(IDynamicThreadPoolService dynamicThreadPoolService, IRegistry registry) {…}
这个构造函数接收两个参数,并将它们赋值给类中的对应变量。这样,类中的方法就可以使用这两个对象来获取线程池配置并将其上报。 - @Scheduled(cron = “0/20 * * * * ?”)
这是 Spring 框架中的一个注解,表示该方法是一个定时任务。cron 表达式 “0/20 * * * * ?” 表示每 20 秒执行一次该方法。 - List
threadPoolConfigEntities = dynamicThreadPoolService.queryThreadPoolList();
获取当前系统中所有线程池的配置信息。 - registry.reportThreadPool(threadPoolConfigEntities);
将获取到的线程池配置列表 threadPoolConfigEntities 上报(存储)到 registry。 - JSON.toJSONString(threadPoolConfigEntities) 是将线程池配置对象转换为 JSON 字符串
- registry.reportThreadPoolConfigParameter(threadPoolConfigEntity)
将当前线程池的详细配置上传。

订阅发布信息,变更线程池
DynamicThreadPoolAutoConfig 动态配置入口


redissonClient 连接 Redis 的客户端
这个方法是用来创建和 Redis 的连接的,叫RedissonClient。redisRegistry 注册器(写入 Redis)
这是一个工具类,用于把线程池的配置信息写到 Redis 中。
它里面定义了俩方法:上报线程池列表 上报线程池参数。threadPoolDataReportJob 定时上报线程池信息任务类
它每隔一段时间(比如 20 秒)会自动执行,把当前系统的线程池信息打包上传到 Redis。dynamicThreadPollService 动态线程池服务类(初始化配置)
这个 Bean 在项目启动时做两件事:
① 获取当前项目名applicationName
从配置文件中读取你这个项目叫什么,如果没写,就给它一个默认值:“缺省的”。
② 遍历你所有的线程池,然后去 Redis 找配置。
简单来说就是启动时,把 Redis 里保存的线程池配置读出来,应用到每一个线程池中去。threadPoolConfigAdjustListener 配置监听器
@Bean
public ThreadPoolConfigAdjustListener threadPoolConfigAdjustListener(…)
这个监听器的作用是:“当 Redis 有消息说要改线程池配置,我就去执行修改操作”。
比如 Redis 发来一条消息说:“把MainPool的最大线程数改成 80。”它就会立刻改掉这个线程池,并同步状态回 Redis。threadPoolConfigAdjustListener 监听 Redis 消息频道
@Bean(name = “dynamicThreadPoolRedisTopic”)
public RTopic threadPoolConfigAdjustListener(…)
redissonClient.getTopic(…):获取 Redis 的一个消息通道(像广播频道) topic.addListener(…):监听这个频道的内容(别人发消息我就听) 如果 Redis 发出线程池配置变更的通知,程序就会马上收到,并通过刚才那个监听器执行更改。

ThreadPoolConfigAdjustListener 动态线程池变更监听
- 这是一个“监听器”,用来监听 Redis 中的消息。
- 它实现了 MessageListener
,说明它能接收类型为 ThreadPoolConfigEntity 的消息(即:别人发来的“线程池配置”对象)。 - 这是一个监听 Redis 消息的类,每当有线程池配置变更的消息,它就会接收、执行更新、并同步到 Redis。

- 核心方法
onMessage(CharSequence charSequence, ThreadPoolConfigEntity threadPoolConfigEntity)
接收消息并更新线程池

① 打印收到的配置
logger.info(“动态线程池,调整线程池配置。线程池名称:{} 核心线程数:{} 最大线程数:{}”,这行日志的作用是告诉你:“我收到了新配置,现在要把线程池 X 的核心线程数改成 Y,最大线程数改成 Z。”
② 调用线程池服务类,更新本地线程池配置
dynamicThreadPoolService.updateThreadPoolConfig(threadPoolConfigEntity);
拿着刚才收到的配置对象,去调用dynamicThreadPoolService里面的更新方法
它会找到你本地程序中的那个线程池对象(根据线程池名),然后给它设定新的参数③ 上报当前所有线程池状态
ListthreadPoolConfigEntities= dynamicThreadPoolService.queryThreadPoolList();
registry.reportThreadPool(threadPoolConfigEntities);
更新完一个线程池后,我们把当前所有线程池的配置重新上传一次到 Redis
registry.reportThreadPool(...)就是把线程池列表写入 Redis。④ 上传刚才修改的这个线程池的详细配置
ThreadPoolConfigEntity threadPoolConfigEntityCurrent = dynamicThreadPoolService.queryThreadPoolConfigByName(threadPoolConfigEntity.getThreadPoolName()); registry.reportThreadPoolConfigParameter(threadPoolConfigEntityCurrent);
单独查询一下刚才修改的那个线程池的“详细配置”
然后把它单独上传给 Redis(方便别人只看这个线程池的当前状态)⑤ 记录日志
告警通知


2. dynamic-thread-pool-admin
管理端工程搭建,提供接口
模块结构
这个模块是一个“动态线程池控制后台系统”,主要有三个功能:
- 从 Redis 中读取线程池配置和状态(查)
- 提供接口让用户可以修改线程池配置(改)
- 通过 Redis 广播消息,实现配置变更后线程池参数自动更新(动)


DynamicThreadPoolController 管理线程池配置的后端接口
别人(前端或你自己)可以通过它来“看配置”“查配置”“改配置”。

@RestController 表示这是一个“控制器类”,专门用来响应前端的请求,返回 JSON 数据
@CrossOrigin(“*“) 允许跨域访问(前端不在同一个服务器也能访问这个接口)
@RequestMapping(…) 给这个控制器加个统一前缀路径,后面所有接口都以这个路径开头
注入 Redis 客户端
@Resource public RedissonClient redissonClient;
自动从 Spring 容器中拿到一个 Redis 客户端对象,我们后面就可以用它来读写 Redis 数据

public Response<List
> queryThreadPoolList() {…}
- 作用:从 Redis 中读取保存的“所有线程池配置列表”,然后返回。
- 从 Redis 拿到一个叫 “THREAD_POOL_CONFIG_LIST_KEY” 的列表(Redis 类型为 list),里面保存的是多个ThreadPoolConfigEntity对象(每一个表示一个线程池的配置)
- 调用 .readAll() 把整个列表一次性读出来。
- RList 是 Redisson 提供的 Redis 列表结构(相当于 Redis 中的 List)。.readAll() 的意思是把整个 Redis 列表一次性读成 Java 的 List
返回。

public Response
queryThreadPoolConfig(@RequestParam String appName,@RequestParam String threadPoolName){…}
- 作用:查询某个线程池的配置
- @RequestParam:从 URL 参数中读取
如: ‘http://localhost:8089/api/v1/dynamic/thread/pool/query_thread_pool_config? * appName=dynamic-thread-pool-test-app&threadPoolName=threadPoolExecutor’- 从 Redis 中读取一个“单个值”类型的数据,Redis 里这种类型叫 String,Redisson 里用 RBucket 来表示。
- .getBucket(key) 拿到 Redis 中的某个 key(这个 key 存的是一个对象)
.get() 把 key 对应的值(对象)拿出来

public Response
updateThreadPoolConfig(@RequestBody ThreadPoolConfigEntity request) {…}
- 作用:接收一个新的线程池配置(从前端传来的 JSON),然后通过 Redis 的“广播”机制,把这个配置发送出去,告诉其他服务该改线程池参数了
- @RequestBody:表示请求体是 JSON 格式,自动把前端发来的 JSON 转换成 ThreadPoolConfigEntity 对象
- RTopic topic = redissonClient.getTopic(“DYNAMIC_THREAD_POOL_REDIS_TOPIC_” + request.getAppName());
topic.publish(request);
getTopic(“xxx”) 获取 Redis 的发布订阅频道(Pub/Sub 机制)
publish(request) 向这个频道发送一个消息(对象会被序列化为 JSON)
所有订阅了这个 Topic 的监听器(如监听器类ThreadPoolConfigAdjustListener)都会马上收到这个消息,然后执行线程池更新
ThreadPoolConfigEntity 通用线程池配置数据结构
表示要保存或传递的“线程池配置信息”,比如:应用名 appName,线程池名 threadPoolName,核心线程数 corePoolSize,最大线程数 maximumPoolSize

Response
通用响应封装



这段代码是 Spring Boot 项目的启动类,它的主要作用是:
启动项目,并配置 Redisson Redis 客户端,完成 Redis 的连接和参数设置。



3. dynamic-thread-pool-test




项目背景与痛点
“在我们的业务系统中,线程池是一个非常重要的组件,用于管理和复用线程资源。但在实际使用过程中,我们遇到了几个痛点:
- 传统的ThreadPoolExecutor一旦创建,核心参数如corePoolSize、maximumPoolSize等就固定了,如果需要调整,通常需要重启应用,这在生产环境中是非常不便的;
- 线程池运行状态缺乏有效监控,当出现问题时,如线程耗尽、任务队列堆积等情况,很难及时发现;
- 线程池参数调优是一个复杂的过程,需要根据实际业务负载情况不断调整,缺乏有效的工具支持这一过程。
针对这些问题,我设计并实现了这个动态线程池组件。”
技术难点与解决方案
“在实现过程中,我遇到并解决了几个技术难点:
- 线程池参数动态调整 :
通过Redis的发布订阅机制,实现了配置变更的实时通知
在SDK中实现了配置监听器,接收到变更后立即调整线程池参数- 多实例配置一致性 :
使用Redis作为配置中心,确保同一应用的多个实例使用相同的线程池配置
应用启动时会从Redis加载最新配置,确保配置一致性- 告警机制设计 :
设计了可配置的告警规则,支持活跃线程数和队列容量两种告警策略
实现了可扩展的告警通知接口,默认提供日志告警,可扩展为邮件、短信等方式- 线程池状态监控 :
通过定时任务定期采集线程池状态并上报到Redis
在控制台可实时查看线程池运行状态,包括活跃线程数、队列大小等关键指标”
