From e44b08f1fc4b7fa4dc3e552b5e07c3ed1efa586c Mon Sep 17 00:00:00 2001 From: Rossen Stoyanchev Date: Fri, 2 Oct 2020 13:32:00 +0100 Subject: [PATCH] Minor refactoring in JettyClientHttpConnector See gh-25849 --- .../reactive/JettyClientHttpConnector.java | 21 ++++++++------ .../reactive/JettyClientHttpRequest.java | 29 +++++++------------ 2 files changed, 22 insertions(+), 28 deletions(-) diff --git a/spring-web/src/main/java/org/springframework/http/client/reactive/JettyClientHttpConnector.java b/spring-web/src/main/java/org/springframework/http/client/reactive/JettyClientHttpConnector.java index 481d3c0582..4c048e5989 100644 --- a/spring-web/src/main/java/org/springframework/http/client/reactive/JettyClientHttpConnector.java +++ b/spring-web/src/main/java/org/springframework/http/client/reactive/JettyClientHttpConnector.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2019 the original author or authors. + * Copyright 2002-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,7 +21,9 @@ import java.util.function.Consumer; import java.util.function.Function; import org.eclipse.jetty.client.HttpClient; +import org.eclipse.jetty.client.api.Request; import org.eclipse.jetty.reactive.client.ContentChunk; +import org.eclipse.jetty.reactive.client.ReactiveRequest; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -81,7 +83,8 @@ public class JettyClientHttpConnector implements ClientHttpConnector { * Constructor with an {@link JettyResourceFactory} that will manage shared resources. * @param resourceFactory the {@link JettyResourceFactory} to use * @param customizer the lambda used to customize the {@link HttpClient} - * @deprecated as of 5.2, in favor of {@link JettyClientHttpConnector#JettyClientHttpConnector(HttpClient, JettyResourceFactory)} + * @deprecated as of 5.2, in favor of + * {@link JettyClientHttpConnector#JettyClientHttpConnector(HttpClient, JettyResourceFactory)} */ @Deprecated public JettyClientHttpConnector(JettyResourceFactory resourceFactory, @Nullable Consumer customizer) { @@ -114,14 +117,14 @@ public class JettyClientHttpConnector implements ClientHttpConnector { } } - JettyClientHttpRequest clientHttpRequest = new JettyClientHttpRequest( - this.httpClient.newRequest(uri).method(method.toString()), this.bufferFactory); + Request request = this.httpClient.newRequest(uri).method(method.toString()); - return requestCallback.apply(clientHttpRequest).then(Mono.from( - clientHttpRequest.getReactiveRequest().response((response, chunks) -> { - Flux content = Flux.from(chunks).map(this::toDataBuffer); - return Mono.just(new JettyClientHttpResponse(response, content)); - }))); + return requestCallback.apply(new JettyClientHttpRequest(request, this.bufferFactory)) + .then(Mono.from(ReactiveRequest.newBuilder(request).build() + .response((reactiveResponse, chunkPublisher) -> { + Flux content = Flux.from(chunkPublisher).map(this::toDataBuffer); + return Mono.just(new JettyClientHttpResponse(reactiveResponse, content)); + }))); } private DataBuffer toDataBuffer(ContentChunk chunk) { diff --git a/spring-web/src/main/java/org/springframework/http/client/reactive/JettyClientHttpRequest.java b/spring-web/src/main/java/org/springframework/http/client/reactive/JettyClientHttpRequest.java index b019a6192c..89a23b165b 100644 --- a/spring-web/src/main/java/org/springframework/http/client/reactive/JettyClientHttpRequest.java +++ b/spring-web/src/main/java/org/springframework/http/client/reactive/JettyClientHttpRequest.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2018 the original author or authors. + * Copyright 2002-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -24,6 +24,7 @@ import java.util.function.Function; import org.eclipse.jetty.client.api.Request; import org.eclipse.jetty.reactive.client.ContentChunk; import org.eclipse.jetty.reactive.client.ReactiveRequest; +import org.eclipse.jetty.reactive.client.internal.PublisherContentProvider; import org.eclipse.jetty.util.Callback; import org.reactivestreams.Publisher; import reactor.core.Exceptions; @@ -37,7 +38,6 @@ import org.springframework.core.io.buffer.PooledDataBuffer; import org.springframework.http.HttpHeaders; import org.springframework.http.HttpMethod; import org.springframework.http.MediaType; -import org.springframework.lang.Nullable; import org.springframework.util.Assert; /** @@ -53,9 +53,6 @@ class JettyClientHttpRequest extends AbstractClientHttpRequest { private final DataBufferFactory bufferFactory; - @Nullable - private ReactiveRequest reactiveRequest; - public JettyClientHttpRequest(Request jettyRequest, DataBufferFactory bufferFactory) { this.jettyRequest = jettyRequest; @@ -87,20 +84,21 @@ class JettyClientHttpRequest extends AbstractClientHttpRequest { @Override public Mono writeWith(Publisher body) { - Flux chunks = Flux.from(body).map(this::toContentChunk); - ReactiveRequest.Content content = ReactiveRequest.Content.fromPublisher(chunks, getContentType()); - this.reactiveRequest = ReactiveRequest.newBuilder(this.jettyRequest).content(content).build(); + ReactiveRequest.Content content = Flux.from(body) + .map(this::toContentChunk) + .as(chunks -> ReactiveRequest.Content.fromPublisher(chunks, getContentType())); + this.jettyRequest.content(new PublisherContentProvider(content)); return doCommit(this::completes); } @Override public Mono writeAndFlushWith(Publisher> body) { - Flux chunks = Flux.from(body) + ReactiveRequest.Content content = Flux.from(body) .flatMap(Function.identity()) .doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release) - .map(this::toContentChunk); - ReactiveRequest.Content content = ReactiveRequest.Content.fromPublisher(chunks, getContentType()); - this.reactiveRequest = ReactiveRequest.newBuilder(this.jettyRequest).content(content).build(); + .map(this::toContentChunk) + .as(chunks -> ReactiveRequest.Content.fromPublisher(chunks, getContentType())); + this.jettyRequest.content(new PublisherContentProvider(content)); return doCommit(this::completes); } @@ -145,11 +143,4 @@ class JettyClientHttpRequest extends AbstractClientHttpRequest { } } - ReactiveRequest getReactiveRequest() { - if (this.reactiveRequest == null) { - this.reactiveRequest = ReactiveRequest.newBuilder(this.jettyRequest).build(); - } - return this.reactiveRequest; - } - }