Use ParameterizedTypeReference in public-facing WebFlux APIs

This commit changes the use of `ResolvableType` to
`ParameterizedTypeReference` in all public-facing WebFlux APIs. This
change removes the necessity for providing the parameterized type
information twice: once for creating the `ResolvableType`, and once for
specifying a `BodyExtractor`.

Issue: SPR-15636
This commit is contained in:
Arjen Poutsma 2017-06-07 17:29:56 +02:00
parent b6c09fa76a
commit 5e954dcba0
10 changed files with 119 additions and 58 deletions

View File

@ -32,18 +32,20 @@ import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import org.springframework.core.ResolvableType; import org.springframework.core.ParameterizedTypeReference;
import org.springframework.core.io.ByteArrayResource; import org.springframework.core.io.ByteArrayResource;
import org.springframework.http.HttpHeaders; import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod; import org.springframework.http.HttpMethod;
import org.springframework.http.MediaType; import org.springframework.http.MediaType;
import org.springframework.http.client.reactive.ClientHttpConnector; import org.springframework.http.client.reactive.ClientHttpConnector;
import org.springframework.http.client.reactive.ClientHttpRequest; import org.springframework.http.client.reactive.ClientHttpRequest;
import org.springframework.http.client.reactive.ClientHttpResponse;
import org.springframework.lang.Nullable; import org.springframework.lang.Nullable;
import org.springframework.test.util.JsonExpectationsHelper; import org.springframework.test.util.JsonExpectationsHelper;
import org.springframework.util.Assert; import org.springframework.util.Assert;
import org.springframework.util.MimeType; import org.springframework.util.MimeType;
import org.springframework.util.MultiValueMap; import org.springframework.util.MultiValueMap;
import org.springframework.web.reactive.function.BodyExtractor;
import org.springframework.web.reactive.function.BodyInserter; import org.springframework.web.reactive.function.BodyInserter;
import org.springframework.web.reactive.function.client.ClientResponse; import org.springframework.web.reactive.function.client.ClientResponse;
import org.springframework.web.reactive.function.client.ExchangeFilterFunction; import org.springframework.web.reactive.function.client.ExchangeFilterFunction;
@ -289,20 +291,19 @@ class DefaultWebTestClient implements WebTestClient {
this.timeout = timeout; this.timeout = timeout;
} }
@SuppressWarnings("unchecked") public <T> EntityExchangeResult<T> decode(BodyExtractor<Mono<T>, ? super ClientHttpResponse> extractor) {
public <T> EntityExchangeResult<T> decode(ResolvableType bodyType) { T body = this.response.body(extractor).block(this.timeout);
T body = (T) this.response.body(toMono(bodyType)).block(this.timeout);
return new EntityExchangeResult<>(this, body); return new EntityExchangeResult<>(this, body);
} }
public <T> EntityExchangeResult<List<T>> decodeToList(ResolvableType elementType) { public <T> EntityExchangeResult<List<T>> decodeToList(BodyExtractor<Flux<T>, ? super ClientHttpResponse> extractor) {
Flux<T> flux = this.response.body(toFlux(elementType)); Flux<T> flux = this.response.body(extractor);
List<T> body = flux.collectList().block(this.timeout); List<T> body = flux.collectList().block(this.timeout);
return new EntityExchangeResult<>(this, body); return new EntityExchangeResult<>(this, body);
} }
public <T> FluxExchangeResult<T> decodeToFlux(ResolvableType elementType) { public <T> FluxExchangeResult<T> decodeToFlux(BodyExtractor<Flux<T>, ? super ClientHttpResponse> extractor) {
Flux<T> body = this.response.body(toFlux(elementType)); Flux<T> body = this.response.body(extractor);
return new FluxExchangeResult<>(this, body, this.timeout); return new FluxExchangeResult<>(this, body, this.timeout);
} }
@ -333,25 +334,23 @@ class DefaultWebTestClient implements WebTestClient {
} }
@Override @Override
@SuppressWarnings("unchecked")
public <B> BodySpec<B, ?> expectBody(Class<B> bodyType) { public <B> BodySpec<B, ?> expectBody(Class<B> bodyType) {
return (BodySpec<B, ?>) expectBody(ResolvableType.forClass(bodyType)); return new DefaultBodySpec<>(this.result.decode(toMono(bodyType)));
} }
@Override @Override
@SuppressWarnings({"rawtypes", "unchecked"}) public <B> BodySpec<B, ?> expectBody(ParameterizedTypeReference<B> bodyType) {
public <B> BodySpec<B, ?> expectBody(ResolvableType bodyType) { return new DefaultBodySpec<>(this.result.decode(toMono(bodyType)));
return new DefaultBodySpec(this.result.decode(bodyType));
} }
@Override @Override
public <E> ListBodySpec<E> expectBodyList(Class<E> elementType) { public <E> ListBodySpec<E> expectBodyList(Class<E> elementType) {
return expectBodyList(ResolvableType.forClass(elementType)); return new DefaultListBodySpec<>(this.result.decodeToList(toFlux(elementType)));
} }
@Override @Override
public <E> ListBodySpec<E> expectBodyList(ResolvableType elementType) { public <E> ListBodySpec<E> expectBodyList(ParameterizedTypeReference<E> elementType) {
return new DefaultListBodySpec<>(this.result.decodeToList(elementType)); return new DefaultListBodySpec<>(this.result.decodeToList(toFlux(elementType)));
} }
@Override @Override
@ -361,12 +360,12 @@ class DefaultWebTestClient implements WebTestClient {
@Override @Override
public <T> FluxExchangeResult<T> returnResult(Class<T> elementType) { public <T> FluxExchangeResult<T> returnResult(Class<T> elementType) {
return returnResult(ResolvableType.forClass(elementType)); return this.result.decodeToFlux(toFlux(elementType));
} }
@Override @Override
public <T> FluxExchangeResult<T> returnResult(ResolvableType elementType) { public <T> FluxExchangeResult<T> returnResult(ParameterizedTypeReference<T> elementType) {
return this.result.decodeToFlux(elementType); return this.result.decodeToFlux(toFlux(elementType));
} }
} }

View File

@ -28,7 +28,7 @@ import java.util.function.Function;
import org.reactivestreams.Publisher; import org.reactivestreams.Publisher;
import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContext;
import org.springframework.core.ResolvableType; import org.springframework.core.ParameterizedTypeReference;
import org.springframework.format.FormatterRegistry; import org.springframework.format.FormatterRegistry;
import org.springframework.http.HttpHeaders; import org.springframework.http.HttpHeaders;
import org.springframework.http.MediaType; import org.springframework.http.MediaType;
@ -534,7 +534,7 @@ public interface WebTestClient {
/** /**
* Variant of {@link #expectBody(Class)} for a body type with generics. * Variant of {@link #expectBody(Class)} for a body type with generics.
*/ */
<B> BodySpec<B, ?> expectBody(ResolvableType bodyType); <B> BodySpec<B, ?> expectBody(ParameterizedTypeReference<B> bodyType);
/** /**
* Declare expectations on the response body decoded to {@code List<E>}. * Declare expectations on the response body decoded to {@code List<E>}.
@ -545,7 +545,7 @@ public interface WebTestClient {
/** /**
* Variant of {@link #expectBodyList(Class)} for element types with generics. * Variant of {@link #expectBodyList(Class)} for element types with generics.
*/ */
<E> ListBodySpec<E> expectBodyList(ResolvableType elementType); <E> ListBodySpec<E> expectBodyList(ParameterizedTypeReference<E> elementType);
/** /**
* Declare expectations on the response body content. * Declare expectations on the response body content.
@ -565,7 +565,7 @@ public interface WebTestClient {
/** /**
* Variant of {@link #returnResult(Class)} for element types with generics. * Variant of {@link #returnResult(Class)} for element types with generics.
*/ */
<T> FluxExchangeResult<T> returnResult(ResolvableType elementType); <T> FluxExchangeResult<T> returnResult(ParameterizedTypeReference<T> elementType);
} }
/** /**

View File

@ -26,6 +26,7 @@ import org.junit.Test;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
import reactor.test.StepVerifier; import reactor.test.StepVerifier;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.http.MediaType; import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity; import org.springframework.http.ResponseEntity;
import org.springframework.test.web.reactive.server.FluxExchangeResult; import org.springframework.test.web.reactive.server.FluxExchangeResult;
@ -39,9 +40,7 @@ import org.springframework.web.bind.annotation.RestController;
import static java.time.Duration.ofMillis; import static java.time.Duration.ofMillis;
import static org.hamcrest.CoreMatchers.endsWith; import static org.hamcrest.CoreMatchers.endsWith;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.*;
import static org.junit.Assert.assertThat;
import static org.springframework.core.ResolvableType.forClassWithGenerics;
import static org.springframework.http.MediaType.TEXT_EVENT_STREAM; import static org.springframework.http.MediaType.TEXT_EVENT_STREAM;
/** /**
@ -98,7 +97,7 @@ public class ResponseEntityTests {
this.client.get().uri("/persons?map=true") this.client.get().uri("/persons?map=true")
.exchange() .exchange()
.expectStatus().isOk() .expectStatus().isOk()
.expectBody(forClassWithGenerics(Map.class, String.class, Person.class)).isEqualTo(map); .expectBody(new ParameterizedTypeReference<Map<String, Person>>() {}).isEqualTo(map);
} }
@Test @Test

View File

@ -25,6 +25,7 @@ import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.core.ResolvableType; import org.springframework.core.ResolvableType;
import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.http.HttpMessage; import org.springframework.http.HttpMessage;
@ -68,11 +69,25 @@ public abstract class BodyExtractors {
/** /**
* Return a {@code BodyExtractor} that reads into a Reactor {@link Mono}. * Return a {@code BodyExtractor} that reads into a Reactor {@link Mono}.
* @param elementType the type of element in the {@code Mono} * The given {@link ParameterizedTypeReference} is used to pass generic type information, for
* instance when using the {@link org.springframework.web.reactive.function.client.WebClient WebClient}
* <pre class="code">
* Mono&lt;Map&lt;String, String&gt;&gt; body = this.webClient
* .get()
* .uri("http://example.com")
* .exchange()
* .flatMap(r -> r.body(toMono(new ParameterizedTypeReference&lt;Map&lt;String,String&gt;&gt;() {})));
* </pre>
* @param typeReference a reference to the type of element in the {@code Mono}
* @param <T> the element type * @param <T> the element type
* @return a {@code BodyExtractor} that reads a mono * @return a {@code BodyExtractor} that reads a mono
*/ */
public static <T> BodyExtractor<Mono<T>, ReactiveHttpInputMessage> toMono(ResolvableType elementType) { public static <T> BodyExtractor<Mono<T>, ReactiveHttpInputMessage> toMono(ParameterizedTypeReference<T> typeReference) {
Assert.notNull(typeReference, "'typeReference' must not be null");
return toMono(ResolvableType.forType(typeReference.getType()));
}
static <T> BodyExtractor<Mono<T>, ReactiveHttpInputMessage> toMono(ResolvableType elementType) {
Assert.notNull(elementType, "'elementType' must not be null"); Assert.notNull(elementType, "'elementType' must not be null");
return (inputMessage, context) -> readWithMessageReaders(inputMessage, context, return (inputMessage, context) -> readWithMessageReaders(inputMessage, context,
elementType, elementType,
@ -93,7 +108,7 @@ public abstract class BodyExtractors {
* Return a {@code BodyExtractor} that reads into a Reactor {@link Flux}. * Return a {@code BodyExtractor} that reads into a Reactor {@link Flux}.
* @param elementClass the class of element in the {@code Flux} * @param elementClass the class of element in the {@code Flux}
* @param <T> the element type * @param <T> the element type
* @return a {@code BodyExtractor} that reads a mono * @return a {@code BodyExtractor} that reads a flux
*/ */
public static <T> BodyExtractor<Flux<T>, ReactiveHttpInputMessage> toFlux(Class<? extends T> elementClass) { public static <T> BodyExtractor<Flux<T>, ReactiveHttpInputMessage> toFlux(Class<? extends T> elementClass) {
Assert.notNull(elementClass, "'elementClass' must not be null"); Assert.notNull(elementClass, "'elementClass' must not be null");
@ -102,11 +117,25 @@ public abstract class BodyExtractors {
/** /**
* Return a {@code BodyExtractor} that reads into a Reactor {@link Flux}. * Return a {@code BodyExtractor} that reads into a Reactor {@link Flux}.
* @param elementType the type of element in the {@code Flux} * The given {@link ParameterizedTypeReference} is used to pass generic type information, for
* instance when using the {@link org.springframework.web.reactive.function.client.WebClient WebClient}
* <pre class="code">
* Flux&lt;ServerSentEvent&lt;String&gt;&gt; body = this.webClient
* .get()
* .uri("http://example.com")
* .exchange()
* .flatMap(r -> r.body(toFlux(new ParameterizedTypeReference&lt;ServerSentEvent&lt;String&gt;&gt;() {})));
* </pre>
* @param typeReference a reference to the type of element in the {@code Flux}
* @param <T> the element type * @param <T> the element type
* @return a {@code BodyExtractor} that reads a mono * @return a {@code BodyExtractor} that reads a flux
*/ */
public static <T> BodyExtractor<Flux<T>, ReactiveHttpInputMessage> toFlux(ResolvableType elementType) { public static <T> BodyExtractor<Flux<T>, ReactiveHttpInputMessage> toFlux(ParameterizedTypeReference<T> typeReference) {
Assert.notNull(typeReference, "'typeReference' must not be null");
return toFlux(ResolvableType.forType(typeReference.getType()));
}
static <T> BodyExtractor<Flux<T>, ReactiveHttpInputMessage> toFlux(ResolvableType elementType) {
Assert.notNull(elementType, "'elementType' must not be null"); Assert.notNull(elementType, "'elementType' must not be null");
return (inputMessage, context) -> readWithMessageReaders(inputMessage, context, return (inputMessage, context) -> readWithMessageReaders(inputMessage, context,
elementType, elementType,

View File

@ -23,6 +23,7 @@ import java.util.stream.Collectors;
import org.reactivestreams.Publisher; import org.reactivestreams.Publisher;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.core.ResolvableType; import org.springframework.core.ResolvableType;
import org.springframework.core.io.Resource; import org.springframework.core.io.Resource;
import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBuffer;
@ -100,17 +101,17 @@ public abstract class BodyInserters {
/** /**
* Return a {@code BodyInserter} that writes the given {@link Publisher}. * Return a {@code BodyInserter} that writes the given {@link Publisher}.
* @param publisher the publisher to stream to the response body * @param publisher the publisher to stream to the response body
* @param elementType the type of elements contained in the publisher * @param typeReference the type of elements contained in the publisher
* @param <T> the type of the elements contained in the publisher * @param <T> the type of the elements contained in the publisher
* @param <P> the type of the {@code Publisher} * @param <P> the type of the {@code Publisher}
* @return a {@code BodyInserter} that writes a {@code Publisher} * @return a {@code BodyInserter} that writes a {@code Publisher}
*/ */
public static <T, P extends Publisher<T>> BodyInserter<P, ReactiveHttpOutputMessage> fromPublisher( public static <T, P extends Publisher<T>> BodyInserter<P, ReactiveHttpOutputMessage> fromPublisher(
P publisher, ResolvableType elementType) { P publisher, ParameterizedTypeReference<T> typeReference) {
Assert.notNull(publisher, "'publisher' must not be null"); Assert.notNull(publisher, "'publisher' must not be null");
Assert.notNull(elementType, "'elementType' must not be null"); Assert.notNull(typeReference, "'typeReference' must not be null");
return bodyInserterFor(publisher, elementType); return bodyInserterFor(publisher, ResolvableType.forType(typeReference.getType()));
} }
/** /**
@ -197,7 +198,7 @@ public abstract class BodyInserters {
* Return a {@code BodyInserter} that writes the given {@code Publisher} publisher as * Return a {@code BodyInserter} that writes the given {@code Publisher} publisher as
* Server-Sent Events. * Server-Sent Events.
* @param eventsPublisher the publisher to write to the response body as Server-Sent Events * @param eventsPublisher the publisher to write to the response body as Server-Sent Events
* @param eventType the type of event contained in the publisher * @param typeReference the type of event contained in the publisher
* @param <T> the type of the elements contained in the publisher * @param <T> the type of the elements contained in the publisher
* @return a {@code BodyInserter} that writes the given {@code Publisher} publisher as * @return a {@code BodyInserter} that writes the given {@code Publisher} publisher as
* Server-Sent Events * Server-Sent Events
@ -207,6 +208,15 @@ public abstract class BodyInserters {
// ReactiveHttpOutputMessage like other methods, since sending SSEs only typically happens on // ReactiveHttpOutputMessage like other methods, since sending SSEs only typically happens on
// the server-side // the server-side
public static <T, S extends Publisher<T>> BodyInserter<S, ServerHttpResponse> fromServerSentEvents(S eventsPublisher, public static <T, S extends Publisher<T>> BodyInserter<S, ServerHttpResponse> fromServerSentEvents(S eventsPublisher,
ParameterizedTypeReference<T> typeReference) {
Assert.notNull(eventsPublisher, "'eventsPublisher' must not be null");
Assert.notNull(typeReference, "'typeReference' must not be null");
return fromServerSentEvents(eventsPublisher,
ResolvableType.forType(typeReference.getType()));
}
static <T, S extends Publisher<T>> BodyInserter<S, ServerHttpResponse> fromServerSentEvents(S eventsPublisher,
ResolvableType eventType) { ResolvableType eventType) {
Assert.notNull(eventsPublisher, "'eventsPublisher' must not be null"); Assert.notNull(eventsPublisher, "'eventsPublisher' must not be null");

View File

@ -23,7 +23,7 @@ import java.util.Set;
import org.reactivestreams.Publisher; import org.reactivestreams.Publisher;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import org.springframework.core.ResolvableType; import org.springframework.core.ParameterizedTypeReference;
import org.springframework.http.CacheControl; import org.springframework.http.CacheControl;
import org.springframework.http.HttpHeaders; import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod; import org.springframework.http.HttpMethod;
@ -81,14 +81,14 @@ public interface EntityResponse<T> extends ServerResponse {
/** /**
* Create a builder with the given publisher. * Create a builder with the given publisher.
* @param publisher the publisher that represents the body of the response * @param publisher the publisher that represents the body of the response
* @param elementType the type of elements contained in the publisher * @param typeReference the type of elements contained in the publisher
* @param <T> the type of the elements contained in the publisher * @param <T> the type of the elements contained in the publisher
* @param <P> the type of the {@code Publisher} * @param <P> the type of the {@code Publisher}
* @return the created builder * @return the created builder
*/ */
static <T, P extends Publisher<T>> Builder<P> fromPublisher(P publisher, ResolvableType elementType) { static <T, P extends Publisher<T>> Builder<P> fromPublisher(P publisher, ParameterizedTypeReference<T> typeReference) {
return new DefaultEntityResponseBuilder<>(publisher, return new DefaultEntityResponseBuilder<>(publisher,
BodyInserters.fromPublisher(publisher, elementType)); BodyInserters.fromPublisher(publisher, typeReference));
} }

View File

@ -21,6 +21,7 @@ import java.nio.charset.StandardCharsets;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Optional; import java.util.Optional;
@ -32,6 +33,7 @@ import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import reactor.test.StepVerifier; import reactor.test.StepVerifier;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.core.codec.ByteBufferDecoder; import org.springframework.core.codec.ByteBufferDecoder;
import org.springframework.core.codec.StringDecoder; import org.springframework.core.codec.StringDecoder;
import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBuffer;
@ -120,6 +122,28 @@ public class BodyExtractorsTests {
.verify(); .verify();
} }
@Test
public void toMonoParameterizedTypeReference() throws Exception {
ParameterizedTypeReference<Map<String, String>> typeReference = new ParameterizedTypeReference<Map<String, String>>() {};
BodyExtractor<Mono<Map<String, String>>, ReactiveHttpInputMessage> extractor = BodyExtractors.toMono(typeReference);
DefaultDataBufferFactory factory = new DefaultDataBufferFactory();
DefaultDataBuffer dataBuffer =
factory.wrap(ByteBuffer.wrap("{\"username\":\"foo\",\"password\":\"bar\"}".getBytes(StandardCharsets.UTF_8)));
Flux<DataBuffer> body = Flux.just(dataBuffer);
MockServerHttpRequest request = MockServerHttpRequest.post("/").contentType(MediaType.APPLICATION_JSON).body(body);
Mono<Map<String, String>> result = extractor.extract(request, this.context);
Map<String, String > expected = new LinkedHashMap<>();
expected.put("username", "foo");
expected.put("password", "bar");
StepVerifier.create(result)
.expectNext(expected)
.expectComplete()
.verify();
}
@Test @Test
public void toMonoWithHints() throws Exception { public void toMonoWithHints() throws Exception {
BodyExtractor<Mono<User>, ReactiveHttpInputMessage> extractor = BodyExtractors.toMono(User.class); BodyExtractor<Mono<User>, ReactiveHttpInputMessage> extractor = BodyExtractors.toMono(User.class);

View File

@ -30,7 +30,7 @@ import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import reactor.test.StepVerifier; import reactor.test.StepVerifier;
import org.springframework.core.ResolvableType; import org.springframework.core.ParameterizedTypeReference;
import org.springframework.core.codec.CharSequenceEncoder; import org.springframework.core.codec.CharSequenceEncoder;
import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DefaultDataBufferFactory; import org.springframework.core.io.buffer.DefaultDataBufferFactory;
@ -70,10 +70,10 @@ public class DefaultEntityResponseBuilderTests {
} }
@Test @Test
public void fromPublisherResolvableType() throws Exception { public void fromPublisher() throws Exception {
Flux<String> body = Flux.just("foo", "bar"); Flux<String> body = Flux.just("foo", "bar");
ResolvableType type = ResolvableType.forClass(String.class); ParameterizedTypeReference<String> typeReference = new ParameterizedTypeReference<String>() {};
EntityResponse<Flux<String>> response = EntityResponse.fromPublisher(body, type).build().block(); EntityResponse<Flux<String>> response = EntityResponse.fromPublisher(body, typeReference).build().block();
assertSame(body, response.entity()); assertSame(body, response.entity());
} }

View File

@ -24,15 +24,15 @@ import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import reactor.test.StepVerifier; import reactor.test.StepVerifier;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.http.codec.ServerSentEvent; import org.springframework.http.codec.ServerSentEvent;
import org.springframework.web.reactive.function.client.WebClient; import org.springframework.web.reactive.function.client.WebClient;
import static org.junit.Assert.*; import static org.junit.Assert.*;
import static org.springframework.core.ResolvableType.*; import static org.springframework.http.MediaType.TEXT_EVENT_STREAM;
import static org.springframework.http.MediaType.*; import static org.springframework.web.reactive.function.BodyExtractors.toFlux;
import static org.springframework.web.reactive.function.BodyExtractors.*; import static org.springframework.web.reactive.function.BodyInserters.fromServerSentEvents;
import static org.springframework.web.reactive.function.BodyInserters.*; import static org.springframework.web.reactive.function.server.RouterFunctions.route;
import static org.springframework.web.reactive.function.server.RouterFunctions.*;
/** /**
* @author Arjen Poutsma * @author Arjen Poutsma
@ -94,7 +94,7 @@ public class SseHandlerFunctionIntegrationTests extends AbstractRouterFunctionIn
.accept(TEXT_EVENT_STREAM) .accept(TEXT_EVENT_STREAM)
.exchange() .exchange()
.flatMapMany(response -> response.body(toFlux( .flatMapMany(response -> response.body(toFlux(
forClassWithGenerics(ServerSentEvent.class, String.class)))); new ParameterizedTypeReference<ServerSentEvent<String>>() {})));
StepVerifier.create(result) StepVerifier.create(result)
.consumeNextWith( event -> { .consumeNextWith( event -> {

View File

@ -26,6 +26,7 @@ import reactor.test.StepVerifier;
import org.springframework.context.annotation.AnnotationConfigApplicationContext; import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Configuration;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.core.ResolvableType; import org.springframework.core.ResolvableType;
import org.springframework.http.codec.ServerSentEvent; import org.springframework.http.codec.ServerSentEvent;
import org.springframework.http.server.reactive.AbstractHttpHandlerIntegrationTests; import org.springframework.http.server.reactive.AbstractHttpHandlerIntegrationTests;
@ -38,9 +39,9 @@ import org.springframework.web.reactive.function.client.WebClient;
import org.springframework.web.server.adapter.WebHttpHandlerBuilder; import org.springframework.web.server.adapter.WebHttpHandlerBuilder;
import static org.junit.Assert.*; import static org.junit.Assert.*;
import static org.springframework.core.ResolvableType.*; import static org.springframework.core.ResolvableType.forClassWithGenerics;
import static org.springframework.http.MediaType.*; import static org.springframework.http.MediaType.TEXT_EVENT_STREAM;
import static org.springframework.web.reactive.function.BodyExtractors.*; import static org.springframework.web.reactive.function.BodyExtractors.toFlux;
/** /**
* @author Sebastien Deleuze * @author Sebastien Deleuze
@ -106,7 +107,7 @@ public class SseIntegrationTests extends AbstractHttpHandlerIntegrationTests {
.uri("/event") .uri("/event")
.accept(TEXT_EVENT_STREAM) .accept(TEXT_EVENT_STREAM)
.exchange() .exchange()
.flatMapMany(response -> response.body(toFlux(type))); .flatMapMany(response -> response.body(toFlux(new ParameterizedTypeReference<ServerSentEvent<String>>() {})));
StepVerifier.create(result) StepVerifier.create(result)
.consumeNextWith( event -> { .consumeNextWith( event -> {
@ -133,8 +134,7 @@ public class SseIntegrationTests extends AbstractHttpHandlerIntegrationTests {
.uri("/event") .uri("/event")
.accept(TEXT_EVENT_STREAM) .accept(TEXT_EVENT_STREAM)
.exchange() .exchange()
.flatMapMany(response -> response.body(toFlux( .flatMapMany(response -> response.body(toFlux(new ParameterizedTypeReference<ServerSentEvent<String>>() {})));
forClassWithGenerics(ServerSentEvent.class, String.class))));
StepVerifier.create(result) StepVerifier.create(result)
.consumeNextWith( event -> { .consumeNextWith( event -> {