1. 业务背景
最近项目运行,发现某热点业务流程,频繁调用Mongo进行入库操作。此处已经成为性能瓶颈,需要对其进行优化。
因为入库数据主要是作为审计进行使用,对入库的时间实时性要求不高,且可以容忍在意外情况下丢失部分数据。
基于此点,设计使用内存阻塞队列存储Mongo入库请求(削峰填谷),然后使用固定尺寸线程池拉取入库请求,通过批量插入Mongo数据库以提升业务流程效率。
2. 线程池设计
实际业务运行中,意外Crash的概率还是比较小的,大多数时候服务都是主动关闭执行升级或Bug Fix操作。
在服务主动关闭的场景下,如何优雅的关闭线程池,并且尽可能的消费内存队列中的入库请求,就成为了一个关键点。
2.1 线程池配置
@Slf4j
@Configuration
public class ApplicationIOThreadPoolConfig {
// 应用运行标志
private final static AtomicBoolean RUNNING_FLAG = new AtomicBoolean(true);
private final MongoTemplate mongoTemplate;
/**
* 固定数量线程池,用于处理IO密集型任务
*/
private final ThreadPoolTaskExecutor ioThreadPool;
public ApplicationIOThreadPoolConfig(MongoTemplate mongoTemplate) {
this.mongoTemplate = mongoTemplate;
this.ioThreadPool = new ThreadPoolTaskExecutor();
initIoThreadPoolConfig();
}
private void initIoThreadPoolConfig() {
ioThreadPool.setCorePoolSize(4);
ioThreadPool.setMaxPoolSize(4);
ioThreadPool.setKeepAliveSeconds(0);
ioThreadPool.setQueueCapacity(128);
ioThreadPool.setThreadNamePrefix("io-thread-");
ioThreadPool.setWaitForTasksToCompleteOnShutdown(true);
ioThreadPool.setAwaitTerminationSeconds(3);
// 需要执行初始化操作
ioThreadPool.initialize();
}
}
创建应用IO线程池Bean,内部包含一个线程池(线程数量固定为4)。
需要注意如下配置:
ioThreadPool.setWaitForTasksToCompleteOnShutdown(true);
ioThreadPool.setAwaitTerminationSeconds(3);
其中:
(1)setWaitForTasksToCompleteOnShutdown:线程池关闭的时候等待线程池内任务完成再继续销毁其他的Bean,这样线程池内的任务销毁就会提前于当前配置里的MongoTemplate销毁(避免MongoTemplate提前销毁,导致阻塞队列里的入库请求丢失)。
(2)setAwaitTerminationSeconds:设置线程池中任务的等待时间,如果超过这个时间,线程池中的任务还没有结束就强制进行销毁,确保应用能够被终止。
2.2 线程任务创建
// 应用启动时,启动四线程从阻塞队列消费入库请求,批量插入Mongo数据库
// 此处线程池不对外开放,仅用于执行数据入库操作
@EventListener(classes = ApplicationReadyEvent.class)
public void onApplicationReadyEvent(@NotNull ApplicationReadyEvent event) {
for (int i = 0; i < 4; i++) {
ioThreadPool.execute(this::initTerminalReportMsgThread);
}
}
// 应用关闭时,设置运行标志为false,以便正在执行的任务退出;继而执行线程池关闭操作
@EventListener(classes = ContextClosedEvent.class)
public void onApplicationCloseEvent(@NotNull ContextClosedEvent event) {
RUNNING_FLAG.set(false);
this.ioThreadPool.shutdown();
log.info("Receive context closed event({}), shutdown io thread pool", event);
}
此处在应用启动时,监听ApplicationReadyEvent事件,当应用启动完成时,创建固定的4线程任务。
2.3 任务终止判定
private void initTerminalReportMsgThread() {
List<TerminalReportMsg> terminalReportMsgs = new ArrayList<>(32);
long start = System.currentTimeMillis();
while (RUNNING_FLAG.get()) {
try {
// 尝试从阻塞队列拉取入库请求(使用poll,设置等待时间,避免长时间阻塞在此处)
TerminalReportMsg msg = Constants.replyMessageBlockingQueue.poll(10, MILLISECONDS);
if (msg != null) {
terminalReportMsgs.add(msg);
// 超过32个请求,则
if (terminalReportMsgs.size() >= 32) {
mongoTemplate.insert(terminalReportMsgs, TerminalReportMsg.class);
terminalReportMsgs.clear();
long end = System.currentTimeMillis();
log.info("Save terminal report msg to mongo, cost {} ms", end - start);
start = end;
}
} else {
// 超过100ms,直接执行入库操作;
// 避免在较长的时间段内数据量较少(小于32),导致前序数据无法及时入库
if (System.currentTimeMillis() - start > 100) {
if (CollUtil.isNotEmpty(terminalReportMsgs)) {
mongoTemplate.insert(terminalReportMsgs, TerminalReportMsg.class);
terminalReportMsgs.clear();
}
start = System.currentTimeMillis();
}
}
} catch (Exception e) {
log.error("Get terminal report msg or send terminal report msg failed", e);
}
}
// 应用终止,此时尽可能的拉取队列中的数据,避免数据丢失
log.info("Thread is interrupted, get all terminal report msg from queue");
while (true) {
try {
TerminalReportMsg msg = Constants.replyMessageBlockingQueue.poll(10, MILLISECONDS);
if (msg == null) {
break;
}
terminalReportMsgs.add(msg);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
if (CollUtil.isNotEmpty(terminalReportMsgs)) {
mongoTemplate.insert(terminalReportMsgs, TerminalReportMsg.class);
}
log.info("Thread is interrupted, save all terminal report msg to mongo");
}
上述代码,是线程池中运行的数据拉取任务,具体内容可见注释描述。
3 补充
线程任务中,使用原子变量RUNNING_FLAG判断应用是否终止,并非使用interrupted的原因在于:
(1)当前线程任务是while循环,不会执行一段时间后自动退出;
(2)线程池执行shutdown的时候,正在运行的线程较难捕获到interputed标志变化(时效)。
基于以上两点,使用原子变量简化判断。