2团
Published on 2024-08-16 / 32 Visits
0
0

Spring Boot设置业务线程池优雅关闭

1. 业务背景

最近项目运行,发现某热点业务流程,频繁调用Mongo进行入库操作。此处已经成为性能瓶颈,需要对其进行优化。

因为入库数据主要是作为审计进行使用,对入库的时间实时性要求不高,且可以容忍在意外情况下丢失部分数据。

基于此点,设计使用内存阻塞队列存储Mongo入库请求(削峰填谷),然后使用固定尺寸线程池拉取入库请求,通过批量插入Mongo数据库以提升业务流程效率。

2. 线程池设计

实际业务运行中,意外Crash的概率还是比较小的,大多数时候服务都是主动关闭执行升级或Bug Fix操作。

在服务主动关闭的场景下,如何优雅的关闭线程池,并且尽可能的消费内存队列中的入库请求,就成为了一个关键点。

2.1 线程池配置

@Slf4j
@Configuration
public class ApplicationIOThreadPoolConfig {

    // 应用运行标志
    private final static AtomicBoolean RUNNING_FLAG = new AtomicBoolean(true);

    private final MongoTemplate mongoTemplate;

    /**
     * 固定数量线程池,用于处理IO密集型任务
     */
    private final ThreadPoolTaskExecutor ioThreadPool;

    public ApplicationIOThreadPoolConfig(MongoTemplate mongoTemplate) {
        this.mongoTemplate = mongoTemplate;
        this.ioThreadPool = new ThreadPoolTaskExecutor();
        initIoThreadPoolConfig();
    }

    private void initIoThreadPoolConfig() {
        ioThreadPool.setCorePoolSize(4);
        ioThreadPool.setMaxPoolSize(4);
        ioThreadPool.setKeepAliveSeconds(0);
        ioThreadPool.setQueueCapacity(128);
        ioThreadPool.setThreadNamePrefix("io-thread-");
        ioThreadPool.setWaitForTasksToCompleteOnShutdown(true);
        ioThreadPool.setAwaitTerminationSeconds(3);
        // 需要执行初始化操作
        ioThreadPool.initialize();
    }
}

创建应用IO线程池Bean,内部包含一个线程池(线程数量固定为4)。

需要注意如下配置:

ioThreadPool.setWaitForTasksToCompleteOnShutdown(true);
ioThreadPool.setAwaitTerminationSeconds(3);

其中:

(1)setWaitForTasksToCompleteOnShutdown:线程池关闭的时候等待线程池内任务完成再继续销毁其他的Bean,这样线程池内的任务销毁就会提前于当前配置里的MongoTemplate销毁(避免MongoTemplate提前销毁,导致阻塞队列里的入库请求丢失)。

(2)setAwaitTerminationSeconds:设置线程池中任务的等待时间,如果超过这个时间,线程池中的任务还没有结束就强制进行销毁,确保应用能够被终止。

2.2 线程任务创建

// 应用启动时,启动四线程从阻塞队列消费入库请求,批量插入Mongo数据库
// 此处线程池不对外开放,仅用于执行数据入库操作
@EventListener(classes = ApplicationReadyEvent.class)
public void onApplicationReadyEvent(@NotNull ApplicationReadyEvent event) {
    for (int i = 0; i < 4; i++) {
        ioThreadPool.execute(this::initTerminalReportMsgThread);
    }
}

// 应用关闭时,设置运行标志为false,以便正在执行的任务退出;继而执行线程池关闭操作    
@EventListener(classes = ContextClosedEvent.class)
public void onApplicationCloseEvent(@NotNull ContextClosedEvent event) {
    RUNNING_FLAG.set(false);
    this.ioThreadPool.shutdown();
    log.info("Receive context closed event({}), shutdown io thread pool", event);
}

此处在应用启动时,监听ApplicationReadyEvent事件,当应用启动完成时,创建固定的4线程任务。

2.3 任务终止判定

private void initTerminalReportMsgThread() {
    List<TerminalReportMsg> terminalReportMsgs = new ArrayList<>(32);
    long start = System.currentTimeMillis();
    while (RUNNING_FLAG.get()) {
        try {
            // 尝试从阻塞队列拉取入库请求(使用poll,设置等待时间,避免长时间阻塞在此处)
            TerminalReportMsg msg = Constants.replyMessageBlockingQueue.poll(10, MILLISECONDS);
            if (msg != null) {
                terminalReportMsgs.add(msg);
                // 超过32个请求,则
                if (terminalReportMsgs.size() >= 32) {
                    mongoTemplate.insert(terminalReportMsgs, TerminalReportMsg.class);
                    terminalReportMsgs.clear();
                    long end = System.currentTimeMillis();
                    log.info("Save terminal report msg to mongo, cost {} ms", end - start);
                    start = end;
                }
            } else {
                // 超过100ms,直接执行入库操作;
                // 避免在较长的时间段内数据量较少(小于32),导致前序数据无法及时入库
                if (System.currentTimeMillis() - start > 100) {
                    if (CollUtil.isNotEmpty(terminalReportMsgs)) {
                        mongoTemplate.insert(terminalReportMsgs, TerminalReportMsg.class);
                        terminalReportMsgs.clear();
                    }
                    start = System.currentTimeMillis();
                }
            }
        } catch (Exception e) {
            log.error("Get terminal report msg or send terminal report msg failed", e);
        }
    }

    // 应用终止,此时尽可能的拉取队列中的数据,避免数据丢失
    log.info("Thread is interrupted, get all terminal report msg from queue");
    while (true) {
        try {
            TerminalReportMsg msg = Constants.replyMessageBlockingQueue.poll(10, MILLISECONDS);
            if (msg == null) {
                break;
            }
            terminalReportMsgs.add(msg);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }
    if (CollUtil.isNotEmpty(terminalReportMsgs)) {
        mongoTemplate.insert(terminalReportMsgs, TerminalReportMsg.class);
    }
    log.info("Thread is interrupted, save all terminal report msg to mongo");
}

上述代码,是线程池中运行的数据拉取任务,具体内容可见注释描述。

3 补充

线程任务中,使用原子变量RUNNING_FLAG判断应用是否终止,并非使用interrupted的原因在于:

(1)当前线程任务是while循环,不会执行一段时间后自动退出;

(2)线程池执行shutdown的时候,正在运行的线程较难捕获到interputed标志变化(时效)。

基于以上两点,使用原子变量简化判断。


Comment