Switch DefaultSockJsService to constructor DI

DefaultSockJsService now relies on constructors and requires a
TaskScheduler at a minimum. It no longer needs lifecycle methods.
This commit is contained in:
Rossen Stoyanchev 2013-04-23 21:35:24 -04:00
parent c28ce0e2bd
commit 36148b7cb1
6 changed files with 150 additions and 159 deletions

View File

@ -129,13 +129,13 @@ public abstract class AbstractServerSockJsSession extends AbstractSockJsSession
}
protected void scheduleHeartbeat() {
Assert.notNull(getSockJsConfig().getHeartbeatScheduler(), "heartbeatScheduler not configured");
Assert.notNull(getSockJsConfig().getTaskScheduler(), "heartbeatScheduler not configured");
cancelHeartbeat();
if (!isActive()) {
return;
}
Date time = new Date(System.currentTimeMillis() + getSockJsConfig().getHeartbeatTime());
this.heartbeatTask = getSockJsConfig().getHeartbeatScheduler().schedule(new Runnable() {
this.heartbeatTask = getSockJsConfig().getTaskScheduler().schedule(new Runnable() {
public void run() {
try {
sendHeartbeat();

View File

@ -25,15 +25,12 @@ import java.util.Random;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.http.HttpMethod;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.server.ServerHttpRequest;
import org.springframework.http.server.ServerHttpResponse;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;
import org.springframework.util.DigestUtils;
@ -49,8 +46,7 @@ import org.springframework.websocket.WebSocketHandler;
* @author Rossen Stoyanchev
* @since 4.0
*/
public abstract class AbstractSockJsService
implements SockJsService, SockJsConfiguration, InitializingBean, DisposableBean {
public abstract class AbstractSockJsService implements SockJsService, SockJsConfiguration {
protected final Log logger = LogFactory.getLog(getClass());
@ -71,17 +67,13 @@ public abstract class AbstractSockJsService
private boolean webSocketsEnabled = true;
private final TaskSchedulerHolder heartbeatSchedulerHolder;
private final TaskScheduler taskScheduler;
public AbstractSockJsService() {
this.heartbeatSchedulerHolder = new TaskSchedulerHolder("SockJs-heartbeat-");
}
public AbstractSockJsService(TaskScheduler heartbeatScheduler) {
Assert.notNull(heartbeatScheduler, "heartbeatScheduler is required");
this.heartbeatSchedulerHolder = new TaskSchedulerHolder(heartbeatScheduler);
public AbstractSockJsService(TaskScheduler scheduler) {
Assert.notNull(scheduler, "scheduler is required");
this.taskScheduler = scheduler;
}
/**
@ -159,8 +151,8 @@ public abstract class AbstractSockJsService
return this.heartbeatTime;
}
public TaskScheduler getHeartbeatScheduler() {
return this.heartbeatSchedulerHolder.getScheduler();
public TaskScheduler getTaskScheduler() {
return this.taskScheduler;
}
/**
@ -199,16 +191,6 @@ public abstract class AbstractSockJsService
return this.webSocketsEnabled;
}
@Override
public void afterPropertiesSet() throws Exception {
this.heartbeatSchedulerHolder.initialize();
}
@Override
public void destroy() throws Exception {
this.heartbeatSchedulerHolder.destroy();
}
/**
* TODO
*
@ -423,46 +405,4 @@ public abstract class AbstractSockJsService
}
};
/**
* Holds an externally provided or an internally managed TaskScheduler. Provides
* initialize and destroy methods have no effect if the scheduler is externally
* managed.
*/
protected static class TaskSchedulerHolder {
private final TaskScheduler taskScheduler;
private final boolean isDefaultTaskScheduler;
public TaskSchedulerHolder(TaskScheduler taskScheduler) {
Assert.notNull(taskScheduler, "taskScheduler is required");
this.taskScheduler = taskScheduler;
this.isDefaultTaskScheduler = false;
}
public TaskSchedulerHolder(String threadNamePrefix) {
ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
scheduler.setThreadNamePrefix(threadNamePrefix);
this.taskScheduler = scheduler;
this.isDefaultTaskScheduler = true;
}
public TaskScheduler getScheduler() {
return this.taskScheduler;
}
public void initialize() {
if (this.isDefaultTaskScheduler) {
((ThreadPoolTaskScheduler) this.taskScheduler).afterPropertiesSet();
}
}
public void destroy() {
if (this.isDefaultTaskScheduler) {
((ThreadPoolTaskScheduler) this.taskScheduler).shutdown();
}
}
}
}

View File

@ -48,10 +48,8 @@ public interface SockJsConfiguration {
public long getHeartbeatTime();
/**
* A scheduler instance to use for scheduling heartbeat frames.
* <p>
* By default a {@link ThreadPoolTaskScheduler} with default settings is used.
* A scheduler instance to use for scheduling heart-beat messages.
*/
public TaskScheduler getHeartbeatScheduler();
public TaskScheduler getTaskScheduler();
}

View File

@ -16,10 +16,15 @@
package org.springframework.sockjs.server.support;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledFuture;
import org.springframework.http.Cookie;
import org.springframework.http.HttpMethod;
@ -41,7 +46,7 @@ import org.springframework.sockjs.server.transport.WebSocketTransportHandler;
import org.springframework.sockjs.server.transport.XhrPollingTransportHandler;
import org.springframework.sockjs.server.transport.XhrStreamingTransportHandler;
import org.springframework.sockjs.server.transport.XhrTransportHandler;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;
import org.springframework.websocket.HandlerProvider;
import org.springframework.websocket.WebSocketHandler;
import org.springframework.websocket.server.DefaultHandshakeHandler;
@ -58,101 +63,58 @@ public class DefaultSockJsService extends AbstractSockJsService {
private final Map<TransportType, TransportHandler> transportHandlers = new HashMap<TransportType, TransportHandler>();
private final Map<TransportType, TransportHandler> transportHandlerOverrides = new HashMap<TransportType, TransportHandler>();
private TaskSchedulerHolder sessionTimeoutSchedulerHolder;
private final Map<String, AbstractSockJsSession> sessions = new ConcurrentHashMap<String, AbstractSockJsSession>();
private ScheduledFuture sessionCleanupTask;
public DefaultSockJsService() {
this.sessionTimeoutSchedulerHolder = new TaskSchedulerHolder("SockJs-sessionTimeout-");
public DefaultSockJsService(TaskScheduler taskScheduler) {
this(taskScheduler, null);
}
public DefaultSockJsService(TaskScheduler heartbeatScheduler, TaskScheduler sessionTimeoutScheduler) {
Assert.notNull(sessionTimeoutScheduler, "sessionTimeoutScheduler is required");
this.sessionTimeoutSchedulerHolder = new TaskSchedulerHolder(sessionTimeoutScheduler);
public DefaultSockJsService(TaskScheduler taskScheduler, Set<TransportHandler> transportHandlers,
TransportHandler... transportHandlerOverrides) {
super(taskScheduler);
transportHandlers = CollectionUtils.isEmpty(transportHandlers) ? getDefaultTransportHandlers() : transportHandlers;
addTransportHandlers(transportHandlers);
addTransportHandlers(Arrays.asList(transportHandlerOverrides));
}
public void setTransportHandlers(TransportHandler... handlers) {
this.transportHandlers.clear();
protected Set<TransportHandler> getDefaultTransportHandlers() {
Set<TransportHandler> result = new HashSet<TransportHandler>();
result.add(new XhrPollingTransportHandler());
result.add(new XhrTransportHandler());
result.add(new JsonpPollingTransportHandler());
result.add(new JsonpTransportHandler());
result.add(new XhrStreamingTransportHandler());
result.add(new EventSourceTransportHandler());
result.add(new HtmlFileTransportHandler());
if (isWebSocketEnabled()) {
try {
result.add(new WebSocketTransportHandler(new DefaultHandshakeHandler()));
}
catch (Exception ex) {
if (logger.isWarnEnabled()) {
logger.warn("Failed to add default WebSocketTransportHandler: " + ex.getMessage());
}
}
}
return result;
}
protected void addTransportHandlers(Collection<TransportHandler> handlers) {
for (TransportHandler handler : handlers) {
if (handler instanceof ConfigurableTransportHandler) {
((ConfigurableTransportHandler) handler).setSockJsConfiguration(this);
}
this.transportHandlers.put(handler.getTransportType(), handler);
}
}
public void setTransportHandlerOverrides(TransportHandler... handlers) {
this.transportHandlerOverrides.clear();
for (TransportHandler handler : handlers) {
this.transportHandlerOverrides.put(handler.getTransportType(), handler);
}
}
@Override
public void afterPropertiesSet() throws Exception {
super.afterPropertiesSet();
if (this.transportHandlers.isEmpty()) {
if (isWebSocketEnabled() && (this.transportHandlerOverrides.get(TransportType.WEBSOCKET) == null)) {
this.transportHandlers.put(TransportType.WEBSOCKET,
new WebSocketTransportHandler(new DefaultHandshakeHandler()));
}
this.transportHandlers.put(TransportType.XHR, new XhrPollingTransportHandler());
this.transportHandlers.put(TransportType.XHR_SEND, new XhrTransportHandler());
this.transportHandlers.put(TransportType.JSONP, new JsonpPollingTransportHandler());
this.transportHandlers.put(TransportType.JSONP_SEND, new JsonpTransportHandler());
this.transportHandlers.put(TransportType.XHR_STREAMING, new XhrStreamingTransportHandler());
this.transportHandlers.put(TransportType.EVENT_SOURCE, new EventSourceTransportHandler());
this.transportHandlers.put(TransportType.HTML_FILE, new HtmlFileTransportHandler());
}
if (!this.transportHandlerOverrides.isEmpty()) {
for (TransportHandler transportHandler : this.transportHandlerOverrides.values()) {
this.transportHandlers.put(transportHandler.getTransportType(), transportHandler);
}
}
for (TransportHandler h : this.transportHandlers.values()) {
if (h instanceof ConfigurableTransportHandler) {
((ConfigurableTransportHandler) h).setSockJsConfiguration(this);
}
}
this.sessionTimeoutSchedulerHolder.initialize();
this.sessionTimeoutSchedulerHolder.getScheduler().scheduleAtFixedRate(new Runnable() {
public void run() {
try {
int count = sessions.size();
if (logger.isTraceEnabled() && (count != 0)) {
logger.trace("Checking " + count + " session(s) for timeouts [" + getName() + "]");
}
for (AbstractSockJsSession session : sessions.values()) {
if (session.getTimeSinceLastActive() > getDisconnectDelay()) {
if (logger.isTraceEnabled()) {
logger.trace("Removing " + session + " for [" + getName() + "]");
}
session.close();
sessions.remove(session.getId());
}
}
if (logger.isTraceEnabled() && (count != 0)) {
logger.trace(sessions.size() + " remaining session(s) [" + getName() + "]");
}
}
catch (Throwable t) {
logger.error("Failed to complete session timeout checks for [" + getName() + "]", t);
}
}
}, getDisconnectDelay());
}
@Override
public void destroy() throws Exception {
super.destroy();
this.sessionTimeoutSchedulerHolder.destroy();
public Map<TransportType, TransportHandler> getTransportHandlers() {
return Collections.unmodifiableMap(this.transportHandlers);
}
@Override
@ -239,6 +201,9 @@ public class DefaultSockJsService extends AbstractSockJsService {
if (session != null) {
return session;
}
if (this.sessionCleanupTask == null) {
scheduleSessionTask();
}
logger.debug("Creating new session with session id \"" + sessionId + "\"");
session = (AbstractSockJsSession) sessionFactory.createSession(sessionId, handler);
this.sessions.put(sessionId, session);
@ -249,4 +214,32 @@ public class DefaultSockJsService extends AbstractSockJsService {
return null;
}
private void scheduleSessionTask() {
this.sessionCleanupTask = getTaskScheduler().scheduleAtFixedRate(new Runnable() {
public void run() {
try {
int count = sessions.size();
if (logger.isTraceEnabled() && (count != 0)) {
logger.trace("Checking " + count + " session(s) for timeouts [" + getName() + "]");
}
for (AbstractSockJsSession session : sessions.values()) {
if (session.getTimeSinceLastActive() > getDisconnectDelay()) {
if (logger.isTraceEnabled()) {
logger.trace("Removing " + session + " for [" + getName() + "]");
}
session.close();
sessions.remove(session.getId());
}
}
if (logger.isTraceEnabled() && (count != 0)) {
logger.trace(sessions.size() + " remaining session(s) [" + getName() + "]");
}
}
catch (Throwable t) {
logger.error("Failed to complete session timeout checks for [" + getName() + "]", t);
}
}
}, getDisconnectDelay());
}
}

View File

@ -15,9 +15,11 @@
*/
package org.springframework.websocket.client;
import java.util.ArrayList;
import java.util.List;
import org.springframework.http.HttpHeaders;
import org.springframework.util.CollectionUtils;
import org.springframework.websocket.HandlerProvider;
import org.springframework.websocket.WebSocketHandler;
import org.springframework.websocket.WebSocketSession;
@ -37,7 +39,7 @@ public class WebSocketConnectionManager extends AbstractWebSocketConnectionManag
private WebSocketSession webSocketSession;
private List<String> subProtocols;
private final List<String> subProtocols = new ArrayList<String>();
public WebSocketConnectionManager(WebSocketClient webSocketClient,
@ -57,7 +59,10 @@ public class WebSocketConnectionManager extends AbstractWebSocketConnectionManag
}
public void setSubProtocols(List<String> subProtocols) {
this.subProtocols = subProtocols;
this.subProtocols.clear();
if (!CollectionUtils.isEmpty(subProtocols)) {
this.subProtocols.addAll(subProtocols);
}
}
public List<String> getSubProtocols() {

View File

@ -0,0 +1,55 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.sockjs.server.support;
import java.util.Map;
import org.junit.Test;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.sockjs.server.TransportHandler;
import org.springframework.sockjs.server.TransportType;
import static org.junit.Assert.*;
/**
* Test fixture for {@link DefaultSockJsService}.
*
* @author Rossen Stoyanchev
*/
public class DefaultSockJsServiceTests {
@Test
public void testDefaultTransportHandlers() {
DefaultSockJsService sockJsService = new DefaultSockJsService(new ThreadPoolTaskScheduler());
Map<TransportType, TransportHandler> handlers = sockJsService.getTransportHandlers();
assertEquals(8, handlers.size());
assertNotNull(handlers.get(TransportType.WEBSOCKET));
assertNotNull(handlers.get(TransportType.XHR));
assertNotNull(handlers.get(TransportType.XHR_SEND));
assertNotNull(handlers.get(TransportType.XHR_STREAMING));
assertNotNull(handlers.get(TransportType.JSONP));
assertNotNull(handlers.get(TransportType.JSONP_SEND));
assertNotNull(handlers.get(TransportType.HTML_FILE));
assertNotNull(handlers.get(TransportType.EVENT_SOURCE));
}
}