以为很熟悉CountDownLatch的使用了,没想到在生产环境翻车了

前言

大家好,我是小郭,之前分享了CountDownLatch的使用,我们知道用来控制并发流程的同步工具,主要的作用是为了等待多个线程同时完成任务后,在进行主线程任务。

万万没想到,在生产环境中竟然翻车了,因为没有考虑到一些场景,导致了CountDownLatch出现了问题,接下来来分享一下由于CountDownLatch导致的问题。

需求背景

先简单介绍下业务场景,针对用户批量下载的文件进行修改上传

为了提高执行的速度,所以在采用线程池去执行 下载-修改-上传 的操作,并在全部执行完之后统一提交保存文件地址到数据库,于是加入了CountDownLatch来进行控制。

具体实现

根据服务本身情况,自定义一个线程池

public static ExecutorService testExtcutor() {        return new ThreadPoolExecutor(                2,                2,                0L,                TimeUnit.SECONDS,                new LinkedBlockingQueue<>(1));    }复制代码

模拟执行

public static void main(String[] args) {        // 下载文件总数        List resultList = new ArrayList<>(100);        IntStream.range(0,100).forEach(resultList::add);        // 下载文件分段        List> split = CollUtil.split(resultList, 10);        ExecutorService executorService = BaseThreadPoolExector.testExtcutor();        CountDownLatch countDownLatch = new CountDownLatch(100);        for (List list : split) {            executorService.execute(() -> {                list.forEach(i ->{                    try {                        // 模拟业务操作                        Thread.sleep(500);                        System.out.println("任务进入");                    } catch (InterruptedException e) {                        e.printStackTrace();                        System.out.println(e.getMessage());                    } finally {                        System.out.println(countDownLatch.getCount());                        countDownLatch.countDown();                    }                });            });        }        try {            countDownLatch.await();            System.out.println("countDownLatch.await()");        } catch (InterruptedException e) {            e.printStackTrace();        }    }复制代码

一开始我个人感觉没有什么问题,反正finally都能够做减一的操作,到最后调用await方法,进行主线程任务

Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@300ffa5d rejected from java.util.concurrent.ThreadPoolExecutor@1f17ae12[Running, pool size = 2, active threads = 2, queued tasks = 1, completed tasks = 0]  at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)  at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)  at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)  at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:112)  at Thread.executor.executorTestBlock.main(executorTestBlock.java:28)任务进入countDownLatch.countDown任务进入countDownLatch.countDown任务进入countDownLatch.countDown任务进入countDownLatch.countDown任务进入countDownLatch.countDown任务进入countDownLatch.countDown任务进入countDownLatch.countDown任务进入countDownLatch.countDown任务进入countDownLatch.countDown任务进入countDownLatch.countDown任务进入countDownLatch.countDown任务进入countDownLatch.countDown任务进入countDownLatch.countDown任务进入countDownLatch.countDown任务进入countDownLatch.countDown复制代码

由于任务数量较多,阻塞队列中已经塞满了,所以默认的拒绝策略,当队列满时,处理策略报错异常,

要注意这个异常是线程池,自己抛出的,不是我们循环里面打印出来的,

这也造成了,线上这个线程池被阻塞了,他永远也调用不到await方法,

利用jstack,我们就能够看到有问题

"pool-1-thread-2" #12 prio=5 os_prio=31 tid=0x00007ff6198b7000 nid=0xa903 waiting on condition [0x0000700001c64000]   java.lang.Thread.State: WAITING (parking)at sun.misc.Unsafe.park(Native Method)- parking to wait for  <0x000000076b2283f8> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074)at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)at java.lang.Thread.run(Thread.java:748)"pool-1-thread-1" #11 prio=5 os_prio=31 tid=0x00007ff6198b6800 nid=0x5903 waiting on condition [0x0000700001b61000]   java.lang.Thread.State: WAITING (parking)at sun.misc.Unsafe.park(Native Method)- parking to wait for  <0x000000076b2283f8> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074)at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)at java.lang.Thread.run(Thread.java:748)

解决方案

  1. 调大阻塞队列,但是问题来了,到底多少阻塞队列才是大呢,如果太大了会不由又造成内存溢出等其他的问题
  2. 在第一个的基础上,我们修改了拒绝策略,当触发拒绝策略的时候,用调用者所在的线程来执行任务
  3. public static ThreadPoolExecutor queueExecutor(BlockingQueue workQueue){ return new ThreadPoolExecutor( size, size, 0L, TimeUnit.SECONDS, workQueue, new ThreadPoolExecutor.CallerRunsPolicy()); } 复制代码
  4. 你可能又会想说,会不会任务数量太多,导致调用者所在的线程执行不过来,任务提交的性能急剧下降
  5. 那我们就应该自定义拒绝策略,将这下排队的消息记录下来,采用补偿机制的方式去执行
  6. 同时也要注意上面的那个异常是线程池抛出来的,我们自己也需要将线程池进行try catch,记录问题数据,并且在finally中执行countDownLatch.countDown来避免,线程池的使用

总结

目前根据业务部门的反馈,业务实际中任务数不很特别多的情况,所以暂时先采用了第二种方式去解决这个线上问题

在这里我们也可以看到,如果没有正确的关闭countDownLatch,可能会导致一直等待,这也是我们需要注意的。

工具虽然好,但是依然要注意他带来的问题,没有正确的去处理好,引发的一系列连锁反应。

展开阅读全文

页面更新:2024-05-12

标签:队列   线程   主线   场景   没想到   异常   策略   代码   操作   环境   业务   文件

1 2 3 4 5

上滑加载更多 ↓
推荐阅读:
友情链接:
更多:

本站资料均由网友自行发布提供,仅用于学习交流。如有版权问题,请与我联系,QQ:4156828  

© CopyRight 2008-2024 All Rights Reserved. Powered By bs178.com 闽ICP备11008920号-3
闽公网安备35020302034844号

Top