Refactored SseEvent to ServerSentEvent
- Renamed SseEvent to ServerSentEvent to make the name less redundant. - ServerSentEvent is now immutable, having a builder to create new instances. - Realigned the class properties to more closely match the events described in the spec, so that `reconnectTime` becomes `retry`, and `name` becomes `event`.
This commit is contained in:
parent
d9eaa5f3ac
commit
16b525f698
|
@ -44,7 +44,7 @@ import org.springframework.http.codec.EncoderHttpMessageWriter;
|
||||||
import org.springframework.http.codec.HttpMessageReader;
|
import org.springframework.http.codec.HttpMessageReader;
|
||||||
import org.springframework.http.codec.HttpMessageWriter;
|
import org.springframework.http.codec.HttpMessageWriter;
|
||||||
import org.springframework.http.codec.ResourceHttpMessageWriter;
|
import org.springframework.http.codec.ResourceHttpMessageWriter;
|
||||||
import org.springframework.http.codec.SseEventHttpMessageWriter;
|
import org.springframework.http.codec.ServerSentEventHttpMessageWriter;
|
||||||
import org.springframework.http.codec.json.Jackson2JsonDecoder;
|
import org.springframework.http.codec.json.Jackson2JsonDecoder;
|
||||||
import org.springframework.http.codec.json.Jackson2JsonEncoder;
|
import org.springframework.http.codec.json.Jackson2JsonEncoder;
|
||||||
import org.springframework.http.codec.xml.Jaxb2XmlDecoder;
|
import org.springframework.http.codec.xml.Jaxb2XmlDecoder;
|
||||||
|
@ -372,7 +372,7 @@ public class WebReactiveConfiguration implements ApplicationContextAware {
|
||||||
writers.add(new EncoderHttpMessageWriter<>(jacksonEncoder));
|
writers.add(new EncoderHttpMessageWriter<>(jacksonEncoder));
|
||||||
sseDataEncoders.add(jacksonEncoder);
|
sseDataEncoders.add(jacksonEncoder);
|
||||||
}
|
}
|
||||||
writers.add(new SseEventHttpMessageWriter(sseDataEncoders));
|
writers.add(new ServerSentEventHttpMessageWriter(sseDataEncoders));
|
||||||
}
|
}
|
||||||
/**
|
/**
|
||||||
* Override this to modify the list of message writers after it has been
|
* Override this to modify the list of message writers after it has been
|
||||||
|
|
|
@ -28,7 +28,7 @@ import org.springframework.context.annotation.Bean;
|
||||||
import org.springframework.context.annotation.Configuration;
|
import org.springframework.context.annotation.Configuration;
|
||||||
import org.springframework.http.MediaType;
|
import org.springframework.http.MediaType;
|
||||||
import org.springframework.http.client.reactive.ReactorClientHttpConnector;
|
import org.springframework.http.client.reactive.ReactorClientHttpConnector;
|
||||||
import org.springframework.http.codec.SseEvent;
|
import org.springframework.http.codec.ServerSentEvent;
|
||||||
import org.springframework.http.server.reactive.AbstractHttpHandlerIntegrationTests;
|
import org.springframework.http.server.reactive.AbstractHttpHandlerIntegrationTests;
|
||||||
import org.springframework.http.server.reactive.HttpHandler;
|
import org.springframework.http.server.reactive.HttpHandler;
|
||||||
import org.springframework.tests.TestSubscriber;
|
import org.springframework.tests.TestSubscriber;
|
||||||
|
@ -135,14 +135,11 @@ public class SseIntegrationTests extends AbstractHttpHandlerIntegrationTests {
|
||||||
}
|
}
|
||||||
|
|
||||||
@RequestMapping("/sse/event")
|
@RequestMapping("/sse/event")
|
||||||
Flux<SseEvent> sse() {
|
Flux<ServerSentEvent<String>> sse() {
|
||||||
return Flux.interval(Duration.ofMillis(100)).map(l -> {
|
return Flux.interval(Duration.ofMillis(100)).map(l -> ServerSentEvent.builder("foo")
|
||||||
SseEvent event = new SseEvent();
|
.id(Long.toString(l))
|
||||||
event.setId(Long.toString(l));
|
.comment("bar")
|
||||||
event.setData("foo");
|
.build()).take(2);
|
||||||
event.setComment("bar");
|
|
||||||
return event;
|
|
||||||
}).take(2);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,226 @@
|
||||||
|
/*
|
||||||
|
* Copyright 2002-2016 the original author or authors.
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.springframework.http.codec;
|
||||||
|
|
||||||
|
import java.time.Duration;
|
||||||
|
import java.util.Optional;
|
||||||
|
|
||||||
|
import org.springframework.http.codec.json.Jackson2JsonEncoder;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Representation for a Server-Sent Event for use with Spring's reactive Web
|
||||||
|
* support. {@code Flux<SseEvent>} or {@code Observable<SseEvent>} is the
|
||||||
|
* reactive equivalent to Spring MVC's {@code SseEmitter}.
|
||||||
|
*
|
||||||
|
* @param <T> the type of data that this event contains
|
||||||
|
* @author Sebastien Deleuze
|
||||||
|
* @author Arjen Poutsma
|
||||||
|
* @see ServerSentEventHttpMessageWriter
|
||||||
|
* @see <a href="https://www.w3.org/TR/eventsource/">Server-Sent Events W3C recommendation</a>
|
||||||
|
* @since 5.0
|
||||||
|
*/
|
||||||
|
public class ServerSentEvent<T> {
|
||||||
|
|
||||||
|
private final String id;
|
||||||
|
|
||||||
|
private final String event;
|
||||||
|
|
||||||
|
private final T data;
|
||||||
|
|
||||||
|
private final Duration retry;
|
||||||
|
|
||||||
|
private final String comment;
|
||||||
|
|
||||||
|
private ServerSentEvent(String id, String event, T data, Duration retry, String comment) {
|
||||||
|
this.id = id;
|
||||||
|
this.event = event;
|
||||||
|
this.data = data;
|
||||||
|
this.retry = retry;
|
||||||
|
this.comment = comment;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Return a builder for a {@code SseEvent}.
|
||||||
|
*
|
||||||
|
* @param <T> the type of data that this event contains
|
||||||
|
* @return the builder
|
||||||
|
*/
|
||||||
|
public static <T> Builder<T> builder() {
|
||||||
|
return new BuilderImpl<>();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Return a builder for a {@code SseEvent}, populated with the give {@linkplain #data() data}.
|
||||||
|
*
|
||||||
|
* @param <T> the type of data that this event contains
|
||||||
|
* @return the builder
|
||||||
|
*/
|
||||||
|
public static <T> Builder<T> builder(T data) {
|
||||||
|
return new BuilderImpl<>(data);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Return the {@code id} field of this event, if available.
|
||||||
|
*/
|
||||||
|
public Optional<String> id() {
|
||||||
|
return Optional.ofNullable(this.id);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Return the {@code event} field of this event, if available.
|
||||||
|
*/
|
||||||
|
public Optional<String> event() {
|
||||||
|
return Optional.ofNullable(this.event);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Return the {@code data} field of this event, if available.
|
||||||
|
*/
|
||||||
|
public Optional<T> data() {
|
||||||
|
return Optional.ofNullable(this.data);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Return the {@code retry} field of this event, if available.
|
||||||
|
*/
|
||||||
|
public Optional<Duration> retry() {
|
||||||
|
return Optional.ofNullable(this.retry);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Return the comment of this event, if available.
|
||||||
|
*/
|
||||||
|
public Optional<String> comment() {
|
||||||
|
return Optional.ofNullable(this.comment);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A mutable builder for a {@code SseEvent}.
|
||||||
|
*
|
||||||
|
* @param <T> the type of data that this event contains
|
||||||
|
*/
|
||||||
|
public interface Builder<T> {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set the value of the {@code id} field.
|
||||||
|
*
|
||||||
|
* @param id the value of the id field
|
||||||
|
* @return {@code this} builder
|
||||||
|
*/
|
||||||
|
Builder<T> id(String id);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set the value of the {@code event} field.
|
||||||
|
*
|
||||||
|
* @param event the value of the event field
|
||||||
|
* @return {@code this} builder
|
||||||
|
*/
|
||||||
|
Builder<T> event(String event);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set the value of the {@code data} field. If the {@code data} argument is a multi-line {@code String}, it
|
||||||
|
* will be turned into multiple {@code data} field lines as defined in Server-Sent Events
|
||||||
|
* W3C recommendation. If {@code data} is not a String, it will be
|
||||||
|
* {@linkplain Jackson2JsonEncoder encoded} into JSON.
|
||||||
|
*
|
||||||
|
* @param data the value of the data field
|
||||||
|
* @return {@code this} builder
|
||||||
|
*/
|
||||||
|
Builder<T> data(T data);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set the value of the {@code retry} field.
|
||||||
|
*
|
||||||
|
* @param retry the value of the retry field
|
||||||
|
* @return {@code this} builder
|
||||||
|
*/
|
||||||
|
Builder<T> retry(Duration retry);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set SSE comment. If a multi-line comment is provided, it will be turned into multiple
|
||||||
|
* SSE comment lines as defined in Server-Sent Events W3C
|
||||||
|
* recommendation.
|
||||||
|
*
|
||||||
|
* @param comment the comment to set
|
||||||
|
* @return {@code this} builder
|
||||||
|
*/
|
||||||
|
Builder<T> comment(String comment);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Builds the event.
|
||||||
|
*
|
||||||
|
* @return the built event
|
||||||
|
*/
|
||||||
|
ServerSentEvent<T> build();
|
||||||
|
}
|
||||||
|
|
||||||
|
private static class BuilderImpl<T> implements Builder<T> {
|
||||||
|
|
||||||
|
private T data;
|
||||||
|
|
||||||
|
private String id;
|
||||||
|
|
||||||
|
private String event;
|
||||||
|
|
||||||
|
private Duration retry;
|
||||||
|
|
||||||
|
private String comment;
|
||||||
|
|
||||||
|
public BuilderImpl() {
|
||||||
|
}
|
||||||
|
|
||||||
|
public BuilderImpl(T data) {
|
||||||
|
this.data = data;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Builder<T> id(String id) {
|
||||||
|
this.id = id;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Builder<T> event(String event) {
|
||||||
|
this.event = event;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Builder<T> data(T data) {
|
||||||
|
this.data = data;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Builder<T> retry(Duration retry) {
|
||||||
|
this.retry = retry;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Builder<T> comment(String comment) {
|
||||||
|
this.comment = comment;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ServerSentEvent<T> build() {
|
||||||
|
return new ServerSentEvent<T>(this.id, this.event, this.data, this.retry, this.comment);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -33,24 +33,29 @@ import org.springframework.core.io.buffer.DataBufferFactory;
|
||||||
import org.springframework.http.MediaType;
|
import org.springframework.http.MediaType;
|
||||||
import org.springframework.http.ReactiveHttpOutputMessage;
|
import org.springframework.http.ReactiveHttpOutputMessage;
|
||||||
import org.springframework.util.Assert;
|
import org.springframework.util.Assert;
|
||||||
|
import org.springframework.util.MimeTypeUtils;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Encoder that supports a stream of {@link SseEvent}s and also plain
|
* Encoder that supports a stream of {@link ServerSentEvent}s and also plain
|
||||||
* {@link Object}s which is the same as an {@link SseEvent} with data
|
* {@link Object}s which is the same as an {@link ServerSentEvent} with data
|
||||||
* only.
|
* only.
|
||||||
*
|
*
|
||||||
* @author Sebastien Deleuze
|
* @author Sebastien Deleuze
|
||||||
* @since 5.0
|
|
||||||
* @author Arjen Poutsma
|
* @author Arjen Poutsma
|
||||||
|
* @since 5.0
|
||||||
*/
|
*/
|
||||||
public class SseEventHttpMessageWriter implements HttpMessageWriter<Object> {
|
public class ServerSentEventHttpMessageWriter implements HttpMessageWriter<Object> {
|
||||||
|
|
||||||
private static final MediaType TEXT_EVENT_STREAM =
|
private static final MediaType TEXT_EVENT_STREAM =
|
||||||
new MediaType("text", "event-stream");
|
new MediaType("text", "event-stream");
|
||||||
|
|
||||||
private final List<Encoder<?>> dataEncoders;
|
private final List<Encoder<?>> dataEncoders;
|
||||||
|
|
||||||
public SseEventHttpMessageWriter(List<Encoder<?>> dataEncoders) {
|
public ServerSentEventHttpMessageWriter() {
|
||||||
|
this.dataEncoders = Collections.emptyList();
|
||||||
|
}
|
||||||
|
|
||||||
|
public ServerSentEventHttpMessageWriter(List<Encoder<?>> dataEncoders) {
|
||||||
Assert.notNull(dataEncoders, "'dataEncoders' must not be null");
|
Assert.notNull(dataEncoders, "'dataEncoders' must not be null");
|
||||||
this.dataEncoders = dataEncoders;
|
this.dataEncoders = dataEncoders;
|
||||||
}
|
}
|
||||||
|
@ -67,7 +72,7 @@ public class SseEventHttpMessageWriter implements HttpMessageWriter<Object> {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Mono<Void> write(Publisher<?> inputStream, ResolvableType type,
|
public Mono<Void> write(Publisher<?> inputStream, ResolvableType type,
|
||||||
MediaType contentType, ReactiveHttpOutputMessage outputMessage) {
|
MediaType contentType, ReactiveHttpOutputMessage outputMessage) {
|
||||||
|
|
||||||
outputMessage.getHeaders().setContentType(TEXT_EVENT_STREAM);
|
outputMessage.getHeaders().setContentType(TEXT_EVENT_STREAM);
|
||||||
|
|
||||||
|
@ -82,68 +87,60 @@ public class SseEventHttpMessageWriter implements HttpMessageWriter<Object> {
|
||||||
}
|
}
|
||||||
|
|
||||||
private Flux<Publisher<DataBuffer>> encode(Publisher<?> inputStream,
|
private Flux<Publisher<DataBuffer>> encode(Publisher<?> inputStream,
|
||||||
DataBufferFactory bufferFactory, ResolvableType type) {
|
DataBufferFactory bufferFactory, ResolvableType type) {
|
||||||
|
|
||||||
return Flux.from(inputStream).map(input -> {
|
return Flux.from(inputStream)
|
||||||
SseEvent event =
|
.map(o -> toSseEvent(o, type))
|
||||||
(SseEvent.class.equals(type.getRawClass()) ? (SseEvent) input :
|
.map(sse -> {
|
||||||
new SseEvent(input));
|
StringBuilder sb = new StringBuilder();
|
||||||
|
sse.id().ifPresent(id -> writeField("id", id, sb));
|
||||||
|
sse.event().ifPresent(event -> writeField("event", event, sb));
|
||||||
|
sse.retry().ifPresent(retry -> writeField("retry", retry.toMillis(), sb));
|
||||||
|
sse.comment().ifPresent(comment -> {
|
||||||
|
comment = comment.replaceAll("\\n", "\n:");
|
||||||
|
sb.append(':').append(comment).append("\n");
|
||||||
|
});
|
||||||
|
Flux<DataBuffer> dataBuffer = sse.data()
|
||||||
|
.<Flux<DataBuffer>>map(data -> {
|
||||||
|
sb.append("data:");
|
||||||
|
if (data instanceof String) {
|
||||||
|
String stringData = ((String) data).replaceAll("\\n", "\ndata:");
|
||||||
|
sb.append(stringData).append('\n');
|
||||||
|
return Flux.empty();
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
return applyEncoder(data, bufferFactory);
|
||||||
|
}
|
||||||
|
}).orElse(Flux.empty());
|
||||||
|
|
||||||
StringBuilder sb = new StringBuilder();
|
return Flux.concat(encodeString(sb.toString(), bufferFactory), dataBuffer,
|
||||||
|
encodeString("\n", bufferFactory));
|
||||||
if (event.getId() != null) {
|
});
|
||||||
sb.append("id:");
|
|
||||||
sb.append(event.getId());
|
|
||||||
sb.append("\n");
|
|
||||||
}
|
|
||||||
|
|
||||||
if (event.getName() != null) {
|
|
||||||
sb.append("event:");
|
|
||||||
sb.append(event.getName());
|
|
||||||
sb.append("\n");
|
|
||||||
}
|
|
||||||
|
|
||||||
if (event.getReconnectTime() != null) {
|
|
||||||
sb.append("retry:");
|
|
||||||
sb.append(event.getReconnectTime().toString());
|
|
||||||
sb.append("\n");
|
|
||||||
}
|
|
||||||
|
|
||||||
if (event.getComment() != null) {
|
|
||||||
sb.append(":");
|
|
||||||
sb.append(event.getComment().replaceAll("\\n", "\n:"));
|
|
||||||
sb.append("\n");
|
|
||||||
}
|
|
||||||
|
|
||||||
Object data = event.getData();
|
|
||||||
Flux<DataBuffer> dataBuffer = Flux.empty();
|
|
||||||
MediaType mediaType =
|
|
||||||
(event.getMediaType() == null ? MediaType.ALL : event.getMediaType());
|
|
||||||
if (data != null) {
|
|
||||||
sb.append("data:");
|
|
||||||
if (data instanceof String) {
|
|
||||||
sb.append(((String) data).replaceAll("\\n", "\ndata:")).append("\n");
|
|
||||||
}
|
|
||||||
else {
|
|
||||||
dataBuffer = applyEncoder(data, mediaType, bufferFactory);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return Flux.concat(encodeString(sb.toString(), bufferFactory), dataBuffer,
|
|
||||||
encodeString("\n", bufferFactory));
|
|
||||||
});
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private ServerSentEvent<?> toSseEvent(Object data, ResolvableType type) {
|
||||||
|
return ServerSentEvent.class.isAssignableFrom(type.getRawClass())
|
||||||
|
? (ServerSentEvent<?>) data
|
||||||
|
: ServerSentEvent.builder().data(data).build();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void writeField(String fieldName, Object fieldValue, StringBuilder stringBuilder) {
|
||||||
|
stringBuilder.append(fieldName);
|
||||||
|
stringBuilder.append(':');
|
||||||
|
stringBuilder.append(fieldValue.toString());
|
||||||
|
stringBuilder.append("\n");
|
||||||
|
}
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
private <T> Flux<DataBuffer> applyEncoder(Object data, MediaType mediaType, DataBufferFactory bufferFactory) {
|
private <T> Flux<DataBuffer> applyEncoder(Object data, DataBufferFactory bufferFactory) {
|
||||||
ResolvableType elementType = ResolvableType.forClass(data.getClass());
|
ResolvableType elementType = ResolvableType.forClass(data.getClass());
|
||||||
Optional<Encoder<?>> encoder = dataEncoders
|
Optional<Encoder<?>> encoder = dataEncoders
|
||||||
.stream()
|
.stream()
|
||||||
.filter(e -> e.canEncode(elementType, mediaType))
|
.filter(e -> e.canEncode(elementType, MimeTypeUtils.APPLICATION_JSON))
|
||||||
.findFirst();
|
.findFirst();
|
||||||
return ((Encoder<T>) encoder.orElseThrow(() -> new CodecException("No suitable encoder found!")))
|
return ((Encoder<T>) encoder.orElseThrow(() -> new CodecException("No suitable encoder found!")))
|
||||||
.encode(Mono.just((T) data), bufferFactory, elementType, mediaType)
|
.encode(Mono.just((T) data), bufferFactory, elementType, MimeTypeUtils.APPLICATION_JSON)
|
||||||
.concatWith(encodeString("\n", bufferFactory));
|
.concatWith(encodeString("\n", bufferFactory));
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,165 +0,0 @@
|
||||||
/*
|
|
||||||
* Copyright 2002-2016 the original author or authors.
|
|
||||||
*
|
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
* you may not use this file except in compliance with the License.
|
|
||||||
* You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package org.springframework.http.codec;
|
|
||||||
|
|
||||||
import org.springframework.http.MediaType;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Representation for a Server-Sent Event for use with Spring's reactive Web
|
|
||||||
* support. {@code Flux<SseEvent>} or {@code Observable<SseEvent>} is the
|
|
||||||
* reactive equivalent to Spring MVC's {@code SseEmitter}.
|
|
||||||
*
|
|
||||||
* @author Sebastien Deleuze
|
|
||||||
* @since 5.0
|
|
||||||
* @see SseEventHttpMessageWriter
|
|
||||||
* @see <a href="https://www.w3.org/TR/eventsource/">Server-Sent Events W3C recommendation</a>
|
|
||||||
*/
|
|
||||||
public class SseEvent {
|
|
||||||
|
|
||||||
private String id;
|
|
||||||
|
|
||||||
private String name;
|
|
||||||
|
|
||||||
private Object data;
|
|
||||||
|
|
||||||
private MediaType mediaType;
|
|
||||||
|
|
||||||
private Long reconnectTime;
|
|
||||||
|
|
||||||
private String comment;
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Create an empty instance.
|
|
||||||
*/
|
|
||||||
public SseEvent() {
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Create an instance with the provided {@code data}.
|
|
||||||
*/
|
|
||||||
public SseEvent(Object data) {
|
|
||||||
this.data = data;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Create an instance with the provided {@code data} and {@code mediaType}.
|
|
||||||
*/
|
|
||||||
public SseEvent(Object data, MediaType mediaType) {
|
|
||||||
this.data = data;
|
|
||||||
this.mediaType = mediaType;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Set the {@code id} SSE field
|
|
||||||
*/
|
|
||||||
public void setId(String id) {
|
|
||||||
this.id = id;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @see #setId(String)
|
|
||||||
*/
|
|
||||||
public String getId() {
|
|
||||||
return id;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Set the {@code event} SSE field
|
|
||||||
*/
|
|
||||||
public void setName(String name) {
|
|
||||||
this.name = name;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @see #setName(String)
|
|
||||||
*/
|
|
||||||
public String getName() {
|
|
||||||
return name;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Set {@code data} SSE field. If a multiline {@code String} is provided, it will be
|
|
||||||
* turned into multiple {@code data} field lines as defined in Server-Sent Events
|
|
||||||
* W3C recommendation.
|
|
||||||
*
|
|
||||||
* If no {@code mediaType} is defined, default {@link SseEventHttpMessageWriter} will:
|
|
||||||
* - Turn single line {@code String} to a single {@code data} field
|
|
||||||
* - Turn multiline line {@code String} to multiple {@code data} fields
|
|
||||||
* - Serialize other {@code Object} as JSON
|
|
||||||
*
|
|
||||||
* @see #setMediaType(MediaType)
|
|
||||||
*/
|
|
||||||
public void setData(Object data) {
|
|
||||||
this.data = data;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @see #setData(Object)
|
|
||||||
*/
|
|
||||||
public Object getData() {
|
|
||||||
return data;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Set the {@link MediaType} used to serialize the {@code data}.
|
|
||||||
* {@link SseEventHttpMessageWriter} should be configured with the relevant encoder to be
|
|
||||||
* able to serialize it.
|
|
||||||
*/
|
|
||||||
public void setMediaType(MediaType mediaType) {
|
|
||||||
this.mediaType = mediaType;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @see #setMediaType(MediaType)
|
|
||||||
*/
|
|
||||||
public MediaType getMediaType() {
|
|
||||||
return this.mediaType;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Set the {@code retry} SSE field
|
|
||||||
*/
|
|
||||||
public void setReconnectTime(Long reconnectTime) {
|
|
||||||
this.reconnectTime = reconnectTime;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @see #setReconnectTime(Long)
|
|
||||||
*/
|
|
||||||
public Long getReconnectTime() {
|
|
||||||
return reconnectTime;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Set SSE comment. If a multiline comment is provided, it will be turned into multiple
|
|
||||||
* SSE comment lines by {@link SseEventHttpMessageWriter} as defined in Server-Sent Events W3C
|
|
||||||
* recommendation.
|
|
||||||
*/
|
|
||||||
public void setComment(String comment) {
|
|
||||||
this.comment = comment;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @see #setComment(String)
|
|
||||||
*/
|
|
||||||
public String getComment() {
|
|
||||||
return comment;
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
|
@ -16,6 +16,7 @@
|
||||||
|
|
||||||
package org.springframework.http.codec;
|
package org.springframework.http.codec;
|
||||||
|
|
||||||
|
import java.time.Duration;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
|
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
@ -31,44 +32,43 @@ import org.springframework.http.codec.json.Jackson2JsonEncoder;
|
||||||
import org.springframework.http.server.reactive.MockServerHttpResponse;
|
import org.springframework.http.server.reactive.MockServerHttpResponse;
|
||||||
import org.springframework.tests.TestSubscriber;
|
import org.springframework.tests.TestSubscriber;
|
||||||
|
|
||||||
import static org.junit.Assert.*;
|
import static org.junit.Assert.assertFalse;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @author Sebastien Deleuze
|
* @author Sebastien Deleuze
|
||||||
*/
|
*/
|
||||||
public class SseEventHttpMessageWriterTests extends AbstractDataBufferAllocatingTestCase {
|
public class ServerSentEventHttpMessageWriterTests extends AbstractDataBufferAllocatingTestCase {
|
||||||
|
|
||||||
private SseEventHttpMessageWriter converter = new SseEventHttpMessageWriter(
|
private ServerSentEventHttpMessageWriter messageWriter = new ServerSentEventHttpMessageWriter(
|
||||||
Collections.singletonList(new Jackson2JsonEncoder()));
|
Collections.singletonList(new Jackson2JsonEncoder()));
|
||||||
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void nullMimeType() {
|
public void nullMimeType() {
|
||||||
assertTrue(converter.canWrite(ResolvableType.forClass(Object.class), null));
|
assertTrue(messageWriter.canWrite(ResolvableType.forClass(Object.class), null));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void unsupportedMimeType() {
|
public void unsupportedMimeType() {
|
||||||
assertFalse(converter.canWrite(ResolvableType.forClass(Object.class),
|
assertFalse(messageWriter.canWrite(ResolvableType.forClass(Object.class),
|
||||||
new MediaType("foo", "bar")));
|
new MediaType("foo", "bar")));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void supportedMimeType() {
|
public void supportedMimeType() {
|
||||||
assertTrue(converter.canWrite(ResolvableType.forClass(Object.class),
|
assertTrue(messageWriter.canWrite(ResolvableType.forClass(Object.class),
|
||||||
new MediaType("text", "event-stream")));
|
new MediaType("text", "event-stream")));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void encodeServerSentEvent() {
|
public void encodeServerSentEvent() {
|
||||||
SseEvent event = new SseEvent();
|
ServerSentEvent<String>
|
||||||
event.setId("c42");
|
event = ServerSentEvent.<String>builder().data("bar").id("c42").event("foo").comment("bla\nbla bla\nbla bla bla")
|
||||||
event.setName("foo");
|
.retry(Duration.ofMillis(123L)).build();
|
||||||
event.setComment("bla\nbla bla\nbla bla bla");
|
Mono<ServerSentEvent<String>> source = Mono.just(event);
|
||||||
event.setReconnectTime(123L);
|
|
||||||
Mono<SseEvent> source = Mono.just(event);
|
|
||||||
MockServerHttpResponse outputMessage = new MockServerHttpResponse();
|
MockServerHttpResponse outputMessage = new MockServerHttpResponse();
|
||||||
converter.write(source, ResolvableType.forClass(SseEvent.class),
|
messageWriter.write(source, ResolvableType.forClass(ServerSentEvent.class),
|
||||||
new MediaType("text", "event-stream"), outputMessage);
|
new MediaType("text", "event-stream"), outputMessage);
|
||||||
|
|
||||||
Publisher<Publisher<DataBuffer>> result = outputMessage.getBodyWithFlush();
|
Publisher<Publisher<DataBuffer>> result = outputMessage.getBodyWithFlush();
|
||||||
|
@ -77,7 +77,8 @@ public class SseEventHttpMessageWriterTests extends AbstractDataBufferAllocating
|
||||||
assertValuesWith(publisher -> {
|
assertValuesWith(publisher -> {
|
||||||
TestSubscriber.subscribe(publisher).assertNoError().assertValuesWith(
|
TestSubscriber.subscribe(publisher).assertNoError().assertValuesWith(
|
||||||
stringConsumer("id:c42\n" + "event:foo\n" + "retry:123\n" +
|
stringConsumer("id:c42\n" + "event:foo\n" + "retry:123\n" +
|
||||||
":bla\n:bla bla\n:bla bla bla\n"),
|
":bla\n:bla bla\n:bla bla bla\n" +
|
||||||
|
"data:bar\n"),
|
||||||
stringConsumer("\n"));
|
stringConsumer("\n"));
|
||||||
|
|
||||||
});
|
});
|
||||||
|
@ -87,7 +88,7 @@ public class SseEventHttpMessageWriterTests extends AbstractDataBufferAllocating
|
||||||
public void encodeString() {
|
public void encodeString() {
|
||||||
Flux<String> source = Flux.just("foo", "bar");
|
Flux<String> source = Flux.just("foo", "bar");
|
||||||
MockServerHttpResponse outputMessage = new MockServerHttpResponse();
|
MockServerHttpResponse outputMessage = new MockServerHttpResponse();
|
||||||
converter.write(source, ResolvableType.forClass(String.class),
|
messageWriter.write(source, ResolvableType.forClass(String.class),
|
||||||
new MediaType("text", "event-stream"), outputMessage);
|
new MediaType("text", "event-stream"), outputMessage);
|
||||||
|
|
||||||
Publisher<Publisher<DataBuffer>> result = outputMessage.getBodyWithFlush();
|
Publisher<Publisher<DataBuffer>> result = outputMessage.getBodyWithFlush();
|
||||||
|
@ -110,7 +111,7 @@ public class SseEventHttpMessageWriterTests extends AbstractDataBufferAllocating
|
||||||
public void encodeMultiLineString() {
|
public void encodeMultiLineString() {
|
||||||
Flux<String> source = Flux.just("foo\nbar", "foo\nbaz");
|
Flux<String> source = Flux.just("foo\nbar", "foo\nbaz");
|
||||||
MockServerHttpResponse outputMessage = new MockServerHttpResponse();
|
MockServerHttpResponse outputMessage = new MockServerHttpResponse();
|
||||||
converter.write(source, ResolvableType.forClass(String.class),
|
messageWriter.write(source, ResolvableType.forClass(String.class),
|
||||||
new MediaType("text", "event-stream"), outputMessage);
|
new MediaType("text", "event-stream"), outputMessage);
|
||||||
|
|
||||||
Publisher<Publisher<DataBuffer>> result = outputMessage.getBodyWithFlush();
|
Publisher<Publisher<DataBuffer>> result = outputMessage.getBodyWithFlush();
|
||||||
|
@ -134,7 +135,7 @@ public class SseEventHttpMessageWriterTests extends AbstractDataBufferAllocating
|
||||||
Flux<Pojo> source = Flux.just(new Pojo("foofoo", "barbar"),
|
Flux<Pojo> source = Flux.just(new Pojo("foofoo", "barbar"),
|
||||||
new Pojo("foofoofoo", "barbarbar"));
|
new Pojo("foofoofoo", "barbarbar"));
|
||||||
MockServerHttpResponse outputMessage = new MockServerHttpResponse();
|
MockServerHttpResponse outputMessage = new MockServerHttpResponse();
|
||||||
converter.write(source, ResolvableType.forClass(Pojo.class),
|
messageWriter.write(source, ResolvableType.forClass(Pojo.class),
|
||||||
new MediaType("text", "event-stream"), outputMessage);
|
new MediaType("text", "event-stream"), outputMessage);
|
||||||
|
|
||||||
Publisher<Publisher<DataBuffer>> result = outputMessage.getBodyWithFlush();
|
Publisher<Publisher<DataBuffer>> result = outputMessage.getBodyWithFlush();
|
Loading…
Reference in New Issue