Merge c14ca9d199
into b256babad5
This commit is contained in:
commit
abc60f15da
|
@ -16,6 +16,7 @@
|
||||||
|
|
||||||
package org.springframework.web.servlet.config.annotation;
|
package org.springframework.web.servlet.config.annotation;
|
||||||
|
|
||||||
|
import java.time.Duration;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
@ -32,6 +33,7 @@ import org.springframework.web.context.request.async.DeferredResultProcessingInt
|
||||||
* Helps with configuring options for asynchronous request processing.
|
* Helps with configuring options for asynchronous request processing.
|
||||||
*
|
*
|
||||||
* @author Rossen Stoyanchev
|
* @author Rossen Stoyanchev
|
||||||
|
* @author Réda Housni Alaoui
|
||||||
* @since 3.2
|
* @since 3.2
|
||||||
*/
|
*/
|
||||||
public class AsyncSupportConfigurer {
|
public class AsyncSupportConfigurer {
|
||||||
|
@ -44,6 +46,8 @@ public class AsyncSupportConfigurer {
|
||||||
|
|
||||||
private final List<DeferredResultProcessingInterceptor> deferredResultInterceptors = new ArrayList<>();
|
private final List<DeferredResultProcessingInterceptor> deferredResultInterceptors = new ArrayList<>();
|
||||||
|
|
||||||
|
private @Nullable Duration sseHeartbeatPeriod;
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The provided task executor is used for the following:
|
* The provided task executor is used for the following:
|
||||||
|
@ -99,6 +103,14 @@ public class AsyncSupportConfigurer {
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Configure the SSE heartbeat period.
|
||||||
|
* @param sseHeartbeatPeriod The SSE heartbeat period
|
||||||
|
*/
|
||||||
|
public AsyncSupportConfigurer setSseHeartbeatPeriod(Duration sseHeartbeatPeriod) {
|
||||||
|
this.sseHeartbeatPeriod = sseHeartbeatPeriod;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
protected @Nullable AsyncTaskExecutor getTaskExecutor() {
|
protected @Nullable AsyncTaskExecutor getTaskExecutor() {
|
||||||
return this.taskExecutor;
|
return this.taskExecutor;
|
||||||
|
@ -116,4 +128,8 @@ public class AsyncSupportConfigurer {
|
||||||
return this.deferredResultInterceptors;
|
return this.deferredResultInterceptors;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected @Nullable Duration getSseHeartbeatPeriod() {
|
||||||
|
return this.sseHeartbeatPeriod;
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -22,6 +22,7 @@ import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Locale;
|
import java.util.Locale;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.Optional;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
|
@ -676,6 +677,7 @@ public class WebMvcConfigurationSupport implements ApplicationContextAware, Serv
|
||||||
}
|
}
|
||||||
adapter.setCallableInterceptors(configurer.getCallableInterceptors());
|
adapter.setCallableInterceptors(configurer.getCallableInterceptors());
|
||||||
adapter.setDeferredResultInterceptors(configurer.getDeferredResultInterceptors());
|
adapter.setDeferredResultInterceptors(configurer.getDeferredResultInterceptors());
|
||||||
|
Optional.ofNullable(configurer.getSseHeartbeatPeriod()).ifPresent(adapter::setSseHeartbeatPeriod);
|
||||||
|
|
||||||
return adapter;
|
return adapter;
|
||||||
}
|
}
|
||||||
|
|
|
@ -17,11 +17,13 @@
|
||||||
package org.springframework.web.servlet.mvc.method.annotation;
|
package org.springframework.web.servlet.mvc.method.annotation;
|
||||||
|
|
||||||
import java.lang.reflect.Method;
|
import java.lang.reflect.Method;
|
||||||
|
import java.time.Duration;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.LinkedHashMap;
|
import java.util.LinkedHashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.Optional;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.concurrent.Callable;
|
import java.util.concurrent.Callable;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
@ -54,6 +56,8 @@ import org.springframework.http.converter.ByteArrayHttpMessageConverter;
|
||||||
import org.springframework.http.converter.HttpMessageConverter;
|
import org.springframework.http.converter.HttpMessageConverter;
|
||||||
import org.springframework.http.converter.StringHttpMessageConverter;
|
import org.springframework.http.converter.StringHttpMessageConverter;
|
||||||
import org.springframework.http.converter.support.AllEncompassingFormHttpMessageConverter;
|
import org.springframework.http.converter.support.AllEncompassingFormHttpMessageConverter;
|
||||||
|
import org.springframework.scheduling.TaskScheduler;
|
||||||
|
import org.springframework.scheduling.concurrent.SimpleAsyncTaskScheduler;
|
||||||
import org.springframework.ui.ModelMap;
|
import org.springframework.ui.ModelMap;
|
||||||
import org.springframework.util.Assert;
|
import org.springframework.util.Assert;
|
||||||
import org.springframework.util.ClassUtils;
|
import org.springframework.util.ClassUtils;
|
||||||
|
@ -123,6 +127,7 @@ import org.springframework.web.util.WebUtils;
|
||||||
* @author Rossen Stoyanchev
|
* @author Rossen Stoyanchev
|
||||||
* @author Juergen Hoeller
|
* @author Juergen Hoeller
|
||||||
* @author Sebastien Deleuze
|
* @author Sebastien Deleuze
|
||||||
|
* @author Réda Housni Alaoui
|
||||||
* @since 3.1
|
* @since 3.1
|
||||||
* @see HandlerMethodArgumentResolver
|
* @see HandlerMethodArgumentResolver
|
||||||
* @see HandlerMethodReturnValueHandler
|
* @see HandlerMethodReturnValueHandler
|
||||||
|
@ -201,6 +206,9 @@ public class RequestMappingHandlerAdapter extends AbstractHandlerMethodAdapter
|
||||||
|
|
||||||
private final Map<ControllerAdviceBean, Set<Method>> modelAttributeAdviceCache = new LinkedHashMap<>();
|
private final Map<ControllerAdviceBean, Set<Method>> modelAttributeAdviceCache = new LinkedHashMap<>();
|
||||||
|
|
||||||
|
private TaskScheduler taskScheduler = new SimpleAsyncTaskScheduler();
|
||||||
|
|
||||||
|
private @Nullable Duration sseHeartbeatPeriod;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Provide resolvers for custom argument types. Custom resolvers are ordered
|
* Provide resolvers for custom argument types. Custom resolvers are ordered
|
||||||
|
@ -526,6 +534,20 @@ public class RequestMappingHandlerAdapter extends AbstractHandlerMethodAdapter
|
||||||
this.parameterNameDiscoverer = parameterNameDiscoverer;
|
this.parameterNameDiscoverer = parameterNameDiscoverer;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set the {@link TaskScheduler}
|
||||||
|
*/
|
||||||
|
public void setTaskScheduler(TaskScheduler taskScheduler) {
|
||||||
|
this.taskScheduler = taskScheduler;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sets the heartbeat period that will be used to periodically prob the SSE connection health
|
||||||
|
*/
|
||||||
|
public void setSseHeartbeatPeriod(@Nullable Duration sseHeartbeatPeriod) {
|
||||||
|
this.sseHeartbeatPeriod = sseHeartbeatPeriod;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A {@link ConfigurableBeanFactory} is expected for resolving expressions
|
* A {@link ConfigurableBeanFactory} is expected for resolving expressions
|
||||||
* in method argument default values.
|
* in method argument default values.
|
||||||
|
@ -733,9 +755,12 @@ public class RequestMappingHandlerAdapter extends AbstractHandlerMethodAdapter
|
||||||
handlers.add(new ModelAndViewMethodReturnValueHandler());
|
handlers.add(new ModelAndViewMethodReturnValueHandler());
|
||||||
handlers.add(new ModelMethodProcessor());
|
handlers.add(new ModelMethodProcessor());
|
||||||
handlers.add(new ViewMethodReturnValueHandler());
|
handlers.add(new ViewMethodReturnValueHandler());
|
||||||
|
|
||||||
|
SseEmitterHeartbeatExecutor sseEmitterHeartbeatExecutor = Optional.ofNullable(sseHeartbeatPeriod)
|
||||||
|
.map(period -> new SseEmitterHeartbeatExecutor(taskScheduler, period)).orElse(null);
|
||||||
handlers.add(new ResponseBodyEmitterReturnValueHandler(getMessageConverters(),
|
handlers.add(new ResponseBodyEmitterReturnValueHandler(getMessageConverters(),
|
||||||
this.reactiveAdapterRegistry, this.taskExecutor, this.contentNegotiationManager,
|
this.reactiveAdapterRegistry, this.taskExecutor, this.contentNegotiationManager,
|
||||||
initViewResolvers(), initLocaleResolver()));
|
initViewResolvers(), initLocaleResolver(), sseEmitterHeartbeatExecutor));
|
||||||
handlers.add(new StreamingResponseBodyReturnValueHandler());
|
handlers.add(new StreamingResponseBodyReturnValueHandler());
|
||||||
handlers.add(new HttpEntityMethodProcessor(getMessageConverters(),
|
handlers.add(new HttpEntityMethodProcessor(getMessageConverters(),
|
||||||
this.contentNegotiationManager, this.requestResponseBodyAdvice, this.errorResponseInterceptors));
|
this.contentNegotiationManager, this.requestResponseBodyAdvice, this.errorResponseInterceptors));
|
||||||
|
|
|
@ -25,6 +25,7 @@ import java.util.ArrayList;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Locale;
|
import java.util.Locale;
|
||||||
|
import java.util.Optional;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.function.Consumer;
|
import java.util.function.Consumer;
|
||||||
|
|
||||||
|
@ -89,6 +90,7 @@ import org.springframework.web.servlet.view.FragmentsRendering;
|
||||||
* </ul>
|
* </ul>
|
||||||
*
|
*
|
||||||
* @author Rossen Stoyanchev
|
* @author Rossen Stoyanchev
|
||||||
|
* @author Réda Housni Alaoui
|
||||||
* @since 4.2
|
* @since 4.2
|
||||||
*/
|
*/
|
||||||
public class ResponseBodyEmitterReturnValueHandler implements HandlerMethodReturnValueHandler {
|
public class ResponseBodyEmitterReturnValueHandler implements HandlerMethodReturnValueHandler {
|
||||||
|
@ -101,6 +103,8 @@ public class ResponseBodyEmitterReturnValueHandler implements HandlerMethodRetur
|
||||||
|
|
||||||
private final LocaleResolver localeResolver;
|
private final LocaleResolver localeResolver;
|
||||||
|
|
||||||
|
@Nullable
|
||||||
|
private final SseEmitterHeartbeatExecutor sseEmitterHeartbeatExecutor;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Simple constructor with reactive type support based on a default instance of
|
* Simple constructor with reactive type support based on a default instance of
|
||||||
|
@ -143,11 +147,32 @@ public class ResponseBodyEmitterReturnValueHandler implements HandlerMethodRetur
|
||||||
ReactiveAdapterRegistry registry, TaskExecutor executor, ContentNegotiationManager manager,
|
ReactiveAdapterRegistry registry, TaskExecutor executor, ContentNegotiationManager manager,
|
||||||
List<ViewResolver> viewResolvers, @Nullable LocaleResolver localeResolver) {
|
List<ViewResolver> viewResolvers, @Nullable LocaleResolver localeResolver) {
|
||||||
|
|
||||||
|
this(messageConverters, registry, executor, manager, viewResolvers, localeResolver, null);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Constructor that with added arguments for view rendering.
|
||||||
|
* @param messageConverters converters to write emitted objects with
|
||||||
|
* @param registry for reactive return value type support
|
||||||
|
* @param executor for blocking I/O writes of items emitted from reactive types
|
||||||
|
* @param manager for detecting streaming media types
|
||||||
|
* @param viewResolvers resolvers for fragment stream rendering
|
||||||
|
* @param localeResolver the {@link LocaleResolver} for fragment stream rendering
|
||||||
|
* @param sseEmitterHeartbeatExecutor for sending periodic events to SSE clients
|
||||||
|
* @since 6.2
|
||||||
|
*/
|
||||||
|
public ResponseBodyEmitterReturnValueHandler(
|
||||||
|
List<HttpMessageConverter<?>> messageConverters,
|
||||||
|
ReactiveAdapterRegistry registry, TaskExecutor executor, ContentNegotiationManager manager,
|
||||||
|
List<ViewResolver> viewResolvers, @Nullable LocaleResolver localeResolver,
|
||||||
|
@Nullable SseEmitterHeartbeatExecutor sseEmitterHeartbeatExecutor) {
|
||||||
|
|
||||||
Assert.notEmpty(messageConverters, "HttpMessageConverter List must not be empty");
|
Assert.notEmpty(messageConverters, "HttpMessageConverter List must not be empty");
|
||||||
this.sseMessageConverters = initSseConverters(messageConverters);
|
this.sseMessageConverters = initSseConverters(messageConverters);
|
||||||
this.reactiveHandler = new ReactiveTypeHandler(registry, executor, manager, null);
|
this.reactiveHandler = new ReactiveTypeHandler(registry, executor, manager, null);
|
||||||
this.viewResolvers = viewResolvers;
|
this.viewResolvers = viewResolvers;
|
||||||
this.localeResolver = (localeResolver != null ? localeResolver : new AcceptHeaderLocaleResolver());
|
this.localeResolver = (localeResolver != null ? localeResolver : new AcceptHeaderLocaleResolver());
|
||||||
|
this.sseEmitterHeartbeatExecutor = sseEmitterHeartbeatExecutor;
|
||||||
}
|
}
|
||||||
|
|
||||||
private static List<HttpMessageConverter<?>> initSseConverters(List<HttpMessageConverter<?>> converters) {
|
private static List<HttpMessageConverter<?>> initSseConverters(List<HttpMessageConverter<?>> converters) {
|
||||||
|
@ -241,6 +266,9 @@ public class ResponseBodyEmitterReturnValueHandler implements HandlerMethodRetur
|
||||||
}
|
}
|
||||||
|
|
||||||
emitter.initialize(emitterHandler);
|
emitter.initialize(emitterHandler);
|
||||||
|
if (emitter instanceof SseEmitter sseEmitter) {
|
||||||
|
Optional.ofNullable(sseEmitterHeartbeatExecutor).ifPresent(handler -> handler.register(sseEmitter));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -18,14 +18,18 @@ package org.springframework.web.servlet.mvc.method.annotation;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.nio.charset.StandardCharsets;
|
import java.nio.charset.StandardCharsets;
|
||||||
|
import java.time.Duration;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.LinkedHashSet;
|
import java.util.LinkedHashSet;
|
||||||
|
import java.util.Optional;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.concurrent.locks.Lock;
|
import java.util.concurrent.locks.Lock;
|
||||||
import java.util.concurrent.locks.ReentrantLock;
|
import java.util.concurrent.locks.ReentrantLock;
|
||||||
|
|
||||||
import org.jspecify.annotations.Nullable;
|
import org.jspecify.annotations.Nullable;
|
||||||
|
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
import org.springframework.http.HttpHeaders;
|
import org.springframework.http.HttpHeaders;
|
||||||
import org.springframework.http.MediaType;
|
import org.springframework.http.MediaType;
|
||||||
import org.springframework.http.server.ServerHttpResponse;
|
import org.springframework.http.server.ServerHttpResponse;
|
||||||
|
@ -41,10 +45,13 @@ import org.springframework.web.servlet.ModelAndView;
|
||||||
* @author Juergen Hoeller
|
* @author Juergen Hoeller
|
||||||
* @author Sam Brannen
|
* @author Sam Brannen
|
||||||
* @author Brian Clozel
|
* @author Brian Clozel
|
||||||
|
* @author Réda Housni Alaoui
|
||||||
* @since 4.2
|
* @since 4.2
|
||||||
*/
|
*/
|
||||||
public class SseEmitter extends ResponseBodyEmitter {
|
public class SseEmitter extends ResponseBodyEmitter {
|
||||||
|
|
||||||
|
private static final Logger LOGGER = LoggerFactory.getLogger(SseEmitter.class);
|
||||||
|
|
||||||
private static final MediaType TEXT_PLAIN = new MediaType("text", "plain", StandardCharsets.UTF_8);
|
private static final MediaType TEXT_PLAIN = new MediaType("text", "plain", StandardCharsets.UTF_8);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -52,6 +59,8 @@ public class SseEmitter extends ResponseBodyEmitter {
|
||||||
*/
|
*/
|
||||||
private final Lock writeLock = new ReentrantLock();
|
private final Lock writeLock = new ReentrantLock();
|
||||||
|
|
||||||
|
private volatile @Nullable Long lastEmissionNanoTime;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create a new SseEmitter instance.
|
* Create a new SseEmitter instance.
|
||||||
*/
|
*/
|
||||||
|
@ -134,12 +143,31 @@ public class SseEmitter extends ResponseBodyEmitter {
|
||||||
this.writeLock.lock();
|
this.writeLock.lock();
|
||||||
try {
|
try {
|
||||||
super.send(dataToSend);
|
super.send(dataToSend);
|
||||||
|
this.lastEmissionNanoTime = System.nanoTime();
|
||||||
}
|
}
|
||||||
finally {
|
finally {
|
||||||
this.writeLock.unlock();
|
this.writeLock.unlock();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void notifyOfHeartbeatTick(Duration heartbeatPeriod) {
|
||||||
|
boolean skip = Optional.ofNullable(lastEmissionNanoTime)
|
||||||
|
.map(lastEmissionNanoTime -> System.nanoTime() - lastEmissionNanoTime)
|
||||||
|
.map(nanoTimeElapsedSinceLastEmission -> nanoTimeElapsedSinceLastEmission < heartbeatPeriod.toNanos())
|
||||||
|
.orElse(false);
|
||||||
|
if (skip) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
LOGGER.trace("Sending heartbeat to {}", this);
|
||||||
|
SseEmitter.SseEventBuilder eventBuilder = SseEmitter.event().name("ping").data("ping", MediaType.TEXT_PLAIN);
|
||||||
|
try {
|
||||||
|
send(eventBuilder);
|
||||||
|
} catch (IOException | RuntimeException e) {
|
||||||
|
// According to SseEmitter's Javadoc, the container itself will call SseEmitter#completeWithError
|
||||||
|
LOGGER.debug(e.getMessage());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String toString() {
|
public String toString() {
|
||||||
return "SseEmitter@" + ObjectUtils.getIdentityHexString(this);
|
return "SseEmitter@" + ObjectUtils.getIdentityHexString(this);
|
||||||
|
|
|
@ -0,0 +1,89 @@
|
||||||
|
/*
|
||||||
|
* Copyright 2002-2025 the original author or authors.
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* https://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.springframework.web.servlet.mvc.method.annotation;
|
||||||
|
|
||||||
|
|
||||||
|
import java.time.Duration;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
import java.util.concurrent.ScheduledFuture;
|
||||||
|
|
||||||
|
import org.jspecify.annotations.Nullable;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
import org.springframework.scheduling.TaskScheduler;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author Réda Housni Alaoui
|
||||||
|
*/
|
||||||
|
class SseEmitterHeartbeatExecutor {
|
||||||
|
|
||||||
|
private static final Logger LOGGER = LoggerFactory.getLogger(SseEmitterHeartbeatExecutor.class);
|
||||||
|
|
||||||
|
private final TaskScheduler taskScheduler;
|
||||||
|
private final Set<SseEmitter> emitters = ConcurrentHashMap.newKeySet();
|
||||||
|
|
||||||
|
private final Object lifecycleMonitor = new Object();
|
||||||
|
|
||||||
|
private final Duration period;
|
||||||
|
|
||||||
|
@Nullable
|
||||||
|
private volatile ScheduledFuture<?> taskFuture;
|
||||||
|
|
||||||
|
public SseEmitterHeartbeatExecutor(TaskScheduler taskScheduler, Duration period) {
|
||||||
|
this.taskScheduler = taskScheduler;
|
||||||
|
this.period = period;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void register(SseEmitter emitter) {
|
||||||
|
startIfNeeded();
|
||||||
|
|
||||||
|
Runnable closeCallback = () -> emitters.remove(emitter);
|
||||||
|
emitter.onCompletion(closeCallback);
|
||||||
|
emitter.onError(t -> closeCallback.run());
|
||||||
|
emitter.onTimeout(closeCallback);
|
||||||
|
|
||||||
|
emitters.add(emitter);
|
||||||
|
}
|
||||||
|
|
||||||
|
boolean isRegistered(SseEmitter emitter) {
|
||||||
|
return emitters.contains(emitter);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void startIfNeeded() {
|
||||||
|
if (taskFuture != null) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
synchronized (lifecycleMonitor) {
|
||||||
|
if (taskFuture != null) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
taskFuture = taskScheduler.scheduleAtFixedRate(this::notifyEmitters, period);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void notifyEmitters() {
|
||||||
|
LOGGER.atDebug().log(() -> "Notifying %s emitter(s)".formatted(emitters.size()));
|
||||||
|
|
||||||
|
for (SseEmitter emitter : emitters) {
|
||||||
|
if (Thread.currentThread().isInterrupted()) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
emitter.notifyOfHeartbeatTick(period);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,329 @@
|
||||||
|
/*
|
||||||
|
* Copyright 2002-2025 the original author or authors.
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* https://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.springframework.web.servlet.mvc.method.annotation;
|
||||||
|
|
||||||
|
import static org.assertj.core.api.Assertions.*;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.nio.charset.StandardCharsets;
|
||||||
|
import java.time.Duration;
|
||||||
|
import java.time.Instant;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Optional;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.concurrent.Delayed;
|
||||||
|
import java.util.concurrent.ScheduledFuture;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.function.Consumer;
|
||||||
|
|
||||||
|
import org.jetbrains.annotations.NotNull;
|
||||||
|
import org.jspecify.annotations.Nullable;
|
||||||
|
import org.junit.jupiter.api.BeforeEach;
|
||||||
|
import org.junit.jupiter.api.DisplayName;
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
|
import org.springframework.http.MediaType;
|
||||||
|
import org.springframework.scheduling.TaskScheduler;
|
||||||
|
import org.springframework.scheduling.Trigger;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author Réda Housni Alaoui
|
||||||
|
*/
|
||||||
|
class SseEmitterHeartbeatExecutorTests {
|
||||||
|
|
||||||
|
private static final MediaType TEXT_PLAIN_UTF8 = new MediaType("text", "plain", StandardCharsets.UTF_8);
|
||||||
|
|
||||||
|
private TestTaskScheduler taskScheduler;
|
||||||
|
|
||||||
|
@BeforeEach
|
||||||
|
void beforeEach() {
|
||||||
|
this.taskScheduler = new TestTaskScheduler();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
@DisplayName("It sends heartbeat at a fixed rate")
|
||||||
|
void test1() {
|
||||||
|
SseEmitterHeartbeatExecutor executor = new SseEmitterHeartbeatExecutor(taskScheduler, Duration.ofSeconds(5));
|
||||||
|
|
||||||
|
TestEmitter emitter = createEmitter();
|
||||||
|
executor.register(emitter.emitter());
|
||||||
|
assertThat(taskScheduler.fixedRateTask).isNotNull();
|
||||||
|
assertThat(taskScheduler.fixedRatePeriod).isEqualTo(Duration.ofSeconds(5));
|
||||||
|
taskScheduler.fixedRateTask.run();
|
||||||
|
|
||||||
|
emitter.handler.assertSentObjectCount(3);
|
||||||
|
emitter.handler.assertObject(0, "event:ping\ndata:", TEXT_PLAIN_UTF8);
|
||||||
|
emitter.handler.assertObject(1, "ping", MediaType.TEXT_PLAIN);
|
||||||
|
emitter.handler.assertObject(2, "\n\n", TEXT_PLAIN_UTF8);
|
||||||
|
emitter.handler.assertWriteCount(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
@DisplayName("Emitter is unregistered on completion")
|
||||||
|
void test2() {
|
||||||
|
SseEmitterHeartbeatExecutor executor = new SseEmitterHeartbeatExecutor(taskScheduler, Duration.ofSeconds(5));
|
||||||
|
|
||||||
|
TestEmitter emitter = createEmitter();
|
||||||
|
executor.register(emitter.emitter());
|
||||||
|
|
||||||
|
assertThat(executor.isRegistered(emitter.emitter)).isTrue();
|
||||||
|
emitter.emitter.complete();
|
||||||
|
assertThat(executor.isRegistered(emitter.emitter)).isFalse();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
@DisplayName("Emitter is unregistered on error")
|
||||||
|
void test3() {
|
||||||
|
SseEmitterHeartbeatExecutor executor = new SseEmitterHeartbeatExecutor(taskScheduler, Duration.ofSeconds(5));
|
||||||
|
|
||||||
|
TestEmitter emitter = createEmitter();
|
||||||
|
executor.register(emitter.emitter());
|
||||||
|
|
||||||
|
assertThat(executor.isRegistered(emitter.emitter)).isTrue();
|
||||||
|
emitter.emitter.completeWithError(new RuntimeException());
|
||||||
|
assertThat(executor.isRegistered(emitter.emitter)).isFalse();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
@DisplayName("Emitter is unregistered on timeout")
|
||||||
|
void test4() {
|
||||||
|
SseEmitterHeartbeatExecutor executor = new SseEmitterHeartbeatExecutor(taskScheduler, Duration.ofSeconds(5));
|
||||||
|
|
||||||
|
TestEmitter emitter = createEmitter();
|
||||||
|
executor.register(emitter.emitter());
|
||||||
|
|
||||||
|
assertThat(executor.isRegistered(emitter.emitter)).isTrue();
|
||||||
|
emitter.handler.completeWithTimeout();
|
||||||
|
assertThat(executor.isRegistered(emitter.emitter)).isFalse();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
@DisplayName("Emitters are unregistered on executor shutdown")
|
||||||
|
void test5() {
|
||||||
|
SseEmitterHeartbeatExecutor executor = new SseEmitterHeartbeatExecutor(taskScheduler, Duration.ofSeconds(5));
|
||||||
|
|
||||||
|
TestEmitter emitter = createEmitter();
|
||||||
|
executor.register(emitter.emitter());
|
||||||
|
|
||||||
|
assertThat(executor.isRegistered(emitter.emitter)).isTrue();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
@DisplayName("The task never throws")
|
||||||
|
void test6() {
|
||||||
|
SseEmitterHeartbeatExecutor executor = new SseEmitterHeartbeatExecutor(taskScheduler, Duration.ofSeconds(5));
|
||||||
|
|
||||||
|
TestEmitter emitter = createEmitter();
|
||||||
|
executor.register(emitter.emitter());
|
||||||
|
assertThat(taskScheduler.fixedRateTask).isNotNull();
|
||||||
|
emitter.handler.exceptionToThrowOnSend = new RuntimeException();
|
||||||
|
|
||||||
|
assertThatCode(() -> taskScheduler.fixedRateTask.run()).doesNotThrowAnyException();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
@DisplayName("The heartbeat rate can be customized")
|
||||||
|
void test7() {
|
||||||
|
SseEmitterHeartbeatExecutor executor = new SseEmitterHeartbeatExecutor(taskScheduler, Duration.ofSeconds(30));
|
||||||
|
TestEmitter emitter = createEmitter();
|
||||||
|
executor.register(emitter.emitter());
|
||||||
|
assertThat(taskScheduler.fixedRateTask).isNotNull();
|
||||||
|
assertThat(taskScheduler.fixedRatePeriod).isEqualTo(Duration.ofSeconds(30));
|
||||||
|
}
|
||||||
|
|
||||||
|
private TestEmitter createEmitter() {
|
||||||
|
SseEmitter sseEmitter = new SseEmitter();
|
||||||
|
TestEmitterHandler handler = new TestEmitterHandler();
|
||||||
|
try {
|
||||||
|
sseEmitter.initialize(handler);
|
||||||
|
} catch (IOException e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
return new TestEmitter(sseEmitter, handler);
|
||||||
|
}
|
||||||
|
|
||||||
|
private record TestEmitter(SseEmitter emitter, TestEmitterHandler handler) {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
private static class TestEmitterHandler implements ResponseBodyEmitter.Handler {
|
||||||
|
|
||||||
|
private final List<Object> objects = new ArrayList<>();
|
||||||
|
|
||||||
|
private final List<@Nullable MediaType> mediaTypes = new ArrayList<>();
|
||||||
|
|
||||||
|
private final List<Runnable> timeoutCallbacks = new ArrayList<>();
|
||||||
|
private final List<Runnable> completionCallbacks = new ArrayList<>();
|
||||||
|
private final List<Consumer<Throwable>> errorCallbacks = new ArrayList<>();
|
||||||
|
|
||||||
|
private int writeCount;
|
||||||
|
@Nullable
|
||||||
|
private RuntimeException exceptionToThrowOnSend;
|
||||||
|
|
||||||
|
public void assertSentObjectCount(int size) {
|
||||||
|
assertThat(this.objects).hasSize(size);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void assertObject(int index, Object object, MediaType mediaType) {
|
||||||
|
assertThat(index).isLessThanOrEqualTo(this.objects.size());
|
||||||
|
assertThat(this.objects.get(index)).isEqualTo(object);
|
||||||
|
assertThat(this.mediaTypes.get(index)).isEqualTo(mediaType);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void assertWriteCount(int writeCount) {
|
||||||
|
assertThat(this.writeCount).isEqualTo(writeCount);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void send(Object data, @Nullable MediaType mediaType) {
|
||||||
|
failSendIfNeeded();
|
||||||
|
this.objects.add(data);
|
||||||
|
this.mediaTypes.add(mediaType);
|
||||||
|
this.writeCount++;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void send(Set<ResponseBodyEmitter.DataWithMediaType> items) {
|
||||||
|
failSendIfNeeded();
|
||||||
|
for (ResponseBodyEmitter.DataWithMediaType item : items) {
|
||||||
|
this.objects.add(item.getData());
|
||||||
|
this.mediaTypes.add(item.getMediaType());
|
||||||
|
}
|
||||||
|
this.writeCount++;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void failSendIfNeeded() {
|
||||||
|
Optional.ofNullable(exceptionToThrowOnSend)
|
||||||
|
.ifPresent(e -> {
|
||||||
|
throw e;
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onCompletion(Runnable callback) {
|
||||||
|
completionCallbacks.add(callback);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onTimeout(Runnable callback) {
|
||||||
|
timeoutCallbacks.add(callback);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onError(Consumer<Throwable> callback) {
|
||||||
|
errorCallbacks.add(callback);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void complete() {
|
||||||
|
completionCallbacks.forEach(Runnable::run);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void completeWithError(Throwable failure) {
|
||||||
|
errorCallbacks.forEach(consumer -> consumer.accept(failure));
|
||||||
|
}
|
||||||
|
|
||||||
|
public void completeWithTimeout() {
|
||||||
|
timeoutCallbacks.forEach(Runnable::run);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static class TestTaskScheduler implements TaskScheduler {
|
||||||
|
|
||||||
|
@Nullable
|
||||||
|
private Runnable fixedRateTask;
|
||||||
|
@Nullable
|
||||||
|
private Duration fixedRatePeriod;
|
||||||
|
private final TestScheduledFuture<?> fixedRateFuture = new TestScheduledFuture<>();
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ScheduledFuture<?> scheduleAtFixedRate(Runnable task, Duration period) {
|
||||||
|
this.fixedRateTask = task;
|
||||||
|
this.fixedRatePeriod = period;
|
||||||
|
return fixedRateFuture;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ScheduledFuture<?> schedule(Runnable task, Trigger trigger) {
|
||||||
|
throw new UnsupportedOperationException();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ScheduledFuture<?> schedule(Runnable task, Instant startTime) {
|
||||||
|
throw new UnsupportedOperationException();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ScheduledFuture<?> scheduleAtFixedRate(Runnable task, Instant startTime, Duration period) {
|
||||||
|
throw new UnsupportedOperationException();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable task, Instant startTime, Duration delay) {
|
||||||
|
throw new UnsupportedOperationException();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable task, Duration delay) {
|
||||||
|
throw new UnsupportedOperationException();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static class TestScheduledFuture<T> implements ScheduledFuture<T> {
|
||||||
|
|
||||||
|
private boolean canceled;
|
||||||
|
private boolean interrupted;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean cancel(boolean mayInterruptIfRunning) {
|
||||||
|
canceled = true;
|
||||||
|
interrupted = mayInterruptIfRunning;
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long getDelay(@NotNull TimeUnit timeUnit) {
|
||||||
|
throw new UnsupportedOperationException();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int compareTo(@NotNull Delayed delayed) {
|
||||||
|
throw new UnsupportedOperationException();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isCancelled() {
|
||||||
|
throw new UnsupportedOperationException();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isDone() {
|
||||||
|
throw new UnsupportedOperationException();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public T get() {
|
||||||
|
throw new UnsupportedOperationException();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public T get(long l, @NotNull TimeUnit timeUnit) {
|
||||||
|
throw new UnsupportedOperationException();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue