2019年6月

多线程情况下不正确的编码引起的内存泄漏

那年冬天, 新上线了一个处理在线消息的应用, 这个应用上线没多久, 就被发现有内存泄漏的问题. 从verbose GC log 来看, 是内存 heap 被用光了. 于是进一步做了一个 heap dump, 发现里面有个 ConcurrentHashMap 占用了大部分 heap 还不释放. 看上去是这个 ConcurrentHashMap 引起的问题.

这个 ConcurrentHashMap 是这么定义的:

Map<Runnable, ScheduledFuture<?>> futures = new ConcurrentHashMap<>();

它的 key 是一个将要执行的 task, value 是一个已经被 scheduled 的 Future 对象. 仔细观察这个 futures 里面的 ScheduledFuture 元素, 发现里面有一大部分全部都是已经被执行过的(通过Future对象的 status 字段可以判断它当前的状态), 不过按照代码的思路, 如果被执行过之后, 它就会被从这个 futures 里面移除, 可是这里却有大量应该被移除, 却没有被移除的 ScheduledFuture 对象.

代码稍微有点复杂, 那个类大概1千行左右代码. 不过这里我们只把跟问题相关的代码描述一下, 另外加了一个main函数做测试, 代码如下:

public class TaskProcessor {
    
    private TaskProcessor(){};
    private static final TaskProcessor instance = new TaskProcessor();
    public static TaskProcessor getInstance() {//单例
        return instance;
    }
    
    private static final ScheduledExecutorService ses = Executors.newScheduledThreadPool(3);
    private static final Map<Runnable, ScheduledFuture<?>> futures = new ConcurrentHashMap<>();
    
    public void submitTask(Runnable task, int latency) {//提交任务, 并且放入 map
        futures.put(task, ses.schedule(new TaskProcessor.Request(task), latency, TimeUnit.MILLISECONDS));
    }
    
    @PreDestroy
    public void shutdown() {//善后
        ses.shutdown();
    }
    
    /**
     *  internal class wrapper a Runnable 封装任务
     */
    private final class Request implements Runnable {
        
        private final Runnable runnable;

        private Request(final Runnable runnable) { this.runnable = runnable;}
        
        public void run() {//真正执行之前先移除
            ScheduledFuture<?> future = futures.remove(runnable);//remove the task from futures queue
            if (null == future) {
                System.out.println("running in a task");
            }
        }
    }
    
    public static void main(String[] args) {//做测试
        TaskProcessor processor = TaskProcessor.getInstance();
        
        for (int i = 0; i < 50000; i++) {//添加5万个任务
            Runnable r = new Runnable() {

                @Override
                public void run() {//做点打印的任务, 防止代码被优化掉
                    System.out.print("-");
                }
            };
            
            processor.submitTask(r, i % 2);//延迟要么0, 要么1
        }
        
        System.out.println(futures.size());//先打印一次队列长度
        
        try {
            Thread.sleep(8000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(futures.size());//8s之后在打印, 正常已经全部执行完
        
        try {
            Thread.sleep(8000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(futures.size());//再8s之后在打印, 正常已经全部执行完
        
        processor.shutdown();
    }
}

这段代码主要描述这么一个事情: 有任务来的时候, 把任务封装到一个 Request 里面, 然后把它 schedule 将来的某个时间去执行. schedule 动作同步返回一个 ScheduledFuture 实例, 为了统计或反映将要执行的task 数量, 把返回的 ScheduledFuture 实例放到了一个 futures 的ConcurrentHashMap 里面. 当真的要执行的时候, 再把它从 futures 这个map 里面移除. 看上去很简单的一个操作, 就是为了算一下在将要执行队列里面的任务, 加了一个这么 futures.

问题也出现在这个 futures 里面. 进一步对 heap 分析, 那些执行过的 ScheduledFuture, 已经都从 scheduler 内部的队列中移除了, 可是没有从这个futures Map 中移除.

仔细看一下这个代码, 如果说 schedule 一个task的时候, 把延迟设置成0或者非常短, 那么这个问题就很有可能出现. 写代码的人期望的顺序是:

ScheduledFuture future = ses.schedule(new TaskProcessor.Request(task), 0, TimeUnit.MILLISECONDS);  //添加将来任务 在schedule task的线程执行
futures.put(task, future);  //放入map, 在schedule task的线程执行
futures.remove(task) //在map中移除, 在scheduler的worker 执行

而实际执行的时间先后顺序可能是:

ScheduledFuture future = ses.schedule(new TaskProcessor.Request(task), 0, TimeUnit.MILLISECONDS);  //1 schedule task的线程执行
futures.remove(task) //3 在scheduler的worker 执行, 可是这里还没有加到Map
futures.put(task, future);  //2 schedule task的线程执行, 加到map

所以, 这个是没有考虑到多线程情况下, 代码同步可能引起的问题.