CompletableFuture异步批处理

前言

在java8的环境下,CompletableFuture是非常受大家喜欢的api,其强大的异步编排能力是做应用服务的绝佳助手。这个api里面包含了大几十个方法,咱们不挨着展开,本篇默认的都是对这个api有一些基础的小伙伴。

CompletableFuture



CompletableFuture异步批处理

使用场景

现在有如下场景。前端传递一个用户对象的集合,拿到之后需要后端挨个处理集合中的对象,且每个对象的操作都非常地耗时,例如需要三秒左右,每一个处理完之后会返回一个结果,我需要汇总这些结果返回给前端。示意图大概如下:

CompletableFuture异步批处理

挨个处理完每个对象之后,已经花费了10s,这还没算上此次其它的操作。假设业务上对这个接口的要求是5s,那么显然这样做就不满足要求了,那怎么办呢,也比较容易想到,每个处理对象的操作异步进行,最终把结果汇总下就行了。

思路如下:

CompletableFuture异步批处理

结合CompletableFuture实现业务

新建user类:

import lombok.Data;

/**
* @author : wuwensheng
* @date : 10:47 2021/12/13
*/
@Data
public class User {
   private String name;

   private int age;

   public User(String name, int age) {
       this.name = name;
       this.age = age;
   }
}
复制代码

springboot整合测试:

import com.teligen.PhoneApplication;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.function.BiFunction;
import java.util.function.Supplier;

/**
 * 异步批处理的类,可以有相关的许多变种
 *
 * @author : wuwensheng
 * @date : 10:36 2021/12/13
 */
@RunWith(SpringRunner.class)
@SpringBootTest(classes = PhoneApplication.class)
@Slf4j
public class CompletableFutureTest {

    @Autowired
    private ThreadPoolExecutor customThreadPoolExecutor;
    
}
复制代码

customThreadPoolExecutor这个对象是我的线程池。

每个对user的处理都是一个CompletableFuture,如下:

public CompletableFuture disposeUser(User user) {
    return CompletableFuture.supplyAsync(new Supplier() {

        @Override
        public Integer get() {
            log.info("Thread name:{}", Thread.currentThread().getName());
            try {
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return user.getAge() + 10;
        }
    }, customThreadPoolExecutor).handleAsync(new BiFunction() {
        @Override
        public Integer apply(Integer param, Throwable throwable) {
            int result = param;
            if (throwable == null) {
                result = param * 2;
            } else {
                log.info("throwable is:{}", throwable.getMessage());
            }
            return result;
        }
    });
}
复制代码

这里处理每一个user都让当前线程沉睡了两秒,用来模拟处理业务所花费的时间。

继续编排下:

@Test
public void userTest() {
    List users = new ArrayList<>();
    users.add(new User("小明", 3));
    users.add(new User("小红", 2));
    users.add(new User("小芳", 18));
    CompletableFuture[] completableFutures = users.stream().map(user -> {

        return disposeUser(user);

    }).toArray(CompletableFuture[]::new);
    // 等待所有任务执行完
    CompletableFuture.allOf(completableFutures).join();

    for (CompletableFuture completableFuture : completableFutures) {
        try {
            log.info("result:{}", completableFuture.get());
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
复制代码

跑一下看看结果:

CompletableFuture异步批处理

ok了,57秒开始处理,59秒处理完毕。任务在join之后的确是并行的。这是一种什么感觉呢,所有异步线程出去办事了,有一辆车等着它们回来,最后一个人回来的时候,那便发车。

咱们再验证下。当处理小芳的时候沉睡5秒,看下结果:

CompletableFuture异步批处理

CompletableFuture异步批处理

这次返回耗费了5秒左右,处理得最慢的那个线程决定了最终的返回时长,这也符合咱们的预期。 大家在处理集合数据并且每一条的处理都比较耗时的话,可以考虑这个手法。


作者:119_115_104_104_201
链接:https://juejin.cn/post/7041087572461748232
来源:稀土掘金
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。

展开阅读全文

页面更新:2024-04-28

标签:批处理   线程   函数   场景   接口   对象   阶段   操作   代码   代表   业务

1 2 3 4 5

上滑加载更多 ↓
推荐阅读:
友情链接:
更多:

本站资料均由网友自行发布提供,仅用于学习交流。如有版权问题,请与我联系,QQ:4156828  

© CopyRight 2008-2024 All Rights Reserved. Powered By bs178.com 闽ICP备11008920号-3
闽公网安备35020302034844号

Top