parent
2b017fe540
commit
55aa8e914e
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
* Copyright 2002-2019 the original author or authors.
|
||||
* Copyright 2002-2021 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
|
@ -20,6 +20,7 @@ import org.springframework.http.HttpStatus;
|
|||
import org.springframework.http.ReactiveHttpInputMessage;
|
||||
import org.springframework.http.ResponseCookie;
|
||||
import org.springframework.util.MultiValueMap;
|
||||
import org.springframework.util.ObjectUtils;
|
||||
|
||||
/**
|
||||
* Represents a client-side reactive HTTP response.
|
||||
|
@ -30,6 +31,15 @@ import org.springframework.util.MultiValueMap;
|
|||
*/
|
||||
public interface ClientHttpResponse extends ReactiveHttpInputMessage {
|
||||
|
||||
/**
|
||||
* Return an id that represents the underlying connection, if available,
|
||||
* or the request for the purpose of correlating log messages.
|
||||
* @since 5.3.5
|
||||
*/
|
||||
default String getId() {
|
||||
return ObjectUtils.getIdentityHexString(this);
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the HTTP status code as an {@link HttpStatus} enum value.
|
||||
* @return the HTTP status as an HttpStatus enum value (never {@code null})
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
* Copyright 2002-2018 the original author or authors.
|
||||
* Copyright 2002-2021 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
|
@ -50,6 +50,11 @@ public class ClientHttpResponseDecorator implements ClientHttpResponse {
|
|||
|
||||
// ClientHttpResponse delegation methods...
|
||||
|
||||
@Override
|
||||
public String getId() {
|
||||
return this.delegate.getId();
|
||||
}
|
||||
|
||||
@Override
|
||||
public HttpStatus getStatusCode() {
|
||||
return this.delegate.getStatusCode();
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
* Copyright 2002-2020 the original author or authors.
|
||||
* Copyright 2002-2021 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
|
@ -37,9 +37,11 @@ import org.springframework.http.HttpMethod;
|
|||
import org.springframework.http.HttpStatus;
|
||||
import org.springframework.http.ResponseCookie;
|
||||
import org.springframework.lang.Nullable;
|
||||
import org.springframework.util.ClassUtils;
|
||||
import org.springframework.util.CollectionUtils;
|
||||
import org.springframework.util.LinkedMultiValueMap;
|
||||
import org.springframework.util.MultiValueMap;
|
||||
import org.springframework.util.ObjectUtils;
|
||||
|
||||
/**
|
||||
* {@link ClientHttpResponse} implementation for the Reactor-Netty HTTP client.
|
||||
|
@ -51,6 +53,11 @@ import org.springframework.util.MultiValueMap;
|
|||
*/
|
||||
class ReactorClientHttpResponse implements ClientHttpResponse {
|
||||
|
||||
/** Reactor Netty 1.0.5+. */
|
||||
static final boolean reactorNettyRequestChannelOperationsIdPresent = ClassUtils.isPresent(
|
||||
"reactor.netty.ChannelOperationsId", ReactorClientHttpResponse.class.getClassLoader());
|
||||
|
||||
|
||||
private static final Log logger = LogFactory.getLog(ReactorClientHttpResponse.class);
|
||||
|
||||
private final HttpClientResponse response;
|
||||
|
@ -64,8 +71,6 @@ class ReactorClientHttpResponse implements ClientHttpResponse {
|
|||
// 0 - not subscribed, 1 - subscribed, 2 - cancelled via connector (before subscribe)
|
||||
private final AtomicInteger state = new AtomicInteger();
|
||||
|
||||
private final String logPrefix;
|
||||
|
||||
|
||||
/**
|
||||
* Constructor that matches the inputs from
|
||||
|
@ -78,7 +83,6 @@ class ReactorClientHttpResponse implements ClientHttpResponse {
|
|||
this.headers = HttpHeaders.readOnlyHttpHeaders(adapter);
|
||||
this.inbound = connection.inbound();
|
||||
this.bufferFactory = new NettyDataBufferFactory(connection.outbound().alloc());
|
||||
this.logPrefix = (logger.isDebugEnabled() ? "[" + connection.channel().id().asShortText() + "] " : "");
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -92,10 +96,21 @@ class ReactorClientHttpResponse implements ClientHttpResponse {
|
|||
this.headers = HttpHeaders.readOnlyHttpHeaders(adapter);
|
||||
this.inbound = inbound;
|
||||
this.bufferFactory = new NettyDataBufferFactory(alloc);
|
||||
this.logPrefix = "";
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public String getId() {
|
||||
String id = null;
|
||||
if (reactorNettyRequestChannelOperationsIdPresent) {
|
||||
id = ChannelOperationsIdHelper.getId(this.response);
|
||||
}
|
||||
if (id == null && this.response instanceof Connection) {
|
||||
id = ((Connection) this.response).channel().id().asShortText();
|
||||
}
|
||||
return (id != null ? id : ObjectUtils.getIdentityHexString(this));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Flux<DataBuffer> getBody() {
|
||||
return this.inbound.receive()
|
||||
|
@ -167,7 +182,7 @@ class ReactorClientHttpResponse implements ClientHttpResponse {
|
|||
void releaseAfterCancel(HttpMethod method) {
|
||||
if (mayHaveBody(method) && this.state.compareAndSet(0, 2)) {
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug(this.logPrefix + "Releasing body, not yet subscribed.");
|
||||
logger.debug("[" + getId() + "]" + "Releasing body, not yet subscribed.");
|
||||
}
|
||||
this.inbound.receive().doOnNext(byteBuf -> {}).subscribe(byteBuf -> {}, ex -> {});
|
||||
}
|
||||
|
@ -186,4 +201,18 @@ class ReactorClientHttpResponse implements ClientHttpResponse {
|
|||
"status=" + getRawStatusCode() + '}';
|
||||
}
|
||||
|
||||
|
||||
private static class ChannelOperationsIdHelper {
|
||||
|
||||
@Nullable
|
||||
public static String getId(HttpClientResponse response) {
|
||||
if (response instanceof reactor.netty.ChannelOperationsId) {
|
||||
return (logger.isDebugEnabled() ?
|
||||
((reactor.netty.ChannelOperationsId) response).asLongText() :
|
||||
((reactor.netty.ChannelOperationsId) response).asShortText());
|
||||
}
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
* Copyright 2002-2019 the original author or authors.
|
||||
* Copyright 2002-2021 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
|
@ -98,14 +98,14 @@ public abstract class ExchangeFunctions {
|
|||
Assert.notNull(clientRequest, "ClientRequest must not be null");
|
||||
HttpMethod httpMethod = clientRequest.method();
|
||||
URI url = clientRequest.url();
|
||||
String logPrefix = clientRequest.logPrefix();
|
||||
|
||||
return this.connector
|
||||
.connect(httpMethod, url, httpRequest -> clientRequest.writeTo(httpRequest, this.strategies))
|
||||
.doOnRequest(n -> logRequest(clientRequest))
|
||||
.doOnCancel(() -> logger.debug(logPrefix + "Cancel signal (to close connection)"))
|
||||
.doOnCancel(() -> logger.debug(clientRequest.logPrefix() + "Cancel signal (to close connection)"))
|
||||
.onErrorResume(WebClientUtils.WRAP_EXCEPTION_PREDICATE, t -> wrapException(t, clientRequest))
|
||||
.map(httpResponse -> {
|
||||
String logPrefix = getLogPrefix(clientRequest, httpResponse);
|
||||
logResponse(httpResponse, logPrefix);
|
||||
return new DefaultClientResponse(
|
||||
httpResponse, this.strategies, logPrefix, httpMethod.name() + " " + url,
|
||||
|
@ -120,6 +120,10 @@ public abstract class ExchangeFunctions {
|
|||
);
|
||||
}
|
||||
|
||||
private String getLogPrefix(ClientRequest request, ClientHttpResponse response) {
|
||||
return request.logPrefix() + "[" + response.getId() + "] ";
|
||||
}
|
||||
|
||||
private void logResponse(ClientHttpResponse response, String logPrefix) {
|
||||
LogFormatUtils.traceDebug(logger, traceOn -> {
|
||||
int code = response.getRawStatusCode();
|
||||
|
|
Loading…
Reference in New Issue