目录

附录C_如何以并发方式在同一个流上执行多种操作

Java 8中,流有一个非常大的(也可能是最大的)局限性,使用时,对它操作一次仅能得到 一个处理结果。实际操作中,如果你试图多次遍历同一个流,结果只有一个,那就是遭遇下面这 样的异常:

1
java.lang.IllegalStateException: stream has already been operated upon or closed

虽然流的设计就是如此,但我们在处理流时经常希望能同时获取多个结果。譬如,你可能会 用一个流来解析日志文件,就像我们在5.7.3节中所做的那样,而不是在某个单一步骤中收集多个 数据。或者,你想要维持菜单的数据模型,就像我们第4章到第6章用于解释流特性的那个例子, 你希望在遍历由“佳肴”构成的流时收集多种信息。

换句话说,你希望一次性向流中传递多个Lambda表达式。为了达到这一目标,你需要一个 fork类型的方法,对每个复制的流应用不同的函数。更理想的情况是你能以并发的方式执行这 些操作,用不同的线程执行各自的运算得到对应的结果。

不幸的是,这些特性目前还没有在Java 8的流实现中提供。不过,本附录会为你展示一种方 法,利用一个通用API ,即Spliterator,尤其是它的延迟绑定能力,结合BlockingQueues 和Futures来实现这一大有裨益的特性。

本附录接下来介绍的实现基于Paul Sandoz向lambda-dev邮件列表http://mail.openjdk.java.net/pipermail/lambda-dev/ 2013-November/011516.html提供的解决方案。

复制流

要达到在一个流上并发地执行多个操作的效果, 你需要做的第一件事就是创建一个 StreamForker,这个StreamForker会对原始的流进行封装,在此基础之上你可以继续定义你 希望执行的各种操作。我们看看下面这段代码。

代码清单C-1 定义一个StreamForker,在一个流上执行多个操作

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
// 定义一个StreamForker,在一个流上执行多个操作
class StreamForker<T> {
    private final Stream<T> stream;
    private final Map<Object, Function<Stream<T>, ?>> forks = new HashMap<>();


    public StreamForker(Stream<T> stream) {
        this.stream = stream;
    }

    public StreamForker<T> fork(Object key, Function<Stream<T>, ?> f) {
        forks.put(key, f); // 使用一个键对流上的函数进行索引
        return this; // 返回this从而保证多次流畅地调用fork方法
    }
    public Results getResults() {
    	// to be implemented
    }
}

这里的fork方法接受两个参数。

  • Function参数,它对流进行处理,将流转变为代表这些操作结果的任何类型。
  • key参数,通过它你可以取得操作的结果,并将这些键/函数对累积到一个内部的Map中。

fork方法返回StreamForker自身,因此,你可以通过复制多个操作构造一个流水线。图 C-1展示了StreamForker背后的主要思想。

https://gitee.com/lienhui68/picStore/raw/master/null/20200819195302.png

这里用户定义了希望在流上执行的三种操作,这三种操作通过三个键索引标识。StreamForker会遍历原始的流,并创建它的三个副本。这时就可以并行地在复制的流上执行这三种操 作,这些函数运行的结果由对应的键进行索引,最终会填入到结果的Map。

流的副本会使用自定义的流,使用阻塞队列进行遍历

所有由fork方法添加的操作的执行都是通过getResults方法的调用触发的,该方法返回一 个Results接口的实现,具体的定义如下:

1
2
3
interface Results {
    <R> R get(Object key);
}

这一接口只有一个方法,你可以将fork方法中使用的key对象作为参数传入,方法会返回该 键对应的操作结果。

使用ForkingStreamConsumer实现Results接口

你可以用下面的方式实现getResults方法:

1
2
3
4
5
6
7
8
9
public Results getResults() {
    ForkingStreamConsumer<T> consumer = build();
    try {
        stream.sequential().forEach(consumer);
    } finally {
        consumer.finish();
    }
    return consumer;
}

ForkingStreamConsumer同时实现了前面定义的Results接口和Consumer接口。随着我 们进一步剖析它的实现细节,你会看到它主要的任务就是处理流中的元素,将它们分发到多个 BlockingQueues中处理,BlockingQueues的数量和通过fork方法提交的操作数是一致的。 注意,我们很明确地知道流是顺序处理的,不过,如果你在一个并发流上执行forEach方法,它 的元素可能就不是顺序地被插入到队列中了。finish方法会在队列的末尾插入特殊元素表明该 队列已经没有更多需要处理的元素了。build方法主要用于创建ForkingStreamConsumer,详 细内容请参考下面的代码清单。

代码清单C-2 使用build方法创建ForkingStreamConsumer

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
// 使用build方法创建ForkingStreamConsumer
private ForkingStreamConsumer<T> build() {
    // 创建由队列组成的列表,每一个队列对应一个操作
    List<BlockingQueue<T>> queues = Lists.newArrayList();

    // 建立用于标识操作的键与包含操作结果的 Future 之间的映射关系
    Map<Object, Future<?>> actions = forks.entrySet().stream().reduce(
            new HashMap<>(),
            (map, e) -> {
                map.put(e.getKey(), getOperationResult(queues, e.getValue()));
                return map;
            },
            (m1, m2) -> {
                m1.putAll(m2);
                return m1;
            }
    );
    return new ForkingStreamConsumer<>(queues, actions);
}

代码清单C-2中,你首先创建了我们前面提到的由BlockingQueues组成的列表。紧接着, 你创建了一个Map,Map的键就是你在流中用于标识不同操作的键,值包含在Future中,Future 中包含了这些操作对应的处理结果。BlockingQueues的列表和Future组成的Map会被传递给 ForkingStreamConsumer的构造函数。每个Future都是通过getOperationResult方法创建 的,代码清单如下。

代码清单C-3 使用getOperationResult方法创建Future

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
// 创建Future
private Future<?> getOperationResult(List<BlockingQueue<T>> queues, Function<Stream<T>, ?> function) {
    // 为某一任务创建一个队列,并添加到队列的列表中
    BlockingQueue<T> queue = new LinkedBlockingQueue<>();
    queues.add(queue);

    // 创建一个Spliterator,遍历队列中的元素
    Spliterator<T> spliterator = new BlockingQueueSpliterator(queue);
    // 创建一个流,将 Spliterator作为数据源
    Stream<T> source = StreamSupport.stream(spliterator, false);
    // 创建一个Future对象,以异步方式计算在流上执行特定函数的结果
    return CompletableFuture.supplyAsync(() -> function.apply(source));
}

getOperationResult方法会创建一个新的BlockingQueue,并将其添加到队列的列表。 这个队列会被传递给一个新的 BlockingQueueSpliterator 对象, 后者是一个延迟绑定的 Spliterator,它会遍历读取队列中的每个元素;我们很快会看到这是如何做到的。

接下来你创建了一个顺序流对该Spliterator进行遍历,最终你会创建一个Future在流上 执行某个你希望的操作并收集其结果。这里的Future使用CompletableFuture类的一个静态工 厂方法创建,CompletableFuture实现了Future接口。这是Java 8新引入的一个类,我们在第 11章对它进行过详细的介绍。

开发ForkingStreamConsumer和BlockingQueueSpliterator

还有两个非常重要的部分你需要实现,分别是前面提到过的ForkingStreamConsumer类和 BlockingQueueSpliterator类。你可以用下面的方式实现前者。

代码清单C-4 实现ForkingStreamConsumer类,为其添加处理多个队列的流元素

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
// 添加处理多个队列的流元素,并根据key获取Future中的结果
static class ForkingStreamConsumer<T> implements Consumer<T>, StreamForker.Results {

    static final Object END_OF_STREAM = new Object(); // 流遍历结束标志

    private final List<BlockingQueue<T>> queues;
    private final Map<Object, Future<?>> actions;

    public ForkingStreamConsumer(List<BlockingQueue<T>> queues, Map<Object, Future<?>> actions) {
        this.queues = queues;
        this.actions = actions;
    }

    @Override
    public void accept(T t) {
        // 将流中遍历的元素添加到所有的队列中
        queues.forEach(q -> q.add(t));
    }

    // 将最后一个元素添加到队列中,表明该流已经结束
    void finish() {
        accept((T) END_OF_STREAM);
    }

    // 等待Future完成相关的计算, 返回由特定键标识的处理结果
    @Override
    public <R> R get(Object key) {
        try {
            return ((Future<R>) actions.get(key)).get();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
}

这个类同时实现了 Consumer 和 Results 接口, 并持有两个引用, 一个指向由 Blocking- Queues组成的列表,另一个是执行了由Future构成的Map结构,它们表示的是即将在流上执行 的各种操作。

Consumer接口要求实现accept方法。这里,每当ForkingStreamConsumer接受流中的一 个元素,它就会将该元素添加到所有的BlockingQueues中。另外,当原始流中的所有元素都添 加到所有队列后,finish方法会将最后一个元素添加所有队列。

Results接口需要实现get方法。一旦处理结束,get方法会获得Map中由键索引的Future, 解析处理的结果并返回。

最后,流上要进行的每个操作都会对应一个BlockingQueueSpliterator。每个BlockingQueueSpliterator 都持有一个指向 BlockingQueues 的引用, 这个 BlockingQueues 是由 ForkingStreamConsumer 生 成 的 , 你 可 以 用 下 面 这 段 代 码 清 单 类 似 的 方 法 实 现 一 个 BlockingQueueSpliterator。

代码清单C-5 一个遍历BlockingQueue并读取其中元素的Spliterator

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
class BlockingQueueSpliterator<T> implements Spliterator<T> {
    private final BlockingQueue<T> q;

    public BlockingQueueSpliterator(BlockingQueue<T> q) {
        this.q = q;
    }

    @Override
    public boolean tryAdvance(Consumer<? super T> action) {
        T t;
        while (true) { // 中断后继续take数据
            try {
                t = q.take();
                break;
            } catch (InterruptedException e) {
                // 中断异常
            }
        }
        if (t != ForkingStreamConsumer.END_OF_STREAM) {
            action.accept(t);
            return true;
        }
        return false;
    }

    @Override
    public Spliterator<T> trySplit() {
        return null;
    }

    @Override
    public long estimateSize() {
        return 0;
    }

    @Override
    public int characteristics() {
        return 0;
    }
}

这段代码实现了一个Spliterator,不过它并未定义如何切分流的策略,仅仅利用了流的 延迟绑定能力。由于这个原因,它也没有实现trySplit方法。

由于无法预测能从队列中取得多少个元素,所以estimatedSize方法也无法返回任何有意 义的值。更进一步,由于你没有试图进行任何切分,所以这时的估算也没什么用处。

这一实现并没有体现表7-2中列出的Spliterator的任何特性,因此characteristic方法 返回0。

这段代码中提供了实现的唯一方法是tryAdvance,它从BlockingQueue中取得原始流中的 元素,而这些元素最初由ForkingSteamConsumer添加。依据getOperationResult方法创建 Spliterator同样的方式,这些元素会被作为进一步处理流的源头传递给Consumer对象(在流 上要执行的函数会作为参数传递给某个fork方法调用)。tryAdvance方法返回true通知调用方 还有其他的元素需要处理,直到它发现由ForkingSteamConsumer添加的特殊对象,表明队列 中已经没有更多需要处理的元素了。图C-2展示了StreamForker及其构建模块的概述。

https://gitee.com/lienhui68/picStore/raw/master/null/20200819202203.png

这幅图中,左上角的StreamForker中包含一个Map结构,以方法的形式定义了流上要执行 的操作,这些方法分别由对应的键索引。右边的ForkingStreamConsumer为每一种操作的对象 维护了一个队列,原始流中的所有元素会被分发到这些队列中。

图的下半部分,每一个队列都有一个BlockingQueueSpliterator从队列中提取元素作为 各个流处理的源头。最后,由原始流复制创建的每个流,都会被作为参数传递给某个处理函数, 执行对应的操作。至此,你已经实现了StreamForker所有组件,可以开始工作了。

将StreamForker运用于实战

我们将StreamForker应用到第4章中定义的menu数据模型上,希望对它进行一些处理。通 过复制原始的菜肴(dish)流,我们想以并发的方式执行四种不同的操作,代码清单如下所示。 这尤其适用于以下情况:你想要生成一份由逗号分隔的菜肴名列表,计算菜单的总热量,找出热 量最高的菜肴,并按照菜的类型对这些菜进行分类。

代码清单C-6 将StreamForker运用于实战

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public static void main(String[] args) {
    // 测试
    Stream<Dish> menuStream = menu.stream();
    StreamForker.Results results = new StreamForker<>(menuStream)
            // 生成一份由逗号分隔的菜肴名列表
            .fork("shortMenu", s -> s.map(Dish::getName).collect(joining(", ")))
            // 计算菜单的总热量
            .fork("totalCalories", s -> s.mapToInt(Dish::getCalories).sum())
            // 找出热量最高的菜肴
            .fork("mostCaloricDish", s -> s.collect(reducing(
                    (d1, d2) -> d1.getCalories() > d2.getCalories() ? d1 : d2))
                    .get())
            // 按照菜的类型对这些菜进行分类
            .fork("dishesByType", s -> s.collect(groupingBy(Dish::getType))).getResults();

    String shortMenu = results.get("shortMenu");
    System.out.println(shortMenu);
    int totalCalories = results.get("totalCalories");
    Dish mostCaloricDish = results.get("mostCaloricDish");
    Map<Dish.Type, List<Dish>> dishesByType = results.get("dishesByType");

    System.out.println("Short menu: " + shortMenu);
    System.out.println("Total calories: " + totalCalories);
    System.out.println("Most caloric dish: " + mostCaloricDish);
    System.out.println("Dishes by type: " + dishesByType);
}

StreamForker提供了一种使用简便、结构流畅的API,它能够复制流,并对每个复制的流 施加不同的操作。这些应用在流上以函数的形式表示,可以用任何对象的方式标识,在这个例子 里,我们选择使用String的方式。如果你没有更多的流需要添加,可以调用StreamForker的 getResults方法,触发所有定义的操作开始执行,并取得StreamForker.Results。由于这些 操作的内部实现就是异步的,getResults方法调用后会立刻返回,不会等待所有的操作完成, 拿到所有的执行结果才返回。

你可以通过向StreamForker.Results接口传递标识特定操作的键取得某个操作的结果。 如果该时刻操作已经完成,get方法会返回对应的结果;否则,该方法会阻塞,直到计算结束, 取得对应的操作结果。

正如我们所预期的,这段代码会产生下面这些输出:

1
2
3
4
Short menu: pork, beef, chicken, french fries, rice, season fruit, pizza, prawns, salmon
Total calories: 4200
Most caloric dish: Dish(name=pork, vegetarian=false, calories=800, type=MEAT)
Dishes by type: {FISH=[Dish(name=prawns, vegetarian=false, calories=300, type=FISH), Dish(name=salmon, vegetarian=false, calories=450, type=FISH)], OTHER=[Dish(name=french fries, vegetarian=true, calories=530, type=OTHER), Dish(name=rice, vegetarian=true, calories=350, type=OTHER), Dish(name=season fruit, vegetarian=true, calories=120, type=OTHER), Dish(name=pizza, vegetarian=true, calories=550, type=OTHER)], MEAT=[Dish(name=pork, vegetarian=false, calories=800, type=MEAT), Dish(name=beef, vegetarian=false, calories=700, type=MEAT), Dish(name=chicken, vegetarian=false, calories=400, type=MEAT)]}

性能的考量

提起性能,你不应该想当然地认为这种方法比多次遍历流的方式更加高效。如果构成流的数据都保存在内存中,阻塞式队列所引发的开销很容易就抵消了由并发执行操作所带来的性能提升。

与此相反,如果操作涉及大量的I/O,譬如流的源头是一个巨型文件,那么单次访问流可能是个不错的选择;因此(大多数情况下)优化应用性能唯一有意义的规则是“好好地度量它”。

通过这个例子,我们展示了怎样一次性地在同一个流上执行多个操作。更重要地是,我们相 信这个例子也证明了一点,即使某个特性原生的Java API暂时还不支持,充分利用Lambda表达式 的灵活性和一点点的创意,整合现有的功能,你完全可以实现想要的新特性。

一般在工作中构成流的数据都保存在内存中,如果使用该方式有以下几个缺点

  1. 浪费空间,有几个任务就得创建几个队列
  2. 构建异步任务和使用阻塞队列会有开销,很容易抵消由并发执行操作所带来的性能提升
  3. 线程池共用异步通用线程池,容易被其他业务影响

完整代码

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
package com.eh.eden.java8.demo;

import com.google.common.collect.Lists;
import lombok.Data;

import java.util.*;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

import static java.util.stream.Collectors.*;

/**
 * todo
 *
 * @author David Li
 * @create 2020/08/19
 */
public class StreamForkerDemo {
    static List<Dish> menu = Arrays.asList(
            new Dish("pork", false, 800, Dish.Type.MEAT),
            new Dish("beef", false, 700, Dish.Type.MEAT),
            new Dish("chicken", false, 400, Dish.Type.MEAT),
            new Dish("french fries", true, 530, Dish.Type.OTHER),
            new Dish("rice", true, 350, Dish.Type.OTHER),
            new Dish("season fruit", true, 120, Dish.Type.OTHER),
            new Dish("pizza", true, 550, Dish.Type.OTHER),
            new Dish("prawns", false, 300, Dish.Type.FISH),
            new Dish("salmon", false, 450, Dish.Type.FISH));

    public static void main(String[] args) {
        // 测试
        Stream<Dish> menuStream = menu.stream();
        StreamForker.Results results = new StreamForker<>(menuStream)
                // 生成一份由逗号分隔的菜肴名列表
                .fork("shortMenu", s -> s.map(Dish::getName).collect(joining(", ")))
                // 计算菜单的总热量
                .fork("totalCalories", s -> s.mapToInt(Dish::getCalories).sum())
                // 找出热量最高的菜肴
                .fork("mostCaloricDish", s -> s.collect(reducing(
                        (d1, d2) -> d1.getCalories() > d2.getCalories() ? d1 : d2))
                        .get())
                // 按照菜的类型对这些菜进行分类
                .fork("dishesByType", s -> s.collect(groupingBy(Dish::getType))).getResults();

        String shortMenu = results.get("shortMenu");
        int totalCalories = results.get("totalCalories");
        Dish mostCaloricDish = results.get("mostCaloricDish");
        Map<Dish.Type, List<Dish>> dishesByType = results.get("dishesByType");

        System.out.println("Short menu: " + shortMenu);
        System.out.println("Total calories: " + totalCalories);
        System.out.println("Most caloric dish: " + mostCaloricDish);
        System.out.println("Dishes by type: " + dishesByType);
    }
}

@Data
class Dish {

    private String name;
    private boolean vegetarian;
    private int calories;
    private Type type;

    public Dish(String name, boolean vegetarian, int calories, Type type) {
        this.name = name;
        this.vegetarian = vegetarian;
        this.calories = calories;
        this.type = type;
    }

    public enum Type {MEAT, FISH, OTHER}
}

// 定义一个StreamForker,在一个流上执行多个操作
class StreamForker<T> {
    private final Stream<T> stream;
    private final Map<Object, Function<Stream<T>, ?>> forks = new HashMap<>();


    public StreamForker(Stream<T> stream) {
        this.stream = stream;
    }

    public StreamForker<T> fork(Object key, Function<Stream<T>, ?> f) {
        forks.put(key, f); // 使用一个键对流上的函数进行索引
        return this; // 返回this从而保证多次流畅地调用fork方法
    }

    // 使用build方法创建ForkingStreamConsumer
    private ForkingStreamConsumer<T> build() {
        // 创建由队列组成的列表,每一个队列对应一个操作
        List<BlockingQueue<T>> queues = Lists.newArrayList();

        // 建立用于标识操作的键与包含操作结果的 Future 之间的映射关系
        Map<Object, Future<?>> actions = forks.entrySet().stream().reduce(
                new HashMap<>(),
                (map, e) -> {
                    map.put(e.getKey(), getOperationResult(queues, e.getValue()));
                    return map;
                },
                (m1, m2) -> {
                    m1.putAll(m2);
                    return m1;
                }
        );
        return new ForkingStreamConsumer<>(queues, actions);
    }

    // 创建Future
    private Future<?> getOperationResult(List<BlockingQueue<T>> queues, Function<Stream<T>, ?> function) {
        // 为某一任务创建一个队列,并添加到队列的列表中
        BlockingQueue<T> queue = new LinkedBlockingQueue<>();
        queues.add(queue);

        // 创建一个Spliterator,遍历队列中的元素
        Spliterator<T> spliterator = new BlockingQueueSpliterator(queue);
        // 创建一个流,将 Spliterator作为数据源
        Stream<T> source = StreamSupport.stream(spliterator, false);
        // 创建一个Future对象,以异步方式计算在流上执行特定函数的结果
        return CompletableFuture.supplyAsync(() -> function.apply(source));
    }

    public Results getResults() {
        ForkingStreamConsumer<T> consumer = build(); // 创建ForkingStreamConsumer
        try {
            stream.sequential().forEach(consumer); // 处理流中的元素
        } finally {
            consumer.finish(); // 在队列的末尾插入特殊元素表明该 队列已经没有更多需要处理的元素
        }
        return consumer;
    }

    interface Results {
        <R> R get(Object key);
    }

    // 添加处理多个队列的流元素,并根据key获取Future中的结果
    static class ForkingStreamConsumer<T> implements Consumer<T>, StreamForker.Results {

        static final Object END_OF_STREAM = new Object(); // 流遍历结束标志

        private final List<BlockingQueue<T>> queues;
        private final Map<Object, Future<?>> actions;

        public ForkingStreamConsumer(List<BlockingQueue<T>> queues, Map<Object, Future<?>> actions) {
            this.queues = queues;
            this.actions = actions;
        }

        @Override
        public void accept(T t) {
            // 将流中遍历的元素添加到所有的队列中
            queues.forEach(q -> q.add(t));
        }

        // 将最后一个元素添加到队列中,表明该流已经结束
        void finish() {
            accept((T) END_OF_STREAM);
        }

        // 等待Future完成相关的计算, 返回由特定键标识的处理结果
        @Override
        public <R> R get(Object key) {
            try {
                return ((Future<R>) actions.get(key)).get();
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
    }

    class BlockingQueueSpliterator<T> implements Spliterator<T> {
        private final BlockingQueue<T> q;

        public BlockingQueueSpliterator(BlockingQueue<T> q) {
            this.q = q;
        }

        @Override
        public boolean tryAdvance(Consumer<? super T> action) {
            T t;
            while (true) { // 中断后继续take数据
                try {
                    t = q.take();
                    break;
                } catch (InterruptedException e) {
                    // 中断异常
                }
            }
            if (t != ForkingStreamConsumer.END_OF_STREAM) {
                action.accept(t);
                return true;
            }
            return false;
        }

        @Override
        public Spliterator<T> trySplit() {
            return null;
        }

        @Override
        public long estimateSize() {
            return 0;
        }

        @Override
        public int characteristics() {
            return 0;
        }
    }
}