在上一讲中,我已经带你在 ReactiveSpringCSS 案例系统中通过 WebFlux 创建了响应式 Web 服务,并给你留下了一道思考题:如何实现非阻塞式的跨服务调用?
我们知道在 Spring 中存在一个功能强大的工具类 RestTemplate,专门用来实现基于 HTTP 协议的远程请求和响应处理。RestTemplate 的主要问题在于不支持响应式流规范,也就无法提供非阻塞式的流式操作。Spring 5 全面引入响应式编程模型,同时也提供了 RestTemplate 的响应式版本,这就是 WebClient 工具类。
这一讲我们就针对 WebClient 展开详细的探讨。首先我会带你创建和配置 WebClient对象;然后使用 WebClient 来访问远程 Web 服务,并介绍该组件的一些使用技巧;最后我依然会结合 ReactiveSpringCSS 案例来给出与现有服务之间的集成过程。
创建和配置 WebClient
WebClient 类位于 org.springframework.web.reactive.function.client 包中,要想在项目中集成 WebClient 类,只需要引入 WebFlux 依赖即可。
创建 WebClient
创建 WebClient 有两种方法,一种是通过它所提供的 create() 工厂方法,另一种则是使用 WebClient Builder 构造器工具类。
我们可以直接使用 create() 工厂方法来创建 WebClient 的实例,示例代码如下所示。
1
|
WebClient webClient = WebClient.create();
|
如果我们创建 WebClient 的目的是针对某一个特定服务进行操作,那么就可以使用该服务的地址作为 baseUrl 来初始化 WebClient,示例代码如下所示。
1
2
|
WebClient webClient =
WebClient.create("https://localhost:8081/accounts");
|
WebClient 还附带了一个构造器类 Builder,使用方法也很简单,示例代码如下所示。
1
|
WebClient webClient = WebClient.builder().build();
|
配置 WebClient
创建完 WebClient 实例之后,我们还可以在 WebClient.builder() 方法中添加相关的配置项,来对 WebClient 的行为做一些控制,通常用来设置消息头信息等,示例代码如下所示。
1
2
3
4
5
|
WebClient webClient = WebClient.builder()
.baseUrl("https://localhost:8081/accounts")
.defaultHeader(HttpHeaders.CONTENT_TYPE, "application/json")
.defaultHeader(HttpHeaders.USER_AGENT, "Reactive WebClient")
.build();
|
上述代码展示了 defaultHeader 的使用方法,WebClient.builder() 还包含 defaultCookie、defaultRequest 等多个配置项可供使用。
现在,我们已经成功创建了 WebClient 对象,接下来就可以使用该对象来访问远程服务了。
使用 WebClient 访问服务
在远程服务访问上,WebClient 有几种常见的使用方式,包括非常实用的 retrieve() 和 exchange() 方法、用于封装请求数据的 RequestBody,以及表单和文件的提交。接下来我就对这些使用方式进行详细介绍并给出相关示例。
构造 URL
Web 请求中通过请求路径可以携带参数,在使用 WebClient 时也可以在它提供的 uri() 方法中添加路径变量和参数值。如果我们定义一个包含路径变量名为 id 的 URL,然后将 id 值设置为 100,那么就可以使用如下示例代码。
1
|
webClient.get().uri("http://localhost:8081/accounts/{id}", 100);
|
当然,URL 中也可以使用多个路径变量以及多个参数值。如下所示的代码中就定义了 URL 中拥有路径变量 param1 和 param2,实际访问的时候将被替换为 value1 和 value2。如果有很多的参数,只要按照需求对请求地址进行拼装即可。
1
|
webClient.get().uri("http://localhost:8081/account/{param1}/{ param2}", "value1", "value12");
|
同时,我们也可以事先把这些路径变量和参数值拼装成一个 Map 对象,然后赋值给当前 URL。如下所示的代码就定义了 Key 为 param1 和 param2 的 HashMap,实际访问时会从这个 HashMap 中获取参数值进行替换,从而得到最终的请求路径为http://localhost:8081/accounts/value1/value2,如下所示。
1
2
3
4
|
Map<String, Object> uriVariables = new HashMap<>();
uriVariables.put("param1", "value1");
uriVariables.put("param2", "value2");
webClient.get().uri("http://localhost:8081/accounts/{param1}/{param2}", variables);
|
我们还可以通过使用 URIBuilder 来获取对请求信息的完全控制,示例代码如下所示。
1
2
3
4
5
6
7
8
|
public Flux<Account> getAccounts(String username, String token) {
return webClient.get()
.uri(uriBuilder -> uriBuilder.path("/accounts").build())
.header("Authorization", "Basic " + Base64Utils
.encodeToString((username + ":" + token).getBytes(UTF_8)))
.retrieve()
.bodyToFlux(Account.class);
}
|
这里我们为每次请求添加了包含授权信息的“Authorization”消息头,用来传递用户名和访问令牌。一旦我们准备好请求信息,就可以使用 WebClient 提供的一系列工具方法完成远程服务的访问,例如上面示例中的 retrieve() 方法。
retrieve() 方法
retrieve() 方法是获取响应主体并对其进行解码的最简单方法,我们再看一个示例,如下所示。
1
2
3
4
5
6
7
|
WebClient webClient = WebClient.create("http://localhost:8081");
Mono<Account> result = webClient.get()
.uri("/accounts/{id}", id)
.accept(MediaType.APPLICATION_JSON)
.retrieve()
.bodyToMono(Account.class);
|
上述代码使用 JSON 作为序列化方式,我们也可以根据需要设置其他方式,例如采用 MediaType.TEXT_EVENT_STREAM 以实现基于流的处理,示例如下。
1
2
3
4
|
Flux<Order> result = webClient.get()
.uri("/accounts").accept(MediaType.TEXT_EVENT_STREAM)
.retrieve()
.bodyToFlux(Account.class);
|
exchange() 方法
如果希望对响应拥有更多的控制权,retrieve() 方法就显得无能为力,这时候我们可以使用 exchange() 方法来访问整个响应结果,该响应结果是一个 ClientResponse 对象,包含了响应的状态码、Cookie 等信息,示例代码如下所示。
1
2
3
4
5
|
Mono<Account> result = webClient.get()
.uri("/accounts/{id}", id)
.accept(MediaType.APPLICATION_JSON)
.exchange()
.flatMap(response -> response.bodyToMono(Account.class));
|
以上代码演示了如何对结果执行 flatMap() 操作符的实现方式,通过这一操作符调用 ClientResponse 的 bodyToMono() 方法以获取目标 Account 对象。
使用 RequestBody
如果你有一个 Mono 或 Flux 类型的请求体,那么可以使用 WebClient 的 body() 方法来进行编码,使用示例如下所示。
1
2
3
4
5
6
7
8
|
Mono<Account> accountMono = ... ;
Mono<Void> result = webClient.post()
.uri("/accounts")
.contentType(MediaType.APPLICATION_JSON)
.body(accountMono, Account.class)
.retrieve()
.bodyToMono(Void.class);
|
如果请求对象是一个普通的 POJO 而不是 Flux/Mono,则可以使用 syncBody() 方法作为一种快捷方式来传递请求,示例代码如下所示。
1
2
3
4
5
6
7
8
|
Account account = ... ;
Mono<Void> result = webClient.post()
.uri("/accounts")
.contentType(MediaType.APPLICATION_JSON)
.syncBody(account)
.retrieve()
.bodyToMono(Void.class);
|
表单和文件提交
当传递的请求体是一个 MultiValueMap 对象时,WebClient 默认发起的是表单提交。例如,针对用户登录场景,我们可以构建一个 MultiValueMap 对象,并传入参数 username 和 password 进行提交,代码如下所示。
1
2
3
4
5
6
7
8
9
10
11
12
|
String baseUrl = "http://localhost:8081";
WebClient webClient = WebClient.create(baseUrl);
MultiValueMap<String, String> map = new LinkedMultiValueMap<>();
map.add("username", "jianxiang");
map.add("password", "password");
Mono<String> mono = webClient.post()
.uri("/login")
.syncBody(map)
.retrieve()
.bodyToMono(String.class);
|
如果想提交 Multipart Data,我们可以使用 MultipartBodyBuilder 工具类来简化请求的构建过程。MultipartBodyBuilder 的使用方法如下所示,最终我们也将得到一个 MultiValueMap 对象。
1
2
3
4
5
6
|
MultipartBodyBuilder builder = new MultipartBodyBuilder();
builder.part("paramPart", "value");
builder.part("filePart", new FileSystemResource("jianxiang.png"));
builder.part("accountPart", new Account("jianxiang"));
MultiValueMap<String, HttpEntity<?>> parts = builder.build();
|
一旦 MultiValueMap 构建完成,通过 WebClient 的 syncBody() 方法就可以实现请求提交,我已经在上文中提到过这种实现方法。
以上介绍的都是 WebClient 所提供的基础方法,我们可以使用这些方法来满足面向业务处理的常规请求。但如果你希望对远程调用过程有更多的控制,那么就需要使用 WebClient 所提供的的一些其他使用技巧了。让我们一起来看一下。
WebClient 的其他使用技巧
除了实现常规的 HTTP 请求之外,WebClient 还有一些高级用法,包括请求拦截和异常处理等。
请求拦截
和传统 RestTemplate 一样,WebClient 也支持使用过滤器函数。让我们回顾 WebClient 的构造器 Builder,你会发现 Builder 实际上是一个接口,它内置了一批初始化 Builder 和 WebClient 的方法定义。DefaultWebClientBuilder 就是该接口的默认实现,截取该类的部分核心代码如下。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
class DefaultWebClientBuilder implements WebClient.Builder {
@Override
public WebClient.Builder filter(ExchangeFilterFunction filter) {
Assert.notNull(filter, "ExchangeFilterFunction must not be null");
initFilters().add(filter);
return this;
}
…
@Override
public WebClient build() {
ExchangeFunction exchange = initExchangeFunction();
ExchangeFunction filteredExchange = (this.filters != null ? this.filters.stream()
.reduce(ExchangeFilterFunction::andThen)
.map(filter -> filter.apply(exchange))
.orElse(exchange) : exchange);
return new DefaultWebClient(filteredExchange, initUriBuilderFactory(),
unmodifiableCopy(this.defaultHeaders), unmodifiableCopy(this.defaultCookies),
new DefaultWebClientBuilder(this));
}
…
}
|
可以看到 build() 方法的目的是构建出一个 DefaultWebClient,而 DefaultWebClient 的构造函数中依赖于 ExchangeFunction 接口。我们来看一下 ExchangeFunction 接口的定义,其中的 filter() 方法传入并执行 ExchangeFilterFunction,如下所示。
1
2
3
4
5
6
7
8
|
public interface ExchangeFunction {
…
default ExchangeFunction filter(ExchangeFilterFunction filter) {
Assert.notNull(filter, "'filter' must not be null");
return filter.apply(this);
}
}
|
当我们看到 Filter(过滤器)这个单词时,思路上就可以触类旁通了。在 Web 应用程序中,Filter 体现的就是一种拦截器作用,而多个 Filter 组合起来构成一种过滤器链。ExchangeFilterFunction 也是一个接口,其部分核心代码如下所示。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
public interface ExchangeFilterFunction {
Mono<ClientResponse> filter(ClientRequest request, ExchangeFunction next);
default ExchangeFilterFunction andThen(ExchangeFilterFunction after) {
Assert.notNull(after, "'after' must not be null");
return (request, next) -> {
ExchangeFunction nextExchange = exchangeRequest -> after.filter(exchangeRequest, next);
return filter(request, nextExchange);
};
}
default ExchangeFunction apply(ExchangeFunction exchange) {
Assert.notNull(exchange, "'exchange' must not be null");
return request -> this.filter(request, exchange);
}
…
}
|
显然,ExchangeFilterFunction 通过 andThen() 方法将自身添加到过滤器链并实现 filter() 这个函数式方法。我们可以使用过滤器函数以任何方式拦截和修改请求,例如通过修改 ClientRequest 来调用 ExchangeFilterFucntion 过滤器链中的下一个过滤器,或者让 ClientRequest 直接返回以阻止过滤器链的进一步执行。
作为示例,如下代码演示了如何使用过滤器功能添加 HTTP 基础认证机制。
1
2
3
|
WebClient client = WebClient.builder()
.filter(basicAuthentication("username", "password"))
.build();
|
这样,基于客户端过滤机制,我们不需要在每个请求中添加 Authorization 消息头,过滤器将拦截每个 WebClient 请求并自动添加该消息头。再来看一个例子,我们将编写一个自定义的过滤器函数 logFilter(),代码如下所示。
1
2
3
4
5
6
7
8
|
private ExchangeFilterFunction logFilter() {
return (clientRequest, next) -> {
logger.info("Request: {} {}", clientRequest.method(), clientRequest.url());
clientRequest.headers()
.forEach((name, values) -> values.forEach(value -> logger.info("{}={}", name, value)));
return next.exchange(clientRequest);
};
}
|
显然,logFilter() 方法的作用是对每个请求做详细的日志记录。我们同样可以通过 filter() 方法把该过滤器添加到请求链路中,代码如下所示。
1
2
3
|
WebClient webClient = WebClient.builder()
.filter(logFilter())
.build();
|
异常处理
当发起一个请求所得到的响应状态码为 4XX 或 5XX 时,WebClient 就会抛出一个 WebClientResponseException 异常,我们可以使用 onStatus() 方法来自定义对异常的处理方式,示例代码如下所示。
1
2
3
4
5
6
7
8
9
10
11
12
|
public Flux<Account> listAccounts() {
return webClient.get()
.uri("/accounts)
.retrieve()
.onStatus(HttpStatus::is4xxClientError, clientResponse ->
Mono.error(new MyCustomClientException())
)
.onStatus(HttpStatus::is5xxServerError, clientResponse ->
Mono.error(new MyCustomServerException())
)
.bodyToFlux(Account.class);
}
|
这里我们构建了一个 MyCustomServerException 来返回自定义异常信息。需要注意的是,这种处理方式只适用于使用 retrieve() 方法进行远程请求的场景,exchange() 方法在获取 4XX 或 5XX 响应的情况下不会引发异常。因此,当使用 exchange() 方法时,我们需要自行检查状态码并以合适的方式处理它们。
案例集成:基于 WebClient 的非阻塞式跨服务通信
介绍完 WebClient 类的使用方式之后,让我们回到 ReactiveSpringCSS 案例。在上一讲中,我已经给出了位于 customer-service 的 CustomerService 类中的 generateCustomerTicket 方法的代码结构,该方法用于完成与 account-service 和 order-service 进行集成,我们来回顾一下。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
public Mono<CustomerTicket> generateCustomerTicket(String accountId, String orderNumber) {
// 创建 CustomerTicket 对象
CustomerTicket customerTicket = new CustomerTicket();
customerTicket.setId("C_" + UUID.randomUUID().toString());
// 从远程 account-service 获取 Account 对象
Mono<AccountMapper> accountMapper = getRemoteAccountByAccountId(accountId);
// 从远程 order-service 中获取 Order 对象
Mono<OrderMapper> orderMapper = getRemoteOrderByOrderNumber(orderNumber);
Mono<CustomerTicket> monoCustomerTicket =
Mono.zip(accountMapper, orderMapper).flatMap(tuple -> {
AccountMapper account = tuple.getT1();
OrderMapper order = tuple.getT2();
// 设置 CustomerTicket 对象属性
…
return Mono.just(customerTicket);
});
// 保存 CustomerTicket 对象并返回
return monoCustomerTicket.flatMap(customerTicketRepository::save);
}
|
另一方面,上一讲中我也给出了在 order-service 中基于函数式编程模型构建 Web 服务的实现过程。同样的,这里也以 getRemoteOrderByOrderNumber 方法为例来和你讨论如何完成与 Web 服务之间的远程调用。getRemoteOrderByOrderNumber 方法定义如下。
1
2
3
4
|
private Mono<OrderMapper> getRemoteOrderByOrderNumber(String orderNumber) {
return orderClient.getOrderByOrderNumber(orderNumber);
}
|
这里,我们会构建一个 ReactiveOrderClient 类来完成对 order-service 的远程访问,如下所示。
1
2
3
4
5
6
7
8
9
10
11
12
13
|
@Component
public class ReactiveOrderClient {
public Mono<OrderMapper> getOrderByOrderNumber(String orderNumber) {
Mono<OrderMapper> orderMono = WebClient.create()
.get()
.uri("localhost:8081/orders/{orderNumber}", orderNumber)
.retrieve()
.bodyToMono(OrderMapper.class).log("getOrderFromRemote");
return orderMono;
}
}
|
你可以注意到,这里基于 WebClient.create() 工厂方法构建了一个 WebClient 对象,并通过它的 retrieve 方法来完成对远程 order-serivce 的请求过程。同时,我们看到这里的返回对象是一个 Mono
,而不是一个 Mono
对象。事实上,它们的内部字段都是一一对应的,只是位于两个不同的代码工程中,所以故意从命名上做了区分。WebClient 会自动完成 OrderMapper 与 Order 之间的自动映射。
小结与预告
在上一讲的基础上,这一讲我为你引入了 WebClient 模板类来完成对远程 HTTP 端点的响应式访问。WebClient 为开发人员提供了一大批有用的工具方法来实现 HTTP 请求的发送以及响应的获取。同时,该模板类还提供了一批定制化的入口供开发人员嵌入对 HTTP 请求过程进行精细化管理的处理逻辑。
这里给你留一道思考题:在使用 WebClient 时,如何实现对请求异常等控制逻辑的定制化处理?
关于远程调用,业界也存在一些协议,在网络通信层面提供非阻塞式的交互体验。RSocket 就是这一领域的主流实现,下一讲我将针对该协议和你一起展开讨论,到时候见。
点击链接,获取课程相关代码↓↓↓https://github.com/lagoueduCol/ReactiveProgramming-jianxiang.git
-– ### 精选评论 ##### **东: > 能不能讲讲原理哦,为啥webcclient能够实现非阻塞远程调用,是怎么融入到响应式体系里面去的? ###### 讲师回复: > 这块原理后面有机会再讲,要将的话涉及很多底层技术体系,篇幅会比较大 ##### **鹏: > 请问老师,如果要在线上使用webclient,应该怎么配置它的一些参数,像最大连接数,工作线程该配置多少,是否要使用netty的全局线程这些,项目中如果只有一部分用了mono,后面block拿到结果再去处理别的逻辑,会有问题吗,是不是必须整个链条都是mono的 ###### 讲师回复: > WebClient的参数可以参考官网的说明,也取决于项目的需求。 全链路都需要是响应式的,中间有一处是阻塞的,那么整个链路其他响应式部分就是无效的