目录

CompletableFuture_组合式异步编程

本章内容

  • 创建异步计算,并获取计算结果
  • 使用非阻塞操作提升吞吐量
  • 设计和实现异步API
  • 如何以异步的方式使用同步的API
  • 如何对两个或多个异步操作进行流水线和合并操作
  • 如何处理异步操作的完成状态

最近这些年,两种趋势不断地推动我们反思我们设计软件的方式。第一种趋势和应用运行的硬件平台相关,第二种趋势与应用程序的架构相关,尤其是它们之间如何交互。

我们在第7章中已经讨论过硬件平台的影响。我们注意到随着多核处理器的出现,提升应用程序处理速度最有效 的方式是编写能充分发挥多核能力的软件。你已经看到通过切分大型的任务,让每个子任务并行 运行,这一目标是能够实现的;你也已经了解相对直接使用线程的方式,使用分支/合并框架(在 Java 7中引入)和并行流(在Java 8中新引入)能以更简单、更有效的方式实现这一目标。

第二种趋势反映在公共API日益增长的互联网服务应用。著名的互联网大鳄们纷纷提供了自己的公共API服务,比如谷歌提供了地理信息服务,Facebook提供了社交信息服务,Twitter提供 了新闻服务。现在,很少有网站或者网络应用会以完全隔离的方式工作。更多的时候,我们看到 的下一代网络应用都采用“混聚”(mash-up)的方式:它会使用来自多个来源的内容,将这些内容聚合在一起,方便用户的生活。

比如,你可能希望为你的法国客户提供指定主题的热点报道。为实现这一功能,你需要向 谷歌或者Twitter的API请求所有语言中针对该主题最热门的评论,可能还需要依据你的内部算法 对它们的相关性进行排序。之后,你可能还需要使用谷歌的翻译服务把它们翻译成法语,甚至 利用谷歌地图服务定位出评论作者的位置信息,最终将所有这些信息聚集起来,呈现在你的网 站上。

当然,如果某些外部网络服务发生响应慢的情况,你希望依旧能为用户提供部分信息,比如提供带问号标记的通用地图,以文本的方式显示信息,而不是呆呆地显示一片空白屏幕,直到地 图服务器返回结果或者超时退出。图11-1解释了这种典型的“混聚”应用如何与所需的远程服务 交互。

https://gitee.com/lienhui68/picStore/raw/master/null/20200814022150.png

要实现类似的服务,你需要与互联网上的多个Web服务通信。可是,你并不希望因为等待某 些服务的响应,阻塞应用程序的运行,浪费数十亿宝贵的CPU时钟周期。比如,不要因为等待 Facebook的数据,暂停对来自Twitter的数据处理。

这些场景体现了多任务程序设计的另一面。第7章中介绍的分支/合并框架以及并行流是实现 并行处理的宝贵工具;它们将一个操作切分为多个子操作,在多个不同的核、CPU甚至是机器上 并行地执行这些子操作。

与此相反,如果你的意图是实现并发,而非并行,或者你的主要目标是在同一个CPU上执行几个松耦合的任务,充分利用CPU的核,让其足够忙碌,从而最大化程序的吞吐量,那么你 其实真正想做的是避免因为等待远程服务的返回,或者等待数据库的查询,而阻塞线程的执行, 浪费宝贵的计算资源,因为这种等待的时间很可能相当长。通过本章中你会了解,Future接口, 尤其是它的新版实现CompletableFuture,是处理这种情况的利器。图11-2说明了并行和并发 的区别。

https://gitee.com/lienhui68/picStore/raw/master/null/20200814022328.png

Future接口

Future接口在Java 5中被引入,设计初衷是对将来某个时刻会发生的结果进行建模它建模 了一种异步计算,返回一个执行运算结果的引用,当运算结束后,这个引用被返回给调用方。在 Future中触发那些潜在耗时的操作把调用线程解放出来,让它能继续执行其他有价值的工作, 不再需要呆呆等待耗时的操作完成。打个比方,你可以把它想象成这样的场景:你拿了一袋子衣 服到你中意的干洗店去洗。干洗店的员工会给你张发票,告诉你什么时候你的衣服会洗好(这就 是一个Future事件)。衣服干洗的同时,你可以去做其他的事情。Future的另一个优点是它比 更底层的Thread更易用。要使用Future,通常你只需要将耗时的操作封装在一个Callable对 象中,再将它提交给ExecutorService,就万事大吉了。下面这段代码展示了Java 8之前使用 Future的一个例子。

代码清单11-1 使用Future以异步的方式执行一个耗时的操作

https://gitee.com/lienhui68/picStore/raw/master/null/20200814022928.png

正像图11-3介绍的那样,这种编程方式让你的线程可以在ExecutorService以并发方式调 用另一个线程执行耗时操作的同时,去执行一些其他的任务。接着,如果你已经运行到没有异步 操作的结果就无法继续任何有意义的工作时,可以调用它的get方法去获取操作的结果。如果操 作已经完成,该方法会立刻返回操作的结果,否则它会阻塞你的线程,直到操作完成,返回相应 的结果。

https://gitee.com/lienhui68/picStore/raw/master/null/20200814023017.png

你能想象这种场景存在怎样的问题吗?如果该长时间运行的操作永远不返回了会怎样?为 了处理这种可能性,虽然Future提供了一个无需任何参数的get方法,我们还是推荐大家使用重载版本的get方法,它接受一个超时的参数,通过它,你可以定义你的线程等待Future结果的最长时间,而不是像代码清单11-1中那样永无止境地等待下去。

Future接口的局限性

通过第一个例子,我们知道 Future 接口提供了方法来检测异步计算是否已经结束(使用 isDone方法),等待异步操作结束,以及获取计算的结果。但是这些特性还不足以让你编写简洁的并发代码。比如,我们很难表述Future结果之间的依赖性;从文字描述上这很简单,“当长时 间计算任务完成时,请将该计算的结果通知到另一个长时间运行的计算任务,这两个计算任务都 完成后,将计算的结果与另一个查询操作结果合并”。但是,使用Future中提供的方法完成这样 的操作又是另外一回事。这也是我们需要更具描述能力的特性的原因,比如下面这些。

  • 将两个异步计算合并为一个——这两个异步计算之间相互独立,同时第二个又依赖于第一个的结果。
  • 等待Future集合中的所有任务都完成。
  • 仅等待Future集合中最快结束的任务完成(有可能因为它们试图通过不同的方式计算同 一个值),并返回它的结果。
  • 通过编程方式完成一个Future任务的执行(即以手工设定异步操作结果的方式)。
  • 应对Future的完成事件即当Future的完成事件发生时会收到通知,并能使用Future计算的结果进行下一步的操作,不只是简单地阻塞等待操作的结果)。

这一章中,你会了解新的CompletableFuture类(它实现了Future接口)如何利用Java 8 的新特性以更直观的方式将上述需求都变为可能。Stream和CompletableFuture的设计都遵循 了类似的模式:它们都使用了Lambda表达式以及流水线的思想。 从这个角度, 你可以说 CompletableFuture和Future的关系就跟Stream和Collection的关系一样。

使用CompleteFuture构建异步应用

为了展示 CompletableFuture 的强大特性, 我们会创建一个名为“最佳价格查询器” (best-price-finder)的应用,它会查询多个在线商店,依据给定的产品或服务找出最低的价格。这个过程中,你会学到几个重要的技能。

  • 首先,你会学到如何为你的客户提供异步API(如果你拥有一间在线商店的话,这是非常 有帮助的)。
  • 其次,你会掌握如何让你使用了同步API的代码变为非阻塞代码。你会了解如何使用流水线将两个接续的异步操作合并为一个异步计算操作。这种情况肯定会出现,比如,在线 商店返回了你想要购买商品的原始价格,并附带着一个折扣代码——最终,要计算出该 商品的实际价格,你不得不访问第二个远程折扣服务,查询该折扣代码对应的折扣比率。
  • 你还会学到如何以响应式的方式处理异步操作的完成事件,以及随着各个商店返回它的 商品价格,最佳价格查询器如何持续地更新每种商品的最佳推荐,而不是等待所有的商 店都返回他们各自的价格(这种方式存在着一定的风险,一旦某家商店的服务中断,用 户可能遭遇白屏)。

同步API与异步API

同步API其实只是对传统方法调用的另一种称呼:你调用了某个方法,调用方在被调用方运行的过程中会等待,被调用方运行结束返回,调用方取得被调用方的返回值并继续运行。即使调用方和被调用方在不同的线程中运行,调用方还是需要等待被调用方结束运行,这就是阻塞式调用这个名词的由来。

与此相反,异步API会直接返回,或者至少在被调用方计算完成之前,将它剩余的计算任务交给另一个线程去做,该线程和调用方是异步的——这就是非阻塞式调用的由来。执行剩余计算任务的线程会将它的计算结果返回给调用方。返回的方式要么是通过回调函数,要么是由调用方再次执行一个“等待,直到计算完成”的方法调用。这种方式的计算在I/O系统程序设计中非常常见:你发起了一次磁盘访问,这次访问和你的其他计算操作是异步的,你完成其他的任务时,磁盘块的数据可能还没载入到内存,你只需要等待数据的载入完成。

实现异步API

为了实现最佳价格查询器应用,让我们从每个商店都应该提供的API定义入手。首先,商店 应该声明依据指定产品名称返回价格的方法:

1
2
3
    public double getPrice(String product) {
        // 待实现
    }

该方法的内部实现会查询商店的数据库,但也有可能执行一些其他耗时的任务,比如联系其 他外部服务(比如,商店的供应商,或者跟制造商相关的推广折扣)。我们在本章剩下的内容中, 采用delay方法模拟这些长期运行的方法的执行,它会人为地引入1秒钟的延迟,方法声明如下。

代码清单11-2 模拟1秒钟延迟的方法

1
2
3
4
5
6
7
public static void delay() {
        try {
            Thread.sleep(1000L);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

为了介绍本章的内容,getPrice方法会调用delay方法,并返回一个随机计算的值,代码 清单如下所示。返回随机计算的价格这段代码看起来有些取巧。它使用charAt,依据产品的名 称,生成一个随机值作为价格。

代码清单11-3 在getPrice方法中引入一个模拟的延迟

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
class Shop {
    public double getPrice(String product) {
        return calculatePrice(product);
    }

    private double calculatePrice(String product) {
        delay();
        Random random = new Random();
        return random.nextDouble() * product.charAt(0) + product.charAt(1);
    }
}

很明显,这个API的使用者(这个例子中为最佳价格查询器)调用该方法时,它依旧会被 阻塞。 为等待同步事件完成而等待1秒钟,这是无法接受的,尤其是考虑到最佳价格查询器对 网络中的所有商店都要重复这种操作。本章接下来的小节中,你会了解如何以异步方式使用同 步API解决这个问题。但是,出于学习如何设计异步API的考虑,我们会继续这一节的内容,假 装我们还在深受这一困难的烦扰:你是一个睿智的商店店主,你已经意识到了这种同步API会 为你的用户带来多么痛苦的体验,你希望以异步API的方式重写这段代码,让用户更流畅地访 问你的网站。

将同步方法转换为异步方法

为了实现这个目标,你首先需要将getPrice转换为getPriceAsync方法,并修改它的返 回值:

1
public Future<Double> getPriceAsync(String product) { ... }

我们在本章开头已经提到,Java 5引入了java.util.concurrent.Future接口表示一个异 步计算(即调用线程可以继续运行,不会因为调用方法而阻塞)的结果。这意味着Future是一 个暂时还不可知值的处理器,这个值在计算完成后,可以通过调用它的get方法取得。因为这样 的设计,getPriceAsync方法才能立刻返回,给调用线程一个机会,能在同一时间去执行其他 有价值的计算任务。新的CompletableFuture类提供了大量的方法,让我们有机会以多种可能 的方式轻松地实现这个方法,比如下面就是这样一段实现代码。

代码清单11-4 getPriceAsync方法的实现

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
public Future<Double> getPriceAsync(String product) {
        // 创建 CompletableFuture 对象,它会包含计算的结果
        CompletableFuture<Double> futurePrice = new CompletableFuture<>();
        // 在另一个 线程中以 异步方式 执行计算
        new Thread(() -> {
            double price = calculatePrice(product);
            // 需长时间计算的任务结束并得出结果时,设置Future的返回值
            futurePrice.complete(price);
        }).start();
        // 无需等待还没结束的计算,直接返回Future对象
        return futurePrice;
    }

在这段代码中,你创建了一个代表异步计算的CompletableFuture对象实例,它在计算完 成时会包含计算的结果。接着,你调用fork创建了另一个线程去执行实际的价格计算工作,不 等该耗时计算任务结束,直接返回一个Future实例。当请求的产品价格最终计算得出时,你可 以使用它的complete方法,结束completableFuture对象的运行,并设置变量的值。很显然, 这个新版Future的名称也解释了它所具有的特性。使用这个API的客户端,可以通过下面的这段代码对其进行调用。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
package com.eh.eden.java8.demo;

import lombok.Getter;

import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;

import static com.eh.eden.java8.demo.Demo9.delay;

/**
 * todo
 *
 * @author David Li
 * @create 2020/08/13
 */
public class Demo9 {

    public static void delay() {
        try {
            Thread.sleep(1000L);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    public static void main(String[] args) throws Exception {
        Shop shop = new Shop("BestShop");
        long start = System.nanoTime();
        // 查询商店, 试图取得商品的价格
        Future<Double> futurePrice = shop.getPriceAsync("my favorite product");
        long invocationTime = ((System.nanoTime() - start) / 1_000_000);
        System.out.println("Invocation returned after " + invocationTime + " msecs");

        // 在计算商品价格的同时执行更多任务,比如查询其他商店
        doSomethingElse();

        // 从Future对象中读取价格, 如果价格未知,会发生阻塞
        double price = futurePrice.get();
        System.out.printf("Price is %.2f%n", price);

        long retrievalTime = ((System.nanoTime() - start) / 1_000_000);
        System.out.println("Price returned after " + retrievalTime + " msecs");
    }

    public static void doSomethingElse() {
    }

}

class Shop {

    @Getter
    private String name;

    public Shop(String name) {
        this.name = name;
    }

    public Future<Double> getPriceAsync(String product) {
        // 创建 CompletableFuture 对象,它会包含计算的结果
        CompletableFuture<Double> futurePrice = new CompletableFuture<>();
        // 在另一个 线程中以 异步方式 执行计算
        new Thread(() -> {
            double price = calculatePrice(product);
            // 需长时间计算的任务结束并得出结果时,设置Future的返回值
            futurePrice.complete(price);
        }).start();
        // 无需等待还没结束的计算,直接返回Future对象
        return futurePrice;
    }

    private double calculatePrice(String product) {
        delay();
        Random random = new Random();
        return random.nextDouble() * product.charAt(0) + product.charAt(1);
    }
}
// 运行结果
Invocation returned after 74 msecs
Price is 228.21
Price returned after 1100 msecs

我们看到这段代码中,客户向商店查询了某种商品的价格。由于商店提供了异步API,该次 调用立刻返回了一个Future对象,通过该对象客户可以在将来的某个时刻取得商品的价格。这种方式下,客户在进行商品价格查询的同时,还能执行一些其他的任务,比如查询其他家商店中商品的价格,不会呆呆地阻塞在那里等待第一家商店返回请求的结果。最后,如果所有有意义的工作都已经完成,客户所有要执行的工作都依赖于商品价格时,再调用Future的get方法。执行了这个操作后,客户要么获得Future中封装的值(如果异步任务已经完成),要么发生阻塞,直到该异步任务完成,期望的值能够访问。

你一定已经发现getPriceAsync方法的调用返回远远早于最终价格计算完成的时间。在11.4 节中,你还会知道我们有可能避免发生客户端被阻塞的风险。实际上这非常简单,Future执行完毕可以发送一个通知,仅在计算结果可用时执行一个由Lambda表达式或者方法引用定义的回调函数。不过,我们当下不会对此进行讨论,现在我们要解决的是另一个问题:如何正确地管理 异步任务执行过程中可能出现的错误。

错误处理

如果没有意外,我们目前开发的代码工作得很正常。但是,如果价格计算过程中产生了错误 会怎样呢?非常不幸,这种情况下你会得到一个相当糟糕的结果:用于提示错误的异常会被限制在试图计算商品价格的当前线程的范围内,最终会杀死该线程,而这会导致等待get方法返回结果的客户端永久地被阻塞。

客户端可以使用重载版本的get方法,它使用一个超时参数来避免发生这样的情况。这是一 种值得推荐的做法,你应该尽量在你的代码中添加超时判断的逻辑,避免发生类似的问题。使用 这种方法至少能防止程序永久地等待下去, 超时发生时, 程序会得到通知发生了 Timeout- Exception。不过,也因为如此,你不会有机会发现计算商品价格的线程内到底发生了什么问题才引发了这样的失效。 为了让客户端能了解商店无法提供请求商品价格的原因, 你需要使用 CompletableFuture的 completeExceptionally方法将导致 CompletableFuture内发生问题的异常抛出。对代码清单11-4优化后的结果如下所示。

代码清单11-6 抛出CompletableFuture内的异常

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
package com.eh.eden.java8.demo;

import lombok.Getter;

import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

import static com.eh.eden.java8.demo.Demo9.delay;

/**
 * todo
 *
 * @author David Li
 * @create 2020/08/13
 */
public class Demo9 {

    public static void delay() {
        try {
            Thread.sleep(1000L);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    public static void main(String[] args) throws Exception {
        Shop shop = new Shop("BestShop");
        long start = System.nanoTime();
        // 查询商店, 试图取得商品的价格
        Future<Double> futurePrice = shop.getPriceAsync("my favorite product");
        long invocationTime = ((System.nanoTime() - start) / 1_000_000);
        System.out.println("Invocation returned after " + invocationTime + " msecs");

        // 在计算商品价格的同时执行更多任务,比如查询其他商店
        doSomethingElse();

        // 从Future对象中读取价格, 如果价格未知,会发生阻塞
        double price = futurePrice.get(3, TimeUnit.SECONDS);
        System.out.printf("Price is %.2f%n", price);

        long retrievalTime = ((System.nanoTime() - start) / 1_000_000);
        System.out.println("Price returned after " + retrievalTime + " msecs");
    }

    public static void doSomethingElse() {
    }

}

class Shop {

    @Getter
    private String name;

    public Shop(String name) {
        this.name = name;
    }

    public Future<Double> getPriceAsync(String product) {
        // 创建 CompletableFuture 对象,它会包含计算的结果
        CompletableFuture<Double> futurePrice = new CompletableFuture<>();
        // 在另一个 线程中以 异步方式 执行计算
        new Thread(() -> {
            try {
                // 如果价格计算正常结束,完成Future操作并设置商品价格
                double price = calculatePrice(product);
                // 需长时间计算的任务结束并得出结果时,设置Future的返回值
                futurePrice.complete(price);
            } catch (Exception e) {
                // 否则就抛出导致失败的异常,完成这次Future操作
                futurePrice.completeExceptionally(e);
            }
        }).start();
        // 无需等待还没结束的计算,直接返回Future对象
        return futurePrice;
    }

    private double calculatePrice(String product) {
        delay();
        Random random = new Random();
        if (true) {
            throw new RuntimeException("product not available");
        }
        return random.nextDouble() * product.charAt(0) + product.charAt(1);
    }
}

// 运行结果
Invocation returned after 79 msecs
Exception in thread "main" java.util.concurrent.ExecutionException: java.lang.RuntimeException: product not available
	at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
	at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
	at com.eh.eden.java8.demo.Demo9.main(Demo9.java:40)
Caused by: java.lang.RuntimeException: product not available
	at com.eh.eden.java8.demo.Shop.calculatePrice(Demo9.java:84)
	at com.eh.eden.java8.demo.Shop.lambda$getPriceAsync$0(Demo9.java:68)
	at java.lang.Thread.run(Thread.java:745)

客户端现在会收到一个 ExecutionException 异常, 该异常接收了一个包含失败原因的 Exception参数,即价格计算方法最初抛出的异常。所以,举例来说,如果该方法抛出了一个运行时异常“product not available”,客户端就会得到像上面这样一段ExecutionException:java.lang.RuntimeException: product not available。

使用工厂方法supplyAsync创建CompletableFuture

目前为止我们已经了解了如何通过编程创建 CompletableFuture 对象以及如何获取返回值,虽然看起来这些操作已经比较方便,但还有进一步提升的空间,CompletableFuture类自身提供了大量精巧的工厂方法,使用这些方法能更容易地完成整个流程,还不用担心实现的细节。 比如,采用supplyAsync方法后,你可以用一行语句重写代码清单11-4中的getPriceAsync方 法,如下所示。

代码清单11-7 使用工厂方法supplyAsync创建CompletableFuture对象

1
2
3
    public Future<Double> getPriceAsync(String product) {
        return CompletableFuture.supplyAsync(() -> calculatePrice(product));
    }

supplyAsync方法定义:

1
2
3
4
5
6
7
8
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
        return asyncSupplyStage(asyncPool, supplier);
    }

public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,
                                                       Executor executor) {
        return asyncSupplyStage(screenExecutor(executor), supplier);
    }

supplyAsync方法接受一个生产者(Supplier)作为参数,返回一个CompletableFuture 对象,该对象在完成异步执行后会读取调用生产者方法的返回值。生产者方法会交由ForkJoinPool 池中的某个执行线程(Executor)运行,但是你也可以使用supplyAsync方法的重载版本,传 递第二个参数指定不同的执行线程执行生产者方法。一般而言,向CompletableFuture的工厂 方法传递可选参数,指定生产者方法的执行线程是可行的,在11.3.4节中,你会使用这一能力,我 们会在该小节介绍如何使用适合你应用特性的执行线程改善程序的性能

此外,代码清单11-7中 getPriceAsync 方法返回的 CompletableFuture 对象和代码清单 11-6中你手工创建和完成的CompletableFuture对象是完全等价的,这意味着它提供了同样的 错误管理机制,而前者你花费了大量的精力才得以构建。

本章的剩余部分中,我们会假设你非常不幸,无法控制Shop类提供API的具体实现,最终提供给你的API都是同步阻塞式的方法。这也是当你试图使用服务提供的HTTP API时最常发生的情况。你会学到如何以异步的方式查询多个商店,避免被单一的请求所阻塞,并由此提升你的“最佳价格查询器”的性能和吞吐量。

让你的代码免受阻塞之苦

所以, 你已经被要求进行“最佳价格查询器”应用的开发了, 不过你需要查询的所有商 店都如11.2节开始时介绍的那样,只提供了同步API。换句话说,你有一个商家的列表,如下 所示:

1
2
3
4
5
6
        List<Shop> shops = Arrays.asList(
                new Shop("BestPrice"),
                new Shop("LetsSaveBig"),
                new Shop("MyFavoriteShop"),
                new Shop("BuyItAll")
        );

你需要使用下面这样的签名实现一个方法,它接受产品名作为参数,返回一个字符串列表, 这个字符串列表中包括商店的名称、该商店中指定商品的价格:

1
public static List<String> findPrices(List<Shop> shops, String product) {

你的第一个想法可能是使用我们在第4、5、6章中学习的Stream特性。你可能试图写出类似 下面这个清单中的代码(是的,作为第一个方案,如果你想到这些已经相当棒了!)。

代码清单11-8 采用顺序查询所有商店的方式实现的findPrices方法

1
2
3
4
5
    public static List<String> findPrices(List<Shop> shops, String product) {
        return shops.stream()
                .map(shop -> String.format("%s price is %.2f", shop.getName(), shop.getPrice(product)))
                .collect(toList());
    }

完整代码

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
package com.eh.eden.java8.demo;

import lombok.Getter;

import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;

import static com.eh.eden.java8.demo.Demo9.delay;
import static java.util.stream.Collectors.toList;

/**
 * todo
 *
 * @author David Li
 * @create 2020/08/13
 */
public class Demo9 {

    public static void delay() {
        try {
            Thread.sleep(1000L);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    public static void main(String[] args) throws Exception {

        List<Shop> shops = Arrays.asList(
                new Shop("BestPrice"),
                new Shop("LetsSaveBig"),
                new Shop("MyFavoriteShop"),
                new Shop("BuyItAll")
        );

        long start = System.nanoTime();
        System.out.println(findPrices(shops, "myPhone27S"));
        long duration = (System.nanoTime() - start) / 1_000_000;
        System.out.println("Done in " + duration + " msecs");


    }

    public static List<String> findPrices(List<Shop> shops, String product) {
        return shops.stream()
                .map(shop -> String.format("%s price is %.2f", shop.getName(), shop.getPrice(product)))
                .collect(toList());
    }

}

class Shop {

    @Getter
    private String name;

    public Shop(String name) {
        this.name = name;
    }

    public Future<Double> getPriceAsync(String product) {
        return CompletableFuture.supplyAsync(() -> calculatePrice(product));
    }

    public double getPrice(String product) {
        return calculatePrice(product);
    }

    private double calculatePrice(String product) {
        delay();
        Random random = new Random();
        return random.nextDouble() * product.charAt(0) + product.charAt(1);
    }
}

好吧,这段代码看起来非常直白。现在试着用该方法去查询你最近这些天疯狂着迷的唯一产 品(是的,你已经猜到了,它就是myPhone27S)。此外,也请记录下方法的执行时间,通过这 些数据,我们可以比较优化之后的方法会带来多大的性能提升,具体的代码清单如下。

代码清单11-9 验证findPrices的正确性和执行性能

1
2
3
4
        long start = System.nanoTime();
        System.out.println(findPrices(shops, "myPhone27S"));
        long duration = (System.nanoTime() - start) / 1_000_000;
        System.out.println("Done in " + duration + " msecs");

运行结果:

1
2
[BestPrice price is 141.33, LetsSaveBig price is 166.25, MyFavoriteShop price is 150.74, BuyItAll price is 168.52]
Done in 4126 msecs

正如你预期的,findPrices方法的执行时间仅比4秒钟多了那么几毫秒,因为对这4个商店的查询是顺序进行的,并且一个查询操作会阻塞另一个,每一个操作都要花费大约1秒左右的时 间计算请求商品的价格。你怎样才能改进这个结果呢?

使用并行流对请求进行并行操作

读完第7章,你应该想到的第一个,可能也是最快的改善方法是使用并行流来避免顺序计算, 如下所示。

代码清单11-10 对findPrices进行并行操作

1
2
3
4
5
6
    public static List<String> findPrices(List<Shop> shops, String product) {
        // 使用并行流并行地从不同的商店获取价格
        return shops.parallelStream()
                .map(shop -> String.format("%s price is %.2f", shop.getName(), shop.getPrice(product)))
                .collect(toList());
    }

运行代码,与代码清单11-9的执行结果相比较,你发现了新版findPrices的改进了吧。

1
2
[BestPrice price is 135.49, LetsSaveBig price is 200.30, MyFavoriteShop price is 167.76, BuyItAll price is 138.72]
Done in 1134 msecs

相当不错啊!看起来这是个简单但有效的主意:现在对四个不同商店的查询实现了并行,所 以完成所有操作的总耗时只有1秒多一点儿。 你能做得更好吗?让我们尝试使用刚学过的 CompletableFuture,将findPrices方法中对不同商店的同步调用替换为异步调用。

使用CompleteFuture发起异步请求

你已经知道我们可以使用工厂方法supplyAsync创建CompletableFuture对象。让我们把 它利用起来:

1
2
3
4
5
List<CompletableFuture<String>> priceFutures = shops.stream()
                .map(shop ->
                        CompletableFuture.supplyAsync(() ->
                                String.format("%s price is %.2f", shop.getName(), shop.getPrice(product))))
                .collect(toList());

使用这种方式, 你会得到一个List<CompletableFuture<String>>, 列表中的每个 CompletableFuture 对象在计算完成后都包含商店的 String 类型的名称。 但是, 由于你用 CompletableFutures实现的findPrices方法要求返回一个List<String>你需要等待所有的future执行完毕,将其包含的值抽取出来,填充到列表中才能返回。

为了实现这个效果, 你可以向最初的 List<CompletableFuture<String>> 施加第二个 map操作,对List中的所有future对象执行join操作,一个接一个地等待它们运行结束。注意 CompletableFuture 类中的 join 方法和 Future 接口中的 get 有相同的含义, 并且也声明在 Future接口中,它们唯一的不同是 join不会抛出任何检测到的异常。使用它你不再需要使用 try/catch语句块让你传递给第二个map方法的Lambda表达式变得过于臃肿。所有这些整合在一 起,你就可以重新实现findPrices了,具体代码如下。

代码清单11-11 使用CompletableFuture实现findPrices方法

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
    public static List<String> findPrices(List<Shop> shops, String product) {
        // 使用CompletableFuture以异步方式计算每种商品的价格
        List<CompletableFuture<String>> priceFutures = shops.stream()
                .map(shop ->
                        CompletableFuture.supplyAsync(() ->
                                String.format("%s price is %.2f", shop.getName(), shop.getPrice(product))))
                .collect(toList());
        // 等待所有异步操作结束
        return priceFutures.stream().map(CompletableFuture::join).collect(toList());
    }

注意到了吗?这里使用了两个不同的Stream流水线,而不是在同一个处理流的流水线上一 个接一个地放置两个map操作——这其实是有缘由的。考虑流操作之间的延迟特性,如果你在单一流水线中处理流,发向不同商家的请求只能以同步、顺序执行的方式才会成功。因此,每个 CompletableFuture对象只能在前一个操作结束之后创建并执行查询指定商家的动作、通知join 方法返回计算结果。图11-4解释了这些重要的细节。

https://gitee.com/lienhui68/picStore/raw/master/null/20200814130529.png

图11-4的上半部分展示了使用单一流水线处理流的过程,我们看到,执行的流程(以虚线标 识)是顺序的。事实上,新的CompletableFuture对象只有在前一个操作完全结束之后,才能创建。与此相反,图的下半部分展示了如何先将CompletableFutures对象聚集到一个列表中 (即图中以椭圆表示的部分),让对象们可以在等待其他对象完成操作之前就能启动。

运行代码清单11-11中的代码来了解下第三个版本findPrices方法的性能,你会得到下面这 几行输出:

1
2
[BestPrice price is 167.77, LetsSaveBig price is 212.44, MyFavoriteShop price is 134.72, BuyItAll price is 196.71]
Done in 2138 msecs

这个结果让人相当失望,不是吗?超过2秒意味着利用CompletableFuture实现的版本, 比刚开始代码清单11-8中原生顺序执行且会发生阻塞的版本快。但是它的用时也差不多是使用并 行流的前一个版本的两倍。尤其是,考虑到从顺序执行的版本转换到并行流的版本只做了非常小 的改动,就让人更加沮丧。

与此形成鲜明对比的是,我们为采用CompletableFutures完成的新版方法做了大量的工 作!但,这就是全部的真相吗?这种场景下使用CompletableFutures真的是浪费时间吗?或 者我们可能漏掉了某些重要的东西?继续往下探究之前,让我们休息几分钟,尤其是想想你测试代码的机器是否足以以并行方式运行四个线程。

如果你使用的机器足够强大,能以并行方式运行更多的线程(比如说8个线程),那你需要使用更多的商店和并行 进程,才能重现这几页中介绍的行为。

寻找更好的方案

并行流的版本工作得非常好,那是因为它能并行地执行四个任务,所以它几乎能为每个商家 分配一个线程。但是,如果你想要增加第五个商家到商店列表中,让你的“最佳价格查询”应用 对其进行处理,这时会发生什么情况?毫不意外,顺序执行版本的执行还是需要大约5秒多钟的 时间,下面是执行的输出:

https://gitee.com/lienhui68/picStore/raw/master/null/20200814130825.png

非常不幸,并行流版本的程序这次比之前也多消耗了差不多1秒钟的时间,因为可以并行运行(通用线程池中处于可用状态的)的四个线程现在都处于繁忙状态,都在对前4个商店进行查询。第五个查询只能等到前面某一个操作完成释放出空闲线程才能继续(同步()+同步()),它的运行结果如下:

https://gitee.com/lienhui68/picStore/raw/master/null/20200814130924.png

CompletableFuture版本的程序结果如何呢?我们也试着添加第5个商店对其进行了测试, 结果如下:

https://gitee.com/lienhui68/picStore/raw/master/null/20200814131131.png

CompletableFuture版本的程序似乎比并行流版本的程序还快那么一点儿。但是最后这个 版本也不太令人满意。比如,如果你试图让你的代码处理9个商店,并行流版本耗时3143毫秒, 而CompletableFuture版本耗时3009毫秒。它们看起来不相伯仲,究其原因都一样:它们内部采用的是同样的通用线程池, 默认都使用固定数目的线程, 具体线程数取决于 Runtime. getRuntime().availableProcessors()的返回值。然而,CompletableFuture具有一定的 优势,因为它允许你对执行器(Executor)进行配置,尤其是线程池的大小,让它以更适合应用需求的方式进行配置,满足程序的要求,而这是并行流API无法提供的。让我们看看你怎样利用这种配置上的灵活性带来实际应用程序性能上的提升。

使用定制的执行器

就这个主题而言,明智的选择似乎是创建一个配有线程池的执行器,线程池中线程的数目取 决于你预计你的应用需要处理的负荷,但是你该如何选择合适的线程数目呢?

调整线程池的大小

《Java并发编程实战》(http://mng.bz/979c)一书中,Brian Goetz和合著者们为线程池大小的优化提供了不少中肯的建议。这非常重要,如果线程池中线程的数量过多,最终它们会竞争稀缺的处理器和内存资源,浪费大量的时间在上下文切换上。反之,如果线程的数目过少,正如现在所面临的情况,处理器的一些核可能就无法充分利用。Brian Goetz建议,线程池大小与处理器的利用率之比可以使用下面的公式进行估算: $$ N_{threads} = N_{CPU}* U_{CPU} * (1 + W/C) $$ 其中: $N_{CPU}$ 是处理器的核的数目,可以通过Runtime.getRuntime().availableProcessors()得到 $U_{CPU}$ 是期望的CPU利用率(该值应该介于0和1之间) $W/C$ 是等待时间与计算时间的比率

你的应用99%的时间都在等待商店的响应,所以估算出的W/C比率为100。这意味着如果你 期望的CPU利用率是100%,你需要创建一个拥有400个线程的线程池。实际操作中,如果你创建 的线程数比商店的数目更多,反而是一种浪费,因为这样做之后,你线程池中的有些线程根本没 有机会被使用。出于这种考虑,我们建议你将执行器使用的线程数,与你需要查询的商店数目设 定为同一个值,这样每个商店都应该对应一个服务线程。不过,为了避免发生由于商店的数目过多导致服务器超负荷而崩溃,你还是需要设置一个上限,比如100个线程。代码清单如下所示。

代码清单11-12 为“最优价格查询器”应用定制的执行器

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
    static List<Shop> shops = Arrays.asList(
            new Shop("BestPrice"),
            new Shop("LetsSaveBig"),
            new Shop("MyFavoriteShop"),
            new Shop("BuyItAll"),
            new Shop("5thShop")
    );

    // 创建一个线程池, 线程池中线程的数目为100 和商店数目二者中较小 的一个值
    private static final Executor executor = Executors.newFixedThreadPool(
            Math.min(shops.size(), 100),
            // 使用守护线程——这种方式不会阻止程序的关停
            r -> {
                Thread t = new Thread(r);
                t.setDaemon(true);
                return t;
            });

注意,你现在正创建的是一个由守护线程构成的线程池。Java程序无法终止或者退出一个正在运行中的用户线程,所以最后剩下的那个用户线程会由于一直等待无法发生的事件而引发问题。与此相反,如果将线程标记为守护进程,意味着程序退出时它也会被回收。这二者之间没有性能上的差 异。现在,你可以将执行器作为第二个参数传递给supplyAsync工厂方法了。比如,你现在可 以按照下面的方式创建一个可查询指定商品价格的CompletableFuture对象:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
public static List<String> findPrices(List<Shop> shops, String product) {


        // 使用CompletableFuture以异步方式计算每种商品的价格
        List<CompletableFuture<String>> priceFutures = shops.stream()
                .map(shop ->
                        CompletableFuture.supplyAsync(() ->
                                String.format("%s price is %.2f", shop.getName(), shop.getPrice(product)), executor))
                .collect(toList());
        // 等待所有异步操作结束
        return priceFutures.stream().map(CompletableFuture::join).collect(toList());
    }
// 运行结果

改进之后,使用CompletableFuture方案的程序处理5个商店仅耗时1021秒,处理9个商店 时耗时1022秒。一般而言,这种状态会一直持续,直到商店的数目达到我们之前计算的阈值400(并不准确,需要实际测算)。 这个例子证明了要创建更适合你的应用特性的执行器,利用CompletableFutures向其提交任 务执行是个不错的主意。处理需大量使用异步操作的情况时,这几乎是最有效的策略。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
    static final int SHOP_SIZE  = 1200;
    static List<Shop> shops = IntStream.rangeClosed(1, SHOP_SIZE).boxed().map(i -> new Shop(i + "")).collect(toList());

    // 创建一个线程池, 线程池中线程的数目为100 和商店数目二者中较小 的一个值
    private static final Executor executor = Executors.newFixedThreadPool(
            Math.min(shops.size(), 400),
            // 使用守护线程——这种方式不会阻止程序的关停
            r -> {
                Thread t = new Thread(r);
                t.setDaemon(true);
                return t;
            });

完整代码:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
package com.eh.eden.java8.demo;

import lombok.Getter;

import java.util.List;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.stream.IntStream;

import static com.eh.eden.java8.demo.Demo9.delay;
import static java.util.stream.Collectors.toList;

/**
 * todo
 *
 * @author David Li
 * @create 2020/08/13
 */
public class Demo9 {

    public static void delay() {
        try {
            Thread.sleep(1000L);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }


    public static void main(String[] args) throws Exception {


        long start = System.nanoTime();
        System.out.println(findPrices(shops, "myPhone27S"));
        long duration = (System.nanoTime() - start) / 1_000_000;
        System.out.println("Done in " + duration + " msecs");


    }


    static final int SHOP_SIZE  = 1200;
    static List<Shop> shops = IntStream.rangeClosed(1, SHOP_SIZE).boxed().map(i -> new Shop(i + "")).collect(toList());

    // 创建一个线程池, 线程池中线程的数目为100 和商店数目二者中较小 的一个值
    private static final Executor executor = Executors.newFixedThreadPool(
            Math.min(shops.size(), 1200),
            // 使用守护线程——这种方式不会阻止程序的关停
            r -> {
                Thread t = new Thread(r);
                t.setDaemon(true);
                return t;
            });

    public static List<String> findPrices(List<Shop> shops, String product) {


        // 使用CompletableFuture以异步方式计算每种商品的价格
        List<CompletableFuture<String>> priceFutures = shops.stream()
                .map(shop ->
                        CompletableFuture.supplyAsync(() ->
                                String.format("%s price is %.2f", shop.getName(), shop.getPrice(product)), executor))
                .collect(toList());
        // 等待所有异步操作结束
        return priceFutures.stream().map(CompletableFuture::join).collect(toList());
    }

}

class Shop {

    @Getter
    private String name;

    public Shop(String name) {
        this.name = name;
    }

    public Future<Double> getPriceAsync(String product) {
        return CompletableFuture.supplyAsync(() -> calculatePrice(product));
    }

    public double getPrice(String product) {
        return calculatePrice(product);
    }

    private double calculatePrice(String product) {
        delay();
        Random random = new Random();
        return random.nextDouble() * product.charAt(0) + product.charAt(1);
    }
}

在线程数量小于阈值时,执行时间与商店数量成正比,比如商店数量400 执行时间约1s, 商店数量800,执行时间约2s,商店数量800,执行时间约3s。

并行——使用流还是CompletableFutures?

目前为止,你已经知道对集合进行并行计算有两种方式:要么将其转化为并行流,利用map这样的操作开展工作,要么枚举出集合中的每一个元素,创建新的线程,在CompletableFuture内对其进行操作。后者提供了更多的灵活性,你可以调整线程池的大小,而这能帮助你确保整体的计算不会因为线程都在等待I/O而发生阻塞。

我们对使用这些API的建议如下。

  • 如果你进行的是计算密集型的操作,并且没有I/O,那么推荐使用Stream接口,因为实现简单,同时效率也可能是最高的(如果所有的线程都是计算密集型的,那就没有必要创建比处理器核数更多的线程)。
  • 如果你并行的工作单元还涉及等待I/O的操作(包括网络连接等待),那么使用CompletableFuture灵活性更好,你可以像前文讨论的那样,依据等待/计算,或者W/C的比率设定需要使用的线程数。这种情况不使用并行流的另一个原因是,处理流的流水线中如果发生I/O等待,流的延迟特性会让我们很难判断到底什么时候触发了等待。

现在你已经了解了如何利用CompletableFuture为你的用户提供异步API,以及如何将一 个同步又缓慢的服务转换为异步的服务。不过到目前为止,我们每个Future中进行的都是单次 的操作。下一节中,你会看到如何将多个异步操作结合在一起,以流水线的方式运行,从描述形 式上,它与你在前面学习的Stream API有几分类似。

对多个异步任务进行流水线操作

让我们假设所有的商店都同意使用一个集中式的折扣服务。该折扣服务提供了五个不同的折扣代码,每个折扣代码对应不同的折扣率。你使用一个枚举型变量Discount.Code来实现这一想法,具体代码如下所示。

代码清单11-13 以枚举类型定义的折扣代码

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
class Discount {

    public enum Code {
        NONE(0), SILVER(5), GOLD(10), PLATINUM(15), DIAMOND(20);

        private final int percentage;

        Code(int percentage) {
            this.percentage = percentage;
        }

    } 
  	// Discount类的具体实现这里暂且不表示,参见代码清单11-14
}

我们还假设所有的商店都同意修改 getPrice 方法的返回格式。 getPrice 现在以 Shop- Name:price:DiscountCode的格式返回一个String类型的值。我们的示例实现中会返回一个 随机生成的Discount.Code,以及已经计算得出的随机价格:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
public String getPrice(String product) {
        Random random = new Random();
        double price = calculatePrice(product);
        Discount.Code code = Discount.Code.values()[random.nextInt(Discount.Code.values().length)];
        return String.format("%s:%.2f:%s", name, price, code);
    }

    private double calculatePrice(String product) {
        delay();
        Random random = new Random();
        return random.nextDouble() * product.charAt(0) + product.charAt(1);
    }

调用getPrice方法可能会返回像下面这样一个String值:

BestPrice:123.26:GOLD

实现折扣服务

你的“最佳价格查询器”应用现在能从不同的商店取得商品价格,解析结果字符串,针对每 个字符串,查询折扣服务取的折扣代码。这个流程决定了请求商品的最终折扣价格(每个折扣 代码的实际折扣比率有可能发生变化,所以你每次都需要查询折扣服务)。我们已经将对商店返 回字符串的解析操作封装到了下面的Quote类之中:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
class Quote {
    @Getter
    private final String shopName;
    @Getter
    private final double price;
    @Getter
    private final Discount.Code discountCode;

    public Quote(String shopName, double price, Discount.Code code) {
        this.shopName = shopName;
        this.price = price;
        this.discountCode = code;
    }

    public static Quote parse(String s) {
        String[] split = s.split(":");
        String shopName = split[0];
        double price = Double.parseDouble(split[1]);
        Discount.Code discountCode = Discount.Code.valueOf(split[2]);
        return new Quote(shopName, price, discountCode);
    }
}

通过传递shop对象返回的字符串给静态工厂方法parse,你可以得到Quote类的一个实例, 它包含了shop的名称、折扣之前的价格,以及折扣代码。

Discount服务还提供了一个applyDiscount方法,它接收一个Quote对象,返回一个字符串,表示生成该Quote的shop中的折扣价格,代码如下所示。

代码清单11-14 Discount服务

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
class Discount {

    public enum Code {
        NONE(0), SILVER(5), GOLD(10), PLATINUM(15), DIAMOND(20);

        private final int percentage;

        Code(int percentage) {
            this.percentage = percentage;
        }
    }

    // 将折扣代码应 用于商品最初 的原始价格
    public static String applyDiscount(Quote quote) {
        return quote.getShopName() + " price is " + Discount.apply(quote.getPrice(), quote.getDiscountCode());
    }

    // 模 拟 Discount 服务响应的延迟
    private static double apply(double price, Code code) {
        delay();
        return price * (100 - code.percentage) / 100;
    }

}

使用折扣服务

和在 11.3节中一样,首先尝试以最直接的方式(坏消息是,这种方式是顺序而且同步执行的)重新实 现findPrices,以满足这些新增的需求。

代码清单11-15 以最简单的方式实现使用Discount服务的findPrices方法

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
    public static List<String> findPrices(List<Shop> shops, String product) {
        return shops.stream()
                // 取得每个 shop 对象 中商品的原始价格
                .map(shop -> shop.getPrice(product))
                // 在 Quote 对 象 中 对 shop 返回的字 符串进行转换
                .map(Quote::parse)
                // 联系 Discount 服 务,为每个Quote 申请折扣
                .map(Discount::applyDiscount)
                .collect(toList());
    }

通过在shop构成的流上采用流水线方式执行三次map操作,我们得到了期望的结果。

  1. 第一个操作将每个shop对象转换成了一个字符串,该字符串包含了该 shop中指定商品的 价格和折扣代码。
  2. 第二个操作对这些字符串进行了解析,在Quote对象中对它们进行转换。
  3. 最终,第三个map会操作联系远程的Discount服务,计算出最终的折扣价格,并返回该 价格及提供该价格商品的shop。

你可能已经猜到,这种实现方式的性能远非最优,不过我们还是应该测量一下。跟之前一样, 通过运行基准测试,我们得到下面的数据:

1
2
[1 price is 141.49, 2 price is 134.304, 3 price is 145.6635, 4 price is 127.63799999999999, 5 price is 151.436]
Done in 10061 msecs

完整代码:

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
package com.eh.eden.java8.demo;

import lombok.Getter;

import java.util.List;
import java.util.Random;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.stream.IntStream;

import static com.eh.eden.java8.demo.Demo9.delay;
import static java.util.stream.Collectors.toList;

/**
 * todo
 *
 * @author David Li
 * @create 2020/08/13
 */
public class Demo9 {

    public static void delay() {
        try {
            Thread.sleep(1000L);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }


    public static void main(String[] args) throws Exception {


        long start = System.nanoTime();
        System.out.println(findPrices(shops, "myPhone27S"));
        long duration = (System.nanoTime() - start) / 1_000_000;
        System.out.println("Done in " + duration + " msecs");


    }


    static final int SHOP_SIZE = 5;
    static List<Shop> shops = IntStream.rangeClosed(1, SHOP_SIZE).boxed().map(i -> new Shop(i + "")).collect(toList());

    // 创建一个线程池, 线程池中线程的数目为100 和商店数目二者中较小 的一个值
    private static final Executor executor = Executors.newFixedThreadPool(
            Math.min(shops.size(), 1200),
            // 使用守护线程——这种方式不会阻止程序的关停
            r -> {
                Thread t = new Thread(r);
                t.setDaemon(true);
                return t;
            });

    public static List<String> findPrices(List<Shop> shops, String product) {
        return shops.stream()
                // 取得每个 shop 对象 中商品的原始价格
                .map(shop -> shop.getPrice(product))
                // 在 Quote 对 象 中 对 shop 返回的字 符串进行转换
                .map(Quote::parse)
                // 联系 Discount 服 务,为每个Quote 申请折扣
                .map(Discount::applyDiscount)
                .collect(toList());
    }

}

class Shop {

    @Getter
    private String name;

    public Shop(String name) {
        this.name = name;
    }

    public String getPrice(String product) {
        Random random = new Random();
        double price = calculatePrice(product);
        Discount.Code code = Discount.Code.values()[random.nextInt(Discount.Code.values().length)];
        return String.format("%s:%.2f:%s", name, price, code);
    }

    private double calculatePrice(String product) {
        delay();
        Random random = new Random();
        return random.nextDouble() * product.charAt(0) + product.charAt(1);
    }
}

class Discount {

    public enum Code {
        NONE(0), SILVER(5), GOLD(10), PLATINUM(15), DIAMOND(20);

        private final int percentage;

        Code(int percentage) {
            this.percentage = percentage;
        }
    }

    // 将折扣代码应 用于商品最初 的原始价格
    public static String applyDiscount(Quote quote) {
        return quote.getShopName() + " price is " + Discount.apply(quote.getPrice(), quote.getDiscountCode());
    }

    // 模 拟 Discount 服务响应的延迟
    private static double apply(double price, Code code) {
        delay();
        return price * (100 - code.percentage) / 100;
    }

}

class Quote {
    @Getter
    private final String shopName;
    @Getter
    private final double price;
    @Getter
    private final Discount.Code discountCode;

    public Quote(String shopName, double price, Discount.Code code) {
        this.shopName = shopName;
        this.price = price;
        this.discountCode = code;
    }

    public static Quote parse(String s) {
        String[] split = s.split(":");
        String shopName = split[0];
        double price = Double.parseDouble(split[1]);
        Discount.Code discountCode = Discount.Code.valueOf(split[2]);
        return new Quote(shopName, price, discountCode);
    }
}

毫无意外,这次执行耗时10秒,因为顺序查询5个商店耗时大约5秒,现在又加上了Discount 服务为5个商店返回的价格申请折扣所消耗的5秒钟。你已经知道,把流转换为并行流的方式,非 常容易提升该程序的性能。不过,通过11.3节的介绍,你也知道这一方案在商店的数目增加时, 扩展性不好,因为Stream底层依赖的是线程数量固定的通用线程池。相反,你也知道,如果自 定义CompletableFutures调度任务执行的执行器能够更充分地利用CPU资源。

构造同步和异步操作

让我们再次使用CompletableFuture提供的特性,以异步方式重新实现findPrices方法。 详细代码如下所示。如果你发现有些内容不太熟悉,不用太担心,我们很快会进行针对性的介绍。

代码清单11-16 使用CompletableFuture实现findPrices方法

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
public static List<String> findPrices(List<Shop> shops, String product) {
        List<CompletableFuture<String>> priceFutures = shops.stream()
                // 以异步方式取得每个 shop 中指定产品的原始价格
                .map(shop ->
                        CompletableFuture.supplyAsync(() ->
                                shop.getPrice(product), executor))
                // Quote对象存在时,对
                .map(future -> future.thenApply(Quote::parse))
                // 使用另一个异步任务构造期望的 Future ,望的 Future ,其返回的值进行转换
                .map(future ->
                        future.thenCompose(quote ->
                                CompletableFuture.supplyAsync(() ->
                                        Discount.applyDiscount(quote), executor)))
                .collect(toList());

        return priceFutures.stream()
                // 等待流中的所有Future执行完毕,并提取各自的返回值
                .map(CompletableFuture::join).collect(toList());
    }
// 运行结果
[shop-1 price is 165.8, shop-2 price is 119.1955, shop-3 price is 179.07, shop-4 price is 170.02, shop-5 price is 134.6625]
Done in 2021 msecs

完整代码:

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
package com.eh.eden.java8.demo;

import lombok.Getter;

import java.util.List;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.stream.IntStream;

import static com.eh.eden.java8.demo.Demo9.delay;
import static java.util.stream.Collectors.toList;

/**
 * todo
 *
 * @author David Li
 * @create 2020/08/13
 */
public class Demo9 {

    public static void delay() {
        try {
            Thread.sleep(1000L);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }


    public static void main(String[] args) throws Exception {


        long start = System.nanoTime();
        System.out.println(findPrices(shops, "myPhone27S"));
        long duration = (System.nanoTime() - start) / 1_000_000;
        System.out.println("Done in " + duration + " msecs");


    }


    static final int SHOP_SIZE = 5;
    static List<Shop> shops = IntStream.rangeClosed(1, SHOP_SIZE).boxed().map(i -> new Shop(String.format("shop-%d", i))).collect(toList());

    // 创建一个线程池, 线程池中线程的数目为100 和商店数目二者中较小 的一个值
    private static final Executor executor = Executors.newFixedThreadPool(
            Math.min(shops.size(), 1200),
            // 使用守护线程——这种方式不会阻止程序的关停
            r -> {
                Thread t = new Thread(r);
                t.setDaemon(true);
                return t;
            });

    public static List<String> findPrices(List<Shop> shops, String product) {
        List<CompletableFuture<String>> priceFutures = shops.stream()
                // 以异步方式取得每个 shop 中指定产品的原始价格
                .map(shop ->
                        CompletableFuture.supplyAsync(() ->
                                shop.getPrice(product), executor))
                // Quote对象存在时,对
                .map(future -> future.thenApply(Quote::parse))
                // 使用另一个异步任务构造期望的 Future ,望的 Future ,其返回的值进行转换
                .map(future ->
                        future.thenCompose(quote ->
                                CompletableFuture.supplyAsync(() ->
                                        Discount.applyDiscount(quote), executor)))
                .collect(toList());

        return priceFutures.stream()
                // 等待流中的所有Future执行完毕,并提取各自的返回值
                .map(CompletableFuture::join).collect(toList());
    }

}

class Shop {

    @Getter
    private String name;

    public Shop(String name) {
        this.name = name;
    }

    public String getPrice(String product) {
        Random random = new Random();
        double price = calculatePrice(product);
        Discount.Code code = Discount.Code.values()[random.nextInt(Discount.Code.values().length)];
        return String.format("%s:%.2f:%s", name, price, code);
    }

    private double calculatePrice(String product) {
        delay();
        Random random = new Random();
        return random.nextDouble() * product.charAt(0) + product.charAt(1);
    }
}

class Discount {

    public enum Code {
        NONE(0), SILVER(5), GOLD(10), PLATINUM(15), DIAMOND(20);

        private final int percentage;

        Code(int percentage) {
            this.percentage = percentage;
        }
    }

    // 将折扣代码应 用于商品最初 的原始价格
    public static String applyDiscount(Quote quote) {
        return quote.getShopName() + " price is " + Discount.apply(quote.getPrice(), quote.getDiscountCode());
    }

    // 模 拟 Discount 服务响应的延迟
    private static double apply(double price, Code code) {
        delay();
        return price * (100 - code.percentage) / 100;
    }

}

class Quote {
    @Getter
    private final String shopName;
    @Getter
    private final double price;
    @Getter
    private final Discount.Code discountCode;

    public Quote(String shopName, double price, Discount.Code code) {
        this.shopName = shopName;
        this.price = price;
        this.discountCode = code;
    }

    public static Quote parse(String s) {
        String[] split = s.split(":");
        String shopName = split[0];
        double price = Double.parseDouble(split[1]);
        Discount.Code discountCode = Discount.Code.valueOf(split[2]);
        return new Quote(shopName, price, discountCode);
    }
}

这一次,事情看起来变得更加复杂了,所以让我们一步一步地理解到底发生了什么。这三次 转换的流程如图11-5所示。

https://gitee.com/lienhui68/picStore/raw/master/null/20200814144454.png

你所进行的这三次 map 操作和代码清单11-5中的同步方案没有太大的区别, 不过你使用 CompletableFuture类提供的特性,在需要的地方把它们变成了异步操作。

  1. 获取价格

    这三个操作中的第一个你已经在本章的各个例子中见过很多次,只需要将Lambda表达式作 为参数传递给supplyAsync工厂方法就可以以异步方式对shop进行查询。第一个转换的结果是 一个Stream<CompletableFuture<String>>,一旦运行结束,每个CompletableFuture对 象中都会包含对应shop返回的字符串。注意,你对CompletableFuture进行了设置,用代码清 单11-12中的方法向其传递了一个订制的执行器Executor。

  2. 解析报价

    现在你需要进行第二次转换将字符串转变为订单。由于一般情况下解析操作不涉及任何远程 服务,也不会进行任何I/O操作,它几乎可以在第一时间进行,所以能够采用同步操作,不会带 来太多的延迟。由于这个原因,你可以对第一步中生成的 CompletableFuture 对象调用它的 thenApply,将一个由字符串转换Quote的方法作为参数传递给它。

    注意到了吗?直到你调用的CompletableFuture执行结束,使用的thenApply方法都不会 阻塞你代码的执行。这意味着CompletableFuture最终结束运行时,你希望传递Lambda表达式 给 thenApply 方法, 将 Stream 中的每个 CompletableFuture<String>对象转换为对应的 CompletableFuture<Quote>对象。你可以把这看成是为处理CompletableFuture的结果建立了一个菜单,就像你曾经为Stream的流水线所做的事儿一样。

  3. 为计算折扣构造Future

    第三个map操作涉及联系远程的Discount服务,为从商店中得到的原始价格申请折扣率。 这一转换与前一个转换又不大一样,因为这一转换需要远程执行(或者,就这个例子而言,它需 要模拟远程调用带来的延迟),出于这一原因,你也希望它能够异步执行。

    为了实现这一目标,你像第一个调用传递 getPrice 给 supplyAsync 那样,将这一操作以 Lambda表达式的方式传递给了supplyAsync工厂方法,该方法最终会返回另一个CompletableFuture对象。到目前为止,你已经进行了两次异步操作,用了两个不同的CompletableFutures 对象进行建模,你希望能把它们以级联的方式串接起来进行工作

    1. 从shop对象中获取价格,接着把价格转换为Quote。
    2. 拿到返回的Quote对象,将其作为参数传递给Discount服务,取得最终的折扣价格。

Java 8的 CompletableFuture API提供了名为thenCompose的方法,它就是专门为这一目的而设计的,thenCompose方法允许你对两个异步操作进行流水线,第一个操作完成时,将其 结果作为参数传递给第二个操作。换句话说,你可以创建两个CompletableFutures对象,对 第 一 个 CompletableFuture 对 象 调 用 thenCompose , 并 向 其 传 递 一 个 函 数 。 当 第 一 个 CompletableFuture执行完毕后,它的结果将作为该函数的参数,这个函数的返回值是以第一 个CompletableFuture的返回做输入计算出的第二个CompletableFuture对象。使用这种方 式,即使Future在向不同的商店收集报价,主线程还是能继续执行其他重要的操作,比如响应 UI事件。

将这三次map操作的返回的Stream元素收集到一个列表,你就得到了一个List<CompletableFuture<String>>,等这些CompletableFuture对象最终执行完毕,你就可以像代码清 单11-11中那样利用join取得它们的返回值。

你在代码清单11-16中使用的thenCompose方法像CompletableFuture类中的其他方法一 样,也提供了一个以Async后缀结尾的版本thenComposeAsync。通常而言,名称中不带Async 的方法和它的前一个任务一样,在同一个线程中运行;而名称以Async结尾的方法会将后续的任 务提交到一个线程池, 所以每个任务是由不同的线程处理的。 就这个例子而言, 第二个 CompletableFuture对象的结果取决于第一个CompletableFuture,所以无论你使用哪个版 本的方法来处理CompletableFuture对象,对于最终的结果,或者大致的时间而言都没有多少 差别。我们选择thenCompose方法的原因是因为它更高效一些,因为少了很多线程切换的开销。

将两个CompletableFuture对象整合起来,无论它们是否存在依赖

代码清单11-16中,你对一个CompletableFuture对象调用了thenCompose方法,并向其传 递 了 第 二 个 CompletableFuture , 而 第 二 个 CompletableFuture 又 需 要 使 用 第 一 个 CompletableFuture的执行结果作为输入。但是,另一种比较常见的情况是,你需要将两个完 全不相干的CompletableFuture对象的结果整合起来,而且你也不希望等到第一个任务完全结 束才开始第二项任务。

这种情况,你应该使用thenCombine方法,它接收名为BiFunction的第二参数,这个参数 定义了当两个CompletableFuture对象完成计算后,结果如何合并。同thenCompose方法一样, thenCombine 方法也提供有一个 Async 的版本。 这里, 如果使用 thenCombineAsync 会导致 BiFunction中定义的合并操作被提交到线程池中,由另一个任务以异步的方式执行。

1
2
3
    public <U,V> CompletionStage<V> thenCombine
        (CompletionStage<? extends U> other,
         BiFunction<? super T,? super U,? extends V> fn);

回到我们正在运行的这个例子,你知道,有一家商店提供的价格是以欧元(EUR)计价的, 但是你希望以美元的方式提供给你的客户。你可以用异步的方式向商店查询指定商品的价格,同 时从远程的汇率服务那里查到欧元和美元之间的汇率。当二者都结束时,再将这两个结果结合起 来,用返回的商品价格乘以当时的汇率,得到以美元计价的商品价格。用这种方式,你需要使用第 三 个 CompletableFuture 对 象 , 当 前 两 个 CompletableFuture 计 算 出 结 果 , 并 由 BiFunction方法完成合并后,由它来最终结束这一任务,代码清单如下所示。

代码清单11-17 合并两个独立的CompletableFuture对象

https://gitee.com/lienhui68/picStore/raw/master/null/20200814145937.png

这里整合的操作只是简单的乘法操作,用另一个单独的任务对其进行操作有些浪费资源,所 以你只要使用thenCombine方法,无需特别求助于异步版本的thenCombineAsync方法。图11-6 展示了代码清单11-17中创建的多个任务是如何在线程池中选择不同的线程执行的,以及它们最 终的运行结果又是如何整合的。

https://gitee.com/lienhui68/picStore/raw/master/null/20200814150254.png

对Future和CompletableFuture的回顾

前文介绍的最后两个例子,即代码清单11-16和代码清单11-17,非常清晰地呈现了相对于采 用 Java 8 之前 提 供的 Future 实 现 , CompletableFuture 版 本 实 现所 具备 的 巨大 优势 。 CompletableFuture利用Lambda表达式以声明式的API提供了一种机制,能够用最有效的方式, 非常容易地将多个以同步或异步方式执行复杂操作的任务结合到一起。为了更直观地感受一下使 用CompletableFuture在代码可读性上带来的巨大提升,你可以尝试仅使用Java 7中提供的特 性,重新实现代码清单11-17的功能。代码清单11-18展示了如何实现这一效果。

https://gitee.com/lienhui68/picStore/raw/master/null/20200814150429.png

响应 CompletableFuture 的 completion 事件

本章你看到的所有示例代码都是通过在响应之前添加1秒钟的等待延迟模拟方法的远程调用。毫无疑问,现实世界中,你的应用访问各个远程服务时很可能遭遇无法预知的延迟,触发的原因多种多样,从服务器的负荷到网络的延迟,有些甚至是源于远程服务如何评估你应用的商业价值,即可能相对于其他的应用,你的应用每次查询的消耗时间更长。

由于这些原因,你希望购买的商品在某些商店的查询速度要比另一些商店更快。为了说明本章的内容,我们以下面的代码清单为例,使用randomDelay方法取代原来的固定延迟。

代码清单11-19 一个模拟生成0.5秒至2.5秒随机延迟的方法

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
    private static final Random random = new Random();

    public static void randomDelay() {
        int delay = 500 + random.nextInt(2000);
        try {
            Thread.sleep(delay);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

目前为止,你实现的findPrices方法只有在取得所有商店的返回值时才显示商品的价格。 而你希望的效果是,只要有商店返回商品价格就在第一时间显示返回值,不再等待那些还未返回 的商店(有些甚至会发生超时)。你如何实现这种更进一步的改进要求呢?

对最佳价格查询器应用的优化

你要避免的首要问题是,等待创建一个包含了所有价格的List创建完成。你应该做的是直接处理CompletableFuture流,这样每个CompletableFuture都在为某个商店执行必要的操作。为了实现这一目标,在下面的代码清单中,你会对代码清单11-12中代码实现的第一部分进 行重构,实现findPricesStream方法来生成一个由CompletableFuture构成的流。

代码清单11-20 重构findPrices方法返回一个由Future构成的流

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
public static Stream<CompletableFuture<String>> findPricesStream(List<Shop> shops, String product) {
        return shops.stream()
                // 以异步方式取得每个 shop 中指定产品的原始价格
                .map(shop ->
                        CompletableFuture.supplyAsync(() ->
                                shop.getPrice(product), executor))
                // Quote对象存在时,对
                .map(future -> future.thenApply(Quote::parse))
                // 使用另一个异步任务构造期望的 Future ,望的 Future ,其返回的值进行转换
                .map(future ->
                        future.thenCompose(quote ->
                                CompletableFuture.supplyAsync(() ->
                                        Discount.applyDiscount(quote), executor)));

    }

现在,你为findPricesStream方法返回的Stream添加了第四个map操作,在此之前,你 已 经 在 该 方 法 内 部 调 用 了 三 次 map 。 这 个 新 添 加 的 操 作 其 实 很 简 单 , 只是在每个CompletableFuture上注册一个操作,该操作会在CompletableFuture完成执行后使用它的 返 回 值 。 Java 8 的 CompletableFuture 通 过 thenAccept 方 法 提 供 了 这 一功 能 , 它 接收 CompletableFuture执行完毕后的返回值做参数。在这里的例子中,该值是由Discount服务 返回的字符串值,它包含了提供请求商品的商店名称及折扣价格,你想要做的操作也很简单,只 是将结果打印输出:

1
findPricesStream(shops, "myPhone").map(f -> f.thenAccept(System.out::println));

注意,和你之前看到的thenCompose和thenCombine方法一样,thenAccept方法也提供 了一个异步版本,名为thenAcceptAsync。异步版本的方法会对处理结果的消费者进行调度, 从线程池中选择一个新的线程继续执行,不再由同一个线程完成CompletableFuture的所有任 务。因为你想要避免不必要的上下文切换,更重要的是你希望避免在等待线程上浪费时间,尽快 响应CompletableFuture的completion事件,所以这里没有采用异步版本。

由于 thenAccept 方法已经定义了如何处理 CompletableFuture 返回的结果, 一旦 CompletableFuture计算得到结果,它就返回一个CompletableFuture<Void>所以,map 操作返回的是一个 Stream<CompletableFuture<Void>> 。 对这个 <CompletableFuture- <Void>>对象,你能做的事非常有限,只能等待其运行结束,不过这也是你所期望的。你还希望能给最慢的商店一些机会,让它有机会打印输出返回的价格。为了实现这一目的,你可以把构成 Stream的所有CompletableFuture<Void>对象放到一个数组中,等待所有的任务执行完成, 代码如下所示。

代码清单11-21 响应CompletableFuture的completion事件

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
    public static void main(String[] args) throws Exception {
        CompletableFuture[] futures = findPricesStream(shops, "myPhone")
                .map(f -> f.thenAccept(System.out::println))
                .toArray(size -> new CompletableFuture[size]);
        CompletableFuture.allOf(futures).join();
    }
// 运行结果
shop-1 price is 116.14
shop-4 price is 139.74
shop-5 price is 227.18
shop-3 price is 121.81
shop-2 price is 194.61
1
<A> A[] toArray(IntFunction<A[]> generator);
1
2
3
4
@FunctionalInterface
public interface IntFunction<R> {
    R apply(int value);
}

allOf工厂方法接收一个由CompletableFuture构成的数组,数组中的所有Completable- Future对象执行完成之后,它返回一个CompletableFuture<Void>对象。这意味着,如果你需 要 等 待 最 初 Stream 中 的 所 有 CompletableFuture 对 象 执 行 完 毕 , 对 allOf 方 法 返 回 的 CompletableFuture执行join操作是个不错的主意。这个方法对“最佳价格查询器”应用也是 有用的,因为你的用户可能会困惑是否后面还有一些价格没有返回,使用这个方法,你可以在执 行完毕之后打印输出一条消息“All shops returned results or timed out”。

然而在另一些场景中,你可能希望只要CompletableFuture对象数组中有任何一个执行完 毕就不再等待,比如,你正在查询两个汇率服务器,任何一个返回了结果都能满足你的需求。在 这种情况下,你可以使用一个类似的工厂方法anyOf。该方法接收一个CompletableFuture对象 构成的数组,返回由第一个执行完毕的CompletableFuture对象的返回值构成的CompletableFuture<Object>

运行结果:

1
shop-5 price is 193.26

付诸实践

正如我们在本节开篇所讨论的,现在你可以通过代码清单11-19中的randomDelay方法模拟 远程方法调用,产生一个介于0.5秒到2.5秒的随机延迟,不再使用恒定1秒的延迟值。代码清单 11-21应用了这一改变,执行这段代码你会看到不同商店的价格不再像之前那样总是在一个时刻 返回,而是随着商店折扣价格返回的顺序逐一地打印输出。为了让这一改变的效果更加明显,我 们对代码进行了微调,在输出中打印每个价格计算所消耗的时间:

1
2
3
4
5
6
7
8
public static void main(String[] args) throws Exception {
        long start = System.nanoTime();
        CompletableFuture[] futures = findPricesStream(shops, "myPhone")
                .map(f -> f.thenAccept(s ->
                        System.out.println(s + " (done in " + (System.nanoTime() - start) / 1000_000 + " msecs)")))
                .toArray(size -> new CompletableFuture[size]);
        CompletableFuture.allOf(futures).join();
    }

运行这段代码所产生的输出如下:

1
2
3
4
5
shop-1 price is 158.02 (done in 2064 msecs)
shop-5 price is 145.63 (done in 2939 msecs)
shop-3 price is 184.07 (done in 2965 msecs)
shop-4 price is 104.87 (done in 4074 msecs)
shop-2 price is 137.23 (done in 4538 msecs)

我们看到,由于随机延迟的效果,第一次价格查询比最慢的查询要快两倍多。

完整代码:

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
package com.eh.eden.java8.demo;

import lombok.Getter;

import java.math.BigDecimal;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.stream.IntStream;
import java.util.stream.Stream;

import static com.eh.eden.java8.demo.Demo9.randomDelay;
import static java.math.BigDecimal.ROUND_HALF_EVEN;
import static java.util.stream.Collectors.toList;

/**
 * todo
 *
 * @author David Li
 * @create 2020/08/13
 */
public class Demo9 {

    private static final Random random = new Random();

    public static void randomDelay() {
        int delay = 500 + random.nextInt(2000);
        try {
            Thread.sleep(delay);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }


    public static void main(String[] args) throws Exception {
        long start = System.nanoTime();
        CompletableFuture[] futures = findPricesStream(shops, "myPhone")
                .map(f -> f.thenAccept(s ->
                        System.out.println(s + " (done in " + (System.nanoTime() - start) / 1000_000 + " msecs)")))
                .toArray(size -> new CompletableFuture[size]);
        CompletableFuture.allOf(futures).join();
    }


    static final int SHOP_SIZE = 5;
    static List<Shop> shops = IntStream.rangeClosed(1, SHOP_SIZE).boxed().map(i -> new Shop(String.format("shop-%d", i))).collect(toList());

    // 创建一个线程池, 线程池中线程的数目为100 和商店数目二者中较小 的一个值
    private static final Executor executor = Executors.newFixedThreadPool(
            Math.min(shops.size(), 1200),
            // 使用守护线程——这种方式不会阻止程序的关停
            r -> {
                Thread t = new Thread(r);
                t.setDaemon(true);
                return t;
            });

    public static Stream<CompletableFuture<String>> findPricesStream(List<Shop> shops, String product) {
        return shops.stream()
                // 以异步方式取得每个 shop 中指定产品的原始价格
                .map(shop ->
                        CompletableFuture.supplyAsync(() ->
                                shop.getPrice(product), executor))
                // Quote对象存在时,对
                .map(future -> future.thenApply(Quote::parse))
                // 使用另一个异步任务构造期望的 Future ,望的 Future ,其返回的值进行转换
                .map(future ->
                        future.thenCompose(quote ->
                                CompletableFuture.supplyAsync(() ->
                                        Discount.applyDiscount(quote), executor)));

    }

}

class Shop {

    @Getter
    private String name;

    public Shop(String name) {
        this.name = name;
    }

    public String getPrice(String product) {
        Random random = new Random();
        double price = calculatePrice(product);
        Discount.Code code = Discount.Code.values()[random.nextInt(Discount.Code.values().length)];
        return String.format("%s:%.2f:%s", name, price, code);
    }

    private double calculatePrice(String product) {
        randomDelay();
        Random random = new Random();
        return random.nextDouble() * product.charAt(0) + product.charAt(1);
    }
}

class Discount {

    public enum Code {
        NONE(0), SILVER(5), GOLD(10), PLATINUM(15), DIAMOND(20);

        private final int percentage;

        Code(int percentage) {
            this.percentage = percentage;
        }
    }

    // 将折扣代码应 用于商品最初 的原始价格
    public static String applyDiscount(Quote quote) {
        return quote.getShopName() + " price is " + Discount.apply(quote.getPrice(), quote.getDiscountCode());
    }

    // 模 拟 Discount 服务响应的延迟
    private static double apply(double price, Code code) {
        randomDelay();
        return new BigDecimal(price * (100 - code.percentage) / 100).setScale(2, ROUND_HALF_EVEN).doubleValue();
    }

}

class Quote {
    @Getter
    private final String shopName;
    @Getter
    private final double price;
    @Getter
    private final Discount.Code discountCode;

    public Quote(String shopName, double price, Discount.Code code) {
        this.shopName = shopName;
        this.price = price;
        this.discountCode = code;
    }

    public static Quote parse(String s) {
        String[] split = s.split(":");
        String shopName = split[0];
        double price = Double.parseDouble(split[1]);
        Discount.Code discountCode = Discount.Code.valueOf(split[2]);
        return new Quote(shopName, price, discountCode);
    }
}

小结

这一章中,你学到的内容如下。

  • 执行比较耗时的操作时,尤其是那些依赖一个或多个远程服务的操作,使用异步任务可以改善程序的性能,加快程序的响应速度。
  • 你应该尽可能地为客户提供异步API。使用CompletableFuture类提供的特性,你能够轻松地实现这一目标。
  • CompletableFuture类还提供了异常管理的机制,让你有机会抛出/管理异步任务执行中发生的异常。
  • 将同步API的调用封装到一个CompletableFuture中,你能够以异步的方式使用其结果。
  • 如果异步任务之间相互独立,或者它们之间某一些的结果是另一些的输入,你可以将这些异步任务构造或者合并成一个。
  • 你可以为CompletableFuture注册一个回调函数,在Future执行完毕或者它们计算的结果可用时,针对性地执行一些程序。
  • 你可以决定在什么时候结束程序的运行,是等待由CompletableFuture对象构成的列表中所有的对象都执行完毕,还是只要其中任何一个首先完成就中止程序的运行。