WritetapConnector internal refactoring

Extract a common delegate class to share between the request and the
to wiretap a Publisher and record and buffer its data.

Preparation for SPR-17363.
This commit is contained in:
Rossen Stoyanchev 2018-10-11 13:51:38 -04:00
parent 050f44d75b
commit c567e65eea
1 changed files with 103 additions and 68 deletions

View File

@ -50,9 +50,6 @@ import org.springframework.util.Assert;
*/ */
class WiretapConnector implements ClientHttpConnector { class WiretapConnector implements ClientHttpConnector {
private static final DataBufferFactory bufferFactory = new DefaultDataBufferFactory();
private final ClientHttpConnector delegate; private final ClientHttpConnector delegate;
private final Map<String, Info> exchanges = new ConcurrentHashMap<>(); private final Map<String, Info> exchanges = new ConcurrentHashMap<>();
@ -117,7 +114,93 @@ class WiretapConnector implements ClientHttpConnector {
public ExchangeResult createExchangeResult(@Nullable String uriTemplate) { public ExchangeResult createExchangeResult(@Nullable String uriTemplate) {
return new ExchangeResult(this.request, this.response, return new ExchangeResult(this.request, this.response,
this.request.getContent(), this.response.getContent(), uriTemplate); this.request.getRecorder().getContent(), this.response.getRecorder().getContent(), uriTemplate);
}
}
/**
* Tap into a Publisher of data buffers to save the content.
*/
final static class WiretapRecorder {
private static final DataBufferFactory bufferFactory = new DefaultDataBufferFactory();
public static final byte[] EMPTY_CONTENT = new byte[0];
@Nullable
private final Publisher<? extends DataBuffer> publisher;
@Nullable
private final Publisher<? extends Publisher<? extends DataBuffer>> publisherNested;
private final DataBuffer buffer;
private final MonoProcessor<byte[]> content;
private WiretapRecorder(@Nullable Publisher<? extends DataBuffer> publisher,
@Nullable Publisher<? extends Publisher<? extends DataBuffer>> publisherNested) {
if (publisher != null && publisherNested != null) {
throw new IllegalArgumentException("At most one publisher expected");
}
this.publisher = publisher != null ?
Flux.from(publisher)
.doOnNext(this::handleOnNext)
.doOnError(this::handleOnError)
.doOnCancel(this::handleOnComplete)
.doOnComplete(this::handleOnComplete) : null;
this.publisherNested = publisherNested != null ?
Flux.from(publisherNested)
.map(p -> Flux.from(p).doOnNext(this::handleOnNext).doOnError(this::handleOnError))
.doOnError(this::handleOnError)
.doOnCancel(this::handleOnComplete)
.doOnComplete(this::handleOnComplete) : null;
this.buffer = bufferFactory.allocateBuffer();
this.content = MonoProcessor.create();
if (this.publisher == null && this.publisherNested == null) {
this.content.onNext(EMPTY_CONTENT);
}
}
public Publisher<? extends DataBuffer> getPublisherToUse() {
Assert.notNull(this.publisher, "Publisher not in use.");
return this.publisher;
}
public Publisher<? extends Publisher<? extends DataBuffer>> getNestedPublisherToUse() {
Assert.notNull(this.publisherNested, "Nested publisher not in use.");
return this.publisherNested;
}
public MonoProcessor<byte[]> getContent() {
return this.content;
}
private void handleOnNext(DataBuffer nextBuffer) {
this.buffer.write(nextBuffer);
}
private void handleOnError(Throwable ex) {
if (!this.content.isTerminated()) {
this.content.onError(ex);
}
}
private void handleOnComplete() {
if (!this.content.isTerminated()) {
byte[] bytes = new byte[this.buffer.readableByteCount()];
this.buffer.read(bytes);
this.content.onNext(bytes);
}
} }
} }
@ -127,68 +210,36 @@ class WiretapConnector implements ClientHttpConnector {
*/ */
private static class WiretapClientHttpRequest extends ClientHttpRequestDecorator { private static class WiretapClientHttpRequest extends ClientHttpRequestDecorator {
private final DataBuffer buffer; @Nullable
private WiretapRecorder recorder;
private final MonoProcessor<byte[]> body = MonoProcessor.create();
public WiretapClientHttpRequest(ClientHttpRequest delegate) { public WiretapClientHttpRequest(ClientHttpRequest delegate) {
super(delegate); super(delegate);
this.buffer = bufferFactory.allocateBuffer();
} }
public WiretapRecorder getRecorder() {
/** Assert.notNull(this.recorder, "No WiretapRecorder: was the client request written?");
* Return a "promise" with the request body content written to the server. return this.recorder;
*/
public MonoProcessor<byte[]> getContent() {
return this.body;
} }
@Override @Override
public Mono<Void> writeWith(Publisher<? extends DataBuffer> publisher) { public Mono<Void> writeWith(Publisher<? extends DataBuffer> publisher) {
return super.writeWith( this.recorder = new WiretapRecorder(publisher, null);
Flux.from(publisher) return super.writeWith(this.recorder.getPublisherToUse());
.doOnNext(this::handleOnNext)
.doOnError(this::handleError)
.doOnCancel(this::handleOnComplete)
.doOnComplete(this::handleOnComplete));
} }
@Override @Override
public Mono<Void> writeAndFlushWith(Publisher<? extends Publisher<? extends DataBuffer>> publisher) { public Mono<Void> writeAndFlushWith(Publisher<? extends Publisher<? extends DataBuffer>> publisher) {
return super.writeAndFlushWith( this.recorder = new WiretapRecorder(null, publisher);
Flux.from(publisher) return super.writeAndFlushWith(this.recorder.getNestedPublisherToUse());
.map(p -> Flux.from(p).doOnNext(this::handleOnNext).doOnError(this::handleError))
.doOnError(this::handleError)
.doOnCancel(this::handleOnComplete)
.doOnComplete(this::handleOnComplete));
} }
@Override @Override
public Mono<Void> setComplete() { public Mono<Void> setComplete() {
handleOnComplete(); this.recorder = new WiretapRecorder(null, null);
return super.setComplete(); return super.setComplete();
} }
private void handleOnNext(DataBuffer buffer) {
this.buffer.write(buffer);
}
private void handleError(Throwable ex) {
if (!this.body.isTerminated()) {
this.body.onError(ex);
}
}
private void handleOnComplete() {
if (!this.body.isTerminated()) {
byte[] bytes = new byte[this.buffer.readableByteCount()];
this.buffer.read(bytes);
this.body.onNext(bytes);
}
}
} }
@ -197,39 +248,23 @@ class WiretapConnector implements ClientHttpConnector {
*/ */
private static class WiretapClientHttpResponse extends ClientHttpResponseDecorator { private static class WiretapClientHttpResponse extends ClientHttpResponseDecorator {
private final DataBuffer buffer; private final WiretapRecorder recorder;
private final MonoProcessor<byte[]> body = MonoProcessor.create();
public WiretapClientHttpResponse(ClientHttpResponse delegate) { public WiretapClientHttpResponse(ClientHttpResponse delegate) {
super(delegate); super(delegate);
this.buffer = bufferFactory.allocateBuffer(); this.recorder = new WiretapRecorder(super.getBody(), null);
} }
/** public WiretapRecorder getRecorder() {
* Return a "promise" with the response body content read from the server. return this.recorder;
*/
public MonoProcessor<byte[]> getContent() {
return this.body;
} }
@Override @Override
@SuppressWarnings("ConstantConditions")
public Flux<DataBuffer> getBody() { public Flux<DataBuffer> getBody() {
return super.getBody() return Flux.from(this.recorder.getPublisherToUse());
.doOnNext(this.buffer::write)
.doOnError(this.body::onError)
.doOnCancel(this::handleOnComplete)
.doOnComplete(this::handleOnComplete);
}
private void handleOnComplete() {
if (!this.body.isTerminated()) {
byte[] bytes = new byte[this.buffer.readableByteCount()];
this.buffer.read(bytes);
this.body.onNext(bytes);
}
} }
} }