ListenableFuture和CompletableFuture简单小结
发布日期:2021-05-08 01:05:30 浏览次数:19 分类:精选文章

本文共 7647 字,大约阅读时间需要 25 分钟。

前言


最近花了点时间熟悉了下ListenableFuture和CompletableFuture的使用。二者都是原生JDK中老版Future-Get模式的改进。本文将结合demo程序来直观的学习一下这两大Future的使用特点。

老版Future模式的缺点


老版Future模式一个最大的问题是需要获取结果做后续处理操作的时候,还是需要阻塞等待。这样的话,和同步调用方式就没有多大区别了。而ListenableFuture和CompletableFuture对于这种情况则是提供了很多易用的API。

如果说按照先后顺序来讲的话,首先是ListenableFuture,这是由Google Guava工具包提供的Future扩展类,随后,JDK在1.8版本中马上也提供了类似这样的类,就是CompletableFuture。

ListenableFuture

先来聊聊ListenableFuture,一句话概括ListenableFuture和JDK原生Future最大的区别是前者做到了一个可以监听结果的Future。换个更通俗的讲法,就是它可以监听异步执行的过程,执行完了,自动触发什么操作。除此之外,可以分别针对成功的情况,或者失败的情况做各种后续处理。具体使用可以看下面笔者写的demo程序。

package import java.util.ArrayList;import java.util.Collections;import java.util.List;import java.util.concurrent.Callable;import java.util.concurrent.CompletableFuture;import java.util.concurrent.Executors;import org.junit.Test;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import com.google.common.util.concurrent.FutureCallback;import com.google.common.util.concurrent.Futures;import com.google.common.util.concurrent.ListenableFuture;import com.google.common.util.concurrent.ListeningExecutorService;import com.google.common.util.concurrent.MoreExecutors;/** * The unit test for ListenableFuture/CompletableFuture.  * Created by yiqun01.lin * on 2018/5/3. */public class TestFutures {     //线程池中线程个数  private static final int POOL_SIZE = 50;  //带有回调机制的线程池  private static final ListeningExecutorService service = MoreExecutors          .listeningDecorator(Executors.newFixedThreadPool(POOL_SIZE));  private static Logger LOG = LoggerFactory.getLogger(TestFutures.class);  @Test  public void testListenableFuture() {    final List
value = Collections .synchronizedList(new ArrayList
()); try { List
> futures = new ArrayList
>(); // 将实现了callable的任务放入到线程池中,得到一个带有回调机制的ListenableFuture实例, // 通过Futures.addCallback方法对得到的ListenableFuture实例进行监听,一旦得到结果就进入到onSuccess方法中, // 在onSuccess方法中将查询的结果存入到集合中 for (int i = 0; i < 1; i++) { final int index = i; if (i == 9) { Thread.sleep(500 * i); } ListenableFuture
sfuture = service .submit(new Callable
() { @Override public String call() throws Exception { long time = System.currentTimeMillis(); LOG.info("Finishing sleeping task{}: {}", index, time); return String.valueOf(time); } }); sfuture.addListener(new Runnable() { @Override public void run() { LOG.info("Listener be triggered for task{}.", index); } }, service); Futures.addCallback(sfuture, new FutureCallback
() { public void onSuccess(String result) { LOG.info("Add result value into value list {}.", result); value.add(result); } public void onFailure(Throwable t) { LOG.info("Add result value into value list error.", t); throw new RuntimeException(t); } }); // 将每一次查询得到的ListenableFuture放入到集合中 futures.add(sfuture); } // 这里将集合中的若干ListenableFuture形成一个新的ListenableFuture // 目的是为了异步阻塞,直到所有的ListenableFuture都得到结果才继续当前线程 // 这里的时间取的是所有任务中用时最长的一个 ListenableFuture
> allAsList = Futures.allAsList(futures); allAsList.get(); LOG.info("All sub-task are finished."); } catch (Exception ignored) { } } @Test public void testCompletableFuture() throws Exception { ... }}

根据测试输出结果,来验证其中的执行顺序,是不是我们预期的那样。

2018-05-19 11:06:34,870 [pool-1-thread-1] INFO  records.TestFutures (TestFutures.java:call(53)) - Finishing sleeping task0: 15266991948682018-05-19 11:06:34,874 [pool-1-thread-2] INFO  records.TestFutures (TestFutures.java:run(60)) - Listener be triggered for task0.2018-05-19 11:06:34,896 [main] INFO  records.TestFutures (TestFutures.java:onSuccess(66)) - Add result value into value list 1526699194868.2018-05-19 11:06:34,924 [main] INFO  records.TestFutures (TestFutures.java:testListenableFuture(84)) - All sub-task are finished.

CompletableFuture


我们再来看看CompletableFuture的使用,这个是在JDK8中开始引入的,这个在一定程度上与ListenableFuture非常类似。比如说ListenableFuture的listener监听回调,在这个类中,相当于thenRun或者whneComplete操作原语。CompletableFuture提供的API其实有很多,从大的方向上来划分的话,有下面几类:

public static CompletableFuture
runAsync(Runnable runnable) public static CompletableFuture
runAsync(Runnable runnable, Executor executor) public static
CompletableFuture supplyAsync(Supplier supplier) public static CompletableFuture supplyAsync(Supplier supplier, Executor executor)

注意到这里,runAsync是不带类型返回的,Void,而supplyAsync API需要传入类型的,整型,字符串或者其它,然后是否需要在额外的线程池里执行这些Async操作,如果没有指定,会默认在ForkJoinPool提供的common pool里跑。

同样笔者也写了一个简单的demo程序:

package import java.util.ArrayList;import java.util.Collections;import java.util.List;import java.util.concurrent.Callable;import java.util.concurrent.CompletableFuture;import java.util.concurrent.Executors;import org.junit.Test;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import com.google.common.util.concurrent.FutureCallback;import com.google.common.util.concurrent.Futures;import com.google.common.util.concurrent.ListenableFuture;import com.google.common.util.concurrent.ListeningExecutorService;import com.google.common.util.concurrent.MoreExecutors;/** * The unit test for ListenableFuture/CompletableFuture.  * Created by yiqun01.lin * on 2018/5/3. */public class TestFutures {     //线程池中线程个数  private static final int POOL_SIZE = 50;  //带有回调机制的线程池  private static final ListeningExecutorService service = MoreExecutors          .listeningDecorator(Executors.newFixedThreadPool(POOL_SIZE));  private static Logger LOG = LoggerFactory.getLogger(TestFutures.class);  @Test  public void testListenableFuture() {    ...  }  @Test  public void testCompletableFuture() throws Exception {    // case1: supplyAsync    CompletableFuture
future = CompletableFuture.supplyAsync(() -> { LOG.info("Run supplyAsync."); return "Return result of Supply Async"; }); // case2: thenRun,与supplyAsync同线程 future.thenRun(new Runnable() { @Override public void run() { LOG.info("Run action."); } }); // case2: thenRunAsync,另启动线程执行 future.thenRunAsync(new Runnable() { @Override public void run() { LOG.info("Run async action."); } }); // 主动触发Complete结束方法 // future.complete("Manual complete value."); future.whenComplete((v, e) -> { LOG.info("WhenComplete value: " + v); LOG.info("WhenComplete exception: " + e); }); CompletableFuture
future2 = CompletableFuture.runAsync(() -> { LOG.info("Return result of Run Async."); }); CompletableFuture
future3 = CompletableFuture.supplyAsync(() -> { return "hello"; }); CompletableFuture
future4 = CompletableFuture.supplyAsync(() -> { return "world"; }); CompletableFuture
f = future3.thenCombine(future4, (x, y) -> x + "-" + y); LOG.info(f.get()); }}

测试输出结果:

2018-05-19 11:16:36,358 [ForkJoinPool.commonPool-worker-1] INFO  records.TestFutures (TestFutures.java:lambda$0(93)) - Run supplyAsync.2018-05-19 11:16:36,381 [main] INFO  records.TestFutures (TestFutures.java:run(102)) - Run action.2018-05-19 11:16:36,393 [ForkJoinPool.commonPool-worker-1] INFO  records.TestFutures (TestFutures.java:run(111)) - Run async action.2018-05-19 11:16:36,394 [main] INFO  records.TestFutures (TestFutures.java:lambda$1(118)) - WhenComplete value: Return result of Supply Async2018-05-19 11:16:36,394 [main] INFO  records.TestFutures (TestFutures.java:lambda$1(119)) - WhenComplete exception: null2018-05-19 11:16:36,396 [ForkJoinPool.commonPool-worker-1] INFO  records.TestFutures (TestFutures.java:lambda$2(122)) - Return result of Run Async.2018-05-19 11:16:36,397 [main] INFO  records.TestFutures (TestFutures.java:testCompletableFuture(133)) - hello-world

这些API使用起来还是非常灵活的,大家可以自行本地继续调试调试,包括哪些是阻塞执行的,哪些是异步的,哪些是需要额外开线程执行的等等。

上一篇:HDFS文件目录list操作加速优化
下一篇:公司如何使用开源软件

发表评论

最新留言

路过按个爪印,很不错,赞一个!
[***.219.124.196]2025年04月16日 04时01分17秒