通过前两讲的内容可以知道,Reactor 框架为我们提供了各种操作符,使用这些操作符可以高效地操作 Flux 和 Mono 对象。Reactor 中的操作符可以分成不同的类型,上一讲我们关注转换、过滤和组合类的操作符,而今天我将继续为你介绍剩余的条件、裁剪、工具类的操作符。
条件操作符
所谓条件操作符,本质上就是提供了一个判断的依据来确定是否处理流中的元素。Reactor 中常用的条件操作符有 defaultIfEmpty、takeUntil、takeWhile、skipUntil 和 skipWhile 等,下面我将分别介绍。
defaultIfEmpty 操作符
defaultIfEmpty 操作符针对空数据流提供了一个简单而有用的处理方法。该操作符用来返回来自原始数据流的元素,如果原始数据流中没有元素,则返回一个默认元素。
defaultIfEmpty 操作符在实际开发过程中应用广泛,通常用在对方法返回值的处理上。如下所示的就是在 Controller 层中对 Service 层返回结果的一种常见处理方法。
1
2
3
4
5
6
7
8
|
@GetMapping("/orders/{id}")
public Mono<ResponseEntity<Order>> findOrderById(@PathVariable
String id) {
return orderService.findOrderById(id)
.map(ResponseEntity::ok)
.defaultIfEmpty(ResponseEntity
.status(404).body(null));
}
|
可以看到,这里使用 defaultIfEmpty 操作符实现默认返回值。在示例代码所展示的 HTTP 端点中,当找不到指定的数据时,我们可以通过 defaultIfEmpty 方法返回一个空对象以及 404 状态码。
takeUntil/takeWhile 操作符
takeUntil 操作符的基本用法是 takeUntil (Predicate<? super T> predicate),其中 Predicate 代表一种断言条件,该操作符将从数据流中提取元素直到断言条件返回 true。takeUntil 的示例代码如下所示,我们希望从一个包含 100 个连续元素的序列中获取 1~10 个元素。
1
2
|
Flux.range(1, 100).takeUntil(i -> i == 10)
.subscribe(System.out::println);
|
类似的,takeWhile 操作符的基本用法是 takeWhile (Predicate<? super T> continuePredicate),其中 continuePredicate 代表的也是一种断言条件。与 takeUntil 不同的是,takeWhile 会在 continuePredicate 条件返回 true 时才进行元素的提取。takeWhile 的示例代码如下所示,这段代码的执行效果与 takeUntil 的示例代码一致。
1
2
|
Flux.range(1, 100).takeWhile(i -> i <= 10)
.subscribe(System.out::println);
|
讲到这里,让我们回顾上一讲介绍的第一个转换操作符 buffer。事实上,Reactor 框架中同样也提供了 bufferUntil 和 bufferWhile 操作符来实现数据收集,这两个操作符用到了和 takeUntil/takeWhile 完全一样的断言机制,如下代码演示了 bufferUntil 的使用方法。
1
2
|
Flux.range(1, 10).bufferUntil(i -> i % 2 == 0)
.subscribe(System.out::println);
|
以上代码的执行结果如下所示,这里所设置的断言就是“i % 2 == 0”这一条件。
1
2
3
4
5
|
[1, 2]
[3, 4]
[5, 6]
[7, 8]
[9, 10]
|
对应的,bufferWhile 的使用方法和执行结果分别如下所示。
1
2
|
Flux.range(1, 10).bufferWhile(i -> i % 2 == 0)
.subscribe(System.out::println);
|
[2]
[4]
[6]
[8]
[10]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
skipUntil/skipWhile 操作符
与 takeUntil 相对应,skipUntil 操作符的基本用法是 skipUntil (Predicate<? super T> predicate)。skipUntil 将丢弃原始数据流中的元素直到 Predicate 返回 true。
同样,与 takeWhile 相对应,skipWhile 操作符的基本用法是 skipWhile (Predicate<? super T> continuePredicate),当 continuePredicate 返回 true 时才进行元素的丢弃。这两个操作符都很简单,就不具体展开讨论了。
下面来说说裁剪操作符。
裁剪操作符
裁剪操作符通常用于统计流中的元素数量,或者检查元素是否具有一定的属性。在 Reactor 中,常用的裁剪操作符有 any 、concat、count 和 reduce 等。
any 操作符
any 操作符用于检查是否至少有一个元素具有所指定的属性,示例代码如下。
|
Flux.just(3, 5, 7, 9, 11, 15, 16, 17)
.any(e -> e % 2 == 0)
.subscribe(isExisted -> System.out.println(isExisted));
1
2
3
4
|
在这个 Flux 流中存在一个元素 16 可以被 2 除尽,所以控制台将输出“true”。
类似的,Reactor 中还存在一个 all 操作符,用来检查流中元素是否都满足同一属性,示例代码如下所示。
|
Flux.just(“abc”, “ela”, “ade”, “pqa”, “kang”)
.all(a -> a.contains(“a”))
.subscribe(isAllContained -> System.out.println(isAllContained));
1
2
3
4
5
6
7
8
|
显然,在这个 Flux 流中所有元素都包含了字符“a”,所以控制台也将输出“true”。
concat 操作符
concat 操作符用来合并来自不同 Flux 的数据。与上一讲中所介绍的 merge 操作符不同,这种合并采用的是顺序的方式,所以严格意义上并不是一种合并操作,所以我们把它归到裁剪操作符类别中。
例如,如果执行下面这段代码,我们将在控制台中依次看到 1 到 10 这 10 个数字。
|
Flux.concat(
Flux.range(1, 3),
Flux.range(4, 2),
Flux.range(6, 5)
).subscribe(System.out::println);
};
1
2
3
4
5
6
7
8
|
reduce 操作符
裁剪操作符中最经典的就是这个 reduce 操作符。reduce 操作符对来自 Flux 序列中的所有元素进行累积操作并得到一个 Mono 序列,该 Mono 序列中包含了最终的计算结果。reduce 操作符示意图如下所示。
reduce 操作符示意图(来自 Reactor 官网)
在上图中,具体的累积计算很简单,我们也可以通过一个 BiFunction 来实现任何自定义的复杂计算逻辑。reduce 操作符的示例代码如下所示,这里的 BiFunction 就是一个求和函数,用来对 1 到 10 的数字进行求和,运行结果为 55。
|
Flux.range(1, 10).reduce((x, y) -> x + y)
.subscribe(System.out::println);
1
2
|
与 reduce 操作符类似的还有一个 reduceWith 操作符,用来在 reduce 操作时指定一个初始值。reduceWith 操作符的代码示例如下所示,我们使用 5 来初始化求和过程,显然得到的结果将是 60。
|
Flux.range(1, 10).reduceWith(() -> 5, (x, y) -> x + y)
.subscribe(System.out::println);
1
2
3
4
5
6
7
8
9
10
|
以上就是三种裁剪操作符的介绍,应该很好理解,下面我们来看看工具操作符。
工具操作符
Reactor 中常用的工具操作符有 subscribe、timeout、block、log 和 debug 等。
subscribe 操作符
说起 subscribe 操作符,我已经在“06 | 流式操作:如何使用 Flux 和 Mono 高效构建响应式数据流”中讲到订阅响应式流时介绍过很多,这里再带你回顾一下通过该操作符订阅序列的最通用方式,如下所示。
|
//订阅序列的最通用方式,可以为我们的Subscriber实现提供所需的任意行为
subscribe(Subscriber subscriber);
1
2
|
基于这种方式,如果默认的 subscribe() 方法没有提供所需的功能,我们可以实现自己的 Subscriber。一般而言,我们总是可以直接实现响应式流规范所提供的 Subscriber 接口,并将其订阅到流。实现一个自定义的 Subscriber 并没有想象中那么困难,这里我给你演示一个简单的实现示例。
|
Subscriber subscriber = new Subscriber() {
volatile Subscription subscription;
public void onSubscribe(Subscription s) {
subscription = s;
System.out.println(“initialization”);
subscription.request(1);
}
public void onNext(String s) {
System.out.println(“onNext:” + s);
subscription.request(1);
}
public void onComplete() {
System.out.println(“onComplete”);
}
public void onError(Throwable t) {
System.out.println(“onError:” + t.getMessage());
}
};
1
2
3
4
5
6
7
8
9
10
|
在这个自定义 Subscriber 实现中,我们首先持有对订阅令牌 Subscription 的引用。由于订阅和数据处理可能发生在不同的线程中,因此我们使用 volatile 关键字来确保所有线程都具有对 Subscription 实例的正确引用。
当订阅到达时,我们会通过 onSubscribe 回调通知 Subscriber。在这里,我们保存订阅令牌并初始化请求。
你应该注意到,在 onNext 回调中,我们打印接收到的数据并请求下一个元素。在这种情况下,我们执行 subscription.request(1) 方法,也就是说使用简单的拉模型来管理背压。
剩下的 onComplete 和 onError 方法我们都只是打印了一下日志。
现在,让我们通过 subscribe() 方法来使用这个 Subscriber,如下所示。
|
Flux flux = Flux.just(“12”, “23”, “34”);
flux.subscribe(subscriber);
initialization
onNext:12
onNext:23
onNext:34
onComplete
1
2
|
前面构建的自定义 Subscriber 虽然能够正常运作,但因为过于偏底层,因此并不推荐你使用。我们推荐的方法是扩展 Project Reactor 提供的 BaseSubscriber 类。在这种情况下,订阅者可能如下所示。
|
class MySubscriber extends BaseSubscriber {
public void hookOnSubscribe(Subscription subscription) {
System.out.println(“initialization”);
request(1);
}
public void hookOnNext(T value) {
System.out.println(“onNext:” + value);
request(1);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
|
可以看到这里使用了两个钩子方法:hookOnSubscribe(Subscription) 和 hookOnNext(T)。和这两个方法一起,我们可以重载诸如 hookOnError(Throwable)、hookOnCancel()、hookOnComplete() 等方法。
timeout 操作符
timeout 操作符非常简单,保持原始的流发布者,当特定时间段内没有产生任何事件时,将生成一个异常。
block 操作符
顾名思义,block 操作符在接收到下一个元素之前会一直阻塞。block 操作符常用来把响应式数据流转换为传统数据流。例如,使用如下方法将分别把 Flux 数据流和 Mono 数据流转变成普通的 List
<Order>
对象和单个的 Order 对象,我们同样可以设置 block 操作的等待时间。
|
public List getAllOrders() {
return orderservice.getAllOrders()
.block(Duration.ofSecond(5));
}
public Order getOrderById(Long orderId) {
return orderservice.getOrderById(orderId)
.block(Duration.ofSecond(2));
}
1
2
3
4
|
log 操作符
Reactor 中专门提供了针对日志的工具操作符 log,它会观察所有的数据并使用日志工具进行跟踪。我们可以通过如下代码演示 log 操作符的使用方法,在 Flux.just() 方法后直接添加 log() 函数。
|
Flux.just(1, 2).log().subscribe(System.out::println);
1
2
|
以上代码的执行结果如下所示(为了显示简洁,部分内容和格式做了调整)。通常,我们也可以在 log() 方法中添加参数来指定日志分类的名称。
|
Info: | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
Info: | request(unbounded)
Info: | onNext(1)
1
Info: | onNext(2)
2
Info: | onComplete()
1
2
3
4
5
6
|
debug 操作符
在“01 | 追本溯源:响应式编程究竟是一种什么样的技术体系”中,我们已经提到基于回调和异步的实现方式比较难以调整。响应式编程也是一样,这也是它与传统编程方式之间一个很大的差异点。
为此,Reactor 框架的设计者也考虑到了普通开发人员的诉求,并开发了专门用于 debug 的操作符。要想启动调试模式,我们需要在程序开始的地方添加如下代码。
|
Hooks.onOperator(providedHook ->
providedHook.operatorStacktrace())
1
2
3
4
|
现在,所有的操作符在执行时都会保存与执行过程相关的附加信息。而当系统出现异常时,这些附加信息就相当于系统异常堆栈信息的一部分,方便开发人员进行问题的分析和排查。
上述做法是全局性的,如果你只想观察某个特定的流,那么就可以使用检查点(checkpoint)这一调试功能。例如以下代码演示了如何通过检查点来捕获 0 被用作除数的场景,我们在代码中添加了一个名为“debug”的检查点。
|
Mono.just(0).map(x -> 1 / x)
.checkpoint(“debug”).subscribe(System.out::println);
Exception in thread “main” reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.ArithmeticException: / by zero
Caused by: java.lang.ArithmeticException: / by zero
…
Assembly trace from producer [reactor.core.publisher.MonoMap] :
reactor.core.publisher.Mono.map(Mono.java:2029)
com.jianxiang.reactor.demo.Debug.main(Debug.java:10)
Error has been observed by the following operator(s):
|_ Mono.map(Debug.java:10)
|_ Mono.checkpoint(Debug.java:10)
Suppressed: reactor.core.publisher.FluxOnAssembly$AssemblySnapshotException: zero
at reactor.core.publisher.MonoOnAssembly.(MonoOnAssembly.java:55)
at reactor.core.publisher.Mono.checkpoint(Mono.java:1304)
… 1 more
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
可以看到,这个检查点信息会包含在异常堆栈中。根据需要在系统的关键位置上添加自定义的检查点,也是我们日常开发过程中的一种最佳实践。
小结与预告
好了,这一讲内容就介绍到这。承接上一讲的 Reactor 框架所提供的操作符,这一讲我分别就条件操作符、裁剪操作符以及各种工具操作符展开了详细的说明。在日常开发过程中,这些操作符都比较常见,能够加速我们开发响应式系统的开发过程。
这里依然给你留一道思考题:在 Reactor 中,如何自己实现一个 Subscriber?
那么介绍完 Spring 内置的 Reactor 框架之后,从下一讲开始,我们要讨论在 Spring 中使用这一框架来实现响应式组件的具体过程,首先要说的就是全新的 WebFlux 组件。下一讲,我们将详细分析 WebFlux 与传统 WebMVC 之间的区别,希望会带给你新的思路,我们到时见。
点击链接,获取课程相关代码↓↓↓https://github.com/lagoueduCol/ReactiveProgramming-jianxiang.git
\--- ### 精选评论 ##### \*\*风: > 博主,能否帮助大家理解一下它的实现,因为如果只是说这些个api的使用的话,看看文档也就会了,个人觉得,reactiveStream难的是理解,不是这些api的使用。从一个简单的数据流开始,订阅,推,拉,背压,什么情况下切换了线程。 ###### 讲师回复: > 这门课还是比较面向入门的,通过案例来理解框架的应用方式,至于框架的实现原理规划上不会讲得太多 ##### \*\*敏: > 这些调试手段,生产环境发布时,是不是就要去掉才好 ##### \*\*明: > 老师的react是哪个版本的?我的只有Hooks.onOperatorDebug()方法,没有Hooks.onOperator,而且加了checkpoint后异常输出也没有具体发生在哪个操作符,我的是3.4.4版本的。reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.ArithmeticException: / by zeroCaused by: java.lang.ArithmeticException: / by zero at react.FluxDemo3.lambda$main$7(FluxDemo3.java:93) Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:Error has been observed at the following site(s): |\_ checkpoint ⇢ debugStack trace: at react.FluxDemo3.lambda$main$7(FluxDemo3.java:93) at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:113) ###### 讲师回复: > 我用的Spring Boot是2.1.13.RELEASE,对应的Reactor版本好像是3.3.X版本的
|