目录

并行数据处理与性能

本章内容

  • 用并行流并行处理数据
  • 并行流的性能分析
  • 分支/合并框架
  • 使用Spliterator分割流

在Java 7之前,并行处理数据集合非常麻烦。第一,你得明确地把包含数据的数据结 构分成若干子部分。第二,你要给每个子部分分配一个独立的线程。第三,你需要在恰当的时候 对它们进行同步来避免不希望出现的竞争条件,等待所有线程完成,最后把这些部分结果合并起 来。Java 7引入了一个叫作分支/合并的框架,让这些操作更稳定、更不易出错。我们会在7.2节探 讨这一框架。

在本章中,你将了解Stream接口如何让你不用太费力气就能对数据集执行并行操作。它允 许你声明性地将顺序流变为并行流。此外,你将看到Java是如何变戏法的,或者更实际地来说, 流是如何在幕后应用Java 7引入的分支/合并框架的。你还会发现,了解并行流内部是如何工作的 很重要,因为如果你忽视这一方面,就可能因误用而得到意外的(很可能是错的)结果。

我们会特别演示,在并行处理数据块之前,并行流被划分为数据块的方式在某些情况下恰恰 是这些错误且无法解释的结果的根源。 因此, 你将会学习如何通过实现和使用你自己的 Spliterator来控制这个划分过程。

并行流

在第4章中,我们简要地提到了Stream接口可以让你非常方便地处理它的元素:可以通过对 收集源调用parallelStream方法来把集合转换为并行流。并行流就是一个把内容分成多个数据块,并用不同的线程分别处理每个数据块的流。这样一来,你就可以自动把给定操作的工作负荷 分配给多核处理器的所有内核,让它们都忙起来。

将顺序流转换为并行流

parallel

Stream在内部分成了几块。因此可以对不同的块独立并行进行归纳操作,如图7-1所示。 最后,同一个归纳操作会将各个子流的部分归纳结果合并起来,得到整个原始流的归纳结果。

https://gitee.com/lienhui68/picStore/raw/master/null/20200812005714.png

请注意,在现实中,对顺序流调用parallel方法并不意味着流本身有任何实际的变化。它 在内部实际上就是设了一个boolean标志,表示你想让调用parallel之后进行的所有操作都并 行执行。类似地,你只需要对并行流调用sequential方法就可以把它变成顺序流。请注意,你 可能以为把这两个方法结合起来,就可以更细化地控制在遍历流时哪些操作要并行执行,哪些要 顺序执行。例如,你可以这样做:

1
stream.parallel().filter(...) .sequential() .map(...) .parallel() .reduce();

但最后一次parallel或sequential调用会影响整个流水线。在本例中,流水线会并行执 行,因为最后调用的是它。

配置并行流使用的线程池

看看流的parallel方法,你可能会想,并行流用的线程是从哪儿来的?有多少个?怎么自定义这个过程呢?

并行流内部使用了默认的ForkJoinPool(7.2节会进一步讲到分支/合并框架),它默认的线 程 数 量 就 是 你 的 处 理 器 数 量 , 这 个 值 是 由 Runtime.getRuntime().availableProcessors()得到的。

但 是 你 可 以 通 过 系 统 属 性 java.util.concurrent.ForkJoinPool.common.parallelism来改变线程池大小,如下所示:

System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism","12");

这是一个全局设置,因此它将影响代码中所有的并行流。反过来说,目前还无法专为某个并行流指定这个值。一般而言,让ForkJoinPool的大小等于处理器数量是个不错的默认值,除非你有很好的理由,否则我们强烈建议你不要修改它。

测量流性能

迭代式、顺序归纳和并行归纳

测试框架

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class Demo7 {
    public static void main(String[] args) {
        Demo7 test = new Demo7();
        System.out.println("sum done in:" + test.measureSumPerf(test::sum, 10_000_000_000L));
    }

    public long measureSumPerf(Function<Long, Long> adder, long n) {
        long fastest = Long.MAX_VALUE;
        for (int i = 0; i < 10; i++) {
            long start = System.nanoTime();
            adder.apply(n);
            long duration = (System.nanoTime() - start) / 1_000_000;
            System.out.println(i + ": " + duration);
            if (duration < fastest) fastest = duration;
        }
        return fastest;
    }

    public long sum(long n) {
        return LongStream.rangeClosed(1, n).parallel().sum();
    }
}

Stream.iterate生成的流求和

  • iterate生成的是装箱的对象,必须拆箱成数字才能求和;
  • 我们很难把iterate分成多个独立块来并行执行。

第二个问题更有意思一点,因为你必须意识到某些流操作比其他操作更容易并行化。具体来 说,iterate很难分割成能够独立执行的小块,因为每次应用这个函数都要依赖前一次应用的结 果,如图7-2所示。

https://gitee.com/lienhui68/picStore/raw/master/null/20200812011622.png

LongStream.rangeClosed

  • LongStream.rangeClosed直接产生原始类型的long数字,没有装箱拆箱的开销。
  • LongStream.rangeClosed会生成数字范围,很容易拆分为独立的小块。

并行化并不是没有代价的。并行化过程本身需要对流做递归划分,把每 个子流的归纳操作分配到不同的线程,然后把这些操作的结果合并成一个值。但在多个内核之间 移动数据的代价也可能比你想的要大,所以很重要的一点是要保证在内核中并行执行工作的时间比在内核之间传输数据的时间长。总而言之,很多情况下不可能或不方便并行化。然而,在使用 并行Stream加速代码之前,你必须确保用得对;如果结果错了,算得快就毫无意义了。让我们 来看一个常见的陷阱。

错用并行流示例

错用并行流而产生错误的首要原因,就是使用的算法改变了某些共享状态。

https://gitee.com/lienhui68/picStore/raw/master/null/20200812012049.png

高效使用并行流

一般而言,想给出任何关于什么时候该用并行流的定量建议都是不可能也毫无意义的,因为 任何类似于“仅当至少有一千个(或一百万个或随便什么数字)元素的时候才用并行流)”的建 议对于某台特定机器上的某个特定操作可能是对的,但在略有差异的另一种情况下可能就是大错 特错。尽管如此,我们至少可以提出一些定性意见,帮你决定某个特定情况下是否有必要使用并 行流。

  • 如果有疑问,测量。把顺序流转成并行流轻而易举,但却不一定是好事。我们在本节中 已经指出,并行流并不总是比顺序流快。此外,并行流有时候会和你的直觉不一致,所 以在考虑选择顺序流还是并行流时,第一个也是最重要的建议就是用适当的基准来检查 其性能

  • 留意装箱。自动装箱和拆箱操作会大大降低性能。Java 8中有原始类型流(IntStream、 LongStream、DoubleStream)来避免这种操作,但凡有可能都应该用这些流。

  • 有些操作本身在并行流上的性能就比顺序流差。特别是limit和findFirst等依赖于元 素顺序的操作,它们在并行流上执行的代价非常大。例如,findAny会比findFirst性 能好,因为它不一定要按顺序来执行。你总是可以调用unordered方法来把有序流变成 无序流。那么,如果你需要流中的n个元素而不是专门要前n个的话,对无序并行流调用 limit可能会比单个有序流(比如数据源是一个List)更高效。

  • 还要考虑流的操作流水线的总计算成本。设N是要处理的元素的总数,Q是一个元素通过 流水线的大致处理成本,则N*Q就是这个对成本的一个粗略的定性估计。Q值较高就意味 着使用并行流时性能好的可能性比较大。

  • 对于较小的数据量,选择并行流几乎从来都不是一个好的决定。并行处理少数几个元素 的好处还抵不上并行化造成的额外开销。

  • 要考虑流背后的数据结构是否易于分解。例如,ArrayList的拆分效率比LinkedList 高得多,因为前者用不着遍历就可以平均拆分,而后者则必须遍历。另外,用range工厂方法创建的原始类型流也可以快速分解。 最后,你将在7.3节中学到,你可以自己实现 Spliterator来完全掌控分解过程。

  • 流自身的特点,以及流水线中的中间操作修改流的方式,都可能会改变分解过程的性能。 例如,一个SIZED流可以分成大小相等的两部分,这样每个部分都可以比较高效地并行处 理,但筛选操作可能丢弃的元素个数却无法预测,导致流本身的大小未知。

  • 还要考虑终端操作中合并步骤的代价是大是小(例如Collector中的combiner方 如果这一步代价很大,那么组合每个子流产生的部分结果所付出的代价就可能会超出通 过并行流得到的性能提升。

    表7-1按照可分解性总结了一些流数据源适不适于并行。

    https://gitee.com/lienhui68/picStore/raw/master/null/20200812012704.png

分支/合并框架

分支/合并框架的目的是以递归方式将可以并行的任务拆分成更小的任务,然后将每个子任 务的结果合并起来生成整体结果。它是ExecutorService接口的一个实现,它把子任务分配给 线程池(称为ForkJoinPool)中的工作线程。

使用RecursiveTask

要把任务提交到这个池,必须创建RecursiveTask<R>的一个子类,其中R是并行化任务(以 及所有子任务)产生的结果类型,或者如果任务不返回结果,则是RecursiveAction类型(当 然它可能会更新其他非局部机构)。 要定义 RecursiveTask, 只需实现它唯一的抽象方法 compute:

protected abstract R compute();

这个方法同时定义了将任务拆分成子任务的逻辑,以及无法再拆分或不方便再拆分时,生成 单个子任务结果的逻辑。正由于此,这个方法的实现类似于下面的伪代码:

1
2
3
4
5
6
7
if (任务足够小或不可分) { 
  顺序计算该任务 
} else { 
  将任务分成两个子任务 
    递归调用本方法拆分每个子任务等待所有子任务完成 
    合并每个子任务的结果 
}

一般来说并没有确切的标准决定一个任务是否应该再拆分,但有几种试探方法可以帮助你做 出这一决定。我们会在7.2.1节中进一步澄清。递归的任务拆分过程如图7-3所示。

https://gitee.com/lienhui68/picStore/raw/master/null/20200812013441.png

你可能已经注意到,这只不过是著名的分治算法的并行版本而已。这里举一个用分支/合并 框架的实际例子,还以前面的例子为基础,让我们试着用这个框架为一个数字范围(这里用一个 long[]数组表示)求和。如前所述,你需要先为RecursiveTask类做一个实现,就是下面代码 清单中的ForkJoinSumCalculator。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
package com.eh.eden.java8.demo;

import java.util.concurrent.RecursiveTask;

/**
 * todo
 *
 * @author David Li
 * @create 2020/08/12
 */
public class ForkJoinSumCalculator extends RecursiveTask<Long> {

    private final long[] numbers;
    private final int start;
    private final int end;

    // 不再将任务分 解为子任务的 组大小
    public static final long THRESHOLD = 10_000;

    // 公共构造 函数用于 创建主任务
    public ForkJoinSumCalculator(long[] numbers) {
        this(numbers, 0, numbers.length);
    }

    // 私有构造函数用于以递归方式为主任务创建子任务
    private ForkJoinSumCalculator(long[] numbers, int start, int end) {
        this.numbers = numbers;
        this.start = start;
        this.end = end;
    }

    @Override
    protected Long compute() {
        int length = end - start; // 无需+1, 计算子任务时左闭右开,不包括end
        if (length <= THRESHOLD) {
            return computeSequentially();
        }
        // 创建一个子任 务来为数组的 前一半求和
        ForkJoinSumCalculator leftTask = new ForkJoinSumCalculator(numbers, start, start + length / 2);
        // 使用另一个ForkJoinPool中的线程异步执行新创建的子任务
        leftTask.fork();
        // 创建一个子任 务来为数组的 后一半求和
        ForkJoinSumCalculator rightTask = new ForkJoinSumCalculator(numbers, start + length / 2, end);
        // 同步执行第二个子 任务,有可能允许进一步递归划分
        Long rightResult = rightTask.compute();
        // 读取第一个子任务的结果, 如果尚未完成就等待
        Long leftResult = leftTask.join();
        // 该任务的结果是两个子任务结果的组合
        return leftResult + rightResult;
    }

    // 在子任务不再可分时计算结果的简单算法
    private long computeSequentially() {
        long sum = 0;
        for (int i = start; i < end; i++) {
            sum += numbers[i];
        }
        return sum;
    }
}

现在编写一个方法来并行对前n个自然数求和就很简单了。你只需把想要的数字数组传给 ForkJoinSumCalculator的构造函数:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class Demo7 {
    public static void main(String[] args) {
//        System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "1");
        Demo7 test = new Demo7();
        System.out.println("sum done in:" + test.measureSumPerf(Demo7::forkJoinSum, 100_000_000L));
    }

    public long measureSumPerf(Function<Long, Long> adder, long n) {
        long fastest = Long.MAX_VALUE;
        for (int i = 0; i < 10; i++) {
            long start = System.nanoTime();
            adder.apply(n);
            long duration = (System.nanoTime() - start) / 1_000_000;
            System.out.println(i + ": " + duration);
            if (duration < fastest) fastest = duration;
        }
        return fastest;
    }

    public static long forkJoinSum(long n) {
        long[] numbers = LongStream.rangeClosed(0, n).toArray();
        ForkJoinTask<Long> task = new ForkJoinSumCalculator(numbers);
        return new ForkJoinPool().invoke(task);
    }
}

这个性能看起来比用并行流的版本要差,但这只是因为必须先要把整个数字流都放进一个 long[],之后才能在ForkJoinSumCalculator任务中使用它。

请注意在实际应用时,使用多个ForkJoinPool是没有什么意义的。正是出于这个原因,一 般来说把它实例化一次,然后把实例保存在静态字段中,使之成为单例,这样就可以在软件中任 何部分方便地重用了。。这里创建时用了其默认的无参数构造函数,这意味着想让线程池使用JVM 能够使用的所有处理器。更确切地说,该构造函数将使用Runtime.availableProcessors的 返回值来决定线程池使用的线程数。请注意availableProcessors方法虽然看起来是处理器, 但它实际上返回的是可用内核的数量,包括超线程生成的虚拟内核

运行ForkJoinSumCalculator

当把ForkJoinSumCalculator任务传给ForkJoinPool时,这个任务就由池中的一个线程 执行,这个线程会调用任务的compute方法。该方法会检查任务是否小到足以顺序执行,如果不 够小则会把要求和的数组分成两半, 分给两个新的 ForkJoinSumCalculator , 而它们也由 ForkJoinPool安排执行。因此,这一过程可以递归重复,把原任务分为更小的任务,直到满足 不方便或不可能再进一步拆分的条件(本例中是求和的项目数小于等于10 000)。这时会顺序计 算每个任务的结果,然后由分支过程创建的(隐含的)任务二叉树遍历回到它的根。接下来会合 并每个子任务的部分结果,从而得到总任务的结果。这一过程如图7-4所示。

https://gitee.com/lienhui68/picStore/raw/master/null/20200812020018.png

使用分支/合并框架的最佳做法

虽然分支/合并框架还算简单易用,不幸的是它也很容易被误用。以下是几个有效使用它的 最佳做法。

  • 对一个任务调用join方法会阻塞调用方,直到该任务做出结果。因此,有必要在两个子 任务的计算都开始之后再调用它。否则,你得到的版本会比原始的顺序算法更慢更复杂, 因为每个子任务都必须等待另一个子任务完成才能启动。
  • 不应该在RecursiveTask内部使用ForkJoinPool的invoke方法。相反,你应该始终直 接调用compute或fork方法,只有顺序代码才应该用invoke来启动并行计算。
  • 对子任务调用fork方法可以把它排进ForkJoinPool。同时对左边和右边的子任务调用它似乎很自然,但这样做的效率要比直接对其中一个调用compute低。这样做你可以为其中一个子任务重用同一线程,从而避免在线程池中多分配一个任务造成的开销。
  • 调试使用分支/合并框架的并行计算可能有点棘手。特别是你平常都在你喜欢的IDE里面 看栈跟踪(stack trace)来找问题,但放在分支/合并计算上就不行了,因为调用compute 的线程并不是概念上的调用方,后者是调用fork的那个。
  • 和并行流一样, 你不应理所当然地认为在多核处理器上使用分支/合并框架就比顺序计 算快。我们已经说过,一个任务可以分解成多个独立的子任务,才能让性能在并行化时 有所提升。所有这些子任务的运行时间都应该比分出新任务所花的时间长;一个惯用方 法是把输入/输出放在一个子任务里,计算放在另一个里,这样计算就可以和输入/输出 同时进行。此外,在比较同一算法的顺序和并行版本的性能时还有别的因素要考虑。就 像任何其他Java代码一样,分支/合并框架需要“预热”或者说要执行几遍才会被JIT编 译器优化。这就是为什么在测量性能之前跑几遍程序很重要,我们的测试框架就是这么 做的。同时还要知道,编译器内置的优化可能会为顺序版本带来一些优势(例如执行死 码分析——删去从未被使用的计算)。

对于分支/合并拆分策略还有最后一点补充:你必须选择一个标准,来决定任务是要进一步 拆分还是已小到可以顺序求值。我们会在下一节中就此给出一些提示。

工作窃取

在ForkJoinSumCalculator的例子中,我们决定在要求和的数组中最多包含10 000个项目 时就不再创建子任务了。这个选择是很随意的,但大多数情况下也很难找到一个好的启发式方法 来确定它,只能试几个不同的值来尝试优化它。在我们的测试案例中,我们先用了一个有1000 万项目的数组,意味着ForkJoinSumCalculator至少会分出1000个子任务来。这似乎有点浪费 资源,因为我们用来运行它的机器上只有四个内核。在这个特定例子中可能确实是这样,因为所 有的任务都受CPU约束,预计所花的时间也差不多。

但分出大量的小任务一般来说都是一个好的选择。这是因为,理想情况下,划分并行任务时, 应该让每个任务都用完全相同的时间完成,让所有的CPU内核都同样繁忙。不幸的是,实际中,每 个子任务所花的时间可能天差地别,要么是因为划分策略效率低,要么是有不可预知的原因,比如 磁盘访问慢,或是需要和外部服务协调执行。

分支/合并框架工程用一种称为工作窃取(work stealing)的工作来解决这个问题。在实际应 用中,这意味着这些任务差不多被平均分配到ForkJoinPool中的所有线程上。每个线程都为分 配给它的任务保存一个双向链式队列,每完成一个任务,就会从队列头上取出下一个任务开始执 行。基于前面所述的原因,某个线程可能早早完成了分配给它的所有任务,也就是它的队列已经 空了,而其他的线程还很忙。这时,这个线程并没有闲下来,而是随机选了一个别的线程,从队 列的尾巴上“偷走”一个任务。这个过程一直继续下去,直到所有的任务都执行完毕,所有的队 列都清空。这就是为什么要划成许多小任务而不是少数几个大任务,这有助于更好地在工作线程 之间平衡负载。

一般来说,这种工作窃取算法用于在池中的工作线程之间重新分配和平衡任务。图7-5展示 了这个过程。当工作线程队列中有一个任务被分成两个子任务时,一个子任务就被闲置的工作线 程“偷走”了。如前所述,这个过程可以不断递归,直到规定子任务应顺序执行的条件为真。

https://gitee.com/lienhui68/picStore/raw/master/null/20200812021442.png

现在你应该清楚流如何使用分支/合并框架来并行处理它的项目了,不过还有一点没有讲。 本节中我们分析了一个例子,你明确地指定了将数字数组拆分成多个任务的逻辑。但是,使用本 章前面讲的并行流时就用不着这么做了,这就意味着,肯定有一种自动机制来为你拆分流。这种 新的自动机制称为Spliterator,我们会在下一节中讨论。

Spliterator

Spliterator 是Java 8中加入的另一个新接口;这个名字代表“可分迭代器”(splitable iterator)。和Iterator一样,Spliterator也用于遍历数据源中的元素,但它是为了并行执行 而设计的。虽然在实践中可能用不着自己开发Spliterator,但了解一下它的实现方式会让你 对并行流的工作原理有更深入的了解。Java 8已经为集合框架中包含的所有数据结构提供了一个 默认的Spliterator实现。

eg:ArrayList

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
public class ArrayList<E> extends AbstractList<E>
        implements List<E>, RandomAccess, Cloneable, java.io.Serializable {
  
  ...
    
  @Override
  public Spliterator<E> spliterator() {
      return new ArrayListSpliterator<>(this, 0, -1, 0);
  }

  /** Index-based split-by-two, lazily initialized Spliterator */
  static final class ArrayListSpliterator<E> implements Spliterator<E> {
      private final ArrayList<E> list;
      private int index; // current index, modified on advance/split
      private int fence; // -1 until used; then one past last index
      private int expectedModCount; // initialized when fence set

      /** Create new spliterator covering the given  range */
      ArrayListSpliterator(ArrayList<E> list, int origin, int fence,
                           int expectedModCount) {
          this.list = list; // OK if null unless traversed
          this.index = origin;
          this.fence = fence;
          this.expectedModCount = expectedModCount;
      }

      private int getFence() { // initialize fence to size on first use
          int hi; // (a specialized variant appears in method forEach)
          ArrayList<E> lst;
          if ((hi = fence) < 0) {
              if ((lst = list) == null)
                  hi = fence = 0;
              else {
                  expectedModCount = lst.modCount;
                  hi = fence = lst.size;
              }
          }
          return hi;
      }

      public ArrayListSpliterator<E> trySplit() {
          int hi = getFence(), lo = index, mid = (lo + hi) >>> 1;
          return (lo >= mid) ? null : // divide range in half unless too small
              new ArrayListSpliterator<E>(list, lo, index = mid,
                                          expectedModCount);
      }

      public boolean tryAdvance(Consumer<? super E> action) {
          if (action == null)
              throw new NullPointerException();
          int hi = getFence(), i = index;
          if (i < hi) {
              index = i + 1;
              @SuppressWarnings("unchecked") E e = (E)list.elementData[i];
              action.accept(e);
              if (list.modCount != expectedModCount)
                  throw new ConcurrentModificationException();
              return true;
          }
          return false;
      }

      public void forEachRemaining(Consumer<? super E> action) {
          int i, hi, mc; // hoist accesses and checks from loop
          ArrayList<E> lst; Object[] a;
          if (action == null)
              throw new NullPointerException();
          if ((lst = list) != null && (a = lst.elementData) != null) {
              if ((hi = fence) < 0) {
                  mc = lst.modCount;
                  hi = lst.size;
              }
              else
                  mc = expectedModCount;
              if ((i = index) >= 0 && (index = hi) <= a.length) {
                  for (; i < hi; ++i) {
                      @SuppressWarnings("unchecked") E e = (E) a[i];
                      action.accept(e);
                  }
                  if (lst.modCount == mc)
                      return;
              }
          }
          throw new ConcurrentModificationException();
      }

      public long estimateSize() {
          return (long) (getFence() - index);
      }

      public int characteristics() {
          return Spliterator.ORDERED | Spliterator.SIZED | Spliterator.SUBSIZED;
      }
  }

}

Spliterator接口

1
2
3
4
5
6
public interface Spliterator<T> { 
	boolean tryAdvance(Consumer<? super T> action); 
	Spliterator<T> trySplit(); 
	long estimateSize(); 
	int characteristics(); 
}

与往常一样,T是Spliterator遍历的元素的类型。tryAdvance方法的行为类似于普通的 Iterator,因为它会按顺序一个一个使用Spliterator中的元素,并且如果还有其他元素要遍 历就返回true。但trySplit是专为Spliterator接口设计的,因为它可以把一些元素划出去分 给第二个 Spliterator (由该方法返回), 让它们两个并行处理。 Spliterator 还可通过 estimateSize方法估计还剩下多少元素要遍历,因为即使不那么确切,能快速算出来是一个值 也有助于让拆分均匀一点。

重要的是,要了解这个拆分过程在内部是如何执行的,以便在需要时能够掌控它。因此,我 们会在下一节中详细地分析它。

拆分过程

将 Stream 拆分成多个部分的算法是一个递归过程, 如图7-6所示。 第一步是对第一个 Spliterator调用trySplit,生成第二个Spliterator。第二步对这两个Spliterator调用 trysplit,这样总共就有了四个Spliterator。这个框架不断对Spliterator调用trySplit 直到它返回null,表明它处理的数据结构不能再分割,如第三步所示。最后,这个递归拆分过 程到第四步就终止了,这时所有的Spliterator在调用trySplit时都返回了null。

https://gitee.com/lienhui68/picStore/raw/master/null/20200812023011.png

这个拆分过程也受Spliterator本身的特性影响,而特性是通过characteristics方法声 明的。

Spliterator的特性

Spliterator接口声明的最后一个抽象方法是characteristics,它将返回一个int,代 表Spliterator本身特性集的编码。使用Spliterator的客户可以用这些特性来更好地控制和 优化它的使用。表7-2总结了这些特性。(不幸的是,虽然它们在概念上与收集器的特性有重叠, 编码却不一样。)

https://gitee.com/lienhui68/picStore/raw/master/null/20200812023105.png

实现你自己的Spliterator

让我们来看一个可能需要你自己实现Spliterator的实际例子。我们要开发一个简单的方 法来数数一个String中的单词数。

迭代版本

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class StringCounter {

    final static String SENTENCE = "Nel mezzo del cammin di nostra  vita mi ritrovai in una selva oscura ché la dritta via era smarrita";

    public static void main(String[] args) {
        System.out.println(countWorldIteratively(SENTENCE));
    }

    public static int countWorldIteratively(String s) {
        int count = 0;
        boolean lastSpace = true; // 初始为true,这样只要遇到字符就表示有一个单词。遇到第一个字符+1
        for (char c : s.toCharArray()) {
            if (Character.isWhitespace(c)) {
                lastSpace = true;
            } else {
                if (lastSpace) count++;
                lastSpace = false;
            }
        }
        return count;
    }
}

以函数式风格重写单词计数器

首先你需要把String转换成一个流。不幸的是,原始类型的流仅限于int、long和double, 所以你只能用首先你需要把String转换成一个流。不幸的是,原始类型的流仅限于int、long和double, 所以你只能用Stream<Character>

1
Stream<Character> stream = IntStream.range(0, SENTENCE.length()).mapToObj(SENTENCE::charAt);

你可以对这个流做归约来计算字数。在归约流时,你得保留由两个变量组成的状态:一个int用来计算到目前为止数过的字数,还有一个boolean用来记得上一个遇到的Character是不是空 格。因为Java没有元组(tuple,用来表示由异类元素组成的有序列表的结构,不需要包装对象), 所以你必须创建一个新类WordCounter来把这个状态封装起来,如下所示。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
class WordCounter {
    @Getter
    private final int count;
    private final boolean lastSpace;

    public WordCounter(int count, boolean lastSpace) {
        this.count = count;
        this.lastSpace = lastSpace;
    }

    // 和迭代算法一样, accumulate 方 法 一 个个遍历Character
    public WordCounter accumulate(Character c) {
        if (Character.isWhitespace(c)) {
            return lastSpace ? this : new WordCounter(count, true);
        } else {
            // 上一个字符是空格,而 当前遍历的字符不是 空格时,将单词计数器 加一
            return lastSpace ? new WordCounter(count + 1, false) : this;
        }
    }

    // 合并两个 Word- Counter,把其 计数器加起来
    public WordCounter combine(WordCounter wordCounter) {
        // 仅需要计数器 的总和,无需关 心lastSpace
        return new WordCounter(count + wordCounter.count, wordCounter.lastSpace);
    }
}
// 运行结果
19

在这个列表中,accumulate方法定义了如何更改WordCounter的状态,或更确切地说是用 哪个状态来建立新的WordCounter,因为这个类是不可变的(后面还要坐并行测试))。每次遍历到Stream中的一个新的 Character 时, 就会调用 accumulate 方法。当上一个字符是空格,新字符不是空格时,计数器就加一。图7-7展示 了accumulate方法遍历到新的Character时,WordCounter的状态转换。

https://gitee.com/lienhui68/picStore/raw/master/null/20200812030400.png

调 用 第 二 个 方 法 combine 时 , 会 对 作 用 于 Character 流 的 两 个 不 同 子 部 分 的 两 个 WordCounter的部分结果进行汇总,也就是把两个WordCounter内部的计数器加起来。

现在你已经写好了在WordCounter中累计字符,以及在WordCounter中把它们结合起来的 逻辑,那写一个方法来归约Character流就很简单了:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
package com.eh.eden.java8.demo;

import lombok.Getter;

import java.util.stream.IntStream;
import java.util.stream.Stream;

/**
 * 一个迭代式字数统计方法
 *
 * @author David Li
 * @create 2020/08/12
 */
public class StringCounter {

    final static String SENTENCE = "Nel mezzo del cammin di nostra  vita mi ritrovai in una selva oscura ché la dritta via era smarrita";


    public static void main(String[] args) {
        Stream<Character> stream = IntStream.range(0, SENTENCE.length()).mapToObj(SENTENCE::charAt);
        System.out.println(new StringCounter().countWorlds(stream));
    }

    private int countWorlds(Stream<Character> stream) {
        WordCounter wordCounter = stream.reduce(new WordCounter(0, true), WordCounter::accumulate, WordCounter::combine);
        return wordCounter.getCount();
    }
}

class WordCounter {
    @Getter
    private final int count;
    private final boolean lastSpace;

    public WordCounter(int count, boolean lastSpace) {
        this.count = count;
        this.lastSpace = lastSpace;
    }

    // 和迭代算法一样, accumulate 方 法 一 个个遍历Character
    public WordCounter accumulate(Character c) {
        if (Character.isWhitespace(c)) {
            return lastSpace ? this : new WordCounter(count, true);
        } else {
            // 上一个字符是空格,而 当前遍历的字符不是 空格时,将单词计数器 加一
            return lastSpace ? new WordCounter(count + 1, false) : this;
        }
    }

    // 合并两个 Word- Counter,把其 计数器加起来
    public WordCounter combine(WordCounter wordCounter) {
        // 仅需要计数器 的总和,无需关 心lastSpace
        return new WordCounter(count + wordCounter.count, wordCounter.lastSpace);
    }
}
// 运行结果
19

到现在为止都很好,但我们以函数式实现WordCounter的主要原因之一就是能轻松地并行 处理,让我们来看看具体是如何实现的。

让WordCounter并行工作

你可以尝试用并行流来加快字数统计,如下所示:

1
2
3
System.out.println(new StringCounter().countWorlds(stream.parallel()));
// 运行结果
27

显然有什么不对,可到底是哪里不对呢?问题的根源并不难找。因为原始的String在任意 位置拆分,所以有时一个词会被分为两个词,然后数了两次。这就说明,拆分流会影响结果,而 把顺序流换成并行流就可能使结果出错。

如何解决这个问题呢?解决方案就是要确保String不是在随机位置拆开的,而只能在词尾 拆开。要做到这一点,你必须为Character实现一个Spliterator,它只能在两个词之间拆开 String(如下所示),然后由此创建并行流。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
class WordCounterSpliterator implements Spliterator<Character> {

    private final String string;
    private int currentCharPos = 0;

    public WordCounterSpliterator(String string) {
        this.string = string;
    }

    @Override
    public boolean tryAdvance(Consumer<? super Character> action) {
        // 处理当前字符
        action.accept(string.charAt(currentCharPos++));
        // 如果还有字符要处理,则返回true
        return currentCharPos < string.length();
    }

    @Override
    public Spliterator<Character> trySplit() {
        int currentSize = string.length() - currentCharPos;
        // 返回null表示要解析的string已经足够小,可以顺序处理
        if (currentSize < 10) {
            return null;
        }
        //将试探拆分位置设定为要解析的String的中间,然后让拆分位置前进直到下 一个空格
        for (int splitPos = currentCharPos + currentSize / 2; splitPos < string.length(); splitPos++) {
            if (Character.isWhitespace(string.charAt(splitPos))) {
                // 创建一个新的WordCounterSpliterator来解析string从开始位置到拆分位置的部分
                Spliterator<Character> spliterator = new WordCounterSpliterator(string.substring(currentCharPos, splitPos));
                // 将拆分位置赋值给这个WordCounterSpliterator的起始位置
                currentCharPos = splitPos;
                return spliterator;
            }
        }
        return null;
    }

    @Override
    public long estimateSize() {
        // 还剩多少个分配元素
        return string.length() - currentCharPos;
    }

    @Override
    public int characteristics() {
        return ORDERED + SIZED + SUBSIZED + NONNULL + IMMUTABLE;
    }
}

这个Spliterator由要解析的String创建,并遍历了其中的Character,同时保存了当前 正 在 遍 历 的 字 符 位 置 。 让 我 们 快 速 回 顾 一 下 实 现 了 Spliterator 接 口 的 WordCounterSpliterator中的各个函数。

  • tryAdvance方法把String中当前位置的Character传给了Consumer,并让位置加一。 作为参数传递的Consumer是一个Java内部类,在遍历流时将要处理的Character传给了一系列要对其执行的函数。这里只有一个归约函数,即WordCounter类的accumulate 方法。 如果新的指针位置小于 String 的总长, 且还有要遍历的 Character , 则 tryAdvance返回true。
  • trySplit方法是Spliterator中最重要的一个方法,因为它定义了拆分要遍历的数据 结构的逻辑。就像在代码清单7-1中实现的RecursiveTask的compute方法一样(分支 /合并框架的使用方式),首先要设定不再进一步拆分的下限。这里用了一个非常低的下 限——10个Character, 仅仅是为了保证程序会对那个比较短的String做几次拆分。 在实际应用中,就像分支/合并的例子那样,你肯定要用更高的下限来避免生成太多的 任务。如果剩余的 Character数量低于下限,你就返回null表示无需进一步拆分。相 反,如果你需要执行拆分,就把试探的拆分位置设在要解析的String 块的中间。但我 们没有直接使用这个拆分位置,因为要避免把词在中间断开,于是就往前找,直到找到 一个空格。一旦找到了适当的拆分位置,就可以创建一个新的 Spliterator 来遍历从 当前位置到拆分位置的子串;把当前位置 this设为拆分位置,因为之前的部分将由新 Spliterator来处理,最后返回。
  • 还需要遍历的元素的estimatedSize就是这个Spliterator解析的String的总长度和 当前遍历的位置的差。
  • 最后,characteristic方法告诉框架这个Spliterator是ORDERED(顺序就是String 中各个 Character 的次序)、 SIZED ( estimatedSize 方法的返回值是精确的)、 SUBSIZED(trySplit方法创建的其他Spliterator也有确切大小)、NONNULL(String 中 不 能 有 为 null 的 Character ) 和 IMMUTABLE ( 在 解 析 String 时 不 能 再 添 加 Character,因为String本身是一个不可变类)的。

运用WordCounterSpliterator

现在就可以用这个新的WordCounterSpliterator来处理并行流了,如下所示:

1
2
3
4
5
Spliterator<Character> spliterator = new WordCounterSpliterator(SENTENCE);
Stream<Character> stream = StreamSupport.stream(spliterator, true);
System.out.println(new StringCounter().countWorlds(stream));
// 运行结果
19

传给StreamSupport.stream工厂方法的第二个布尔参数意味着你想创建一个并行流。

完整代码:

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
package com.eh.eden.java8.demo;

import lombok.Getter;

import java.util.Spliterator;
import java.util.function.Consumer;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

/**
 * 一个迭代式字数统计方法
 *
 * @author David Li
 * @create 2020/08/12
 */
public class StringCounter {

    final static String SENTENCE = "Nel mezzo del cammin di nostra  vita mi ritrovai in una selva oscura ché la dritta via era smarrita";


    public static void main(String[] args) {
        Spliterator<Character> spliterator = new WordCounterSpliterator(SENTENCE);
        Stream<Character> stream = StreamSupport.stream(spliterator, true);
        System.out.println(new StringCounter().countWorlds(stream));
    }

    private int countWorlds(Stream<Character> stream) {
        WordCounter wordCounter = stream.reduce(new WordCounter(0, true), WordCounter::accumulate, WordCounter::combine);
        return wordCounter.getCount();
    }
}

class WordCounter {
    @Getter
    private final int count;
    private final boolean lastSpace;

    public WordCounter(int count, boolean lastSpace) {
        this.count = count;
        this.lastSpace = lastSpace;
    }

    // 和迭代算法一样, accumulate 方 法 一 个个遍历Character
    public WordCounter accumulate(Character c) {
        if (Character.isWhitespace(c)) {
            return lastSpace ? this : new WordCounter(count, true);
        } else {
            // 上一个字符是空格,而 当前遍历的字符不是 空格时,将单词计数器 加一
            return lastSpace ? new WordCounter(count + 1, false) : this;
        }
    }

    // 合并两个 Word- Counter,把其 计数器加起来
    public WordCounter combine(WordCounter wordCounter) {
        // 仅需要计数器 的总和,无需关 心lastSpace
        return new WordCounter(count + wordCounter.count, wordCounter.lastSpace);
    }
}

class WordCounterSpliterator implements Spliterator<Character> {

    private final String string;
    private int currentCharPos = 0;

    public WordCounterSpliterator(String string) {
        this.string = string;
    }

    @Override
    public boolean tryAdvance(Consumer<? super Character> action) {
        // 处理当前字符
        action.accept(string.charAt(currentCharPos++));
        // 如果还有字符要处理,则返回true
        return currentCharPos < string.length();
    }

    @Override
    public Spliterator<Character> trySplit() {
        int currentSize = string.length() - currentCharPos;
        // 返回null表示要解析的string已经足够小,可以顺序处理
        if (currentSize < 10) {
            return null;
        }
        //将试探拆分位置设定为要解析的String的中间,然后让拆分位置前进直到下 一个空格
        for (int splitPos = currentCharPos + currentSize / 2; splitPos < string.length(); splitPos++) {
            if (Character.isWhitespace(string.charAt(splitPos))) {
                // 创建一个新的WordCounterSpliterator来解析string从开始位置到拆分位置的部分
                Spliterator<Character> spliterator = new WordCounterSpliterator(string.substring(currentCharPos, splitPos));
                // 将拆分位置赋值给这个WordCounterSpliterator的起始位置
                currentCharPos = splitPos;
                return spliterator;
            }
        }
        return null;
    }

    @Override
    public long estimateSize() {
        // 还剩多少个分配元素
        return string.length() - currentCharPos;
    }

    @Override
    public int characteristics() {
        return ORDERED + SIZED + SUBSIZED + NONNULL + IMMUTABLE;
    }
}

你已经看到了Spliterator如何让你控制拆分数据结构的策略。Spliterator还有最后一 个值得注意的功能,就是可以在第一次遍历、第一次拆分或第一次查询估计大小时绑定元素的数 据源,而不是在创建时就绑定。这种情况下,它称为延迟绑定(late-binding)的Spliterator。

小结

在本章中,你了解了以下内容。

  • 内部迭代让你可以并行处理一个流,而无需在代码中显式使用和协调不同的线程。
  • 虽然并行处理一个流很容易,却不能保证程序在所有情况下都运行得更快。并行软件的行为和性能有时是违反直觉的,因此一定要测量,确保你并没有把程序拖得更慢。
  • 像并行流那样对一个数据集并行执行操作可以提升性能,特别是要处理的元素数量庞大,或处理单个元素特别耗时的时候。
  • 从性能角度来看,使用正确的数据结构,如尽可能利用原始流而不是一般化的流,几乎总是比尝试并行化某些操作更为重要。
  • 分支/合并框架让你得以用递归方式将可以并行的任务拆分成更小的任务,在不同的线程上执行,然后将各个子任务的结果合并起来生成整体结果。
  • Spliterator定义了并行流如何拆分它要遍历的数据。