Polishing
Two changes to web.reactive.function: - Changed Response.stream method to allow for specific Publisher types to be returned in Response. - Router now stores HttpMessageReader|Writer retrieved from Configuration in the attributes as supplier, not as stream, to allow for multiple reads.
This commit is contained in:
parent
d1f60e3de1
commit
f8ac17f278
|
@ -16,6 +16,7 @@
|
||||||
|
|
||||||
package org.springframework.web.reactive.function;
|
package org.springframework.web.reactive.function;
|
||||||
|
|
||||||
|
import java.util.function.Supplier;
|
||||||
import java.util.stream.Stream;
|
import java.util.stream.Stream;
|
||||||
|
|
||||||
import org.reactivestreams.Publisher;
|
import org.reactivestreams.Publisher;
|
||||||
|
@ -56,8 +57,9 @@ abstract class AbstractHttpMessageWriterResponse<T> extends AbstractResponse<T>
|
||||||
}
|
}
|
||||||
|
|
||||||
private Stream<HttpMessageWriter<?>> messageWriterStream(ServerWebExchange exchange) {
|
private Stream<HttpMessageWriter<?>> messageWriterStream(ServerWebExchange exchange) {
|
||||||
return exchange.<Stream<HttpMessageWriter<?>>>getAttribute(Router.HTTP_MESSAGE_WRITERS_ATTRIBUTE)
|
return exchange.<Supplier<Stream<HttpMessageWriter<?>>>>getAttribute(Router.HTTP_MESSAGE_WRITERS_ATTRIBUTE)
|
||||||
.orElseThrow(() -> new IllegalStateException("Could not find HttpMessageWriters in ServerWebExchange"));
|
.orElseThrow(() -> new IllegalStateException("Could not find HttpMessageWriters in ServerWebExchange"))
|
||||||
|
.get();
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -25,6 +25,7 @@ import java.util.Map;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
import java.util.OptionalLong;
|
import java.util.OptionalLong;
|
||||||
import java.util.function.Function;
|
import java.util.function.Function;
|
||||||
|
import java.util.function.Supplier;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
import java.util.stream.Stream;
|
import java.util.stream.Stream;
|
||||||
|
|
||||||
|
@ -200,8 +201,9 @@ class DefaultRequest implements Request {
|
||||||
}
|
}
|
||||||
|
|
||||||
private Stream<HttpMessageReader<?>> messageReaderStream(ServerWebExchange exchange) {
|
private Stream<HttpMessageReader<?>> messageReaderStream(ServerWebExchange exchange) {
|
||||||
return exchange.<Stream<HttpMessageReader<?>>>getAttribute(Router.HTTP_MESSAGE_READERS_ATTRIBUTE)
|
return exchange.<Supplier<Stream<HttpMessageReader<?>>>>getAttribute(Router.HTTP_MESSAGE_READERS_ATTRIBUTE)
|
||||||
.orElseThrow(() -> new IllegalStateException("Could not find HttpMessageReaders in ServerWebExchange"));
|
.orElseThrow(() -> new IllegalStateException("Could not find HttpMessageReaders in ServerWebExchange"))
|
||||||
|
.get();
|
||||||
}
|
}
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
|
|
|
@ -137,7 +137,7 @@ class DefaultResponseBuilder implements Response.BodyBuilder {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public <T> Response<Publisher<T>> stream(Publisher<T> publisher, Class<T> elementClass) {
|
public <T, S extends Publisher<T>> Response<S> stream(S publisher, Class<T> elementClass) {
|
||||||
Assert.notNull(publisher, "'publisher' must not be null");
|
Assert.notNull(publisher, "'publisher' must not be null");
|
||||||
Assert.notNull(elementClass, "'elementClass' must not be null");
|
Assert.notNull(elementClass, "'elementClass' must not be null");
|
||||||
return new PublisherResponse<>(this.statusCode, this.headers, publisher, elementClass);
|
return new PublisherResponse<>(this.statusCode, this.headers, publisher, elementClass);
|
||||||
|
|
|
@ -26,22 +26,22 @@ import org.springframework.web.server.ServerWebExchange;
|
||||||
/**
|
/**
|
||||||
* @author Arjen Poutsma
|
* @author Arjen Poutsma
|
||||||
*/
|
*/
|
||||||
class PublisherResponse<T> extends AbstractHttpMessageWriterResponse<Publisher<T>> {
|
class PublisherResponse<T, S extends Publisher<T>> extends AbstractHttpMessageWriterResponse<S> {
|
||||||
|
|
||||||
private final Publisher<T> body;
|
private final S body;
|
||||||
|
|
||||||
private final ResolvableType bodyType;
|
private final ResolvableType bodyType;
|
||||||
|
|
||||||
|
|
||||||
public PublisherResponse(int statusCode, HttpHeaders headers,
|
public PublisherResponse(int statusCode, HttpHeaders headers,
|
||||||
Publisher<T> body, Class<T> aClass) {
|
S body, Class<T> aClass) {
|
||||||
super(statusCode, headers);
|
super(statusCode, headers);
|
||||||
this.body = body;
|
this.body = body;
|
||||||
this.bodyType = ResolvableType.forClass(aClass);
|
this.bodyType = ResolvableType.forClass(aClass);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Publisher<T> body() {
|
public S body() {
|
||||||
return this.body;
|
return this.body;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -308,7 +308,7 @@ public interface Response<T> {
|
||||||
* @param <T> the type of the elements contained in the publisher
|
* @param <T> the type of the elements contained in the publisher
|
||||||
* @return the built response
|
* @return the built response
|
||||||
*/
|
*/
|
||||||
<T> Response<Publisher<T>> stream(Publisher<T> publisher, Class<T> elementClass);
|
<T, S extends Publisher<T>> Response<S> stream(S publisher, Class<T> elementClass);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Set the body of the response to the given {@link Resource} and return it.
|
* Set the body of the response to the given {@link Resource} and return it.
|
||||||
|
|
|
@ -63,14 +63,14 @@ public abstract class Router {
|
||||||
public static final String REQUEST_ATTRIBUTE = Router.class.getName() + ".request";
|
public static final String REQUEST_ATTRIBUTE = Router.class.getName() + ".request";
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Name of the {@link ServerWebExchange} attribute that contains the
|
* Name of the {@link ServerWebExchange} attribute that contains a {@link Supplier} to the
|
||||||
* {@linkplain Stream stream} of {@link HttpMessageReader}s obtained
|
* {@linkplain Stream stream} of {@link HttpMessageReader}s obtained
|
||||||
* from the {@linkplain Configuration#messageReaders() configuration}.
|
* from the {@linkplain Configuration#messageReaders() configuration}.
|
||||||
*/
|
*/
|
||||||
public static final String HTTP_MESSAGE_READERS_ATTRIBUTE = Router.class.getName() + ".httpMessageReaders";
|
public static final String HTTP_MESSAGE_READERS_ATTRIBUTE = Router.class.getName() + ".httpMessageReaders";
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Name of the {@link ServerWebExchange} attribute that contains the
|
* Name of the {@link ServerWebExchange} attribute that contains a {@link Supplier} to the
|
||||||
* {@linkplain Stream stream} of {@link HttpMessageWriter}s obtained
|
* {@linkplain Stream stream} of {@link HttpMessageWriter}s obtained
|
||||||
* from the {@linkplain Configuration#messageWriters() configuration}.
|
* from the {@linkplain Configuration#messageWriters() configuration}.
|
||||||
*/
|
*/
|
||||||
|
@ -213,8 +213,8 @@ public abstract class Router {
|
||||||
Configuration configuration) {
|
Configuration configuration) {
|
||||||
Map<String, Object> attributes = exchange.getAttributes();
|
Map<String, Object> attributes = exchange.getAttributes();
|
||||||
attributes.put(REQUEST_ATTRIBUTE, request);
|
attributes.put(REQUEST_ATTRIBUTE, request);
|
||||||
attributes.put(HTTP_MESSAGE_READERS_ATTRIBUTE, configuration.messageReaders().get());
|
attributes.put(HTTP_MESSAGE_READERS_ATTRIBUTE, configuration.messageReaders());
|
||||||
attributes.put(HTTP_MESSAGE_WRITERS_ATTRIBUTE, configuration.messageWriters().get());
|
attributes.put(HTTP_MESSAGE_WRITERS_ATTRIBUTE, configuration.messageWriters());
|
||||||
}
|
}
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
|
|
|
@ -17,6 +17,9 @@
|
||||||
package org.springframework.web.reactive.function;
|
package org.springframework.web.reactive.function;
|
||||||
|
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.function.Supplier;
|
||||||
|
import java.util.stream.Stream;
|
||||||
|
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
|
@ -24,6 +27,7 @@ import org.springframework.core.codec.CharSequenceEncoder;
|
||||||
import org.springframework.http.HttpHeaders;
|
import org.springframework.http.HttpHeaders;
|
||||||
import org.springframework.http.HttpMethod;
|
import org.springframework.http.HttpMethod;
|
||||||
import org.springframework.http.codec.EncoderHttpMessageWriter;
|
import org.springframework.http.codec.EncoderHttpMessageWriter;
|
||||||
|
import org.springframework.http.codec.HttpMessageWriter;
|
||||||
import org.springframework.mock.http.server.reactive.test.MockServerHttpRequest;
|
import org.springframework.mock.http.server.reactive.test.MockServerHttpRequest;
|
||||||
import org.springframework.mock.http.server.reactive.test.MockServerHttpResponse;
|
import org.springframework.mock.http.server.reactive.test.MockServerHttpResponse;
|
||||||
import org.springframework.web.server.ServerWebExchange;
|
import org.springframework.web.server.ServerWebExchange;
|
||||||
|
@ -53,8 +57,9 @@ public class BodyResponseTests {
|
||||||
MockServerHttpRequest request = new MockServerHttpRequest(HttpMethod.GET, "http://localhost");
|
MockServerHttpRequest request = new MockServerHttpRequest(HttpMethod.GET, "http://localhost");
|
||||||
MockServerHttpResponse response = new MockServerHttpResponse();
|
MockServerHttpResponse response = new MockServerHttpResponse();
|
||||||
ServerWebExchange exchange = new DefaultServerWebExchange(request, response, new MockWebSessionManager());
|
ServerWebExchange exchange = new DefaultServerWebExchange(request, response, new MockWebSessionManager());
|
||||||
exchange.getAttributes().put(Router.HTTP_MESSAGE_WRITERS_ATTRIBUTE, Collections
|
Set<HttpMessageWriter<?>> messageWriters = Collections.singleton(new EncoderHttpMessageWriter<CharSequence>(new CharSequenceEncoder()));
|
||||||
.singleton(new EncoderHttpMessageWriter<CharSequence>(new CharSequenceEncoder())).stream());
|
exchange.getAttributes().put(Router.HTTP_MESSAGE_WRITERS_ATTRIBUTE,
|
||||||
|
(Supplier<Stream<HttpMessageWriter<?>>>) messageWriters::stream);
|
||||||
|
|
||||||
|
|
||||||
publisherResponse.writeTo(exchange).block();
|
publisherResponse.writeTo(exchange).block();
|
||||||
|
|
|
@ -26,6 +26,9 @@ import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
import java.util.OptionalLong;
|
import java.util.OptionalLong;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.function.Supplier;
|
||||||
|
import java.util.stream.Stream;
|
||||||
|
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
@ -41,6 +44,7 @@ import org.springframework.http.HttpMethod;
|
||||||
import org.springframework.http.HttpRange;
|
import org.springframework.http.HttpRange;
|
||||||
import org.springframework.http.MediaType;
|
import org.springframework.http.MediaType;
|
||||||
import org.springframework.http.codec.DecoderHttpMessageReader;
|
import org.springframework.http.codec.DecoderHttpMessageReader;
|
||||||
|
import org.springframework.http.codec.HttpMessageReader;
|
||||||
import org.springframework.http.server.reactive.ServerHttpRequest;
|
import org.springframework.http.server.reactive.ServerHttpRequest;
|
||||||
import org.springframework.http.server.reactive.ServerHttpResponse;
|
import org.springframework.http.server.reactive.ServerHttpResponse;
|
||||||
import org.springframework.util.LinkedMultiValueMap;
|
import org.springframework.util.LinkedMultiValueMap;
|
||||||
|
@ -155,14 +159,16 @@ public class DefaultRequestTests {
|
||||||
when(mockRequest.getHeaders()).thenReturn(httpHeaders);
|
when(mockRequest.getHeaders()).thenReturn(httpHeaders);
|
||||||
when(mockRequest.getBody()).thenReturn(body);
|
when(mockRequest.getBody()).thenReturn(body);
|
||||||
|
|
||||||
|
Set<HttpMessageReader<?>> messageReaders = Collections
|
||||||
|
.singleton(new DecoderHttpMessageReader<String>(new StringDecoder()));
|
||||||
when(mockExchange.getAttribute(Router.HTTP_MESSAGE_READERS_ATTRIBUTE))
|
when(mockExchange.getAttribute(Router.HTTP_MESSAGE_READERS_ATTRIBUTE))
|
||||||
.thenReturn(Optional.of(Collections
|
.thenReturn(Optional.of(
|
||||||
.singleton(new DecoderHttpMessageReader<String>(new StringDecoder()))
|
(Supplier<Stream<HttpMessageReader<?>>>) messageReaders::stream));
|
||||||
.stream()));
|
|
||||||
|
|
||||||
assertEquals(body, defaultRequest.body().stream());
|
assertEquals(body, defaultRequest.body().stream());
|
||||||
|
|
||||||
Mono<String> resultMono = defaultRequest.body().convertToMono(String.class);
|
Mono<String> resultMono = defaultRequest.body().convertToMono(String.class);
|
||||||
assertEquals("foo", resultMono.block());
|
assertEquals("foo", resultMono.block());
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
|
@ -16,6 +16,7 @@
|
||||||
|
|
||||||
package org.springframework.web.reactive.function;
|
package org.springframework.web.reactive.function;
|
||||||
|
|
||||||
|
import java.net.URI;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
|
@ -27,10 +28,13 @@ import reactor.core.publisher.Mono;
|
||||||
import org.springframework.core.ParameterizedTypeReference;
|
import org.springframework.core.ParameterizedTypeReference;
|
||||||
import org.springframework.http.HttpMethod;
|
import org.springframework.http.HttpMethod;
|
||||||
import org.springframework.http.HttpStatus;
|
import org.springframework.http.HttpStatus;
|
||||||
|
import org.springframework.http.RequestEntity;
|
||||||
import org.springframework.http.ResponseEntity;
|
import org.springframework.http.ResponseEntity;
|
||||||
import org.springframework.web.client.RestTemplate;
|
import org.springframework.web.client.RestTemplate;
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.springframework.web.reactive.function.RequestPredicates.GET;
|
||||||
|
import static org.springframework.web.reactive.function.RequestPredicates.POST;
|
||||||
import static org.springframework.web.reactive.function.Router.route;
|
import static org.springframework.web.reactive.function.Router.route;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -49,8 +53,9 @@ public class PublisherHandlerFunctionIntegrationTests
|
||||||
@Override
|
@Override
|
||||||
protected RoutingFunction<?> routingFunction() {
|
protected RoutingFunction<?> routingFunction() {
|
||||||
PersonHandler personHandler = new PersonHandler();
|
PersonHandler personHandler = new PersonHandler();
|
||||||
return route(RequestPredicates.GET("/mono"), personHandler::mono)
|
return route(GET("/mono"), personHandler::mono)
|
||||||
.and(route(RequestPredicates.GET("/flux"), personHandler::flux));
|
.and(route(POST("/mono"), personHandler::postMono))
|
||||||
|
.and(route(GET("/flux"), personHandler::flux));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -76,6 +81,17 @@ public class PublisherHandlerFunctionIntegrationTests
|
||||||
assertEquals("Jane", body.get(1).getName());
|
assertEquals("Jane", body.get(1).getName());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void postMono() {
|
||||||
|
URI uri = URI.create("http://localhost:" + port + "/mono");
|
||||||
|
Person person = new Person("Jack");
|
||||||
|
RequestEntity<Person> requestEntity = RequestEntity.post(uri).body(person);
|
||||||
|
ResponseEntity<Person> result = restTemplate.exchange(requestEntity, Person.class);
|
||||||
|
|
||||||
|
assertEquals(HttpStatus.OK, result.getStatusCode());
|
||||||
|
assertEquals("Jack", result.getBody().getName());
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
private static class PersonHandler {
|
private static class PersonHandler {
|
||||||
|
|
||||||
|
@ -84,6 +100,11 @@ public class PublisherHandlerFunctionIntegrationTests
|
||||||
return Response.ok().stream(Mono.just(person), Person.class);
|
return Response.ok().stream(Mono.just(person), Person.class);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Response<Publisher<Person>> postMono(Request request) {
|
||||||
|
Mono<Person> personMono = request.body().convertToMono(Person.class);
|
||||||
|
return Response.ok().stream(personMono, Person.class);
|
||||||
|
}
|
||||||
|
|
||||||
public Response<Publisher<Person>> flux(Request request) {
|
public Response<Publisher<Person>> flux(Request request) {
|
||||||
Person person1 = new Person("John");
|
Person person1 = new Person("John");
|
||||||
Person person2 = new Person("Jane");
|
Person person2 = new Person("Jane");
|
||||||
|
|
|
@ -17,6 +17,9 @@
|
||||||
package org.springframework.web.reactive.function;
|
package org.springframework.web.reactive.function;
|
||||||
|
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.function.Supplier;
|
||||||
|
import java.util.stream.Stream;
|
||||||
|
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.reactivestreams.Publisher;
|
import org.reactivestreams.Publisher;
|
||||||
|
@ -26,6 +29,7 @@ import org.springframework.core.codec.CharSequenceEncoder;
|
||||||
import org.springframework.http.HttpHeaders;
|
import org.springframework.http.HttpHeaders;
|
||||||
import org.springframework.http.HttpMethod;
|
import org.springframework.http.HttpMethod;
|
||||||
import org.springframework.http.codec.EncoderHttpMessageWriter;
|
import org.springframework.http.codec.EncoderHttpMessageWriter;
|
||||||
|
import org.springframework.http.codec.HttpMessageWriter;
|
||||||
import org.springframework.mock.http.server.reactive.test.MockServerHttpRequest;
|
import org.springframework.mock.http.server.reactive.test.MockServerHttpRequest;
|
||||||
import org.springframework.mock.http.server.reactive.test.MockServerHttpResponse;
|
import org.springframework.mock.http.server.reactive.test.MockServerHttpResponse;
|
||||||
import org.springframework.web.server.ServerWebExchange;
|
import org.springframework.web.server.ServerWebExchange;
|
||||||
|
@ -42,7 +46,7 @@ public class PublisherResponseTests {
|
||||||
|
|
||||||
private final Publisher<String> publisher = Flux.just("foo", "bar");
|
private final Publisher<String> publisher = Flux.just("foo", "bar");
|
||||||
|
|
||||||
private final PublisherResponse<String> publisherResponse =
|
private final PublisherResponse<String, ? extends Publisher<String>> publisherResponse =
|
||||||
new PublisherResponse<>(200, new HttpHeaders(), publisher, String.class);
|
new PublisherResponse<>(200, new HttpHeaders(), publisher, String.class);
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -55,8 +59,9 @@ public class PublisherResponseTests {
|
||||||
MockServerHttpRequest request = new MockServerHttpRequest(HttpMethod.GET, "http://localhost");
|
MockServerHttpRequest request = new MockServerHttpRequest(HttpMethod.GET, "http://localhost");
|
||||||
MockServerHttpResponse response = new MockServerHttpResponse();
|
MockServerHttpResponse response = new MockServerHttpResponse();
|
||||||
ServerWebExchange exchange = new DefaultServerWebExchange(request, response, new MockWebSessionManager());
|
ServerWebExchange exchange = new DefaultServerWebExchange(request, response, new MockWebSessionManager());
|
||||||
exchange.getAttributes().put(Router.HTTP_MESSAGE_WRITERS_ATTRIBUTE, Collections
|
Set<HttpMessageWriter<?>> messageWriters = Collections.singleton(new EncoderHttpMessageWriter<CharSequence>(new CharSequenceEncoder()));
|
||||||
.singleton(new EncoderHttpMessageWriter<CharSequence>(new CharSequenceEncoder())).stream());
|
exchange.getAttributes().put(Router.HTTP_MESSAGE_WRITERS_ATTRIBUTE,
|
||||||
|
(Supplier<Stream<HttpMessageWriter<?>>>) messageWriters::stream);
|
||||||
|
|
||||||
|
|
||||||
publisherResponse.writeTo(exchange).block();
|
publisherResponse.writeTo(exchange).block();
|
||||||
|
|
Loading…
Reference in New Issue