上一讲,我们引入了 Spring 框架中专门用于构建响应式 Web 服务的 WebFlux 框架,同时我也给出了两种创建 RESTful 风格 HTTP 端点实现方法中的一种,即注解编程模型。今天,我将介绍另一种实现方法——如何使用函数式编程模型创建响应式 RESTful 服务,这种编程模型与传统的基于 Spring MVC 构建 RESTful 服务的方法有较大差别。

WebFlux 函数式编程模型

在引入函数式编程模型之前,让我先带你回顾一下 Spring WebFlux 的系统架构图,如下所示。

Spring WebFlux 架构图(来自 Spring 官网)

在上图的后半部分,你可以看到在 Spring WebFlux 中,函数式编程模型的核心概念是 Router Functions,对标 Spring MVC 中的 @Controller、@RequestMapping 等标准注解。而 Router Functions 则提供一套函数式风格的 API,其中最重要的就是 Router 和 Handler 接口。我们可以简单把 Router 对应成 RequestMapping,把 Controller 对应为 Handler。

当我发起一个远程调用,传入的 HTTP 请求由 HandlerFunction 处理, HandlerFunction 本质上是一个接收 ServerRequest 并返回 Mono 的函数。ServerRequest 和 ServerResponse 是一对不可变接口,用来提供对底层 HTTP 消息的友好访问。在介绍具体的实现案例之前,让我先从这两个接口开始讲起。

ServerRequest

ServerRequest 代表请求对象,可以访问各种 HTTP 请求元素,包括请求方法、URI 和参数,以及通过单独的 ServerRequest.Headers 获取 HTTP 请求头信息。ServerRequest 通过一系列 bodyToMono() 和 bodyToFlux() 方法提供对请求消息体进行访问的途径。例如,如果我们希望将请求消息体提取为 Mono 类型的对象,可以使用如下方法。

1
Mono<String> string = request.bodyToMono(String.class);

而如果我们希望将请求消息体提取为 Flux 类型的对象,可以使用如下方法,其中 Order 是可以从请求消息体反序列化的实体类。

1
Flux<Order> order = request.bodyToFlux(Order.class);

上述的 bodyToMono() 和 bodyToFlux() 两个方法实际上是通用的 ServerRequest.body(BodyExtractor) 工具方法的快捷方式,该方法如下所示。

1
<T> T body(BodyExtractor<T, ? super ServerHttpRequest> extractor);

BodyExtractor 是一种请求消息体的提取器,允许我们编写自己的提取逻辑。请注意 BodyExtractor 提取的对象是一个 ServerHttpRequest 类型的实例,而这个 ServerHttpRequest 是非阻塞的,与之对应的还有一个 ServerHttpResponse 对象。响应式 Web 操作的正是这组非阻塞的 ServerHttpRequest 和 ServerHttpResponse 对象,而不再是 Spring MVC 里的传统 HttpServletRequest 和 HttpServletResponse 对象。

当然,如果我们不需要实现定制化的提取逻辑,就可以使用框架为我们提供的常用的 BodyExtractors 实例。通过 BodyExtractors,上面的例子可以替换为以下形式。

1
2
3
4
5
Mono<String> string = 
	request.body(BodyExtractors.toMono(String.class);
	 
Flux<Person> Order= 
	request.body(BodyExtractors.toFlux(Order.class);

ServerResponse

与ServerRequest 对应,ServerResponse 提供对 HTTP 响应的访问。由于它是不可变的,因此我们可以使用构建器创建一个新的 ServerResponse。构建器允许设置响应状态、添加响应标题并提供响应的具体内容。例如,下面的示例演示了如何通过 ok() 方法创建代表 200 状态码的响应,其中我将响应体的类型设置为 JSON 格式,而响应的具体内容是一个 Mono 对象。

1
2
3
Mono<Order> order = …;
ServerResponse.ok().contentType(MediaType.APPLICATION_JSON)
     .body(order);

通过 body() 方法来加载响应内容是构建 ServerResponse 最常见的方法,这里我们将 Order 对象作为返回值。如果想要返回各种类型的对象,我们也可以使用 BodyInserters 工具类所提供的构建方法,如常见的 fromObject() 和 fromPublisher() 方法等。以下示例代码中,我们通过 fromObject() 方法直接返回一个 “Hello World”。

1
ServerResponse.ok().body(BodyInserters.fromObject("Hello World"));

上述方法的背后实际上是利用 BodyBuilder 接口中的一组 body() 方法,来构建一个 ServerResponse 对象,典型的 body() 方法如下所示。

1
Mono<ServerResponse> body(BodyInserter<?, ? super ServerHttpResponse> inserter);

这里我们同样看到了非阻塞式的 ServerHttpResponse 对象。这种 body() 方法比较常见的用法是返回新增和更新操作的结果,你在本讲后续的内容中将会看到这种使用方法。

HandlerFunction

将 ServerRequest 和 ServerResponse 组合在一起就可以创建 HandlerFunction。HandlerFunction 也是一个接口,定义如下。

1
2
3
4
public interface HandlerFunction<T extends ServerResponse> {
 
    Mono<T> handle(ServerRequest request);
}

我们可以通过实现 HandlerFunction 接口中的 handle() 方法来创建定制化的请求响应处理机制。例如,以下所示的是一个简单的“Hello World”处理函数代码示例。

1
2
3
4
5
6
7
8
9
public class HelloWorldHandlerFunction implements 
	HandlerFunction<ServerResponse> {
 
        @Override
        public Mono<ServerResponse> handle(ServerRequest request) {
            return ServerResponse.ok().body(
	BodyInserters.fromObject("Hello World"));
        }
};

可以看到,这里使用了前面介绍的 ServerResponse 所提供的 body() 方法返回一个 String 类型的消息体。

通常,针对某个领域实体都存在 CRUD 等常见的操作,所以需要编写多个类似的处理函数,比较烦琐。这时候就推荐将多个处理函数分组到一个专门的 Handler 类中。在本讲的后面我同样会演示这种实现方法。

RouterFunction

现在,我们已经可以通过 HandlerFunction 创建请求的处理逻辑,接下来需要把具体请求与这种处理逻辑关联起来,RouterFunction 可以帮助我们实现这一目标。RouterFunction 与传统 SpringMVC 中的 @RequestMapping 注解功能类似。

创建 RouterFunction 的最常见做法是使用如下所示的 route 方法,该方法通过使用请求谓词和处理函数创建一个 ServerResponse 对象。

1
2
3
4
5
public static <T extends ServerResponse> RouterFunction<T> route(
            RequestPredicate predicate, HandlerFunction<T> handlerFunction) {
 
        return new DefaultRouterFunction<>(predicate, handlerFunction);
}

RouterFunction 的核心逻辑位于这里的 DefaultRouterFunction 类中,该类的 route() 方法如下所示。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
public Mono<HandlerFunction<T>> route(ServerRequest request) {
            if (this.predicate.test(request)) {
                if (logger.isTraceEnabled()) {
                    String logPrefix = request.exchange().getLogPrefix();
                    logger.trace(logPrefix + String.format("Matched %s", this.predicate));
                }
                return Mono.just(this.handlerFunction);
            }
            else {
                return Mono.empty();
            }
}

可以看到,该方法将传入的 ServerRequest 路由到具体的处理函数 HandlerFunction。如果请求与特定路由匹配,则返回处理函数的结果,否则就返回一个空的 Mono。

RequestPredicates 工具类提供了常用的谓词,能够实现包括基于路径、HTTP 方法、内容类型等条件的自动匹配。一个简单的 RouterFunction 示例如下,我们用它来实现对 “/hello-world"请求路径的自动路由,这里用到了前面创建的 HelloWorldHandlerFunction。

1
2
RouterFunction<ServerResponse> helloWorldRoute =                RouterFunctions.route(RequestPredicates.path("/hello-world"),
            new HelloWorldHandlerFunction());

类似的,我们应该把 RouterFunction 和各种 HandlerFunction 按照需求结合起来一起使用,常见的做法也是根据领域对象来设计对应的 RouterFunction。

路由机制的优势在于它的组合型。两个路由功能可以组合成一个新的路由功能,并通过一定的评估方法路由到其中任何一个处理函数。如果第一个路由的谓词不匹配,则第二个谓词会被评估。请注意组合的路由器功能会按照顺序进行评估,因此在通用功能之前放置一些特定功能是一项最佳实践。在 RouterFunction 中,同样提供了对应的组合方法来实现这一目标,请看下面的代码。

1
2
3
4
5
6
7
default RouterFunction<T> and(RouterFunction<T> other) {
        return new RouterFunctions.SameComposedRouterFunction<>(this, other);
}
 
default RouterFunction<T> andRoute(RequestPredicate predicate, HandlerFunction<T> handlerFunction) {
        return and(RouterFunctions.route(predicate, handlerFunction));
}

我们可以通过调用上述两个方法中的任意一个来组合两个路由功能,其中后者相当于 RouterFunction.and() 方法与 RouterFunctions.route() 方法的集成。以下代码演示了 RouterFunctions 的组合特性。

1
2
3
4
RouterFunction<ServerResponse> personRoute =
        route(GET("/orders/{id}").and(accept(APPLICATION_JSON)), personHandler::getOrderById)
.andRoute(GET("/orders").and(accept(APPLICATION_JSON)), personHandler::getOrders)
.andRoute(POST("/orders").and(contentType(APPLICATION_JSON)), personHandler::createOrder);

RequestPredicates 工具类所提供的大多数谓词也具备组合特性。例如, RequestPredicates.GET(String) 方法的实现如下所示。

1
2
3
public static RequestPredicate GET(String pattern) {
        return method(HttpMethod.GET).and(path(pattern));
}

可以看到,该方法是 RequestPredicates.method(HttpMethod.GET) 和 RequestPredicates.path(String) 的组合。我们可以通过调用 RequestPredicate.and(RequestPredicate) 方法或 RequestPredicate.or(RequestPredicate) 方法来构建复杂的请求谓词。

案例集成:ReactiveSpringCSS 中的 Web 服务

现在回到 ReactiveSpringCSS 案例,我们已经明确了在案例系统中,customer-service 分别需要访问 account-service 和 order-service 服务中的 Web 服务。在上一讲的内容中,我们已经基于注解编程模型实现了 account-service 中的 AccountController。今天我将继续给你演示 order-service 中 Web 服务的实现过程。

基于函数式编程模型,在 order-service 中,我们编写一个 OrderHandler 专门实现根据 OrderNumber 获取 Order 领域实体的处理函数,如下所示。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
@Configuration
public class OrderHandler {
 
    @Autowired
    private OrderService orderService;
 
    public Mono<ServerResponse> getOrderByOrderNumber(ServerRequest request) {
        String orderNumber = request.pathVariable("orderNumber");

        return ServerResponse.ok().body(this.orderService.getOrderByOrderNumber(orderNumber), Order.class);
    }
}

在上述代码示例中,我们创建了一个 OrderHandler 类,然后注入 OrderService 并实现了一个 getOrderByOrderNumber() 处理函数。

现在我们已经具备了 OrderHandler,就可以创建对应的 OrderRouter 了,示例代码如下。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
@Configuration
public class OrderRouter {
 
    @Bean
    public RouterFunction<ServerResponse> routeOrder(OrderHandler orderHandler) {

        return RouterFunctions.route(
                RequestPredicates.GET("/orders/{orderNumber}")
                    .and(RequestPredicates.accept(MediaType.APPLICATION_JSON)),
                    orderHandler::getOrderByOrderNumber);
    }
}

在这个示例中,我们通过访问“/orders/{orderNumber}”端点就会自动触发 orderHandler 中的 getOrderByOrderNumber() 方法并返回相应的 ServerResponse。

接下来,假设我们已经分别通过远程调用获取了目标 Account 对象和 Order 对象,那么 generateCustomerTicket 方法的执行流程就可以明确了。基于响应式编程的实现方法,我们可以得到如下所示的示例代码。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public Mono<CustomerTicket> generateCustomerTicket(String accountId, String orderNumber) {
 
        // 创建 CustomerTicket 对象
        CustomerTicket customerTicket = new CustomerTicket();
        customerTicket.setId("C_" + UUID.randomUUID().toString());
 
        // 从远程 account-service 获取 Account 对象
        Mono<AccountMapper> accountMapper = getRemoteAccountByAccountId(accountId);

        // 从远程 order-service 中获取 Order 对象
        Mono<OrderMapper> orderMapper = getRemoteOrderByOrderNumber(orderNumber);

        Mono<CustomerTicket> monoCustomerTicket = 
                Mono.zip(accountMapper, orderMapper).flatMap(tuple -> {
            AccountMapper account = tuple.getT1();
            OrderMapper order = tuple.getT2();

            if(account == null || order == null) {
                return Mono.just(customerTicket);
            }

            // 设置 CustomerTicket 对象属性
            customerTicket.setAccountId(account.getId());
            customerTicket.setOrderNumber(order.getOrderNumber());
            customerTicket.setCreateTime(new Date());
            customerTicket.setDescription("TestCustomerTicket");

            return Mono.just(customerTicket);
        });

        // 保存 CustomerTicket 对象并返回
        return monoCustomerTicket.flatMap(customerTicketRepository::save);
}

显然,这里的 getRemoteAccountById 和 getRemoteOrderByOrderNumber 方法都涉及了非阻塞式的远程 Web 服务的调用,这一过程我们将放在下一讲中详细介绍。

请注意,到这里时使用了 Reactor 框架中的 zip 操作符,将 accountMapper 流中的元素与 orderMapper 流中的元素按照一对一的方式进行合并,合并之后得到一个 Tuple2 对象。然后,我们再分别从这个 Tuple2 对象中获取 AccountMapper 和 OrderMapper 对象,并将它们的属性填充到所生成的 CustomerTicket 对象中。最后,我们通过 flatMap 操作符调用了 customerTicketRepository 的 save 方法完成了数据的持久化。这是 zip 和 flatMap 这两个操作符非常经典的一种应用场景,你需要熟练掌握。

小结与预告

好了,那么本讲内容就说到这。延续上一讲,我们接着讨论了 Spring WebFlux 的使用方法,并给出了基于函数式编程模型的 RESTful 端点创建方法。在这种开发模型中,开发人员需要重点把握 ServerRequest、ServerResponse、HandlerFunction 以及 RouterFunction 这四个核心对象的使用方法。

这里给你留一道思考题:你知道在 WebFlux 函数式编程模型中包含哪些核心编程对象吗?

现在,我们已经通过 WebFlux 构建了响应式 Web 服务,下一步就是如何来消费它们了。Spring 也专门提供了一个非阻塞式的 WebClient 工具类来完成这一目标,下一讲我就来和你系统地讨论这个工具类的使用方法,到时见。

点击链接,获取课程相关代码↓↓↓https://github.com/lagoueduCol/ReactiveProgramming-jianxiang.git

-– ### 精选评论 ##### *晨: > 老师你好,在项目中,背压怎么设置,设置到哪块 ######     讲师回复: >     就是使用一组onBackpressureXXX操作符,后面案例中会有示例