Spring Cloud Gateway获取body

时间:2024-04-07 15:38:01

前言

当前业务需求: 客户端传送数据的content_type: application/json, post请求,服务器需要从客request的中取出body进行网关的验权,然后把处理之后的数据重新封装到body中,经历查找问题解决问题的各种坑之后,痛定思痛,以此博客,希望能帮到有需求的人

获取body成功的 版本结合如下: 其他的版本不保证适合本博客
springboot 2.0.6.RELEASE + springcloud Finchley.SR2 + spring cloud gateway
maven依赖如下:

<parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.0.6.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
 <!-- spring cloud 依赖-->
    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>Finchley.SR2</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

采坑历史

一开始使用的是:springboot 2.0.3.RELEASE + springcloud Finchley.RELEASE + spring cloud gateway, 使用的是网上铺天盖地的下面的写法来获取body,不是批判,只是觉得在技术上对于存在问题的应该要自己去亲自验证,并且秉着尊重技术的原则,不转载传播错误的博客。获取body有异常的方法:

	/**
     * 获取请求体中的字符串内容
     * @param serverHttpRequest
     * @return
     */
    private String resolveBodyFromRequest(ServerHttpRequest serverHttpRequest){
        //获取请求体
        Flux<DataBuffer> body = serverHttpRequest.getBody();
        StringBuilder sb = new StringBuilder();

        body.subscribe(buffer -> {
            byte[] bytes = new byte[buffer.readableByteCount()];
            buffer.read(bytes);
            DataBufferUtils.release(buffer);
            String bodyString = new String(bytes, StandardCharsets.UTF_8);
            sb.append(bodyString);
        });
        return sb.toString();

    }

该方法在获取body的时候,对于json的获取经常出现不完整,问题描述见查错过程

查错过程

出现的错误:服务器获取的body不完整
Spring Cloud Gateway获取body

查看nginx日志:nginx查看客户端的信息是完整的
Spring Cloud Gateway获取body

结果是各种查错,找问题:
官方文档也有大量针对这个问题的issues:https://github.com/spring-cloud/spring-cloud-gateway/issues/509, 本人也在里面提问了,可能是因为蹩脚的英文,直接被忽视了,不过看到前人的一些提问,查看源码后, 另外github上这边博客给了很大 的启发,特此结合自己的验证: https://github.com/xiaozhiliaoo/gateway-sample2/blob/master/src/main/java/org/lili/ApiLocator.java

模拟场景一: application/x-www-form-urlencoded 表单提交数据
postman:
Spring Cloud Gateway获取body
服务器接收 Controller层
Spring Cloud Gateway获取body

gateway的处理:
Spring Cloud Gateway获取body
代码

    @Bean
    public RouteLocator activityRouter(RouteLocatorBuilder builder) {
        RouteLocatorBuilder.Builder routes = builder.routes();
        RouteLocatorBuilder.Builder serviceProvider = routes

                /**
                 * 拦截  /test/** 的所有请求,lb:// 代表将请求通过负载均衡路由到ORDER_SERVICE服务上面
                 * Hystrix 支持两个参数:
                 *      name:即HystrixCommand的名字
                 *      fallbackUri:即 fallback 对应的 uri,这里的 uri 仅支持forward: schemed 的
                 *
                 */
                .route(RouteIdEnum.TEST_ROUTE_ID.getCode(),
                        r -> r.readBody(String.class, requestBody -> {
                            logger.info("requestBody is {}", requestBody);
                            // 这里不对body做判断处理
                            return true;
                        }).and().path(contextPath.concat("/test/**"))
                                .filters(f -> f.stripPrefix(1).requestRateLimiter(c -> c.setRateLimiter(apiRedisRateLimiter()).setKeyResolver(apiKeyResolver))
                                        .filter(parameterWrapFilter).hystrix(h -> h.setName("activityInnerHystrixCommand").setFallbackUri("forward:/activityInner/hystrixFallback")))
                                .uri("lb://".concat("ORDER_SERVICE")
                )


                ;
        RouteLocator routeLocator = serviceProvider.build();
        logger.info("ActivityServiceRouter is loading ... {}", routeLocator);
        return routeLocator;
    }

模拟场景二: application/json 提交数据
postman
Spring Cloud Gateway获取body
服务器接收,controller
Spring Cloud Gateway获取body

gateway

   @Bean
    public RouteLocator activityRouter(RouteLocatorBuilder builder) {
        RouteLocatorBuilder.Builder routes = builder.routes();
        RouteLocatorBuilder.Builder serviceProvider = routes

                /**
                 * 拦截  /test/** 的所有请求,lb:// 代表将请求通过负载均衡路由到ISHANGJIE_ORDER_SERVICE服务上面
                 * Hystrix 支持两个参数:
                 *      name:即HystrixCommand的名字
                 *      fallbackUri:即 fallback 对应的 uri,这里的 uri 仅支持forward: schemed 的
                 *
                 */
                .route(RouteIdEnum.TEST_ROUTE_ID.getCode(),
                        r -> r.readBody(Object.class, requestBody -> {
                            logger.info("requestBody is {}", requestBody);
                            // 这里不对body做判断处理
                            return true;
                        }).and().path(contextPath.concat("/test/**"))
                                .filters(f -> f.stripPrefix(1).requestRateLimiter(c -> c.setRateLimiter(apiRedisRateLimiter()).setKeyResolver(apiKeyResolver))
                                        .filter(parameterWrapFilter).hystrix(h -> h.setName("activityInnerHystrixCommand").setFallbackUri("forward:/activityInner/hystrixFallback")))
                                .uri("lb://".concat(ServiceConstant.ISHANGJIE_ORDER_SERVICE))
                )
                
                ;
        RouteLocator routeLocator = serviceProvider.build();
        logger.info("ActivityServiceRouter is loading ... {}", routeLocator);
        return routeLocator;
    }

两种场景都是在filter中通过 Object object = exchange.getAttribute(“cachedRequestBodyObject”); 来获取body数据
代码样例:

package com.ald.ishangjie.controller.filter.common;

import io.netty.buffer.ByteBufAllocator;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.gateway.filter.GatewayFilter;
import org.springframework.cloud.gateway.filter.GatewayFilterChain;
import org.springframework.core.Ordered;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.core.io.buffer.NettyDataBufferFactory;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.http.MediaType;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.http.server.reactive.ServerHttpRequestDecorator;
import org.springframework.http.server.reactive.ServerHttpResponse;
import org.springframework.stereotype.Component;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.net.InetSocketAddress;
import java.net.URI;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.Date;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicReference;

/**
 * @desc: 统一整理客户端的请求和重新封装参数
 * @date: 2019/1/3 17:52
 */
@Component
public class ParameterWrapFilter implements GatewayFilter, Ordered {
    private Logger logger = LoggerFactory.getLogger(ParameterWrapFilter.class);

    @Value("${default.app-version}")
    private int appVersion;

    @Value("${wechat.applet.token_secret}")
    private String TOKEN_SECRET;

    /**
     * 过滤业务处理
     * @param exchange
     * @param chain
     * @return
     */
    @Override
    public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
        ServerHttpRequest serverHttpRequest = exchange.getRequest();
        ServerHttpResponse serverHttpResponse = exchange.getResponse();
        HttpMethod method = serverHttpRequest.getMethod();
        String contentType = serverHttpRequest.getHeaders().getFirst(HttpHeaders.CONTENT_TYPE);
        String userName = serverHttpRequest.getHeaders().getFirst(CommonConstant.KEY_USERNAME);
        String netType = serverHttpRequest.getHeaders().getFirst(CommonConstant.KEY_NET_TYPE);
        String mapToken = serverHttpRequest.getHeaders().getFirst("mapToken");
        String id = "";
        Long userId = null;
        URI uri = serverHttpRequest.getURI();
        String path = uri.getPath();
        InetSocketAddress remoteAddress = exchange.getRequest().getRemoteAddress();
        String ip = remoteAddress.getHostName();
        //post请求并且 content-type 为 application/json 或者  application/x-www-form-urlencoded
        if (method == HttpMethod.POST) {
            //从请求里获取Post请求体 参考 https://github.com/xiaozhiliaoo/gateway-sample2/tree/master
            Object object = exchange.getAttribute("cachedRequestBodyObject");
            JSONObject json = (JSONObject) JSONObject.toJSON(object);
            String bodyStr = json.toJSONString();
     /*       String bodyStr = resolveBodyFromRequest(serverHttpRequest);
            logger.info("RequestWrapFilter uri={},bodyStr={}", uri, bodyStr);
            // 站内H5请求
            if (path.contains(CommonConstant.INNER_H5) || path.contains(CommonConstant.INNER_H5_OLD)) {
                // 站内参数验证,并且将相关的参数解析到appInfoBo对象
                AppInfoBo appInfoBo = InnerParamCheckUtil.doParamCheck(exchange, tokenRedisCache);
                //参数转换为空,直接响应错误
                if (null == appInfoBo) {
                    return serverHttpResponse.writeWith(Mono.just(this.wrapResponse(exchange, "", IShangJieExceptionCode.H5_AUTH_CHECK_ERROR)));
                }
                userName = appInfoBo.getUserName();
                netType = appInfoBo.getNetType();
                id = appInfoBo.getId();

                // 站外H5请求
            } else if (path.contains(CommonConstant.OUTER_H5) || path.contains(CommonConstant.OUTER_H5_OLD)) {
                String token = serverHttpRequest.getHeaders().getFirst("token");
                if (StringUtils.isNotBlank(token)) {
                    userName = BaseTokenUtil.getUserName(token, TOKEN_SECRET);
                }
            }else{
                logger.info("RequestWrapFilter exception uri={},bodyStr={}", uri, bodyStr);
            }

            if (StringUtils.isNotBlank(userName)) {
                UserDo userDo = this.userService.getUserByUserName(userName);
                if (null != userDo) {
                    userId = userDo.getRid();
                }
            }


            // 重新封装 bodyStr
            logger.info("gateway fitle contentType={}", contentType);
            if (StringUtils.isNotBlank(bodyStr)) {
                // application/json格式
                if (MediaType.APPLICATION_JSON_VALUE.equalsIgnoreCase(contentType) || MediaType.APPLICATION_JSON_UTF8_VALUE.equalsIgnoreCase(contentType)) {
                    //将bodyString解析成jsonObject对象
                    JSONObject json = JSONObject.parseObject(bodyStr);
                    bodyStr = this.fillJsonBody(json, userName, netType, userId, ip, id);
                } else if (MediaType.APPLICATION_FORM_URLENCODED_VALUE.equalsIgnoreCase(contentType)) {
                    // 普通键值对,增加参数
                    bodyStr = String.format("%s&%s", bodyStr, this.fillParamBody(userName, netType, userId, ip, id));
                }
            } else {
                logger.error("POST body is null,url={}", serverHttpRequest.getURI().getRawPath());
                // json格式 application/json
                if (MediaType.APPLICATION_JSON_VALUE.equalsIgnoreCase(contentType) || MediaType.APPLICATION_JSON_UTF8_VALUE.equalsIgnoreCase(contentType)) {
                    preWarnUtil.warn("ParameterWrapFilter/filter", IShangJieExceptionCode.BODY_IS_NULL_ERROR);
                    // 返回参数异常响应
                    return serverHttpResponse.writeWith(Mono.just(this.wrapResponse(exchange, "", IShangJieExceptionCode.PARAM_ERROR)));

                // 表单格式 application/x-www-form-urlencoded
                } else if (MediaType.APPLICATION_FORM_URLENCODED_VALUE.equalsIgnoreCase(contentType)) {
                    // 普通键值对,增加参数
                    bodyStr = String.format("?%s", this.fillParamBody(userName, netType, userId, ip, id));
                }
            }
*/
            //重新 封装request,传给下一级
            ServerHttpRequest request = serverHttpRequest.mutate().uri(uri).build();
            DataBuffer bodyDataBuffer = stringBuffer(bodyStr);
            Flux<DataBuffer> bodyFlux = Flux.just(bodyDataBuffer);
            // 定义新的消息头
            HttpHeaders headers = new HttpHeaders();
            headers.putAll(exchange.getRequest().getHeaders());

            // 添加消息头
            headers.set(CommonConstant.KEY_MAP_TOKEN, mapToken);

            // 由于修改了传递参数,需要重新设置CONTENT_LENGTH,长度是字节长度,不是字符串长度
            int length = bodyStr.getBytes().length;
            headers.remove(HttpHeaders.CONTENT_LENGTH);
            headers.setContentLength(length);

            // 设置CONTENT_TYPE
            if (StringUtils.isNotBlank(contentType)) {
                headers.set(HttpHeaders.CONTENT_TYPE, contentType);
            } else {
                logger.info("=====StringUtils.isBlank(contentType)=====");
                preWarnUtil.warn("ParameterWrapFilter/filter", IShangJieExceptionCode.CONTENT_TYPE_IS_NULL_ERROR);
            }
            // 由于post的body只能订阅一次,由于上面代码中已经订阅过一次body。所以要再次封装请求到request才行,不然会报错请求已经订阅过
            request = new ServerHttpRequestDecorator(request) {
                @Override
                public Flux<DataBuffer> getBody() {
                    return bodyFlux;
                }
            };
            request.mutate().header(HttpHeaders.CONTENT_LENGTH, Integer.toString(bodyStr.length()));
            return chain.filter(exchange.mutate().request(request).build());

        } else if (method == HttpMethod.GET) {
            LogUtil.addLog(LogLevelEnum.INFO, "ParameterWrapFilter failed,path={}", serverHttpRequest.getURI().getPath());
            return serverHttpResponse.writeWith(Mono.just(this.wrapResponse(exchange, "", IShangJieExceptionCode.GET_NOT_ALLOW_ERROR)));
        }
        return chain.filter(exchange);
    }


    /**
     * 填充json数据格式
     */
    private String fillJsonBody(JSONObject json, String userName, String netType, Long userId, String ip, String id) {
        if (null == json) {
            json = new JSONObject();
        }
        json.put(CommonConstant.KEY_APP_VERSION, appVersion);
        json.put(CommonConstant.KEY_USERNAME, userName);
        json.put(CommonConstant.KEY_NET_TYPE, netType);
        json.put(CommonConstant.KEY_USER_ID, userId);
        json.put(CommonConstant.KEY_IP, ip);
        json.put(CommonConstant.KEY_ID, id);
        // 转换回字符串
        return JSONObject.toJSONString(json);
    }


    /**
     * 填充拼接数据格式
     */
    private String fillParamBody(String userName, String netType, Long userId, String ip, String id) {
        StringBuffer buffer = new StringBuffer();
        buffer.append(CommonConstant.KEY_USERNAME).append("=").append(userName)
            .append("&").append(CommonConstant.KEY_NET_TYPE).append("=").append(netType)
            .append("&").append(CommonConstant.KEY_APP_VERSION).append("=").append(appVersion)
            .append("&").append(CommonConstant.KEY_USER_ID).append("=").append(userId)
            .append("&").append(CommonConstant.KEY_IP).append("=").append(ip)
            .append("&").append(CommonConstant.KEY_ID).append("=").append(id);

        return buffer.toString();
    }

     public String getBody(ServerHttpRequest serverHttpRequest){
         String body = null;
         try {
             ByteBuffer byteBuffer = Mono.from(serverHttpRequest.getBody()).toFuture().get().asByteBuffer();
             byte[] bytes = new byte[byteBuffer.capacity()];
             while (byteBuffer.hasRemaining()) {
                 byteBuffer.get(bytes);
             }
             body = new String(bytes, Charset.forName("UTF-8"));
         } catch (InterruptedException e) {
             e.printStackTrace();
         } catch (ExecutionException e) {
             e.printStackTrace();
         }
         return body;
     }

    /**
     * string转成 buffer
     *
     * @param value
     * @return
     */
    private DataBuffer stringBuffer(String value) {
        byte[] bytes = value.getBytes(StandardCharsets.UTF_8);
        NettyDataBufferFactory nettyDataBufferFactory = new NettyDataBufferFactory(ByteBufAllocator.DEFAULT);
        DataBuffer buffer = nettyDataBufferFactory.allocateBuffer(bytes.length);
        buffer.write(bytes);
        return buffer;
    }

    /**
     * 优先级最高
     *
     * @return
     */
    @Override
    public int getOrder() {
        return Ordered.HIGHEST_PRECEDENCE;
    }
}

两个场景postman run的结果:
Spring Cloud Gateway获取body

服务器接收参数:
Spring Cloud Gateway获取body
通过上面的试验,服务都是接收到数据,postman压测了1000个请求,未出现异常数据