动态可监控线程池框架DynamicTp

动态可监控线程池框架DynamicTp

使用线程池ThreadPoolExecutor过程中有以下痛点

  1. 代码中创建了一个ThreadPoolExecutor,但是不知道那几个核心参数设置多少比较合适
  2. 凭经验设置参数值,上线后发现需要调整,改代码重新发布服务,非常麻烦
  3. 线程池相对开发人员来说是个黑盒,运行情况不能及时感知到,直到出现问题

如果有以上痛点,动态可监控线程池框架有帮助。

0. 项目整体架构

项目目录结构

img

img

img

img

img

模块详解

img

img

img

技术实现细节

img

项目优势 使用场景

img

依赖手动添加到maven

img

5个yml配置文件分析

img

img

img

img

1. dynamic-thread-pool-spring-boot-starter

模块结构

img


采集线程池配置数据

DynamicThreadPoolAutoConfig(动态配置入口)

  • Spring Boot 自动配置入口类

测试

img

img


img

img


ThreadPoolConfigEntit(线程池配置实体对象)

  • 封装一个线程池的当前状态(例如核心线程数、最大线程数、队列大小等)。

img


IDynamicThreadPoolService(动态线程池服务接口)

  • 抽象出动态线程池服务行为,便于后续扩展和测试。

img


DynamicThreadPoolService(动态线程池服务)

  • 动态线程池的实现类
  • 实现线程池的增删查改。
  • 管理多个线程池,通过线程池名字访问对应配置。
  • 方法说明:
    • queryThreadPoolList():返回所有线程池当前状态列表。
    • queryThreadPoolConfigByName(name):获取指定线程池的状态。
    • updateThreadPoolConfig(entity):根据传入参数更新某线程池核心参数。

img

img

img

img

img



线程池数据上报(Redis注册中心)

DynamicThreadPoolAutoConfig(动态配置入口)

  • redissonClient 方法创建了一个 RedissonClient 实例,用来与 Redis 服务器连接,配置包括 Redis 地址、连接池大小、密码等。
    redissonClient 方法这段代码比较固定,为了连接Redis。

img

img

img

img

img


DynamicThreadPoolAutoProperties(动态线程池配置)

  • 连Redis需要配置,故需要一个动态线程池配置文件。
  • 用于绑定 application.yml 中 dynamic.thread.pool.config 配置的属性类
  • 把 Redis 的连接配置(host、port、密码、线程池配置参数)映射为 Java 对象,供自动配置类 DynamicThreadPoolAutoConfig 使用。

img


IRegistry(注册中心接口)

  • reportThreadPool(List threadPoolEntities):该方法接收一个 ThreadPoolConfigEntity 类型的列表,上报一组线程池的配置信息。
  • reportThreadPoolConfigParameter(ThreadPoolConfigEntity threadPoolConfigEntity):该方法接收一个 ThreadPoolConfigEntity 类型的单个线程池配置,上报单个线程池的详细配置信息。

img


RedisRegistry(Redis注册中心)

注册中心的实现方式有很多种,Redis是其中一种。

  • RedisRegistry 实现了 IRegistry 接口,这个类的核心目的是将应用中的线程池信息实时保存到 Redis 中,供其他系统或应用进行查询和使用。
  • reportThreadPool:接收所有线程池的配置列表,负责上传线程池列表,覆盖 Redis 中的原有线程池列表。
  • reportThreadPoolConfigParameter:接收单个线程池的配置,负责上传单个线程池的详细配置,并为其设置有效期(30 天)。

img

  • private final RedissonClient redissonClient;
    这里声明了一个 RedissonClient 类型的变量,名为 redissonClient。RedissonClient 是 Redis 的客户端,帮助我们在程序中与 Redis 进行交互,发送和获取数据。
  • public RedisRegistry(RedissonClient redissonClient)
    这是 RedisRegistry 类的构造函数。构造函数的作用是初始化 RedisRegistry 对象时,将传入的 redissonClient(一个已经配置好的 Redis 客户端对象)赋值给类中的 redissonClient 变量。这样,类的其他方法就可以使用这个客户端来与 Redis 交互。
  • RList list = redissonClient.getList(RegistryEnumVO.THREAD_POOL_CONFIG_LIST_KEY.getKey());
    redissonClient.getList(…) 是 Redisson 客户端提供的一个方法,它返回一个可以与 Redis 中的列表进行交互的对象。RegistryEnumVO.THREAD_POOL_CONFIG_LIST_KEY.getKey() 获取到的是一个字符串,表示 Redis 中的键,用来标识存储线程池配置信息的地方。
    list.delete();
    调用 delete() 方法删除 Redis 中已存储的列表。这样做是为了确保每次更新时,Redis 中只保存最新的配置信息,而不保留之前的旧数据。
    list.addAll(threadPoolEntities);
    将传入的 threadPoolEntities 列表添加到 Redis 中。这一行代码的作用是将传入的线程池配置列表上传到 Redis 中,替代原来的数据。
  • String cacheKey = RegistryEnumVO.THREAD_POOL_CONFIG_PARAMETER_LIST_KEY.getKey() + ““ + threadPoolConfigEntity.getAppName() + ““ + threadPoolConfigEntity.getThreadPoolName();
    这里构建了一个缓存键 cacheKey,用于存储线程池配置信息。这个键由以上几部分组成。
  • RBucket bucket = redissonClient.getBucket(cacheKey);
    redissonClient.getBucket(cacheKey) 是 Redisson 客户端提供的一个方法,用来获取 Redis 中一个键(cacheKey)对应的单个对象。这里的 RBucket 就是 Redis 中保存单个对象的容器。
  • bucket.set(threadPoolConfigEntity, Duration.ofDays(30));
    ucket.set(…) 将 threadPoolConfigEntity(线程池配置信息)存入 Redis 中,并设置该数据的有效期为 30 天。也就是说,这些数据在 Redis 中保存 30 天,之后会被自动删除。

RegistryEnumVO(注册中心枚举值对象 key)

  • 统一管理 Redis 中的键名,避免代码中到处写字符串,方便维护和修改。

img


ThreadPoolDataReportJob(线程池数据上报任务)

  • 一个定时任务类,定期查询动态线程池的配置信息,上报到一个指定的 registry,并且会记录日志以便追踪。

img

  • dynamicThreadPoolService:这是一个接口类型的变量,用于调用动态线程池服务。它的作用是获取当前线程池的配置。
    registry:一个接口类型的变量,负责将获取到的线程池配置信息报告(存储)到 Redis 中
  • public ThreadPoolDataReportJob(IDynamicThreadPoolService dynamicThreadPoolService, IRegistry registry) {…}
    这个构造函数接收两个参数,并将它们赋值给类中的对应变量。这样,类中的方法就可以使用这两个对象来获取线程池配置并将其上报。
  • @Scheduled(cron = “0/20 * * * * ?”)
    这是 Spring 框架中的一个注解,表示该方法是一个定时任务。cron 表达式 “0/20 * * * * ?” 表示每 20 秒执行一次该方法。
  • List threadPoolConfigEntities = dynamicThreadPoolService.queryThreadPoolList();
    获取当前系统中所有线程池的配置信息。
  • registry.reportThreadPool(threadPoolConfigEntities);
    将获取到的线程池配置列表 threadPoolConfigEntities 上报(存储)到 registry。
  • JSON.toJSONString(threadPoolConfigEntities) 是将线程池配置对象转换为 JSON 字符串
  • registry.reportThreadPoolConfigParameter(threadPoolConfigEntity)
    将当前线程池的详细配置上传。

img



订阅发布信息,变更线程池

DynamicThreadPoolAutoConfig 动态配置入口

img

img

  • redissonClient 连接 Redis 的客户端
    这个方法是用来创建和 Redis 的连接的,叫RedissonClient。

  • redisRegistry 注册器(写入 Redis)
    这是一个工具类,用于把线程池的配置信息写到 Redis 中。
    它里面定义了俩方法:上报线程池列表 上报线程池参数。

  • threadPoolDataReportJob 定时上报线程池信息任务类
    它每隔一段时间(比如 20 秒)会自动执行,把当前系统的线程池信息打包上传到 Redis。

  • dynamicThreadPollService 动态线程池服务类(初始化配置)
    这个 Bean 在项目启动时做两件事:
    ① 获取当前项目名applicationName
    从配置文件中读取你这个项目叫什么,如果没写,就给它一个默认值:“缺省的”。
    ② 遍历你所有的线程池,然后去 Redis 找配置。
    简单来说就是启动时,把 Redis 里保存的线程池配置读出来,应用到每一个线程池中去。

  • threadPoolConfigAdjustListener 配置监听器

    @Bean

    public ThreadPoolConfigAdjustListener threadPoolConfigAdjustListener(…)

    这个监听器的作用是:“当 Redis 有消息说要改线程池配置,我就去执行修改操作”。
    比如 Redis 发来一条消息说:“把 MainPool 的最大线程数改成 80。”它就会立刻改掉这个线程池,并同步状态回 Redis。

  • threadPoolConfigAdjustListener 监听 Redis 消息频道
    @Bean(name = “dynamicThreadPoolRedisTopic”)
    public RTopic threadPoolConfigAdjustListener(…)
    redissonClient.getTopic(…):获取 Redis 的一个消息通道(像广播频道) topic.addListener(…):监听这个频道的内容(别人发消息我就听) 如果 Redis 发出线程池配置变更的通知,程序就会马上收到,并通过刚才那个监听器执行更改。

img

ThreadPoolConfigAdjustListener 动态线程池变更监听

  • 这是一个“监听器”,用来监听 Redis 中的消息。
  • 它实现了 MessageListener,说明它能接收类型为 ThreadPoolConfigEntity 的消息(即:别人发来的“线程池配置”对象)。
  • 这是一个监听 Redis 消息的类,每当有线程池配置变更的消息,它就会接收、执行更新、并同步到 Redis。

img

  • 核心方法
    onMessage(CharSequence charSequence, ThreadPoolConfigEntity threadPoolConfigEntity)
    接收消息并更新线程池

img

  • ① 打印收到的配置
    logger.info(“动态线程池,调整线程池配置。线程池名称:{} 核心线程数:{} 最大线程数:{}”,

    这行日志的作用是告诉你:“我收到了新配置,现在要把线程池 X 的核心线程数改成 Y,最大线程数改成 Z。”

  • ② 调用线程池服务类,更新本地线程池配置
    dynamicThreadPoolService.updateThreadPoolConfig(threadPoolConfigEntity);
    拿着刚才收到的配置对象,去调用 dynamicThreadPoolService 里面的更新方法
    它会找到你本地程序中的那个线程池对象(根据线程池名),然后给它设定新的参数

  • ③ 上报当前所有线程池状态
    List threadPoolConfigEntities= dynamicThreadPoolService.queryThreadPoolList();
    registry.reportThreadPool(threadPoolConfigEntities);
    更新完一个线程池后,我们把当前所有线程池的配置重新上传一次到 Redis
    registry.reportThreadPool(...) 就是把线程池列表写入 Redis。

  • ④ 上传刚才修改的这个线程池的详细配置
    ThreadPoolConfigEntity threadPoolConfigEntityCurrent = dynamicThreadPoolService.queryThreadPoolConfigByName(threadPoolConfigEntity.getThreadPoolName()); registry.reportThreadPoolConfigParameter(threadPoolConfigEntityCurrent);
    单独查询一下刚才修改的那个线程池的“详细配置”
    然后把它单独上传给 Redis(方便别人只看这个线程池的当前状态)

  • ⑤ 记录日志

告警通知

image-20250904095737924

image-20250904095759443

2. dynamic-thread-pool-admin

管理端工程搭建,提供接口

模块结构

这个模块是一个“动态线程池控制后台系统”,主要有三个功能:

  • 从 Redis 中读取线程池配置和状态(查)
  • 提供接口让用户可以修改线程池配置(改)
  • 通过 Redis 广播消息,实现配置变更后线程池参数自动更新(动)

img点击并拖拽以移动编辑

DynamicThreadPoolController 管理线程池配置的后端接口

别人(前端或你自己)可以通过它来“看配置”“查配置”“改配置”。

img

@RestController 表示这是一个“控制器类”,专门用来响应前端的请求,返回 JSON 数据

@CrossOrigin(“*“) 允许跨域访问(前端不在同一个服务器也能访问这个接口)

@RequestMapping(…) 给这个控制器加个统一前缀路径,后面所有接口都以这个路径开头

注入 Redis 客户端

@Resource public RedissonClient redissonClient;

自动从 Spring 容器中拿到一个 Redis 客户端对象,我们后面就可以用它来读写 Redis 数据


img

public Response<List> queryThreadPoolList() {…}

  • 作用:从 Redis 中读取保存的“所有线程池配置列表”,然后返回。
  • 从 Redis 拿到一个叫 “THREAD_POOL_CONFIG_LIST_KEY” 的列表(Redis 类型为 list),里面保存的是多个ThreadPoolConfigEntity对象(每一个表示一个线程池的配置)
  • 调用 .readAll() 把整个列表一次性读出来。
  • RList 是 Redisson 提供的 Redis 列表结构(相当于 Redis 中的 List)。.readAll() 的意思是把整个 Redis 列表一次性读成 Java 的 List 返回。

img

public Response queryThreadPoolConfig(@RequestParam String appName,@RequestParam String threadPoolName){…}

  • 作用:查询某个线程池的配置
  • @RequestParam:从 URL 参数中读取
    如: ‘http://localhost:8089/api/v1/dynamic/thread/pool/query_thread_pool_config? * appName=dynamic-thread-pool-test-app&threadPoolName=threadPoolExecutor’
  • 从 Redis 中读取一个“单个值”类型的数据,Redis 里这种类型叫 String,Redisson 里用 RBucket 来表示。
  • .getBucket(key) 拿到 Redis 中的某个 key(这个 key 存的是一个对象)
    .get() 把 key 对应的值(对象)拿出来

img

public Response updateThreadPoolConfig(@RequestBody ThreadPoolConfigEntity request) {…}

  • 作用:接收一个新的线程池配置(从前端传来的 JSON),然后通过 Redis 的“广播”机制,把这个配置发送出去,告诉其他服务该改线程池参数了
  • @RequestBody:表示请求体是 JSON 格式,自动把前端发来的 JSON 转换成 ThreadPoolConfigEntity 对象
  • RTopic topic = redissonClient.getTopic(“DYNAMIC_THREAD_POOL_REDIS_TOPIC_” + request.getAppName());
    topic.publish(request);
    getTopic(“xxx”) 获取 Redis 的发布订阅频道(Pub/Sub 机制)
    publish(request) 向这个频道发送一个消息(对象会被序列化为 JSON)
    所有订阅了这个 Topic 的监听器(如监听器类 ThreadPoolConfigAdjustListener)都会马上收到这个消息,然后执行线程池更新

ThreadPoolConfigEntity 通用线程池配置数据结构

表示要保存或传递的“线程池配置信息”,比如:应用名 appName,线程池名 threadPoolName,核心线程数 corePoolSize,最大线程数 maximumPoolSize

img


Response 通用响应封装

img

imgimg

这段代码是 Spring Boot 项目的启动类,它的主要作用是:

启动项目,并配置 Redisson Redis 客户端,完成 Redis 的连接和参数设置。

img点击并拖拽以移动编辑

img

3. dynamic-thread-pool-test

img点击并拖拽以移动编辑

img

img

项目背景与痛点

“在我们的业务系统中,线程池是一个非常重要的组件,用于管理和复用线程资源。但在实际使用过程中,我们遇到了几个痛点:

  • 传统的ThreadPoolExecutor一旦创建,核心参数如corePoolSize、maximumPoolSize等就固定了,如果需要调整,通常需要重启应用,这在生产环境中是非常不便的;
  • 线程池运行状态缺乏有效监控,当出现问题时,如线程耗尽、任务队列堆积等情况,很难及时发现;
  • 线程池参数调优是一个复杂的过程,需要根据实际业务负载情况不断调整,缺乏有效的工具支持这一过程。

针对这些问题,我设计并实现了这个动态线程池组件。”

技术难点与解决方案

“在实现过程中,我遇到并解决了几个技术难点:

  • 线程池参数动态调整 :
    通过Redis的发布订阅机制,实现了配置变更的实时通知
    在SDK中实现了配置监听器,接收到变更后立即调整线程池参数
  • 多实例配置一致性 :
    使用Redis作为配置中心,确保同一应用的多个实例使用相同的线程池配置
    应用启动时会从Redis加载最新配置,确保配置一致性
  • 告警机制设计 :
    设计了可配置的告警规则,支持活跃线程数和队列容量两种告警策略
    实现了可扩展的告警通知接口,默认提供日志告警,可扩展为邮件、短信等方式
  • 线程池状态监控 :
    通过定时任务定期采集线程池状态并上报到Redis
    在控制台可实时查看线程池运行状态,包括活跃线程数、队列大小等关键指标”

动态可监控线程池框架DynamicTp
https://blog.xirui.work/posts/b621866a.html
作者
xirui
发布于
2024年11月9日
许可协议