目录

用流收集数据

本章内容

  • 用Collectors类创建和使用收集器
  • 将数据流归约为一个值
  • 汇总:归约的特殊情况
  • 数据分组和分区
  • 开发自己的自定义收集器

我们在前一章中学到,流可以用类似于数据库的操作帮助你处理集合。你可以把Java 8的流 看作花哨又懒惰的数据集迭代器。它们支持两种类型的操作:中间操作(如filter或map)和 终端操作(如count、findFirst、forEach和reduce)。中间操作可以链接起来,将一个流转换为另一个流。这些操作不会消耗流,其目的是建立一个流水线。与此相反,终端操作会消耗流,以产生一个最终结果,例如返回流中的最大元素。它们通常可以通过优化流水线来缩短 计算时间。

在本章中,你会发现collect是一个归约操作,就像reduce一样可以接 受各种做法作为参数, 将流中的元素累积成一个汇总结果。 具体的做法是通过定义新的 Collector接口来定义的,因此区分Collection、Collector和collect是很重要的。


收集器简介

结果——“做什么”,而不用操心执行的步骤——“如何做”。

要是做多级分组,指令式和函数式之间的区别就会更加明显:由于需要好多层嵌套循环和条 件,指令式代码很快就变得更难阅读、更难维护、更难修改。相比之下,函数式版本只要再加上 一个收集器就可以轻松地增强功能了。

收集器用作高级归约

刚刚的结论又引出了优秀的函数式API设计的另一个好处:更易复合和重用。收集器非常有 用,因为用它可以简洁而灵活地定义collect用来生成结果集合的标准。更具体地说,对流调用 collect方法将对流中的元素触发一个归约操作(由Collector来参数化)。图6-1所示的归约操 作所做的工作和代码清单6-1中的指令式代码一样。它遍历流中的每个元素,并让Collector进行处理。

https://gitee.com/lienhui68/picStore/raw/master/null/20200810153856.png

reduce方法:

1
T reduce(T identity, BinaryOperator<T> accumulator);

Collector接口:

1
2
3
4
5
6
public interface Collector<T, A, R> {
    Supplier<A> supplier();
    BiConsumer<A, T> accumulator();
    BinaryOperator<A> combiner();
    Function<A, R> finisher();
    Set<Characteristics> characteristics();

预定义收集器

在本章剩下的部分中,我们主要探讨预定义收集器的功能,也就是那些可以从Collectors 类提供的工厂方法(例如groupingBy)创建的收集器。它们主要提供了三大功能:

  • 将流元素归约和汇总为一个值
  • 元素分组
  • 元素分区

归约和汇总

在需要将流项目重组成集合时,一般会使用收集器(Stream方法collect 的参数)。再宽泛一点来说,但凡要把流中所有的项目合并成一个结果时就可以用。这个结果可以是任何类型,可以复杂如代表一棵树的多级映射(多级映射),或是简单如一个整数。

我们先来举一个简单的例子,利用counting工厂方法返回的收集器,数一数菜单里有多少 种菜:

1
long howManyDishes = menu.stream().collect(Collectors.counting());

这还可以写得更为直接:

1
long howManyDishes = menu.stream().count();

counting收集器在和其他收集器联合使用的时候特别有用,后面会谈到这一点。

在本章后面的部分,我们假定你已导入了Collectors类的所有静态工厂方法:

1
import static java.util.stream.Collectors.*;

这样你就可以写counting()而用不着写Collectors.counting()之类的了。

查找流中的最大值和最小值

假设你想要找出菜单中热量最高的菜。 你可以使用两个收集器, Collectors.maxBy 和 Collectors.minBy,来计算流中的最大或最小值。这两个收集器接收一个Comparator参数来 比较流中的元素。你可以创建一个Comparator来根据所含热量对菜肴进行比较,并把它传递给 Collectors.maxBy:

1
2
Comparator<Dish> dishCaloriesComparator = Comparator.comparingInt(Dish::getCalories);
Optional<Dish> mostCalorieDish = menu.stream().collect(maxBy(dishCaloriesComparator));

汇总

Collectors类专门为汇总提供了一个工厂方法:Collectors.summingInt。它可接受一 个把对象映射为求和所需int的函数,并返回一个收集器;该收集器在传递给普通的collect方 法后即执行我们需要的汇总操作。举个例子来说,你可以这样求出菜单列表的总热量:

1
int totalCalories = menu.stream().collect(summingInt(Dish::getCalories));

Collectors.summingLong和Collectors.summingDouble方法的作用完全一样,可以用 于求和字段为long或double的情况。

但汇总不仅仅是求和;还有Collectors.averagingInt,连同对应的averagingLong和 averagingDouble可以计算数值的平均数:

1
double avgCalories = menu.stream().collect(averagingInt(Dish::getCalories));

很多时候,你可能想要得到两个或更多这样的 结果,而且你希望只需一次操作就可以完成。在这种情况下,你可以使用summarizingInt工厂 方法返回的收集器。例如,通过一次summarizing操作你可以就数出菜单中元素的个数,并得 到菜肴热量总和、平均值、最大值和最小值:

1
IntSummaryStatistics menuStatistics = menu.stream().collect(summarizingInt(Dish::getCalories));

这个收集器会把所有这些信息收集到一个叫作IntSummaryStatistics的类里,它提供了 方便的取值(getter)方法来访问结果。打印menuStatisticobject会得到以下输出:

1
IntSummaryStatistics{count=9, sum=4300, min=120, average=477.777778, max=800}

同样,相应的summarizingLong和summarizingDouble工厂方法有相关的LongSummary- Statistics 和 DoubleSummaryStatistics 类型, 适用于收集的属性是原始类型 long 或 double的情况。

连接字符串

joining工厂方法返回的收集器会把对流中每一个对象应用toString方法得到的所有字符 串连接成一个字符串。这意味着你把菜单中所有菜肴的名称连接起来,如下所示:

1
String shortMenu = menu.stream().map(Dish::getName).collect(joining());

请注意,joining在内部使用了StringBuilder来把生成的字符串逐个追加起来。此外还 要注意,如果Dish类有一个toString方法来返回菜肴的名称,那你无需用提取每一道菜名称的 函数来对原流做映射就能够得到相同的结果:

1
String shortMenu = menu.stream().collect(joining());

二者均可产生以下字符串:

1
porkbeefchickenfrench friesriceseason fruitpizzaprawnssalmon

但该字符串的可读性并不好。幸好,joining工厂方法有一个重载版本可以接受元素之间的 分界符,这样你就可以得到一个逗号分隔的菜肴名称列表:

1
String shortMenu = menu.stream().map(Dish::getName).collect(joining(", "));

正如我们预期的那样,它会生成:

1
pork, beef, chicken, french fries, rice, season fruit, pizza, prawns, salmon

广义的归约和汇总

事实上,我们已经讨论的所有收集器,都是一个可以用reducing工厂方法定义的归约过程 的特殊情况而已。Collectors.reducing工厂方法是所有这些特殊情况的一般化。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
    public static <T, U>
    Collector<T, ?, U> reducing(U identity,
                                Function<? super T, ? extends U> mapper,
                                BinaryOperator<U> op) {
        return new CollectorImpl<>(
                boxSupplier(identity),
                (a, t) -> { a[0] = op.apply(a[0], mapper.apply(t)); },
                (a, b) -> { a[0] = op.apply(a[0], b[0]); return a; },
                a -> a[0], CH_NOID);
    }

可以说,先 前讨论的案例仅仅是为了方便程序员而已。(但是,请记得方便程序员和可读性是头等大事!)例 如,可以用reducing方法创建的收集器来计算你菜单的总热量,如下所示:

1
int totalCalories = menu.stream().collect(reducing( 0, Dish::getCalories, (i, j) -> i + j));

它需要三个参数。

  • 第一个参数是归约操作的起始值,也是流中没有元素时的返回值,所以很显然对于数值和而言0是一个合适的值。

  • 第二个参数就是你在6.2.2节中使用的函数,将菜肴转换成一个表示其所含热量的int。

  • 第三个参数是一个BinaryOperator,将两个项目累积成一个同类型的值。这里它就是对两个int求和。 同样,你可以使用下面这样单参数形式的reducing来找到热量最高的菜,如下所示:

    1
    2
    
    Optional<Dish> mostCalorieDish = menu.stream().collect(reducing( 
          (d1, d2) -> d1.getCalories() > d2.getCalories() ? d1 : d2));
    

你可以把单参数reducing工厂方法创建的收集器看作三参数方法的特殊情况,它把流中的 第一个项目作为起点,把恒等函数(即一个函数仅仅是返回其输入参数)作为一个转换函数。这 也意味着,要是把单参数reducing收集器传递给空流的collect方法,收集器就没有起点;正 如我们在6.2.1节中所解释的,它将因此而返回一个Optional<Dish>对象。

收集和归约

归约定义

1
2
3
<U> U reduce(U identity,
                 BiFunction<U, ? super T, U> accumulator,
                 BinaryOperator<U> combiner);

Stream接口的collect和reduce方法有何不同,因为两种方法通常会获得相同的结果。例如,你可以像下面这样使用reduce方法来实现toListCollector所做的工作:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
Stream<Integer> stream = Arrays.asList(1, 2, 3, 4, 5, 6).stream();
        List<Integer> numbers = stream.reduce(
                new ArrayList<Integer>(),
                (List<Integer> l, Integer e) -> {
                    l.add(e);
                    return l;
                },
                (List<Integer> l1, List<Integer> l2) -> {
                    l1.addAll(l2);
                    return l1;
                }
        );

这个解决方案有两个问题:一个语义问题和一个实际问题。

语义问题在于,reduce方法旨在把两个值结合起来生成一个新值,它是一个不可变的归约。与此相反,collect方法的设]计就是要改变容器,从而累积要输出的结果。这意味着,上面的代码片段是在滥用reduce方法,因为它在原地改变了作为累加器的List。

你在下一章中会更详细地看到,以错误的语义使用reduce方法还会造成一个实际问题:这个归约过程不能并行工作,因为由多个线程并发修改同一个数据结构可能会破坏List本身。在这种情况下,如果你想要线程安全,就需要每次分配一个新的List,而对象分配又会影响性能。这就是collect方法特别适合表达可变容器上的归约的原因,更关键的是它适合并行操作,本章后面会谈到这一点。

  1. 收集框架的灵活性:以不同的方法执行同样的操作

    你还可以进一步简化前面使用 reducing 收集器的求和例子——引用 Integer 类的 sum 方 法,而不用去写一个表达同一操作的Lambda表达式。这会得到以下程序:

    https://gitee.com/lienhui68/picStore/raw/master/null/20200810160939.png

    从逻辑上说,归约操作的工作原理如图6-3所示:利用累积函数,把一个初始化为起始值的 累加器,和把转换函数应用到流中每个元素上得到的结果不断迭代合并起来。

    https://gitee.com/lienhui68/picStore/raw/master/null/20200810160958.png

    在现实中,我们在6.2节开始时提到的counting收集器也是类似地利用三参数reducing工厂 方法实现的。它把流中的每个元素都转换成一个值为1的Long型对象,然后再把它们相加:

    1
    2
    3
    4
    
    public static <T> Collector<T, ?, Long>
        counting() {
            return reducing(0L, e -> 1L, Long::sum);
        }
    

    使用泛型?通配符

    在刚刚提到的代码片段中,你可能已经注意到了?通配符,它用作counting工厂方法返回的收集器签名中的第二个泛型类型。对这种记法你应该已经很熟悉了,特别是如果你经常使用Java的集合框架的话。在这里,它仅仅意味着收集器的累加器类型未知,换句话说,累加器本身可以是任何类型。我们在这里原封不动地写出了Collectors类中原始定义的方法签名,但在本章其余部分我们将避免使用任何通配符表示法,以使讨论尽可能简单。

    我们在第5章已经注意到,还有另一种方法不使用收集器也能执行相同操作——将菜肴流映 射为每一道菜的热量,然后用前一个版本中使用的方法引用来归约得到的流:

    1
    
    int totalCalories = menu.stream().map(Dish::getCalories).reduce(Integer::sum).get();
    

    最后,更简洁的方法是 把流映射到一个IntStream,然后调用sum方法,你也可以得到相同的结果:

    1
    
    int totalCalories = menu.stream().mapToInt(Dish::getCalories).sum();
    
  2. 根据情况选择最佳解决方案

    这再次说明了,函数式编程(特别是Java 8的Collections框架中加入的基于函数式风格原理设计的新API)通常提供了多种方法来执行同一个操作。这个例子还说明,收集器在某种程度 上比Stream接口上直接提供的方法用起来更复杂,但好处在于它们能提供更高水平的抽象和概括,也更容易重用和自定义。

    我们的建议是,尽可能为手头的问题探索不同的解决方案,但在通用的方案里面,始终选择最专门化的一个。无论是从可读性还是性能上看,这一般都是最好的决定。例如,要计菜单的总热量,我们更倾向于最后一个解决方案(使用IntStream),因为它最简明,也很可能最易读。 同时,它也是性能最好的一个,因为IntStream可以让我们避免自动拆箱操作,也就是从Integer 到int的隐式转换,它在这里毫无用处。

分组

一个常见的数据库操作是根据一个或多个属性对集合中的项目进行分组。就像前面讲到按货 币对交易进行分组的例子一样,如果用指令式风格来实现的话,这个操作可能会很麻烦、啰嗦而 且容易出错。但是,如果用Java 8所推崇的函数式风格来重写的话,就很容易转化为一个非常容 易看懂的语句。

https://gitee.com/lienhui68/picStore/raw/master/null/20200810161826.png

我们把这个Function叫作分类函数,因为它用来把流中的元素分成不 同的组。如图6-4所示,分组操作的结果是一个Map,把分组函数返回的值作为映射的键,把流中 所有具有这个分类值的项目的列表作为对应的映射值。

https://gitee.com/lienhui68/picStore/raw/master/null/20200810161944.png

但是,分类函数不一定像方法引用那样可用,因为你想用以分类的条件可能比简单的属性访 问器要复杂。例如,你可能想把热量不到400卡路里的菜划分为“低热量”(diet),热量400到700 卡路里的菜划为“普通”(normal),高于700卡路里的划为“高热量”(fat)。由于Dish类的作者 没有把这个操作写成一个方法,你无法使用方法引用,但你可以把这个逻辑写成Lambda表达式:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
public class MenuDemo {

    public enum CaloricLevel {DIET, NORMAL, FAT}

    public static void main(String[] args) {
        List<Dish> menu = Arrays.asList(
                new Dish("pork", false, 800, Dish.Type.MEAT),
                new Dish("beef", false, 700, Dish.Type.MEAT),
                new Dish("chicken", false, 400, Dish.Type.MEAT),
                new Dish("french fries", true, 530, Dish.Type.OTHER),
                new Dish("rice", true, 350, Dish.Type.OTHER),
                new Dish("season fruit", true, 120, Dish.Type.OTHER),
                new Dish("pizza", true, 550, Dish.Type.OTHER),
                new Dish("prawns", false, 300, Dish.Type.FISH),
                new Dish("salmon", false, 450, Dish.Type.FISH));

        /**
         * 你可能想把热量不到400卡路里的菜划分为“低热量”(diet),
         * 热量400到700 卡路里的菜划为“普通”(normal),
         * 高于700卡路里的划为“高热量”(fat)。
         */
        Map<CaloricLevel, List<Dish>> dishesByCaloricLevel = menu.stream()
                .collect(groupingBy(t -> {
                    if (t.getCalories() < 400) {
                        return CaloricLevel.DIET;
                    } else if (t.getCalories() >= 400 && t.getCalories() < 700) {
                        return CaloricLevel.NORMAL;
                    } else
                        return CaloricLevel.FAT;
                }));
        System.out.println();
    }
}

@Data
class Dish {

    private String name;
    private boolean vegetarian;
    private int calories;
    private Type type;

    public Dish(String name, boolean vegetarian, int calories, Type type) {
        this.name = name;
        this.vegetarian = vegetarian;
        this.calories = calories;
        this.type = type;
    }

    public enum Type {MEAT, FISH, OTHER}
}

多级分组

如何对菜单中的菜肴按照类型和热量同时进行分组

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
Map<Dish.Type, Map<CaloricLevel, List<Dish>>> dishesByTypeCaloricLevel = menu.stream()
                .collect(groupingBy(Dish::getType,
                        groupingBy(t -> {
                            if (t.getCalories() < 400) {
                                return CaloricLevel.DIET;
                            } else if (t.getCalories() >= 400 && t.getCalories() < 700) {
                                return CaloricLevel.NORMAL;
                            } else
                                return CaloricLevel.FAT;
                        })));

这里的外层Map的键就是第一级分类函数生成的值:“fish, meat, other”,而这个Map的值又是 一个Map,键是二级分类函数生成的值:“normal, diet, fat”。最后,第二级map的值是流中元素构 成的List,是分别应用第一级和第二级分类函数所得到的对应第一级和第二级键的值:“salmon、 pizza…” 这种多级分组操作可以扩展至任意层级,n级分组就会得到一个代表n级树形结构的n级 Map。

图6-5显示了为什么结构相当于n维表格,并强调了分组操作的分类目的。

https://gitee.com/lienhui68/picStore/raw/master/null/20200810165302.png

一般来说,把groupingBy看作“桶”比较容易明白。第一个groupingBy给每个键建立了 一个桶。然后再用下游的收集器去收集每个桶中的元素,以此得到n级分组。

按子组收集数据

在上一节中,我们看到可以把第二个 groupingBy收集器传递给外层收集器来实现多级分 组。但进一步说,传递给第一个groupingBy的第二个收集器可以是任何类型,而不一定是另一个 groupingBy 。 例如, 要数一数菜单中每类菜有多少个, 可以传递 counting 收集器作为 groupingBy收集器的第二个参数:

1
Map<Dish.Type, Long> typesCount = menu.stream().collect( groupingBy(Dish::getType, counting()));

还要注意,普通的单参数groupingBy(f)(其中f是分类函数)实际上是groupingBy(f, toList())的简便写法。

再举一个例子,你可以把前面用于查找菜单中热量最高的菜肴的收集器改一改,按照菜的类 型分类:

1
2
Map<Dish.Type, Optional<Dish>> mostCaloricByType = menu.stream()
                .collect(groupingBy(Dish::getType, maxBy(Comparator.comparing(Dish::getCalories))));

这个分组的结果显然是一个map,以Dish的类型作为键,以包装了该类型中热量最高的Dish 的Optional<Dish>作为值:

这个 Map 中的值是 Optional ,因为这是 maxBy 工厂方法生成的收集器的类型,但实际上, 如果菜单中没有某一类型的 Dish ,这个类型就不会对应一个 Optional. empty() 值, 而且根本不会出现在 Map 的键中。 groupingBy 收集器只有在应用分组条件后,第一次在 流中找到某个键对应的元素时才会把键加入分组 Map 中。这意味着 Optional 包装器在这 里不是很有用,因为它不会仅仅因为它是归约收集器的返回类型而表达一个最终可能不 存在却意外存在的值。

把收集器的结果转换为另一种类型

因为分组操作的Map结果中的每个值上包装的Optional没什么用,所以你可能想要把它们 去掉。要做到这一点,或者更一般地来说,把收集器返回的结果转换为另一种类型,你可以使用 Collectors.collectingAndThen工厂方法返回的收集器,如下所示。

1
2
3
Map<Dish.Type, Dish> mostCaloricByType = menu.stream()
                .collect(groupingBy(Dish::getType,
                        collectingAndThen(maxBy(Comparator.comparing(Dish::getCalories)), Optional::get)));

这个工厂方法接受两个参数——要转换的收集器以及转换函数,并返回另一个收集器。这个 收集器相当于旧收集器的一个包装,collect操作的最后一步就是将返回值用转换函数做一个映 射。在这里,被包起来的收集器就是用maxBy建立的那个,而转换函数Optional::get则把返 回的Optional中的值提取出来。前面已经说过,这个操作放在这里是安全的,因为reducing 收集器永远都不会返回Optional.empty()。

把好几个收集器嵌套起来很常见,它们之间到底发生了什么可能不那么明显。图6-6可以直 观地展示它们是怎么工作的。从最外层开始逐层向里,注意以下几点。

https://gitee.com/lienhui68/picStore/raw/master/null/20200810170323.png

  • 收集器用虚线表示,因此groupingBy是最外层,根据菜肴的类型把菜单流分组,得到三 个子流。

  • groupingBy收集器包裹着collectingAndThen收集器,因此分组操作得到的每个子流 都用这第二个收集器做进一步归约。

  • collectingAndThen收集器又包裹着第三个收集器maxBy。

  • 随后由归约收集器进行子流的归约操作,然后包含它的collectingAndThen收集器会对 其结果应用Optional:get转换函数。

  • 对三个子流分别执行这一过程并转换而得到的三个值, 也就是各个类型中热量最高的 Dish,将成为groupingBy收集器返回的Map中与各个分类键(Dish的类型)相关联的值。

与groupingBy联合使用的其他收集器的例子

一般来说,通过groupingBy工厂方法的第二个参数传递的收集器将会对分到同一组中的所 有流元素执行进一步归约操作。例如,你还重用求出所有菜肴热量总和的收集器,不过这次是对 每一组Dish求和:

1
2
Map<Dish.Type, Integer> totalCaloriesByType =
menu.stream().collect(groupingBy(Dish::getType, summingInt(Dish::getCalories)));

然而常常和groupingBy联合使用的另一个收集器是mapping方法生成的。

这个方法接受两 个参数:一个函数对流中的元素做变换,另一个则将变换的结果对象收集起来。

其目的是在累加 之前对每个输入元素应用一个映射函数,这样就可以让接受特定类型元素的收集器适应不同类型的对象。

我们来看一个使用这个收集器的实际例子。比方说你想要知道,对于每种类型的Dish, 菜单中都有哪些CaloricLevel。我们可以把groupingBy和mapping收集器结合起来,如下所示:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
Map<Dish.Type, Set<CaloricLevel>> caloricLevelsByType = menu.stream()
                .collect(groupingBy(Dish::getType,
                        mapping(t -> {
                            if (t.getCalories() < 400) {
                                return CaloricLevel.DIET;
                            } else if (t.getCalories() >= 400 && t.getCalories() < 700) {
                                return CaloricLevel.NORMAL;
                            } else
                                return CaloricLevel.FAT;
                        }, toSet())
                ));

请注意在上 一个示例中,对于返回的Set是什么类型并没有任何保证。但通过使用toCollection,你就可 以有更多的控制。例如,你可以给它传递一个构造函数引用来要求HashSet:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
Map<Dish.Type, Set<CaloricLevel>> caloricLevelsByType = menu.stream()
                .collect(groupingBy(Dish::getType,
                        mapping(t -> {
                            if (t.getCalories() < 400) {
                                return CaloricLevel.DIET;
                            } else if (t.getCalories() >= 400 && t.getCalories() < 700) {
                                return CaloricLevel.NORMAL;
                            } else
                                return CaloricLevel.FAT;
                        }, toCollection(HashSet::new))
                ));

分区

https://gitee.com/lienhui68/picStore/raw/master/null/20200810171700.png

分区是分组的特殊情况:由一个谓词(返回一个布尔值的函数)作为分类函数,它称分区函 数。分区函数返回一个布尔值,这意味着得到的分组Map的键类型是Boolean,于是它最多可以 分为两组——true是一组,false是一组。

请注意,用同样的分区谓词,对菜单List创建的流作筛选,然后把结果收集到另外一个List 中也可以获得相同的结果

分区的优势

分区的好处在于保留了分区函数返回true或false的两套流元素列表。在上一个例子中,要 得到非素食Dish的List,你可以使用两个筛选操作来访问partitionedMenu这个Map中false 键的值:一个利用谓词,一个利用该谓词的非。而且就像你在分组中看到的,partitioningBy 工厂方法有一个重载版本,可以像下面这样传递第二个收集器:

https://gitee.com/lienhui68/picStore/raw/master/null/20200810172013.png

将数字按质数和非质数分开

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
package com.eh.eden.java8.demo;

import java.util.List;
import java.util.Map;
import java.util.stream.IntStream;

import static java.util.stream.Collectors.partitioningBy;

/**
 * todo
 *
 * @author David Li
 * @create 2020/08/10
 */
public class PrimeDemo {
    /**
     * 将前n个数分成质数和非质数
     *
     * @param n
     * @return
     */
    public Map<Boolean, List<Integer>> partitionPrimes(int n) {
        return IntStream.rangeClosed(2, n).boxed()
                .collect(partitioningBy(
                        candidate -> isPrime(candidate)
                ));
    }

    /**
     * 判断一个数是否是是否是质数
     *
     * @param candidate
     * @return
     */
    public boolean isPrime(int candidate) {
        int candidateRoot = (int) Math.sqrt(candidate);
        return IntStream.rangeClosed(2, candidateRoot).noneMatch(i -> candidate % i == 0);
    }

    public static void main(String[] args) {
        PrimeDemo primeDemo = new PrimeDemo();
        Map<Boolean, List<Integer>> map = primeDemo.partitionPrimes(20);
        map.forEach((k, v) -> System.out.print(k + ":" + v + "\n"));
    }
}

运行结果

1
2
false:[4, 6, 8, 9, 10, 12, 14, 15, 16, 18, 20]
true:[2, 3, 5, 7, 11, 13, 17, 19]

Collectors类的静态工厂方法

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public static <T> Collector<T, ?, List<T>> toList() { // 把流中所有项目收集到一个 List
public static <T> Collector<T, ?, Set<T>> toSet() { // 把流中所有项目收集到一个 Set,删除重复项
public static <T, C extends Collection<T>> Collector<T, ?, C> toCollection(Supplier<C> collectionFactory) {
  // 把流中所有项目收集到给定的供应源创建的集合
public static <T> Collector<T, ?, Long> counting() { // 计算流中元素的个数
public static <T> Collector<T, ?, Integer> summingInt(ToIntFunction<? super T> mapper) {
  // 对流中项目的一个整数属性求和
public static <T> Collector<T, ?, Double> averagingInt(ToIntFunction<? super T> mapper) {
  // 计算流中项目 Integer 属性的平均值
public static <T> Collector<T, ?, IntSummaryStatistics> summarizingInt(ToIntFunction<? super T> mapper) {
  // 收集关于流中项目 Integer 属性的统计值,例如最大、最小、 总和与平均值
public static Collector<CharSequence, ?, String> joining() { // 连接对流中每个项目调用 toString 方法所生成的字符串
public static <T> Collector<T, ?, Optional<T>> maxBy(Comparator<? super T> comparator) {
  // 按照给定比较器选出的最大元素
public static <T> Collector<T, ?, Optional<T>> minBy(Comparator<? super T> comparator) {
  // 按照给定比较器选出的最小元素
public static <T, U> Collector<T, ?, U> reducing(U identity, Function<? super T, ? extends U> mapper, BinaryOperator<U> op) { // 从一个作为累加器的初始值开始,利用 BinaryOperator 与流 中的元素逐个结合,从而将流归约为单个值
public static<T,A,R,RR> Collector<T,A,RR> collectingAndThen(Collector<T,A,R> downstream, Function<R,RR> finisher) { // 包裹另一个收集器,对其结果应用转换函数
public static <T, K, A, D> Collector<T, ?, Map<K, D>> groupingBy(Function<? super T, ? extends K> classifier,
                                          Collector<? super T, A, D> downstream) {
  // 根据项目的一个属性的值对流中的项目作分组,并将属性值作 为结果 Map 的键
public static <T, D, A> Collector<T, ?, Map<Boolean, D>> partitioningBy(Predicate<? super T> predicate,
                                                    Collector<? super T, A, D> downstream) {
  // 根据对流中每个项目应用谓词的结果来对项目进行分区

收集器接口

Collector接口包含了一系列方法,为实现具体的归约操作(即收集器)提供了范本。我们已经看过了Collector接口中实现的许多收集器,例如toList或groupingBy。这也意味着, 你可以为Collector接口提供自己的实现,从而自由地创建自定义归约操作。

要开始使用Collector接口,我们先看看本章开始时讲到的一个收集器——toList工厂方 法,它会把流中的所有元素收集成一个List。我们当时说在日常工作中经常会用到这个收集器, 而且它也是写起来比较直观的一个,至少理论上如此。通过仔细研究这个收集器是怎么实现的, 我们可以很好地了解Collector接口是怎么定义的,以及它的方法所返回的函数在内部是如何为 collect方法所用的。

首先让我们在下面的列表中看看Collector接口的定义,它列出了接口的签名以及声明的五 个方法。

1
2
3
4
5
6
7
public interface Collector<T, A, R> {
	Supplier<A> supplier(); 
	BiConsumer<A, T> accumulator(); 
	Function<A, R> finisher(); 
	BinaryOperator<A> combiner(); 
	Set<Characteristics> characteristics(); 
}
  • T是流中要收集的项目的泛型。
  • A是累加器的类型,累加器是在收集过程中用于累积部分结果的对象。
  • R是收集操作得到的对象(通常但并不一定是集合)的类型。

例如,你可以实现一个ToListCollector<T>类,将Stream<T>中的所有元素收集到一个 List<T>里,它的签名如下:

1
public class ToListCollector<T> implements Collector<T, List<T>, List<T>>

我们很快就会澄清,这里用于累积的对象也将是收集过程的最终结果。

理解Collector接口声明的方法

现在我们可以一个个来分析Collector接口声明的五个方法了。通过分析,你会注意到,前 四个方法都会返回一个会被collect方法调用的函数,而第五个方法characteristics则提供 了一系列特征,也就是一个提示列表,告诉collect方法在执行归约操作的时候可以应用哪些优 化(比如并行化)。

  1. 建立新的结果容器:supplier方法

    supplier方法必须返回一个结果为空的Supplier,也就是一个无参数函数,在调用时它会 创建一个空的累加器实例,供数据收集过程使用。很明显,对于将累加器本身作为结果返回的收 集器,比如我们的ToListCollector,在对空流执行操作的时候,这个空的累加器也代表了收集过程的结果。在我们的ToListCollector中,supplier返回一个空的List,如下所示:

    1
    
    public Supplier<List<T>> supplier() { return ArrayList::new; }
    
  2. 将元素添加到结果容器:accumulator方法

    accumulator方法会返回执行归约操作的函数。当遍历到流中第n个元素时,这个函数执行 时会有两个参数:保存归约结果的累加器(已收集了流中的前 n1 个项目),还有第n个元素本身。 该函数将返回void,因为累加器是原位更新,即函数的执行改变了它的内部状态以体现遍历的 元素的效果。对于ToListCollector,这个函数仅仅会把当前项目添加至已经遍历过的项目的列表:

    1
    2
    3
    
    public BiConsumer<List<T>, T> accumulator() { return (list, item) -> list.add(item); }
    // or
    public BiConsumer<List<T>, T> accumulator() { return List::add; }
    
  3. 对结果容器应用最终转换:finisher方法

    在遍历完流后,finisher方法必须返回在累积过程的最后要调用的一个函数,以便将累加 器对象转换为整个集合操作的最终结果。通常,就像ToListCollector的情况一样,累加器对 象恰好符合预期的最终结果,因此无需进行转换。所以finisher方法只需返回identity函数:

    1
    
    public Function<List<T>, List<T>> finisher() { return Function.identity(); }
    

    这三个方法已经足以对流进行顺序归约,至少从逻辑上看可以按图6-7进行。实践中的实现 细节可能还要复杂一点,一方面是因为流的延迟性质,可能在collect操作之前还需要完成其他 中间操作的流水线,另一方面则是理论上可能要进行并行归约。

    https://gitee.com/lienhui68/picStore/raw/master/null/20200810181312.png

  4. 合并两个结果容器:combiner方法

    四个方法中的最后一个——combiner方法会返回一个供归约操作使用的函数,它定义了对 流的各个子部分进行并行处理时,各个子部分归约所得的累加器要如何合并。对于toList而言, 这个方法的实现非常简单,只要把从流的第二个部分收集到的项目列表加到遍历第一部分时得到 的列表后面就行了:

    1
    2
    3
    
    public BinaryOperator<List<T>> combiner() { return 
         (list1, list2) ->  { list1.addAll(list2); return list1; } 
    }
    

    有了这第四个方法,就可以对流进行并行归约了。它会用到Java 7中引入的分支/合并框架和 Spliterator抽象,我们会在下一章中讲到。这个过程类似于图6-8所示,这里会详细介绍。

    https://gitee.com/lienhui68/picStore/raw/master/null/20200810181437.png

    • 原始流会以递归方式拆分为子流,直到定义流是否需要进一步拆分的一个条件为非(如果分布式工作单位太小,并行计算往往比顺序计算要慢,而且要是生成的并行任务比处理器内核数多很多的话就毫无意义了)。
    • 现在,所有的子流都可以并行处理,即对每个子流应用图6-7所示的顺序归约算法。
    • 最后,使用收集器combiner方法返回的函数,将所有的部分结果两两合并。这时会把原始流每次拆分时得到的子流对应的结果合并起来。
  5. characteristics方法

    characteristics会返回一个不可变的Characteristics集合,它定义 了收集器的行为——尤其是关于流是否可以并行归约, 以及可以使用哪些优化的提示。 Characteristics是一个包含三个项目的枚举。

    • UNORDERED——归约结果不受流中项目的遍历和累积顺序的影响。

    • CONCURRENT——accumulator函数可以从多个线程同时调用,且该收集器可以并行归 约流。如果收集器没有标为UNORDERED,那它仅在用于无序数据源时才可以并行归约。

    • IDENTITY_FINISH——这表明完成器方法返回的函数是一个恒等函数,可以跳过。这种 情况下,累加器对象将会直接用作归约过程的最终结果。这也意味着,将累加器A不加检 查地转换为结果R是安全的。

    我们迄今开发的 ToListCollector 是 IDENTITY_FINISH 的, 因为用来累积流中元素的List已经是我们要的最终结果,用不着进一步转换了,但它并不是UNORDERED,因为用在有序 流上的时候,我们还是希望顺序能够保留在得到的List中。最后,它是CONCURRENT的,但我们 刚才说过了,仅仅在背后的数据源无序时才会并行处理。

全部融合到一起

前一小节中谈到的五个方法足够我们开发自己的ToListCollector了。你可以把它们都融 合起来,如下面的代码清单所示。

https://gitee.com/lienhui68/picStore/raw/master/null/20200810181844.png

请注意,这个实现与Collectors.toList方法并不完全相同,但区别仅仅是一些小的优化。 这些优化的一个主要方面是Java API所提供的收集器在需要返回空列表时使用了Collections.emptyList()这个单例(singleton)。这意味着它可安全地替代原生Java,来收集菜单流中的所 有Dish的列表:

1
List<Dish> dishes = menuStream.collect(new ToListCollector<Dish>());

这个实现和标准的List<Dish> dishes = menuStream.collect(toList());构造之间的其他差异在于toList是一个工厂,而ToListCollector必须用new来实例化。

进行自定义收集而不去实现Collector

对于IDENTITY_FINISH的收集操作,还有一种方法可以得到同样的结果而无需从头实现新 的Collectors接口。Stream有一个重载的collect方法可以接受另外三个函数——supplier、 accumulator和combiner,其语义和Collector接口的相应方法返回的函数完全相同。所以比 如说,我们可以像下面这样把菜肴流中的项目收集到一个List中:

https://gitee.com/lienhui68/picStore/raw/master/null/20200810182110.png

我们认为,这第二种形式虽然比前一个写法更为紧凑和简洁,却不那么易读。此外,以恰当 的类来实现自己的自定义收集器有助于重用并可避免代码重复。另外值得注意的是,这第二个 collect 方法不能传递任何 Characteristics , 所以它永远都是一个 IDENTITY_FINISH 和 CONCURRENT但并非UNORDERED的收集器。

开发你自己的收集器以获得更好的性能

实现

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
package com.eh.eden.java8.demo;

import java.util.*;
import java.util.function.*;
import java.util.stream.Collector;
import java.util.stream.IntStream;

/**
 * 自定义收集器来对前n个数进行分组,分组条件是判断是否是质数
 *
 * @author David Li
 * @create 2020/08/10
 */
public class PrimeNumbersCollector implements Collector<Integer, Map<Boolean, List<Integer>>, Map<Boolean, List<Integer>>> {

    /**
     * supplier方法会返回一个在调用 时创建累加器的函数
     *
     * @return
     */
    @Override
    public Supplier<Map<Boolean, List<Integer>>> supplier() {
        return () -> new HashMap<Boolean, List<Integer>>() {
            {
                put(true, new ArrayList<>());
                put(false, new ArrayList<>());
            }
        };
    }

    @Override
    public BiConsumer<Map<Boolean, List<Integer>>, Integer> accumulator() {
        /**
         * 收集器中最重要的方法是accumulator,因 为它定义了如何收集流中元素的逻辑。
         * 这里它也是实现前面所讲的优化的关键。现在在任何一次迭代中,都可以访问收集过程的部分结果,也就是包含迄今找到的质数的累加器
         * acc.get(true)返回上一次迭代中收集到的质数
         * isPrime(acc.get(true), candidate) 判断当前数是否是质数
         * acc.get(isPrime(acc.get(true), candidate)).add(candidate); 添加对对应的List中
         */
        return (acc, candidate) -> acc.get(isPrime(acc.get(true), candidate)).add(candidate);
    }

    /**
     * 在并行收集时把两个部分累加器合并起来,这里,它只需要合并两个Map,
     * 即 将第二个Map中质数和非质数列表中的所有数字合并到第一个Map的对应列表中就行了:
     * 请注意,实际上这个收集器是不能并行使用的,因为该算法本身是顺序的。这意味着永远都 不会调用 combiner 方法,
     * 你可以把它的实现留空(更好的做法是抛出一个 Unsupported- OperationException异常)。
     * 为了让这个例子完整,还是决定实现它。
     *
     * @return
     */
    @Override
    public BinaryOperator<Map<Boolean, List<Integer>>> combiner() {
        return (m1, m2) -> {
            m1.get(true).addAll(m2.get(true));
            m1.get(false).addAll(m2.get(false));
            return m1;
        };
    }

    @Override
    public Function<Map<Boolean, List<Integer>>, Map<Boolean, List<Integer>>> finisher() {
        // 恒等函数
        return Function.identity();
    }

  	/**
  	* 就characteristics方法而言,我们已经说过,它既不是CONCURRENT也不是UNORDERED, 但却是IDENTITY_FINISH的
  	*/
    @Override
    public Set<Characteristics> characteristics() {
        return EnumSet.of(Characteristics.IDENTITY_FINISH);
    }

    /**
     * 使用质数作为除数 来判断一个数是否是质数 (非质数本身就不是质数无需判断)
     *
     * @param primes    在添加candidate之前的那个集合
     * @param candidate
     * @return
     */
    public static boolean isPrime(List<Integer> primes, int candidate) {
        int candidateRoot = (int) Math.sqrt((double) candidate);
        return takeWhile(primes, i -> i <= candidateRoot).stream()
                .noneMatch(i -> candidate % i == 0);
    }

    /**
     * 仅仅用小于被测数平方根的质数来测试
     *
     * @param list
     * @param p
     * @param <A>
     * @return
     */
    public static <A> List<A> takeWhile(List<A> list, Predicate<A> p) {
        int i = 0;
        for (A a : list) {
            if (!p.test(a)) {
                return list.subList(0, i);
            }
            i++;
        }
        return list;
    }

    /**
     * 测试质数收集器
     *
     * @param args
     */
    public static void main(String[] args) {
        int n = 20;
        Map<Boolean, List<Integer>> res = IntStream.rangeClosed(2, n)
                .boxed()
                .collect(new PrimeNumbersCollector());
        res.forEach((k, v) -> System.out.println(k + ": " + v));
    }

}

运行结果

1
2
false: [4, 6, 8, 9, 10, 12, 14, 15, 16, 18, 20]
true: [2, 3, 5, 7, 11, 13, 17, 19]

改造

因为finisher是恒等函数,所以我们把实现 PrimeNumbersCollector核心逻辑的三个函数传给collect方法的重载版本来获得同样的结果

collect:

1
2
3
<R> R collect(Supplier<R> supplier,
                  BiConsumer<R, ? super T> accumulator,
                  BiConsumer<R, R> combiner);

注意这里的combiner是BiConsumer<R, R>变量,Collector接口里是BinaryOperator<A> combiner()

因为这里是恒等函数,原位操作即可。

改造后的代码:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
package com.eh.eden.java8.demo;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Predicate;
import java.util.stream.IntStream;

/**
 * todo
 *
 * @author David Li
 * @create 2020/08/10
 */
public class PrimeDemo {
    /**
     * 使用质数作为除数 来判断一个数是否是质数 (非质数本身就不是质数无需判断)
     *
     * @param primes    在添加candidate之前的那个集合
     * @param candidate
     * @return
     */
    public static boolean isPrime(List<Integer> primes, int candidate) {
        int candidateRoot = (int) Math.sqrt((double) candidate);
        return takeWhile(primes, i -> i <= candidateRoot).stream()
                .noneMatch(i -> candidate % i == 0);
    }

    /**
     * 仅仅用小于被测数平方根的质数来测试
     *
     * @param list
     * @param p
     * @param <A>
     * @return
     */
    public static <A> List<A> takeWhile(List<A> list, Predicate<A> p) {
        int i = 0;
        for (A a : list) {
            if (!p.test(a)) {
                return list.subList(0, i);
            }
            i++;
        }
        return list;
    }

    /**
     * 测试质数收集器
     *
     * @param args
     */
    public static void main(String[] args) {
        int n = 20;
        Map<Boolean, List<Integer>> res = IntStream.rangeClosed(2, n)
                .boxed()
                .collect(
                        () -> new HashMap<Boolean, List<Integer>>() {
                            {
                                put(true, new ArrayList<>());
                                put(false, new ArrayList<>());
                            }
                        },
                        (acc, candidate) -> acc.get(isPrime(acc.get(true), candidate)).add(candidate),
                        (m1, m2) -> {
                            m1.get(true).addAll(m2.get(true));
                            m1.get(false).addAll(m2.get(false));
                        }
                );
        res.forEach((k, v) -> System.out.println(k + ": " + v));
    }
}

// 运行结果
false: [4, 6, 8, 9, 10, 12, 14, 15, 16, 18, 20]
true: [2, 3, 5, 7, 11, 13, 17, 19]

这样就可以避免为实现Collector接口创建一个全新的类;得到的代码更紧凑,虽然 可能可读性会差一点,可重用性会差一点。

小结

以下是你应从本章中学到的关键概念。

  • collect是一个终端操作,它接受的参数是将流中元素累积到汇总结果的各种方式(称为收集器)。
  • 预定义收集器包括将流元素归约和汇总到一个值,例如计算最小值、最大值或平均值。
  • 预定义收集器可以用groupingBy对流中元素进行分组,或用partitioningBy进行分区。
  • 收集器可以高效地复合起来,进行多级分组、分区和归约。
  • 你可以实现Collector接口中定义的方法来开发你自己的收集器。