在java8的环境下,CompletableFuture是非常受大家喜欢的api,其强大的异步编排能力是做应用服务的绝佳助手。这个api里面包含了大几十个方法,咱们不挨着展开,本篇默认的都是对这个api有一些基础的小伙伴。
现在有如下场景。前端传递一个用户对象的集合,拿到之后需要后端挨个处理集合中的对象,且每个对象的操作都非常地耗时,例如需要三秒左右,每一个处理完之后会返回一个结果,我需要汇总这些结果返回给前端。示意图大概如下:
挨个处理完每个对象之后,已经花费了10s,这还没算上此次其它的操作。假设业务上对这个接口的要求是5s,那么显然这样做就不满足要求了,那怎么办呢,也比较容易想到,每个处理对象的操作异步进行,最终把结果汇总下就行了。
思路如下:
新建user类:
import lombok.Data;
/**
* @author : wuwensheng
* @date : 10:47 2021/12/13
*/
@Data
public class User {
private String name;
private int age;
public User(String name, int age) {
this.name = name;
this.age = age;
}
}
复制代码
springboot整合测试:
import com.teligen.PhoneApplication;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.function.BiFunction;
import java.util.function.Supplier;
/**
* 异步批处理的类,可以有相关的许多变种
*
* @author : wuwensheng
* @date : 10:36 2021/12/13
*/
@RunWith(SpringRunner.class)
@SpringBootTest(classes = PhoneApplication.class)
@Slf4j
public class CompletableFutureTest {
@Autowired
private ThreadPoolExecutor customThreadPoolExecutor;
}
复制代码
customThreadPoolExecutor这个对象是我的线程池。
每个对user的处理都是一个CompletableFuture,如下:
public CompletableFuture disposeUser(User user) {
return CompletableFuture.supplyAsync(new Supplier() {
@Override
public Integer get() {
log.info("Thread name:{}", Thread.currentThread().getName());
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
return user.getAge() + 10;
}
}, customThreadPoolExecutor).handleAsync(new BiFunction() {
@Override
public Integer apply(Integer param, Throwable throwable) {
int result = param;
if (throwable == null) {
result = param * 2;
} else {
log.info("throwable is:{}", throwable.getMessage());
}
return result;
}
});
}
复制代码
这里处理每一个user都让当前线程沉睡了两秒,用来模拟处理业务所花费的时间。
继续编排下:
@Test
public void userTest() {
List users = new ArrayList<>();
users.add(new User("小明", 3));
users.add(new User("小红", 2));
users.add(new User("小芳", 18));
CompletableFuture[] completableFutures = users.stream().map(user -> {
return disposeUser(user);
}).toArray(CompletableFuture[]::new);
// 等待所有任务执行完
CompletableFuture.allOf(completableFutures).join();
for (CompletableFuture completableFuture : completableFutures) {
try {
log.info("result:{}", completableFuture.get());
} catch (Exception e) {
e.printStackTrace();
}
}
}
复制代码
跑一下看看结果:
ok了,57秒开始处理,59秒处理完毕。任务在join之后的确是并行的。这是一种什么感觉呢,所有异步线程出去办事了,有一辆车等着它们回来,最后一个人回来的时候,那便发车。
咱们再验证下。当处理小芳的时候沉睡5秒,看下结果:
这次返回耗费了5秒左右,处理得最慢的那个线程决定了最终的返回时长,这也符合咱们的预期。 大家在处理集合数据并且每一条的处理都比较耗时的话,可以考虑这个手法。
作者:119_115_104_104_201
链接:https://juejin.cn/post/7041087572461748232
来源:稀土掘金
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。
页面更新:2024-04-28
本站资料均由网友自行发布提供,仅用于学习交流。如有版权问题,请与我联系,QQ:4156828
© CopyRight 2008-2024 All Rights Reserved. Powered By bs178.com 闽ICP备11008920号-3
闽公网安备35020302034844号