zxcvbnmzsedr
4/13/2019 - 3:27 AM

spring cloud gateway read request body

Spring cloud gateway print request info

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.gateway.filter.GatewayFilterChain;
import org.springframework.cloud.gateway.filter.GlobalFilter;
import org.springframework.cloud.gateway.handler.predicate.ReadBodyPredicateFactory;
import org.springframework.core.Ordered;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.http.HttpMethod;
import org.springframework.http.codec.HttpMessageReader;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.http.server.reactive.ServerHttpRequestDecorator;
import org.springframework.util.MultiValueMap;
import org.springframework.util.StopWatch;
import org.springframework.web.reactive.function.server.HandlerStrategies;
import org.springframework.web.reactive.function.server.ServerRequest;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;

import static org.springframework.cloud.gateway.filter.AdaptCachedBodyGlobalFilter.CACHED_REQUEST_BODY_KEY;

/**
 * read request body
 * @author zhaotianzeng
 * @version V1.0
 * @date 2019-04-11 20:02
 */
@Slf4j
public class StatisticalFilter implements GlobalFilter, Ordered {
    @Value("#{'${print.headers:}'.split(',')}")
    private Set<String> headers;
    private static final String CACHE_REQUEST_BODY_OBJECT_KEY = "cachedRequestBodyObject";
    private static final String START_TIME = "startTime";

    @Override
    public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
        ServerHttpRequest request = exchange.getRequest();
        String requestUri = request.getPath().pathWithinApplication().value();
        MultiValueMap<String, String> queryParams = request.getQueryParams();

        Map<String, String> headerMap = new LinkedHashMap<>();
        // controll print header
        for (String h : headers) {
            if ("*".equalsIgnoreCase(h)) {
                headerMap = request.getHeaders().toSingleValueMap();
                break;
            } else {
                String header = request.getHeaders().getFirst(h);
                headerMap.put(h, header);
            }
        }

        // cost time
        StopWatch stopWatch = new StopWatch(requestUri);
        stopWatch.start();
        exchange.getAttributes().put(START_TIME, stopWatch);

        log.info("URL: {} Params {} ,Header: {}", requestUri, queryParams, headerMap);


        if (request.getMethod() == HttpMethod.POST) {
            return readBody(exchange).flatMap(exchange1 ->
                    chain.filter(exchange1).then(this.stop(exchange))
            );
        }
        return chain.filter(exchange).then(this.stop(exchange));
    }

    private static final List<HttpMessageReader<?>> MESSAGE_READERS = HandlerStrategies.withDefaults().messageReaders();

    @Override
    public int getOrder() {
        return Integer.MIN_VALUE;
    }

    private Mono<Void> stop(ServerWebExchange exchange) {
        return Mono.fromRunnable(() -> {
            StopWatch stop = exchange.getAttribute(START_TIME);
            if (stop != null) {
                stop.stop();
                log.info("URL: {}   Cost:{}ms", stop.getId(), stop.getTotalTimeMillis());
            }
        });
    }

    /**
     * read body
     * <p>
     * reference {@link ReadBodyPredicateFactory}
     */
    private Mono<ServerWebExchange> readBody(ServerWebExchange exchange) {
        Object cachedBody = exchange.getAttribute(CACHE_REQUEST_BODY_OBJECT_KEY);
        if (cachedBody != null) {
            return Mono.just(exchange);
        }
        return DataBufferUtils.join(exchange.getRequest().getBody())
                .flatMap(dataBuffer -> {
                    DataBufferUtils.retain(dataBuffer);
                    Flux<DataBuffer> cachedFlux = Flux.defer(() -> Flux.just(dataBuffer.slice(0, dataBuffer.readableByteCount())));
                    ServerHttpRequest mutatedRequest = new ServerHttpRequestDecorator(exchange.getRequest()) {
                        @Override
                        public Flux<DataBuffer> getBody() {
                            return cachedFlux;
                        }
                    };
                    ServerWebExchange build = exchange.mutate().request(mutatedRequest).build();
                    return Mono.just(build)
                            .map(ex -> {
                                ServerRequest.create(ex, MESSAGE_READERS)
                                        .bodyToMono(String.class)
                                        .subscribe(
                                                bodyString ->
                                                        log.info("Body: {}", bodyString.replaceAll("\\s", ""))
                                        );
                                return ex;
                            })
                            .doOnNext(objectValue -> {
                                exchange.getAttributes().put(CACHE_REQUEST_BODY_OBJECT_KEY, objectValue);
                                exchange.getAttributes().put(CACHED_REQUEST_BODY_KEY, cachedFlux);
                            });
                });

    }


}