Spring Integration的网络通量支持

时间:2022-12-14 20:00:09

Spring Integration的网络通量支持

WebFlux Spring 集成模块 () 允许以反应方式执行 HTTP 请求和处理入站 HTTP 请求。​​spring-integration-webflux​

您需要将此依赖项包含在项目中:

<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-webflux</artifactId>
<version>6.0.0</version>
</dependency>

在非基于 Servlet 的服务器配置的情况下,必须包含依赖关系。​​io.projectreactor.netty:reactor-netty​

WebFlux 支持由以下网关实现组成:和 。 该支持完全基于Spring WebFlux和Project Actor基础。 有关详细信息,请参阅 HTTP 支持,因为许多选项在反应式和常规 HTTP 组件之间共享。​​WebFluxInboundEndpoint​​​​WebFluxRequestExecutingMessageHandler​

WebFlux 命名空间支持

Spring 集成提供了一个命名空间和相应的模式定义。 若要将其包含在配置中,请在应用程序上下文配置文件中添加以下命名空间声明:​​webflux​

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-webflux="http://www.springframework.org/schema/integration/webflux"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
https://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration
https://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/webflux
https://www.springframework.org/schema/integration/webflux/spring-integration-webflux.xsd">
...
</beans>

WebFlux 入站组件

从版本 5.0 开始,提供了 的实现。 这个组件类似于基于MVC的,它通过新提取的共享一些常用选项。 它用于Spring WebFlux反应式环境(而不是MVC)。 以下示例显示了 WebFlux 端点的简单实现:​​WebFluxInboundEndpoint​​​​WebHandler​​​​HttpRequestHandlingEndpointSupport​​​​BaseHttpInboundEndpoint​

@Bean
public IntegrationFlow inboundChannelAdapterFlow() {
return IntegrationFlow
.from(WebFlux.inboundChannelAdapter("/reactivePost")
.requestMapping(m -> m.methods(HttpMethod.POST))
.requestPayloadType(ResolvableType.forClassWithGenerics(Flux.class, String.class))
.statusCodeFunction(m -> HttpStatus.ACCEPTED))
.channel(c -> c.queue("storeChannel"))
.get();
}

配置类似于(示例前面提到的),不同之处在于我们用于将 WebFlux 基础架构添加到我们的集成应用程序中。 此外,通过使用反应式 HTTP 服务器实现提供的基于按需的背压功能对下游流执行操作。​​HttpRequestHandlingEndpointSupport​​​​@EnableWebFlux​​​​WebFluxInboundEndpoint​​​​sendAndReceive​

回复部分也是非阻塞的,并且基于内部 ,该部分平面映射到按需解决的回复。​​FutureReplyChannel​​​​Mono​

您可以使用自定义、 甚至 . 后者提供了一种机制,可用于将回复作为任何响应式类型返回:Reactor 、RxJava 等。 这样,我们就可以使用 Spring 集成组件实现服务器发送事件场景,如以下示例所示:​​WebFluxInboundEndpoint​​​​ServerCodecConfigurer​​​​RequestedContentTypeResolver​​​​ReactiveAdapterRegistry​​​​Flux​​​​Observable​​​​Flowable​

@Bean
public IntegrationFlow sseFlow() {
return IntegrationFlow
.from(WebFlux.inboundGateway("/sse")
.requestMapping(m -> m.produces(MediaType.TEXT_EVENT_STREAM_VALUE)))
.handle((p, h) -> Flux.just("foo", "bar", "baz"))
.get();
}

有关更多可能的配置选项,请参阅请求映射支持和跨源资源共享 (CORS) 支持。

当请求正文为空或返回 时,请求参数 () 用于处理目标消息的 。​​payloadExpression​​​​null​​​​MultiValueMap<String, String>​​​​payload​

有效负载验证

从版本 5.2 开始,可以使用 . 与 HTTP 支持中的 MVC 验证不同,它用于在执行回退和函数之前验证请求已转换为的元素。 框架无法假设对象在构建最终有效负载后会有多复杂。 如果需要限制最终有效负载(或其元素)的验证可见性,则应将验证转移到下游,而不是 WebFlux 端点。 更多信息请参阅Spring WebFlux文档。 无效的有效负载被拒绝,并显示包含所有验证的(扩展名)。 在 Spring 框架参考手册中查看有关验证的更多信息。​​WebFluxInboundEndpoint​​​​Validator​​​​Publisher​​​​HttpMessageReader​​​​payloadExpression​​​​Publisher​​​​Publisher​​​​IntegrationWebExchangeBindException​​​​WebExchangeBindException​​​​Errors​

WebFlux 出站组件

(从版本 5.0 开始)实现类似于 。 它使用来自Spring Framework WebFlux模块。 要配置它,请定义类似于以下内容的 Bean:​​WebFluxRequestExecutingMessageHandler​​​​HttpRequestExecutingMessageHandler​​​​WebClient​

@Bean
public IntegrationFlow outboundReactive() {
return f -> f
.handle(WebFlux.<MultiValueMap<String, String>>outboundGateway(m ->
UriComponentsBuilder.fromUriString("http://localhost:8080/foo")
.queryParams(m.getPayload())
.build()
.toUri())
.httpMethod(HttpMethod.GET)
.expectedResponseType(String.class));
}

该操作返回 ,该 映射到 (通过使用几个步骤)作为 的输出。 与 as 一起,评估将推迟到进行下游订阅。 否则,它被视为一种模式,并且响应将适于 a 以获得来自 的异步回复。 输出消息的目标有效负载取决于配置。 或标识响应正文元素转换的目标类型。 如果设置为 ,则响应正文将转换为 a,并为每个元素提供,并将其作为有效负载发送到下游。 之后,您可以使用拆分器以反应方式迭代它。​​WebClient​​​​exchange()​​​​Mono<ClientResponse>​​​​Mono.map()​​​​AbstractIntegrationMessageBuilder​​​​WebFluxRequestExecutingMessageHandler​​​​ReactiveChannel​​​​outputChannel​​​​Mono<ClientResponse>​​​​async​​​​Mono​​​​SettableListenableFuture​​​​WebFluxRequestExecutingMessageHandler​​​​WebFluxRequestExecutingMessageHandler​​​​setExpectedResponseType(Class<?>)​​​​setExpectedResponseTypeExpression(Expression)​​​​replyPayloadToFlux​​​​true​​​​Flux​​​​expectedResponseType​​​​Flux​​​​Flux​

此外,可以注入 a 而不是 and 属性。 它可用于对正文和 HTTP 标头转换的低级访问和更多控制。 Spring 集成作为一个标识函数来生成(下游)整体和任何其他可能的自定义逻辑。​​BodyExtractor<?, ClientHttpResponse>​​​​WebFluxRequestExecutingMessageHandler​​​​expectedResponseType​​​​replyPayloadToFlux​​​​ClientHttpResponse​​​​ClientHttpResponseBodyExtractor​​​​ClientHttpResponse​

从版本 5.2 开始,支持 反应式 、 和类型作为请求消息有效负载。 在内部使用相应的 以填充到 . 当有效负载是反应式 时,已配置 或 可用于确定发布者元素类型的类型。 表达式必须解析为 ,该解析为目标或 。​​WebFluxRequestExecutingMessageHandler​​​​Publisher​​​​Resource​​​​MultiValueMap​​​​BodyInserter​​​​WebClient.RequestBodySpec​​​​Publisher​​​​publisherElementType​​​​publisherElementTypeExpression​​​​Class<?>​​​​String​​​​Class<?>​​​​ParameterizedTypeReference​

从版本 5.5 开始,公开一个标志(默认情况下)以仅返回响应正文,或返回整体作为回复消息有效负载,与提供的 or 无关。 如果 中不存在主体,则忽略此标志并返回整体。​​WebFluxRequestExecutingMessageHandler​​​​extractResponseBody​​​​true​​​​ResponseEntity​​​​expectedResponseType​​​​replyPayloadToFlux​​​​ResponseEntity​​​​ResponseEntity​

有关更多可能的配置选项,请参阅 HTTP 出站组件。

WebFlux 标头映射

由于 WebFlux 组件完全基于 HTTP 协议,因此 HTTP 标头映射没有区别。 有关用于映射标头的更多可能选项和组件,请参阅 HTTP 标头映射。

网络通量请求属性

从版本 6.0 开始,可以将 配置为通过 评估请求属性。 必须在 中计算此 SpEL 表达式。 然后将此类映射传播到 HTTP 请求配置回调。 如果需要将键值对象形式的信息从请求传递,并且下游过滤器将访问这些属性以进行进一步处理,这将很有帮助。​​WebFluxRequestExecutingMessageHandler​​​​setAttributeVariablesExpression()​​​​Map​​​​WebClient.RequestBodySpec.attributes(Consumer<Map<String, Object>> attributesConsumer)​​​​Message​