Add first cut of SockJS server support

This commit is contained in:
Rossen Stoyanchev 2013-04-01 15:39:04 -04:00
parent 30ab5953f9
commit 88447e503b
57 changed files with 3220 additions and 50 deletions

View File

@ -526,6 +526,9 @@ project("spring-websocket") {
optional("org.eclipse.jetty:jetty-websocket:8.1.10.v20130312")
optional("org.glassfish.tyrus:tyrus-websocket-core:1.0-SNAPSHOT")
optional("com.fasterxml.jackson.core:jackson-databind:2.0.1")
}
repositories {

View File

@ -19,6 +19,7 @@ import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import org.springframework.http.Cookies;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpInputMessage;
import org.springframework.util.Assert;
@ -35,6 +36,8 @@ public class MockHttpInputMessage implements HttpInputMessage {
private final InputStream body;
private final Cookies cookies = new Cookies();
public MockHttpInputMessage(byte[] contents) {
this.body = (contents != null) ? new ByteArrayInputStream(contents) : null;
@ -53,4 +56,8 @@ public class MockHttpInputMessage implements HttpInputMessage {
return this.body;
}
@Override
public Cookies getCookies() {
return this.cookies ;
}
}

View File

@ -21,6 +21,7 @@ import java.io.OutputStream;
import java.io.UnsupportedEncodingException;
import java.nio.charset.Charset;
import org.springframework.http.Cookies;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpOutputMessage;
@ -38,6 +39,7 @@ public class MockHttpOutputMessage implements HttpOutputMessage {
private final ByteArrayOutputStream body = new ByteArrayOutputStream();
private final Cookies cookies = new Cookies();
/**
* Return the headers.
@ -83,4 +85,9 @@ public class MockHttpOutputMessage implements HttpOutputMessage {
}
}
@Override
public Cookies getCookies() {
return this.cookies;
}
}

View File

@ -0,0 +1,25 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.http;
public interface Cookie {
String getName();
String getValue();
}

View File

@ -0,0 +1,59 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.http;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
public class Cookies {
private final List<Cookie> cookies;
public Cookies() {
this.cookies = new ArrayList<Cookie>();
}
private Cookies(Cookies cookies) {
this.cookies = Collections.unmodifiableList(cookies.getCookies());
}
public static Cookies readOnlyCookies(Cookies cookies) {
return new Cookies(cookies);
}
public List<Cookie> getCookies() {
return this.cookies;
}
public Cookie getCookie(String name) {
for (Cookie c : this.cookies) {
if (c.getName().equals(name)) {
return c;
}
}
return null;
}
public Cookie addCookie(String name, String value) {
DefaultCookie cookie = new DefaultCookie(name, value);
this.cookies.add(cookie);
return cookie;
}
}

View File

@ -0,0 +1,40 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.http;
import org.springframework.util.Assert;
public class DefaultCookie implements Cookie {
private final String name;
private final String value;
DefaultCookie(String name, String value) {
Assert.hasText(name, "cookie name must not be empty");
this.name = name;
this.value = value;
}
public String getName() {
return name;
}
public String getValue() {
return value;
}
}

View File

@ -31,4 +31,9 @@ public interface HttpMessage {
*/
HttpHeaders getHeaders();
/**
* TODO ..
*/
Cookies getCookies();
}

View File

@ -19,6 +19,7 @@ package org.springframework.http.client;
import java.io.IOException;
import java.io.OutputStream;
import org.springframework.http.Cookies;
import org.springframework.http.HttpHeaders;
import org.springframework.util.Assert;
@ -44,6 +45,11 @@ public abstract class AbstractClientHttpRequest implements ClientHttpRequest {
return getBodyInternal(this.headers);
}
public Cookies getCookies() {
// TODO
throw new UnsupportedOperationException();
}
public final ClientHttpResponse execute() throws IOException {
checkExecuted();
ClientHttpResponse result = executeInternal(this.headers);

View File

@ -18,6 +18,7 @@ package org.springframework.http.client;
import java.io.IOException;
import org.springframework.http.Cookies;
import org.springframework.http.HttpStatus;
/**
@ -32,4 +33,9 @@ public abstract class AbstractClientHttpResponse implements ClientHttpResponse {
return HttpStatus.valueOf(getRawStatusCode());
}
public Cookies getCookies() {
// TODO
throw new UnsupportedOperationException();
}
}

View File

@ -17,9 +17,9 @@
package org.springframework.http.client;
import java.io.IOException;
import java.io.OutputStream;
import java.net.URI;
import org.springframework.http.Cookies;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.util.Assert;
@ -58,4 +58,9 @@ final class BufferingClientHttpRequestWrapper extends AbstractBufferingClientHtt
return new BufferingClientHttpResponseWrapper(response);
}
@Override
public Cookies getCookies() {
return this.request.getCookies();
}
}

View File

@ -20,9 +20,9 @@ import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import org.springframework.http.Cookies;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.util.FileCopyUtils;
import org.springframework.util.StreamUtils;
/**
@ -67,6 +67,10 @@ final class BufferingClientHttpResponseWrapper implements ClientHttpResponse {
return new ByteArrayInputStream(this.body);
}
public Cookies getCookies() {
return this.response.getCookies();
}
public void close() {
this.response.close();
}

View File

@ -18,6 +18,7 @@ package org.springframework.http.client.support;
import java.net.URI;
import org.springframework.http.Cookies;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.http.HttpRequest;
@ -73,4 +74,11 @@ public class HttpRequestWrapper implements HttpRequest {
return this.request.getHeaders();
}
/**
* Returns the cookies of the wrapped request.
*/
public Cookies getCookies() {
return this.request.getCookies();
}
}

View File

@ -30,6 +30,7 @@ import java.util.Map;
import java.util.Random;
import org.springframework.core.io.Resource;
import org.springframework.http.Cookies;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpInputMessage;
@ -383,6 +384,11 @@ public class FormHttpMessageConverter implements HttpMessageConverter<MultiValue
return this.os;
}
public Cookies getCookies() {
// TODO
throw new UnsupportedOperationException();
}
private void writeHeaders() throws IOException {
if (!this.headersWritten) {
for (Map.Entry<String, List<String>> entry : this.headers.entrySet()) {

View File

@ -0,0 +1,34 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.http.server;
/**
* TODO..
*/
public interface AsyncServerHttpRequest extends ServerHttpRequest {
void setTimeout(long timeout);
void startAsync();
boolean isAsyncStarted();
void completeAsync();
boolean isAsyncCompleted();
}

View File

@ -0,0 +1,139 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.http.server;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.servlet.AsyncContext;
import javax.servlet.AsyncEvent;
import javax.servlet.AsyncListener;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.springframework.util.Assert;
public class AsyncServletServerHttpRequest extends ServletServerHttpRequest
implements AsyncServerHttpRequest, AsyncListener {
private Long timeout;
private AsyncContext asyncContext;
private AtomicBoolean asyncCompleted = new AtomicBoolean(false);
private final List<Runnable> timeoutHandlers = new ArrayList<Runnable>();
private final List<Runnable> completionHandlers = new ArrayList<Runnable>();
private final HttpServletResponse servletResponse;
/**
* Create a new instance for the given request/response pair.
*/
public AsyncServletServerHttpRequest(HttpServletRequest request, HttpServletResponse response) {
super(request);
this.servletResponse = response;
}
/**
* Timeout period begins after the container thread has exited.
*/
public void setTimeout(long timeout) {
Assert.state(!isAsyncStarted(), "Cannot change the timeout with concurrent handling in progress");
this.timeout = timeout;
}
public void addTimeoutHandler(Runnable timeoutHandler) {
this.timeoutHandlers.add(timeoutHandler);
}
public void addCompletionHandler(Runnable runnable) {
this.completionHandlers.add(runnable);
}
public boolean isAsyncStarted() {
return ((this.asyncContext != null) && getServletRequest().isAsyncStarted());
}
/**
* Whether async request processing has completed.
* <p>It is important to avoid use of request and response objects after async
* processing has completed. Servlet containers often re-use them.
*/
public boolean isAsyncCompleted() {
return this.asyncCompleted.get();
}
public void startAsync() {
Assert.state(getServletRequest().isAsyncSupported(),
"Async support must be enabled on a servlet and for all filters involved " +
"in async request processing. This is done in Java code using the Servlet API " +
"or by adding \"<async-supported>true</async-supported>\" to servlet and " +
"filter declarations in web.xml.");
Assert.state(!isAsyncCompleted(), "Async processing has already completed");
if (isAsyncStarted()) {
return;
}
this.asyncContext = getServletRequest().startAsync(getServletRequest(), this.servletResponse);
this.asyncContext.addListener(this);
if (this.timeout != null) {
this.asyncContext.setTimeout(this.timeout);
}
}
public void dispatch() {
Assert.notNull(this.asyncContext, "Cannot dispatch without an AsyncContext");
this.asyncContext.dispatch();
}
public void completeAsync() {
Assert.notNull(this.asyncContext, "Cannot dispatch without an AsyncContext");
if (isAsyncStarted() && !isAsyncCompleted()) {
this.asyncContext.complete();
}
}
// ---------------------------------------------------------------------
// Implementation of AsyncListener methods
// ---------------------------------------------------------------------
public void onStartAsync(AsyncEvent event) throws IOException {
}
public void onError(AsyncEvent event) throws IOException {
}
public void onTimeout(AsyncEvent event) throws IOException {
for (Runnable handler : this.timeoutHandlers) {
handler.run();
}
}
public void onComplete(AsyncEvent event) throws IOException {
for (Runnable handler : this.completionHandlers) {
handler.run();
}
this.asyncContext = null;
this.asyncCompleted.set(true);
}
}

View File

@ -18,6 +18,7 @@ package org.springframework.http.server;
import org.springframework.http.HttpInputMessage;
import org.springframework.http.HttpRequest;
import org.springframework.util.MultiValueMap;
/**
* Represents a server-side HTTP request.
@ -27,4 +28,9 @@ import org.springframework.http.HttpRequest;
*/
public interface ServerHttpRequest extends HttpRequest, HttpInputMessage {
/**
* Returns the map of query parameters. Empty if no query has been set.
*/
MultiValueMap<String, String> getQueryParams();
}

View File

@ -33,12 +33,16 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import javax.servlet.http.Cookie;
import javax.servlet.http.HttpServletRequest;
import org.springframework.http.Cookies;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.http.MediaType;
import org.springframework.util.Assert;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;
/**
* {@link ServerHttpRequest} implementation that is based on a {@link HttpServletRequest}.
@ -58,6 +62,10 @@ public class ServletServerHttpRequest implements ServerHttpRequest {
private HttpHeaders headers;
private Cookies cookies;
private MultiValueMap<String, String> queryParams;
/**
* Construct a new instance of the ServletServerHttpRequest based on the given {@link HttpServletRequest}.
@ -123,6 +131,28 @@ public class ServletServerHttpRequest implements ServerHttpRequest {
return this.headers;
}
public Cookies getCookies() {
if (this.cookies == null) {
this.cookies = new Cookies();
for (Cookie cookie : this.servletRequest.getCookies()) {
this.cookies.addCookie(cookie.getName(), cookie.getValue());
}
}
return this.cookies;
}
public MultiValueMap<String, String> getQueryParams() {
if (this.queryParams == null) {
this.queryParams = new LinkedMultiValueMap<String, String>(this.servletRequest.getParameterMap().size());
for (String name : this.servletRequest.getParameterMap().keySet()) {
for (String value : this.servletRequest.getParameterValues(name)) {
this.queryParams.add(name, value);
}
}
}
return this.queryParams;
}
public InputStream getBody() throws IOException {
if (isFormPost(this.servletRequest)) {
return getBodyFromServletRequestParameters(this.servletRequest);

View File

@ -22,6 +22,8 @@ import java.util.List;
import java.util.Map;
import javax.servlet.http.HttpServletResponse;
import org.springframework.http.Cookie;
import org.springframework.http.Cookies;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.util.Assert;
@ -40,6 +42,8 @@ public class ServletServerHttpResponse implements ServerHttpResponse {
private boolean headersWritten = false;
private final Cookies cookies = new Cookies();
/**
* Construct a new instance of the ServletServerHttpResponse based on the given {@link HttpServletResponse}.
@ -66,12 +70,18 @@ public class ServletServerHttpResponse implements ServerHttpResponse {
return (this.headersWritten ? HttpHeaders.readOnlyHttpHeaders(this.headers) : this.headers);
}
public Cookies getCookies() {
return (this.headersWritten ? Cookies.readOnlyCookies(this.cookies) : this.cookies);
}
public OutputStream getBody() throws IOException {
writeCookies();
writeHeaders();
return this.servletResponse.getOutputStream();
}
public void close() {
writeCookies();
writeHeaders();
}
@ -95,4 +105,13 @@ public class ServletServerHttpResponse implements ServerHttpResponse {
}
}
private void writeCookies() {
if (!this.headersWritten) {
for (Cookie source : this.cookies.getCookies()) {
javax.servlet.http.Cookie target = new javax.servlet.http.Cookie(source.getName(), source.getValue());
target.setPath("/");
this.servletResponse.addCookie(target);
}
}
}
}

View File

@ -31,6 +31,9 @@ public class MockHttpInputMessage implements HttpInputMessage {
private final InputStream body;
private final Cookies cookies = new Cookies();
public MockHttpInputMessage(byte[] contents) {
Assert.notNull(contents, "'contents' must not be null");
this.body = new ByteArrayInputStream(contents);
@ -50,4 +53,9 @@ public class MockHttpInputMessage implements HttpInputMessage {
public InputStream getBody() throws IOException {
return body;
}
@Override
public Cookies getCookies() {
return this.cookies ;
}
}

View File

@ -32,6 +32,9 @@ public class MockHttpOutputMessage implements HttpOutputMessage {
private final ByteArrayOutputStream body = spy(new ByteArrayOutputStream());
private final Cookies cookies = new Cookies();
@Override
public HttpHeaders getHeaders() {
return headers;
@ -50,4 +53,9 @@ public class MockHttpOutputMessage implements HttpOutputMessage {
byte[] bytes = getBodyAsBytes();
return new String(bytes, charset);
}
@Override
public Cookies getCookies() {
return this.cookies;
}
}

View File

@ -29,6 +29,7 @@ import java.util.List;
import org.junit.Before;
import org.junit.Test;
import org.springframework.http.Cookies;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.http.HttpRequest;
@ -253,6 +254,8 @@ public class InterceptingClientHttpRequestFactoryTests {
private boolean executed = false;
private Cookies cookies = new Cookies();
private RequestMock() {
}
@ -289,6 +292,11 @@ public class InterceptingClientHttpRequestFactoryTests {
executed = true;
return responseMock;
}
@Override
public Cookies getCookies() {
return this.cookies ;
}
}
private static class ResponseMock implements ClientHttpResponse {
@ -299,6 +307,8 @@ public class InterceptingClientHttpRequestFactoryTests {
private HttpHeaders headers = new HttpHeaders();
private Cookies cookies = new Cookies();
@Override
public HttpStatus getStatusCode() throws IOException {
return statusCode;
@ -327,5 +337,10 @@ public class InterceptingClientHttpRequestFactoryTests {
@Override
public void close() {
}
@Override
public Cookies getCookies() {
return this.cookies ;
}
}
}

View File

@ -0,0 +1,36 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.sockjs;
/**
*
* @author Rossen Stoyanchev
* @since 4.0
*/
public interface SockJsHandler {
void newSession(SockJsSession session) throws Exception;
void handleMessage(SockJsSession session, String message) throws Exception;
void handleException(SockJsSession session, Throwable exception);
void sessionClosed(SockJsSession session);
}

View File

@ -0,0 +1,43 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.sockjs;
/**
*
* @author Rossen Stoyanchev
* @since 4.0
*/
public class SockJsHandlerAdapter implements SockJsHandler {
@Override
public void newSession(SockJsSession session) throws Exception {
}
@Override
public void handleMessage(SockJsSession session, String message) throws Exception {
}
@Override
public void handleException(SockJsSession session, Throwable exception) {
}
@Override
public void sessionClosed(SockJsSession session) {
}
}

View File

@ -0,0 +1,32 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.sockjs;
/**
*
* @author Rossen Stoyanchev
* @since 4.0
*/
public interface SockJsSession {
void sendMessage(String text) throws Exception;
void close();
}

View File

@ -0,0 +1,128 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.sockjs;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.util.Assert;
/**
* TODO
*
* @author Rossen Stoyanchev
* @since 4.0
*/
public abstract class SockJsSessionSupport implements SockJsSession {
protected Log logger = LogFactory.getLog(this.getClass());
private final String sessionId;
private final SockJsHandler delegate;
private State state = State.NEW;
private long timeCreated = System.currentTimeMillis();
private long timeLastActive = System.currentTimeMillis();
/**
*
* @param sessionId
* @param delegate the recipient of SockJS messages
*/
public SockJsSessionSupport(String sessionId, SockJsHandler delegate) {
Assert.notNull(sessionId, "sessionId is required");
Assert.notNull(delegate, "SockJsHandler is required");
this.sessionId = sessionId;
this.delegate = delegate;
}
public String getId() {
return this.sessionId;
}
public SockJsHandler getSockJsHandler() {
return this.delegate;
}
public boolean isNew() {
return State.NEW.equals(this.state);
}
public boolean isOpen() {
return State.OPEN.equals(this.state);
}
public boolean isClosed() {
return State.CLOSED.equals(this.state);
}
/**
* Polling and Streaming sessions periodically close the current HTTP request and
* wait for the next request to come through. During this "downtime" the session is
* still open but inactive and unable to send messages and therefore has to buffer
* them temporarily. A WebSocket session by contrast is stateful and remain active
* until closed.
*/
public abstract boolean isActive();
/**
* Return the time since the session was last active, or otherwise if the
* session is new, the time since the session was created.
*/
public long getTimeSinceLastActive() {
if (isNew()) {
return (System.currentTimeMillis() - this.timeCreated);
}
else {
return isActive() ? 0 : System.currentTimeMillis() - this.timeLastActive;
}
}
/**
* Should be invoked whenever the session becomes inactive.
*/
protected void updateLastActiveTime() {
this.timeLastActive = System.currentTimeMillis();
}
public void connectionInitialized() throws Exception {
this.state = State.OPEN;
this.delegate.newSession(this);
}
public void delegateMessages(String... messages) throws Exception {
for (String message : messages) {
this.delegate.handleMessage(this, message);
}
}
public void close() {
this.state = State.CLOSED;
}
public String toString() {
return getClass().getSimpleName() + " [id=" + sessionId + "]";
}
private enum State { NEW, OPEN, CLOSED }
}

View File

@ -0,0 +1,86 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.sockjs;
import org.springframework.http.HttpMethod;
/**
*
* @author Rossen Stoyanchev
* @since 4.0
*/
public enum TransportType {
WEBSOCKET("websocket", HttpMethod.GET, false /* CORS ? */),
XHR("xhr", HttpMethod.POST, true),
XHR_SEND("xhr_send", HttpMethod.POST, true),
JSONP("jsonp", HttpMethod.GET, false),
JSONP_SEND("jsonp_send", HttpMethod.POST, false),
XHR_STREAMING("xhr_streaming", HttpMethod.POST, true),
EVENT_SOURCE("eventsource", HttpMethod.GET, false),
HTML_FILE("htmlfile", HttpMethod.GET, false);
private final String value;
private final HttpMethod httpMethod;
private final boolean corsSupported;
private TransportType(String value, HttpMethod httpMethod, boolean supportsCors) {
this.value = value;
this.httpMethod = httpMethod;
this.corsSupported = supportsCors;
}
public String value() {
return this.value;
}
/**
* The HTTP method for this transport.
*/
public HttpMethod getHttpMethod() {
return this.httpMethod;
}
/**
* Are cross-domain requests (CORS) supported?
*/
public boolean isCorsSupported() {
return this.corsSupported;
}
public static TransportType fromValue(String transportValue) {
for (TransportType type : values()) {
if (type.value().equals(transportValue)) {
return type;
}
}
throw new IllegalArgumentException("No matching constant for [" + transportValue + "]");
}
@Override
public String toString() {
return this.value;
}
}

View File

@ -0,0 +1,151 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.sockjs.server;
import java.io.EOFException;
import java.io.IOException;
import java.util.Date;
import java.util.concurrent.ScheduledFuture;
import org.springframework.sockjs.server.SockJsConfiguration;
import org.springframework.sockjs.server.SockJsFrame;
import org.springframework.sockjs.SockJsHandler;
import org.springframework.sockjs.SockJsSession;
import org.springframework.sockjs.SockJsSessionSupport;
import org.springframework.util.Assert;
/**
* Provides partial implementations of {@link SockJsSession} methods to send messages,
* including heartbeat messages and to manage session state.
*
* @author Rossen Stoyanchev
* @since 4.0
*/
public abstract class AbstractServerSession extends SockJsSessionSupport {
private final SockJsConfiguration sockJsConfig;
private ScheduledFuture<?> heartbeatTask;
public AbstractServerSession(String sessionId, SockJsHandler delegate, SockJsConfiguration sockJsConfig) {
super(sessionId, delegate);
Assert.notNull(sockJsConfig, "sockJsConfig is required");
this.sockJsConfig = sockJsConfig;
}
public SockJsConfiguration getSockJsConfig() {
return this.sockJsConfig;
}
public final synchronized void sendMessage(String message) {
Assert.isTrue(!isClosed(), "Cannot send a message, session has been closed");
sendMessageInternal(message);
}
protected abstract void sendMessageInternal(String message);
public final synchronized void close() {
if (!isClosed()) {
logger.debug("Closing session");
// set the status
super.close();
if (isActive()) {
// deliver messages "in flight" before sending close frame
writeFrame(SockJsFrame.closeFrameGoAway());
}
cancelHeartbeat();
closeInternal();
getSockJsHandler().sessionClosed(this);
}
}
protected abstract void closeInternal();
/**
* For internal use within a TransportHandler and the (TransportHandler-specific)
* session sub-class. The frame is written only if the connection is active.
*/
protected void writeFrame(SockJsFrame frame) {
if (logger.isTraceEnabled()) {
logger.trace("Preparing to write " + frame);
}
try {
writeFrameInternal(frame);
}
catch (EOFException ex) {
logger.warn("Failed to send message due to client disconnect. Terminating connection abruptly");
deactivate();
close();
}
catch (Throwable t) {
logger.error("Failed to send message. Terminating connection abruptly", t);
deactivate();
close();
}
}
protected abstract void writeFrameInternal(SockJsFrame frame) throws Exception;
/**
* Some {@link TransportHandler} types cannot detect if a client connection is closed
* or lost and will eventually fail to send messages. When that happens, we need a way
* to disconnect the underlying connection before calling {@link #close()}.
*/
protected abstract void deactivate();
public synchronized void sendHeartbeat() {
if (isActive()) {
writeFrame(SockJsFrame.heartbeatFrame());
scheduleHeartbeat();
}
}
protected void scheduleHeartbeat() {
Assert.notNull(getSockJsConfig().getHeartbeatScheduler(), "heartbeatScheduler not configured");
cancelHeartbeat();
if (!isActive()) {
return;
}
Date time = new Date(System.currentTimeMillis() + getSockJsConfig().getHeartbeatTime());
this.heartbeatTask = getSockJsConfig().getHeartbeatScheduler().schedule(new Runnable() {
public void run() {
sendHeartbeat();
}
}, time);
if (logger.isTraceEnabled()) {
logger.trace("Scheduled heartbeat after " + getSockJsConfig().getHeartbeatTime()/1000 + " seconds");
}
}
protected void cancelHeartbeat() {
if ((this.heartbeatTask != null) && !this.heartbeatTask.isDone()) {
if (logger.isTraceEnabled()) {
logger.trace("Cancelling heartbeat");
}
this.heartbeatTask.cancel(false);
}
this.heartbeatTask = null;
}
}

View File

@ -0,0 +1,431 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.sockjs.server;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.Arrays;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.Random;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.http.HttpMethod;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.server.ServerHttpRequest;
import org.springframework.http.server.ServerHttpResponse;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.sockjs.SockJsHandler;
import org.springframework.sockjs.TransportType;
import org.springframework.sockjs.server.support.DefaultTransportHandlerRegistrar;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;
import org.springframework.util.DigestUtils;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;
import org.springframework.websocket.server.HandshakeRequestHandler;
/**
* Provides support for SockJS configuration options and serves the static SockJS URLs.
*
* @author Rossen Stoyanchev
* @since 4.0
*/
public abstract class AbstractSockJsService implements SockJsConfiguration {
protected final Log logger = LogFactory.getLog(getClass());
private static final int ONE_YEAR = 365 * 24 * 60 * 60;
private String sockJsServiceName = getClass().getSimpleName() + "@" + Integer.toHexString(hashCode());
private String clientLibraryUrl = "https://d1fxtkz8shb9d2.cloudfront.net/sockjs-0.3.4.min.js";
private int streamBytesLimit = 128 * 1024;
private boolean jsessionIdCookieNeeded = true;
private long heartbeatTime = 25 * 1000;
private TaskScheduler heartbeatScheduler;
private long disconnectDelay = 5 * 1000;
private boolean webSocketsEnabled = true;
private HandshakeRequestHandler handshakeRequestHandler;
/**
* Class constructor...
*
*/
public AbstractSockJsService() {
this.heartbeatScheduler = createScheduler("SockJs-heartbeat-");
}
protected TaskScheduler createScheduler(String threadNamePrefix) {
ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
scheduler.setThreadNamePrefix(threadNamePrefix);
return scheduler;
}
/**
* A unique name for the service, possibly the prefix at which it is deployed.
* Used mainly for logging purposes.
*/
public void setSockJsServiceName(String serviceName) {
this.sockJsServiceName = serviceName;
}
/**
* The SockJS service name.
* @see #setSockJsServiceName(String)
*/
public String getSockJsServiceName() {
return this.sockJsServiceName;
}
/**
* Transports which don't support cross-domain communication natively (e.g.
* "eventsource", "htmlfile") rely on serving a simple page (using the
* "foreign" domain) from an invisible iframe. Code run from this iframe
* doesn't need to worry about cross-domain issues since it is running from
* a domain local to the SockJS server. The iframe does need to load the
* SockJS javascript client library and this option allows configuring its
* url.
* <p>
* By default this is set to point to
* "https://d1fxtkz8shb9d2.cloudfront.net/sockjs-0.3.4.min.js".
*/
public AbstractSockJsService setSockJsClientLibraryUrl(String clientLibraryUrl) {
this.clientLibraryUrl = clientLibraryUrl;
return this;
}
/**
* The URL to the SockJS JavaScript client library.
* @see #setSockJsClientLibraryUrl(String)
*/
public String getSockJsClientLibraryUrl() {
return this.clientLibraryUrl;
}
public AbstractSockJsService setStreamBytesLimit(int streamBytesLimit) {
this.streamBytesLimit = streamBytesLimit;
return this;
}
public int getStreamBytesLimit() {
return streamBytesLimit;
}
/**
* Some load balancers do sticky sessions, but only if there is a JSESSIONID
* cookie. Even if it is set to a dummy value, it doesn't matter since
* session information is added by the load balancer.
* <p>
* Set this option to indicate if a JSESSIONID cookie should be created. The
* default value is "true".
*/
public AbstractSockJsService setJsessionIdCookieNeeded(boolean jsessionIdCookieNeeded) {
this.jsessionIdCookieNeeded = jsessionIdCookieNeeded;
return this;
}
/**
* Whether setting JSESSIONID cookie is necessary.
* @see #setJsessionIdCookieNeeded(boolean)
*/
public boolean isJsessionIdCookieNeeded() {
return this.jsessionIdCookieNeeded;
}
public AbstractSockJsService setHeartbeatTime(long heartbeatTime) {
this.heartbeatTime = heartbeatTime;
return this;
}
public long getHeartbeatTime() {
return this.heartbeatTime;
}
public TaskScheduler getHeartbeatScheduler() {
return this.heartbeatScheduler;
}
public void setHeartbeatScheduler(TaskScheduler heartbeatScheduler) {
Assert.notNull(heartbeatScheduler, "heartbeatScheduler is required");
this.heartbeatScheduler = heartbeatScheduler;
}
public AbstractSockJsService setDisconnectDelay(long disconnectDelay) {
this.disconnectDelay = disconnectDelay;
return this;
}
public long getDisconnectDelay() {
return this.disconnectDelay;
}
/**
* Some load balancers don't support websockets. This option can be used to
* disable the WebSocket transport on the server side.
* <p>
* The default value is "true".
*/
public AbstractSockJsService setWebSocketsEnabled(boolean webSocketsEnabled) {
this.webSocketsEnabled = webSocketsEnabled;
return this;
}
/**
* Whether WebSocket transport is enabled.
* @see #setWebSocketsEnabled(boolean)
*/
public boolean isWebSocketsEnabled() {
return this.webSocketsEnabled;
}
/**
* SockJS exposes an entry point at "/websocket" for raw WebSocket
* communication without additional custom framing, e.g. no open frame, no
* heartbeats, only raw WebSocket protocol. This property allows setting a
* handler for requests for raw WebSocket communication.
*/
public AbstractSockJsService setWebsocketHandler(HandshakeRequestHandler handshakeRequestHandler) {
this.handshakeRequestHandler = handshakeRequestHandler;
return this;
}
/**
* TODO
*
* @param request
* @param response
* @param sockJsPath
*
* @throws Exception
*/
public final void handleRequest(ServerHttpRequest request, ServerHttpResponse response, String sockJsPath)
throws Exception {
logger.debug(request.getMethod() + " [" + sockJsPath + "]");
try {
request.getHeaders();
}
catch (IllegalArgumentException ex) {
// Ignore invalid Content-Type (TODO!!)
}
if (sockJsPath.equals("") || sockJsPath.equals("/")) {
response.getHeaders().setContentType(new MediaType("text", "plain", Charset.forName("UTF-8")));
response.getBody().write("Welcome to SockJS!\n".getBytes("UTF-8"));
return;
}
else if (sockJsPath.equals("/info")) {
this.infoHandler.handle(request, response);
return;
}
else if (sockJsPath.matches("/iframe[0-9-.a-z_]*.html")) {
this.iframeHandler.handle(request, response);
return;
}
else if (sockJsPath.equals("/websocket")) {
Assert.notNull(this.handshakeRequestHandler, "No handler for raw Websockets configured");
this.handshakeRequestHandler.doHandshake(request, response);
return;
}
String[] pathSegments = StringUtils.tokenizeToStringArray(sockJsPath.substring(1), "/");
if (pathSegments.length != 3) {
logger.debug("Expected /{server}/{session}/{transport} but got " + sockJsPath);
response.setStatusCode(HttpStatus.NOT_FOUND);
return;
}
String serverId = pathSegments[0];
String sessionId = pathSegments[1];
String transport = pathSegments[2];
if (!validateRequest(serverId, sessionId, transport)) {
response.setStatusCode(HttpStatus.NOT_FOUND);
return;
}
handleRequestInternal(request, response, sessionId, TransportType.fromValue(transport));
}
protected boolean validateRequest(String serverId, String sessionId, String transport) {
if (!StringUtils.hasText(serverId) || !StringUtils.hasText(sessionId) || !StringUtils.hasText(transport)) {
logger.debug("Empty server, session, or transport value");
return false;
}
// Server and session id's must not contain "."
if (serverId.contains(".") || sessionId.contains(".")) {
logger.debug("Server or session contain a \".\"");
return false;
}
if (!isWebSocketsEnabled() && transport.equals(TransportType.WEBSOCKET.value())) {
logger.debug("Websocket transport is disabled");
return false;
}
return true;
}
protected abstract void handleRequestInternal(ServerHttpRequest request, ServerHttpResponse response,
String sessionId, TransportType transportType) throws Exception;
protected void addCorsHeaders(ServerHttpRequest request, ServerHttpResponse response, HttpMethod... httpMethods) {
String origin = request.getHeaders().getFirst("origin");
origin = ((origin == null) || origin.equals("null")) ? "*" : origin;
response.getHeaders().add("Access-Control-Allow-Origin", origin);
response.getHeaders().add("Access-Control-Allow-Credentials", "true");
List<String> accessControllerHeaders = request.getHeaders().get("Access-Control-Request-Headers");
if (accessControllerHeaders != null) {
for (String header : accessControllerHeaders) {
response.getHeaders().add("Access-Control-Allow-Headers", header);
}
}
if (!ObjectUtils.isEmpty(httpMethods)) {
response.getHeaders().add("Access-Control-Allow-Methods", StringUtils.arrayToDelimitedString(httpMethods, ", "));
response.getHeaders().add("Access-Control-Max-Age", String.valueOf(ONE_YEAR));
}
}
protected void addCacheHeaders(ServerHttpResponse response) {
response.getHeaders().setCacheControl("public, max-age=" + ONE_YEAR);
response.getHeaders().setExpires(new Date().getTime() + ONE_YEAR * 1000);
}
protected void addNoCacheHeaders(ServerHttpResponse response) {
response.getHeaders().setCacheControl("no-store, no-cache, must-revalidate, max-age=0");
}
protected void sendMethodNotAllowed(ServerHttpResponse response, List<HttpMethod> httpMethods) throws IOException {
logger.debug("Sending Method Not Allowed (405)");
response.setStatusCode(HttpStatus.METHOD_NOT_ALLOWED);
response.getHeaders().setAllow(new HashSet<HttpMethod>(httpMethods));
response.getBody(); // ensure headers are flushed (TODO!)
}
private interface SockJsRequestHandler {
void handle(ServerHttpRequest request, ServerHttpResponse response) throws Exception;
}
private static final Random random = new Random();
private final SockJsRequestHandler infoHandler = new SockJsRequestHandler() {
private static final String INFO_CONTENT =
"{\"entropy\":%s,\"origins\":[\"*:*\"],\"cookie_needed\":%s,\"websocket\":%s}";
public void handle(ServerHttpRequest request, ServerHttpResponse response) throws Exception {
if (HttpMethod.GET.equals(request.getMethod())) {
response.getHeaders().setContentType(new MediaType("application", "json", Charset.forName("UTF-8")));
addCorsHeaders(request, response);
addNoCacheHeaders(response);
String content = String.format(INFO_CONTENT, random.nextInt(), isJsessionIdCookieNeeded(), isWebSocketsEnabled());
response.getBody().write(content.getBytes());
}
else if (HttpMethod.OPTIONS.equals(request.getMethod())) {
response.setStatusCode(HttpStatus.NO_CONTENT);
addCorsHeaders(request, response, HttpMethod.GET, HttpMethod.OPTIONS);
addCacheHeaders(response);
response.getBody(); // ensure headers are flushed (TODO!)
}
else {
sendMethodNotAllowed(response, Arrays.asList(HttpMethod.OPTIONS, HttpMethod.GET));
}
}
};
private final SockJsRequestHandler iframeHandler = new SockJsRequestHandler() {
private static final String IFRAME_CONTENT =
"<!DOCTYPE html>\n" +
"<html>\n" +
"<head>\n" +
" <meta http-equiv=\"X-UA-Compatible\" content=\"IE=edge\" />\n" +
" <meta http-equiv=\"Content-Type\" content=\"text/html; charset=UTF-8\" />\n" +
" <script>\n" +
" document.domain = document.domain;\n" +
" _sockjs_onload = function(){SockJS.bootstrap_iframe();};\n" +
" </script>\n" +
" <script src=\"%s\"></script>\n" +
"</head>\n" +
"<body>\n" +
" <h2>Don't panic!</h2>\n" +
" <p>This is a SockJS hidden iframe. It's used for cross domain magic.</p>\n" +
"</body>\n" +
"</html>";
public void handle(ServerHttpRequest request, ServerHttpResponse response) throws Exception {
if (!HttpMethod.GET.equals(request.getMethod())) {
sendMethodNotAllowed(response, Arrays.asList(HttpMethod.GET));
return;
}
String content = String.format(IFRAME_CONTENT, getSockJsClientLibraryUrl());
byte[] contentBytes = content.getBytes(Charset.forName("UTF-8"));
StringBuilder builder = new StringBuilder("\"0");
DigestUtils.appendMd5DigestAsHex(contentBytes, builder);
builder.append('"');
String etagValue = builder.toString();
List<String> ifNoneMatch = request.getHeaders().getIfNoneMatch();
if (!CollectionUtils.isEmpty(ifNoneMatch) && ifNoneMatch.get(0).equals(etagValue)) {
response.setStatusCode(HttpStatus.NOT_MODIFIED);
return;
}
response.getHeaders().setContentType(new MediaType("text", "html", Charset.forName("UTF-8")));
response.getHeaders().setContentLength(contentBytes.length);
addCacheHeaders(response);
response.getHeaders().setETag(etagValue);
response.getBody().write(contentBytes);
}
};
}

View File

@ -0,0 +1,69 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.sockjs.server;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
/**
*
*
* @author Rossen Stoyanchev
* @since 4.0
*/
public interface SockJsConfiguration {
/**
* Streaming transports save responses on the client side and don't free
* memory used by delivered messages. Such transports need to recycle the
* connection once in a while. This property sets a minimum number of bytes
* that can be send over a single HTTP streaming request before it will be
* closed. After that client will open a new request. Setting this value to
* one effectively disables streaming and will make streaming transports to
* behave like polling transports.
* <p>
* The default value is 128K (i.e. 128 * 1024).
*/
public int getStreamBytesLimit();
/**
* The amount of time in milliseconds before a client is considered
* disconnected after not having a receiving connection, i.e. an active
* connection over which the server can send data to the client.
* <p>
* The default value is 5000.
*/
public long getDisconnectDelay();
/**
* The amount of time in milliseconds when the server has not sent any
* messages and after which the server should send a heartbeat frame to the
* client in order to keep the connection from breaking.
* <p>
* The default value is 25,000 (25 seconds).
*/
public long getHeartbeatTime();
/**
* A scheduler instance to use for scheduling heartbeat frames.
* <p>
* By default a {@link ThreadPoolTaskScheduler} with default settings is used.
*/
public TaskScheduler getHeartbeatScheduler();
}

View File

@ -0,0 +1,167 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.sockjs.server;
import java.nio.charset.Charset;
import org.springframework.util.Assert;
import com.fasterxml.jackson.core.io.JsonStringEncoder;
/**
*
*
* @author Rossen Stoyanchev
* @since 4.0
*/
public class SockJsFrame {
private static final SockJsFrame OPEN_FRAME = new SockJsFrame("o");
private static final SockJsFrame HEARTBEAT_FRAME = new SockJsFrame("h");
private static final SockJsFrame CLOSE_GO_AWAY_FRAME = closeFrame(3000, "Go away!");
private static final SockJsFrame CLOSE_ANOTHER_CONNECTION_OPEN = closeFrame(2010, "Another connection still open");
private final String content;
private SockJsFrame(String content) {
this.content = content;
}
public static SockJsFrame openFrame() {
return OPEN_FRAME;
}
public static SockJsFrame heartbeatFrame() {
return HEARTBEAT_FRAME;
}
public static SockJsFrame messageFrame(String... messages) {
return new MessageFrame(messages);
}
public static SockJsFrame closeFrameGoAway() {
return CLOSE_GO_AWAY_FRAME;
}
public static SockJsFrame closeFrameAnotherConnectionOpen() {
return CLOSE_ANOTHER_CONNECTION_OPEN;
}
public static SockJsFrame closeFrame(int code, String reason) {
return new SockJsFrame("c[" + code + ",\"" + reason + "\"]");
}
public String getContent() {
return this.content;
}
public byte[] getContentBytes() {
return this.content.getBytes(Charset.forName("UTF-8"));
}
public String toString() {
String quoted = this.content.replace("\n", "\\n").replace("\r", "\\r");
return "SockJsFrame content='" + quoted + "'";
}
private static class MessageFrame extends SockJsFrame {
public MessageFrame(String... messages) {
super(prepareContent(messages));
}
public static String prepareContent(String... messages) {
Assert.notNull(messages, "messages required");
StringBuilder sb = new StringBuilder();
sb.append("a[");
for (int i=0; i < messages.length; i++) {
sb.append('"');
// TODO: dependency on Jackson
char[] quotedChars = JsonStringEncoder.getInstance().quoteAsString(messages[i]);
sb.append(escapeSockJsCharacters(quotedChars));
sb.append('"');
if (i < messages.length - 1) {
sb.append(',');
}
}
sb.append(']');
return sb.toString();
}
private static String escapeSockJsCharacters(char[] chars) {
StringBuilder result = new StringBuilder();
for (char ch : chars) {
if (isSockJsEscapeCharacter(ch)) {
result.append('\\').append('u');
String hex = Integer.toHexString(ch).toLowerCase();
for (int i = 0; i < (4 - hex.length()); i++) {
result.append('0');
}
result.append(hex);
}
else {
result.append(ch);
}
}
return result.toString();
}
private static boolean isSockJsEscapeCharacter(char ch) {
return (ch >= '\u0000' && ch <= '\u001F') || (ch >= '\u200C' && ch <= '\u200F')
|| (ch >= '\u2028' && ch <= '\u202F') || (ch >= '\u2060' && ch <= '\u206F')
|| (ch >= '\uFFF0' && ch <= '\uFFFF') || (ch >= '\uD800' && ch <= '\uDFFF');
}
}
public interface FrameFormat {
SockJsFrame format(SockJsFrame frame);
}
public static class DefaultFrameFormat implements FrameFormat {
private final String format;
public DefaultFrameFormat(String format) {
Assert.notNull(format, "format is required");
this.format = format;
}
/**
*
* @param format a String with a single %s formatting character where the
* frame content is to be inserted; e.g. "data: %s\r\n\r\n"
* @return new SockJsFrame instance with the formatted content
*/
public SockJsFrame format(SockJsFrame frame) {
String content = String.format(this.format, preProcessContent(frame.getContent()));
return new SockJsFrame(content);
}
protected String preProcessContent(String content) {
return content;
}
}
}

View File

@ -0,0 +1,79 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.sockjs.server;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.sockjs.SockJsHandler;
import org.springframework.websocket.WebSocketSession;
/**
*
* @author Rossen Stoyanchev
* @since 4.0
*/
public class SockJsWebSocketSessionAdapter extends AbstractServerSession {
private static Log logger = LogFactory.getLog(SockJsWebSocketSessionAdapter.class);
private WebSocketSession webSocketSession;
public SockJsWebSocketSessionAdapter(String sessionId, SockJsHandler delegate, SockJsConfiguration sockJsConfig) {
super(sessionId, delegate, sockJsConfig);
}
public void setWebSocketSession(WebSocketSession webSocketSession) throws Exception {
this.webSocketSession = webSocketSession;
scheduleHeartbeat();
connectionInitialized();
}
@Override
public boolean isActive() {
return (this.webSocketSession != null);
}
@Override
public void sendMessageInternal(String message) {
cancelHeartbeat();
writeFrame(SockJsFrame.messageFrame(message));
scheduleHeartbeat();
}
@Override
protected void writeFrameInternal(SockJsFrame frame) throws Exception {
if (logger.isTraceEnabled()) {
logger.trace("Write " + frame);
}
this.webSocketSession.sendText(frame.getContent());
}
@Override
public void closeInternal() {
this.webSocketSession.close();
this.webSocketSession = null;
updateLastActiveTime();
}
@Override
protected void deactivate() {
this.webSocketSession.close();
}
}

View File

@ -0,0 +1,39 @@
/*
* Copyright 2002-2013 the toriginal author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.sockjs.server;
import org.springframework.http.server.ServerHttpRequest;
import org.springframework.http.server.ServerHttpResponse;
import org.springframework.sockjs.SockJsHandler;
import org.springframework.sockjs.SockJsSessionSupport;
import org.springframework.sockjs.TransportType;
/**
*
* @author Rossen Stoyanchev
* @since 4.0
*/
public interface TransportHandler {
TransportType getTransportType();
SockJsSessionSupport createSession(String sessionId, SockJsHandler handler, SockJsConfiguration config);
void handleRequest(ServerHttpRequest request, ServerHttpResponse response, SockJsSessionSupport session)
throws Exception;
}

View File

@ -0,0 +1,28 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.sockjs.server;
/**
*
* @author Rossen Stoyanchev
* @since 4.0
*/
public interface TransportHandlerRegistrar {
void registerTransportHandlers(TransportHandlerRegistry registry);
}

View File

@ -0,0 +1,28 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.sockjs.server;
/**
*
* @author Rossen Stoyanchev
* @since 4.0
*/
public interface TransportHandlerRegistry {
void registerHandler(TransportHandler handler);
}

View File

@ -0,0 +1,93 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.sockjs.server;
import java.io.IOException;
import java.io.InputStream;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.util.StringUtils;
import org.springframework.websocket.WebSocketHandler;
import org.springframework.websocket.WebSocketSession;
import com.fasterxml.jackson.databind.ObjectMapper;
/**
*
* @author Rossen Stoyanchev
* @since 4.0
*/
public class WebSocketSockJsHandlerAdapter implements WebSocketHandler {
private static final Log logger = LogFactory.getLog(WebSocketSockJsHandlerAdapter.class);
private final SockJsWebSocketSessionAdapter sockJsSession;
// TODO: the JSON library used must be configurable
private final ObjectMapper objectMapper = new ObjectMapper();
public WebSocketSockJsHandlerAdapter(SockJsWebSocketSessionAdapter sockJsSession) {
this.sockJsSession = sockJsSession;
}
@Override
public void newSession(WebSocketSession webSocketSession) throws Exception {
logger.debug("WebSocket connection established");
webSocketSession.sendText(SockJsFrame.openFrame().getContent());
this.sockJsSession.setWebSocketSession(webSocketSession);
}
@Override
public void handleTextMessage(WebSocketSession session, String message) throws Exception {
if (logger.isDebugEnabled()) {
logger.debug("Received payload " + message + " for " + sockJsSession);
}
if (StringUtils.isEmpty(message)) {
logger.debug("Ignoring empty payload");
return;
}
try {
String[] messages = this.objectMapper.readValue(message, String[].class);
this.sockJsSession.delegateMessages(messages);
}
catch (IOException e) {
logger.error("Broken data received. Terminating WebSocket connection abruptly", e);
session.close();
}
}
@Override
public void handleBinaryMessage(WebSocketSession session, InputStream message) throws Exception {
// should not happen
throw new UnsupportedOperationException();
}
@Override
public void handleException(WebSocketSession session, Throwable exception) {
exception.printStackTrace();
}
@Override
public void sessionClosed(WebSocketSession session, int statusCode, String reason) throws Exception {
logger.debug("WebSocket connection closed for " + this.sockJsSession);
this.sockJsSession.close();
}
}

View File

@ -0,0 +1,216 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.sockjs.server.support;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.http.Cookie;
import org.springframework.http.HttpMethod;
import org.springframework.http.HttpStatus;
import org.springframework.http.server.ServerHttpRequest;
import org.springframework.http.server.ServerHttpResponse;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.sockjs.SockJsHandler;
import org.springframework.sockjs.SockJsSessionSupport;
import org.springframework.sockjs.TransportType;
import org.springframework.sockjs.server.AbstractSockJsService;
import org.springframework.sockjs.server.TransportHandler;
import org.springframework.sockjs.server.TransportHandlerRegistrar;
import org.springframework.sockjs.server.TransportHandlerRegistry;
import org.springframework.util.Assert;
/**
* TODO
*
* @author Rossen Stoyanchev
* @since 4.0
*/
public class DefaultSockJsService extends AbstractSockJsService implements TransportHandlerRegistry, InitializingBean {
private static final AtomicLong webSocketSessionIdSuffix = new AtomicLong();
private final SockJsHandler sockJsHandler;
private TaskScheduler sessionTimeoutScheduler;
private final Map<String, SockJsSessionSupport> sessions = new ConcurrentHashMap<String, SockJsSessionSupport>();
private final Map<TransportType, TransportHandler> transportHandlers = new HashMap<TransportType, TransportHandler>();
/**
* Class constructor...
*
*/
public DefaultSockJsService(SockJsHandler sockJsHandler) {
Assert.notNull(sockJsHandler, "sockJsHandler is required");
this.sockJsHandler = sockJsHandler;
this.sessionTimeoutScheduler = createScheduler("SockJs-sessionTimeout-");
new DefaultTransportHandlerRegistrar().registerTransportHandlers(this);
}
/**
* A scheduler instance to use for scheduling periodic expires session cleanup.
* <p>
* By default a {@link ThreadPoolTaskScheduler} with default settings is used.
*/
public TaskScheduler getSessionTimeoutScheduler() {
return this.sessionTimeoutScheduler;
}
public void setSessionTimeoutScheduler(TaskScheduler sessionTimeoutScheduler) {
Assert.notNull(sessionTimeoutScheduler, "sessionTimeoutScheduler is required");
this.sessionTimeoutScheduler = sessionTimeoutScheduler;
}
@Override
public void registerHandler(TransportHandler transportHandler) {
Assert.notNull(transportHandler, "transportHandler is required");
this.transportHandlers.put(transportHandler.getTransportType(), transportHandler);
}
public void setTransportHandlerRegistrar(TransportHandlerRegistrar registrar) {
Assert.notNull(registrar, "registrar is required");
this.transportHandlers.clear();
registrar.registerTransportHandlers(this);
}
@Override
public void afterPropertiesSet() throws Exception {
this.sessionTimeoutScheduler.scheduleAtFixedRate(new Runnable() {
public void run() {
try {
int count = sessions.size();
if (logger.isTraceEnabled() && (count != 0)) {
logger.trace("Checking " + count + " session(s) for timeouts [" + getSockJsServiceName() + "]");
}
for (SockJsSessionSupport session : sessions.values()) {
if (session.getTimeSinceLastActive() > getDisconnectDelay()) {
if (logger.isTraceEnabled()) {
logger.trace("Removing " + session + " for [" + getSockJsServiceName() + "]");
}
session.close();
sessions.remove(session.getId());
}
}
if (logger.isTraceEnabled() && (count != 0)) {
logger.trace(sessions.size() + " remaining session(s) [" + getSockJsServiceName() + "]");
}
}
catch (Throwable t) {
logger.error("Failed to complete session timeout checks for [" + getSockJsServiceName() + "]", t);
}
}
}, getDisconnectDelay());
}
@Override
protected void handleRequestInternal(ServerHttpRequest request, ServerHttpResponse response,
String sessionId, TransportType transportType) throws Exception {
TransportHandler transportHandler = this.transportHandlers.get(transportType);
if (transportHandler == null) {
logger.debug("Transport handler not found");
response.setStatusCode(HttpStatus.NOT_FOUND);
return;
}
HttpMethod supportedMethod = transportType.getHttpMethod();
if (!supportedMethod.equals(request.getMethod())) {
if (HttpMethod.OPTIONS.equals(request.getMethod()) && transportType.isCorsSupported()) {
response.setStatusCode(HttpStatus.NO_CONTENT);
addCorsHeaders(request, response, supportedMethod, HttpMethod.OPTIONS);
addCacheHeaders(response);
response.getBody(); // ensure headers are flushed (TODO!)
}
else {
List<HttpMethod> supportedMethods = Arrays.asList(supportedMethod);
if (transportType.isCorsSupported()) {
supportedMethods.add(HttpMethod.OPTIONS);
}
sendMethodNotAllowed(response, supportedMethods);
}
return;
}
SockJsSessionSupport session = getSockJsSession(sessionId, transportHandler);
if (session == null) {
response.setStatusCode(HttpStatus.NOT_FOUND);
return;
}
addNoCacheHeaders(response);
if (isJsessionIdCookieNeeded()) {
Cookie cookie = request.getCookies().getCookie("JSESSIONID");
String jsid = (cookie != null) ? cookie.getValue() : "dummy";
// TODO: Jetty sets Expires header, so bypass Cookie object for now
response.getHeaders().set("Set-Cookie", "JSESSIONID=" + jsid + ";path=/"); // TODO
}
if (transportType.isCorsSupported()) {
addCorsHeaders(request, response);
}
transportHandler.handleRequest(request, response, session);
response.close(); // ensure headers are flushed (TODO !!)
}
public SockJsSessionSupport getSockJsSession(String sessionId, TransportHandler transportHandler) {
TransportType transportType = transportHandler.getTransportType();
// Always create new session for WebSocket requests
sessionId = TransportType.WEBSOCKET.equals(transportType) ?
sessionId + "#" + webSocketSessionIdSuffix.getAndIncrement() : sessionId;
SockJsSessionSupport session = this.sessions.get(sessionId);
if (session != null) {
return session;
}
if (TransportType.XHR_SEND.equals(transportType) || TransportType.JSONP_SEND.equals(transportType)) {
logger.debug(transportType + " did not find session");
return null;
}
synchronized (this.sessions) {
session = this.sessions.get(sessionId);
if (session != null) {
return session;
}
logger.debug("Creating new session with session id \"" + sessionId + "\"");
session = transportHandler.createSession(sessionId, this.sockJsHandler, this);
this.sessions.put(sessionId, session);
return session;
}
}
}

View File

@ -0,0 +1,50 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.sockjs.server.support;
import org.springframework.sockjs.server.TransportHandlerRegistrar;
import org.springframework.sockjs.server.TransportHandlerRegistry;
import org.springframework.sockjs.server.transport.EventSourceTransportHandler;
import org.springframework.sockjs.server.transport.HtmlFileTransportHandler;
import org.springframework.sockjs.server.transport.JsonpPollingTransportHandler;
import org.springframework.sockjs.server.transport.JsonpTransportHandler;
import org.springframework.sockjs.server.transport.XhrPollingTransportHandler;
import org.springframework.sockjs.server.transport.XhrStreamingTransportHandler;
import org.springframework.sockjs.server.transport.XhrTransportHandler;
/**
* TODO
*
* @author Rossen Stoyanchev
* @since 4.0
*/
public class DefaultTransportHandlerRegistrar implements TransportHandlerRegistrar {
public void registerTransportHandlers(TransportHandlerRegistry registry) {
registry.registerHandler(new XhrPollingTransportHandler());
registry.registerHandler(new XhrTransportHandler());
registry.registerHandler(new JsonpPollingTransportHandler());
registry.registerHandler(new JsonpTransportHandler());
registry.registerHandler(new XhrStreamingTransportHandler());
registry.registerHandler(new EventSourceTransportHandler());
registry.registerHandler(new HtmlFileTransportHandler());
}
}

View File

@ -0,0 +1,93 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.sockjs.server.transport;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.Arrays;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.server.ServerHttpRequest;
import org.springframework.http.server.ServerHttpResponse;
import org.springframework.sockjs.SockJsHandler;
import org.springframework.sockjs.SockJsSessionSupport;
import org.springframework.sockjs.server.SockJsConfiguration;
import org.springframework.sockjs.server.TransportHandler;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.ObjectMapper;
/**
* TODO
*
* @author Rossen Stoyanchev
* @since 4.0
*/
public abstract class AbstractHttpReceivingTransportHandler implements TransportHandler {
protected final Log logger = LogFactory.getLog(this.getClass());
// TODO: the JSON library used must be configurable
private final ObjectMapper objectMapper = new ObjectMapper();
public ObjectMapper getObjectMapper() {
return this.objectMapper;
}
@Override
public SockJsSessionSupport createSession(String sessionId, SockJsHandler handler, SockJsConfiguration config) {
return null;
}
@Override
public void handleRequest(ServerHttpRequest request, ServerHttpResponse response, SockJsSessionSupport session)
throws Exception {
String[] messages = null;
try {
messages = readMessages(request);
}
catch (JsonMappingException ex) {
response.setStatusCode(HttpStatus.INTERNAL_SERVER_ERROR);
response.getBody().write("Payload expected.".getBytes("UTF-8"));
return;
}
catch (IOException ex) {
response.setStatusCode(HttpStatus.INTERNAL_SERVER_ERROR);
response.getBody().write("Broken JSON encoding.".getBytes("UTF-8"));
return;
}
if (logger.isTraceEnabled()) {
logger.trace("Received messages: " + Arrays.asList(messages));
}
session.delegateMessages(messages);
response.setStatusCode(getResponseStatus());
response.getHeaders().setContentType(new MediaType("text", "plain", Charset.forName("UTF-8")));
}
protected abstract String[] readMessages(ServerHttpRequest request) throws IOException;
protected abstract HttpStatus getResponseStatus();
}

View File

@ -0,0 +1,74 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.sockjs.server.transport;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.http.MediaType;
import org.springframework.http.server.ServerHttpRequest;
import org.springframework.http.server.ServerHttpResponse;
import org.springframework.sockjs.SockJsSessionSupport;
import org.springframework.sockjs.server.SockJsFrame;
import org.springframework.sockjs.server.TransportHandler;
import org.springframework.sockjs.server.SockJsFrame.FrameFormat;
/**
* TODO
*
* @author Rossen Stoyanchev
* @since 4.0
*/
public abstract class AbstractHttpSendingTransportHandler implements TransportHandler {
protected final Log logger = LogFactory.getLog(this.getClass());
@Override
public void handleRequest(ServerHttpRequest request, ServerHttpResponse response, SockJsSessionSupport session)
throws Exception {
AbstractHttpServerSession httpServerSession = (AbstractHttpServerSession) session;
// Set content type before writing
response.getHeaders().setContentType(getContentType());
if (httpServerSession.isNew()) {
handleNewSession(request, response, httpServerSession);
}
else if (httpServerSession.isActive()) {
logger.debug("another " + getTransportType() + " connection still open: " + httpServerSession);
httpServerSession.writeFrame(response.getBody(), SockJsFrame.closeFrameAnotherConnectionOpen());
}
else {
logger.debug("starting " + getTransportType() + " async request");
httpServerSession.setCurrentRequest(request, response, getFrameFormat(request));
}
}
protected void handleNewSession(ServerHttpRequest request, ServerHttpResponse response,
AbstractHttpServerSession session) throws Exception {
logger.debug("Opening " + getTransportType() + " connection");
session.setFrameFormat(getFrameFormat(request));
session.writeFrame(response.getBody(), SockJsFrame.openFrame());
session.connectionInitialized();
}
protected abstract MediaType getContentType();
protected abstract FrameFormat getFrameFormat(ServerHttpRequest request);
}

View File

@ -0,0 +1,147 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.sockjs.server.transport;
import java.io.IOException;
import java.io.OutputStream;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import org.springframework.http.server.AsyncServerHttpRequest;
import org.springframework.http.server.ServerHttpRequest;
import org.springframework.http.server.ServerHttpResponse;
import org.springframework.sockjs.SockJsHandler;
import org.springframework.sockjs.server.AbstractServerSession;
import org.springframework.sockjs.server.SockJsConfiguration;
import org.springframework.sockjs.server.SockJsFrame;
import org.springframework.sockjs.server.TransportHandler;
import org.springframework.sockjs.server.SockJsFrame.FrameFormat;
import org.springframework.util.Assert;
/**
* An abstract base class for use with HTTP-based transports.
*
* @author Rossen Stoyanchev
* @since 4.0
*/
public abstract class AbstractHttpServerSession extends AbstractServerSession {
private FrameFormat frameFormat;
private final BlockingQueue<String> messageCache = new ArrayBlockingQueue<String>(100);
private AsyncServerHttpRequest asyncRequest;
private OutputStream outputStream;
public AbstractHttpServerSession(String sessionId, SockJsHandler delegate, SockJsConfiguration sockJsConfig) {
super(sessionId, delegate, sockJsConfig);
}
public void setFrameFormat(FrameFormat frameFormat) {
this.frameFormat = frameFormat;
}
public synchronized void setCurrentRequest(ServerHttpRequest request, ServerHttpResponse response,
FrameFormat frameFormat) throws IOException {
if (isClosed()) {
logger.debug("connection already closed");
writeFrame(response.getBody(), SockJsFrame.closeFrameGoAway());
return;
}
Assert.isInstanceOf(AsyncServerHttpRequest.class, request, "Expected AsyncServerHttpRequest");
this.asyncRequest = (AsyncServerHttpRequest) request;
this.asyncRequest.setTimeout(-1);
this.asyncRequest.startAsync();
this.outputStream = response.getBody();
this.frameFormat = frameFormat;
scheduleHeartbeat();
tryFlush();
}
public synchronized boolean isActive() {
return ((this.asyncRequest != null) && (!this.asyncRequest.isAsyncCompleted()));
}
protected BlockingQueue<String> getMessageCache() {
return this.messageCache;
}
protected final synchronized void sendMessageInternal(String message) {
// assert close() was not called
// threads: TH-Session-Endpoint or any other thread
this.messageCache.add(message);
tryFlush();
}
private void tryFlush() {
if (isActive() && !getMessageCache().isEmpty()) {
logger.trace("Flushing messages");
flush();
}
}
/**
* Only called if the connection is currently active
*/
protected abstract void flush();
protected void closeInternal() {
resetRequest();
}
protected synchronized void writeFrameInternal(SockJsFrame frame) throws IOException {
if (isActive()) {
writeFrame(this.outputStream, frame);
}
}
/**
* This method may be called by a {@link TransportHandler} to write a frame
* even when the connection is not active, as long as a valid OutputStream
* is provided.
*/
public void writeFrame(OutputStream outputStream, SockJsFrame frame) throws IOException {
frame = this.frameFormat.format(frame);
if (logger.isTraceEnabled()) {
logger.trace("Writing " + frame);
}
outputStream.write(frame.getContentBytes());
}
@Override
protected void deactivate() {
this.outputStream = null;
this.asyncRequest = null;
updateLastActiveTime();
}
protected synchronized void resetRequest() {
if (isActive()) {
this.asyncRequest.completeAsync();
}
this.outputStream = null;
this.asyncRequest = null;
updateLastActiveTime();
}
}

View File

@ -0,0 +1,62 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.sockjs.server.transport;
import java.io.IOException;
import org.springframework.http.server.ServerHttpRequest;
import org.springframework.http.server.ServerHttpResponse;
import org.springframework.sockjs.SockJsHandler;
import org.springframework.sockjs.SockJsSessionSupport;
import org.springframework.sockjs.server.SockJsConfiguration;
/**
* TODO
*
* @author Rossen Stoyanchev
* @since 4.0
*/
public abstract class AbstractStreamingTransportHandler extends AbstractHttpSendingTransportHandler {
@Override
public StreamingHttpServerSession createSession(String sessionId, SockJsHandler handler, SockJsConfiguration config) {
return new StreamingHttpServerSession(sessionId, handler, config);
}
@Override
public void handleRequest(ServerHttpRequest request, ServerHttpResponse response, SockJsSessionSupport session)
throws Exception {
writePrelude(request, response);
super.handleRequest(request, response, session);
}
protected abstract void writePrelude(ServerHttpRequest request, ServerHttpResponse response)
throws IOException;
@Override
protected void handleNewSession(ServerHttpRequest request, ServerHttpResponse response,
AbstractHttpServerSession session) throws IOException, Exception {
super.handleNewSession(request, response, session);
session.setCurrentRequest(request, response, getFrameFormat(request));
}
}

View File

@ -0,0 +1,60 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.sockjs.server.transport;
import java.io.IOException;
import java.nio.charset.Charset;
import org.springframework.http.MediaType;
import org.springframework.http.server.ServerHttpRequest;
import org.springframework.http.server.ServerHttpResponse;
import org.springframework.sockjs.TransportType;
import org.springframework.sockjs.server.SockJsFrame.DefaultFrameFormat;
import org.springframework.sockjs.server.SockJsFrame.FrameFormat;
/**
* TODO
*
* @author Rossen Stoyanchev
* @since 4.0
*/
public class EventSourceTransportHandler extends AbstractStreamingTransportHandler {
@Override
public TransportType getTransportType() {
return TransportType.EVENT_SOURCE;
}
@Override
protected MediaType getContentType() {
return new MediaType("text", "event-stream", Charset.forName("UTF-8"));
}
@Override
protected void writePrelude(ServerHttpRequest request, ServerHttpResponse response) throws IOException {
response.getBody().write('\r');
response.getBody().write('\n');
response.getBody().flush();
}
@Override
protected FrameFormat getFrameFormat(ServerHttpRequest request) {
return new DefaultFrameFormat("data: %s\r\n\r\n");
}
}

View File

@ -0,0 +1,115 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.sockjs.server.transport;
import java.io.IOException;
import java.nio.charset.Charset;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.server.ServerHttpRequest;
import org.springframework.http.server.ServerHttpResponse;
import org.springframework.sockjs.SockJsSessionSupport;
import org.springframework.sockjs.TransportType;
import org.springframework.sockjs.server.SockJsFrame.DefaultFrameFormat;
import org.springframework.sockjs.server.SockJsFrame.FrameFormat;
import org.springframework.util.StringUtils;
import org.springframework.web.util.JavaScriptUtils;
/**
* TODO
*
* @author Rossen Stoyanchev
* @since 4.0
*/
public class HtmlFileTransportHandler extends AbstractStreamingTransportHandler {
private static final String PARTIAL_HTML_CONTENT;
static {
StringBuilder sb = new StringBuilder(
"<!doctype html>\n" +
"<html><head>\n" +
" <meta http-equiv=\"X-UA-Compatible\" content=\"IE=edge\" />\n" +
" <meta http-equiv=\"Content-Type\" content=\"text/html; charset=UTF-8\" />\n" +
"</head><body><h2>Don't panic!</h2>\n" +
" <script>\n" +
" document.domain = document.domain;\n" +
" var c = parent.%s;\n" +
" c.start();\n" +
" function p(d) {c.message(d);};\n" +
" window.onload = function() {c.stop();};\n" +
" </script>"
);
// Safari needs at least 1024 bytes to parse the website.
// http://code.google.com/p/browsersec/wiki/Part2#Survey_of_content_sniffing_behaviors
int spaces = 1024 - sb.length();
for (int i=0; i < spaces; i++) {
sb.append(' ');
}
PARTIAL_HTML_CONTENT = sb.toString();
}
@Override
public TransportType getTransportType() {
return TransportType.HTML_FILE;
}
@Override
protected MediaType getContentType() {
return new MediaType("text", "html", Charset.forName("UTF-8"));
}
@Override
public void handleRequest(ServerHttpRequest request, ServerHttpResponse response, SockJsSessionSupport session)
throws Exception {
String callback = request.getQueryParams().getFirst("c");
if (! StringUtils.hasText(callback)) {
response.setStatusCode(HttpStatus.INTERNAL_SERVER_ERROR);
response.getBody().write("\"callback\" parameter required".getBytes("UTF-8"));
return;
}
super.handleRequest(request, response, session);
}
@Override
protected void writePrelude(ServerHttpRequest request, ServerHttpResponse response) throws IOException {
// we already validated the parameter..
String callback = request.getQueryParams().getFirst("c");
String html = String.format(PARTIAL_HTML_CONTENT, callback);
response.getBody().write(html.getBytes("UTF-8"));
response.getBody().flush();
}
@Override
protected FrameFormat getFrameFormat(ServerHttpRequest request) {
return new DefaultFrameFormat("<script>\np(\"%s\");\n</script>\r\n") {
@Override
protected String preProcessContent(String content) {
return JavaScriptUtils.javaScriptEscape(content);
}
};
}
}

View File

@ -0,0 +1,86 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.sockjs.server.transport;
import java.nio.charset.Charset;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.server.ServerHttpRequest;
import org.springframework.http.server.ServerHttpResponse;
import org.springframework.sockjs.SockJsHandler;
import org.springframework.sockjs.SockJsSessionSupport;
import org.springframework.sockjs.TransportType;
import org.springframework.sockjs.server.SockJsConfiguration;
import org.springframework.sockjs.server.SockJsFrame;
import org.springframework.sockjs.server.SockJsFrame.FrameFormat;
import org.springframework.util.StringUtils;
import org.springframework.web.util.JavaScriptUtils;
/**
* TODO
*
* @author Rossen Stoyanchev
* @since 4.0
*/
public class JsonpPollingTransportHandler extends AbstractHttpSendingTransportHandler {
@Override
public TransportType getTransportType() {
return TransportType.JSONP;
}
@Override
protected MediaType getContentType() {
return new MediaType("application", "javascript", Charset.forName("UTF-8"));
}
@Override
public PollingHttpServerSession createSession(String sessionId, SockJsHandler handler, SockJsConfiguration config) {
return new PollingHttpServerSession(sessionId, handler, config);
}
@Override
public void handleRequest(ServerHttpRequest request, ServerHttpResponse response, SockJsSessionSupport session)
throws Exception {
String callback = request.getQueryParams().getFirst("c");
if (! StringUtils.hasText(callback)) {
response.setStatusCode(HttpStatus.INTERNAL_SERVER_ERROR);
response.getBody().write("\"callback\" parameter required".getBytes("UTF-8"));
return;
}
super.handleRequest(request, response, session);
}
@Override
protected FrameFormat getFrameFormat(ServerHttpRequest request) {
// we already validated the parameter..
String callback = request.getQueryParams().getFirst("c");
return new SockJsFrame.DefaultFrameFormat(callback + "(\"%s\");\r\n") {
@Override
protected String preProcessContent(String content) {
return JavaScriptUtils.javaScriptEscape(content);
}
};
}
}

View File

@ -0,0 +1,68 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.sockjs.server.transport;
import java.io.IOException;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.server.ServerHttpRequest;
import org.springframework.http.server.ServerHttpResponse;
import org.springframework.sockjs.SockJsSessionSupport;
import org.springframework.sockjs.TransportType;
public class JsonpTransportHandler extends AbstractHttpReceivingTransportHandler {
@Override
public TransportType getTransportType() {
return TransportType.JSONP_SEND;
}
@Override
public void handleRequest(ServerHttpRequest request, ServerHttpResponse response,
SockJsSessionSupport sockJsSession) throws Exception {
if (MediaType.APPLICATION_FORM_URLENCODED.equals(request.getHeaders().getContentType())) {
if (request.getQueryParams().getFirst("d") == null) {
response.setStatusCode(HttpStatus.INTERNAL_SERVER_ERROR);
response.getBody().write("Payload expected.".getBytes("UTF-8"));
return;
}
}
super.handleRequest(request, response, sockJsSession);
response.getBody().write("ok".getBytes("UTF-8"));
}
@Override
protected String[] readMessages(ServerHttpRequest request) throws IOException {
if (MediaType.APPLICATION_FORM_URLENCODED.equals(request.getHeaders().getContentType())) {
String d = request.getQueryParams().getFirst("d");
return getObjectMapper().readValue(d, String[].class);
}
else {
return getObjectMapper().readValue(request.getBody(), String[].class);
}
}
@Override
protected HttpStatus getResponseStatus() {
return HttpStatus.OK;
}
}

View File

@ -0,0 +1,44 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.sockjs.server.transport;
import org.springframework.sockjs.SockJsHandler;
import org.springframework.sockjs.server.SockJsConfiguration;
import org.springframework.sockjs.server.SockJsFrame;
public class PollingHttpServerSession extends AbstractHttpServerSession {
public PollingHttpServerSession(String sessionId, SockJsHandler delegate, SockJsConfiguration sockJsConfig) {
super(sessionId, delegate, sockJsConfig);
}
@Override
protected void flush() {
cancelHeartbeat();
String[] messages = getMessageCache().toArray(new String[getMessageCache().size()]);
getMessageCache().clear();
writeFrame(SockJsFrame.messageFrame(messages));
}
@Override
protected void writeFrame(SockJsFrame frame) {
super.writeFrame(frame);
resetRequest();
}
}

View File

@ -0,0 +1,72 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.sockjs.server.transport;
import java.io.IOException;
import java.io.OutputStream;
import org.springframework.sockjs.SockJsHandler;
import org.springframework.sockjs.server.SockJsConfiguration;
import org.springframework.sockjs.server.SockJsFrame;
public class StreamingHttpServerSession extends AbstractHttpServerSession {
private int byteCount;
public StreamingHttpServerSession(String sessionId, SockJsHandler delegate, SockJsConfiguration sockJsConfig) {
super(sessionId, delegate, sockJsConfig);
}
protected void flush() {
cancelHeartbeat();
do {
String message = getMessageCache().poll();
SockJsFrame frame = SockJsFrame.messageFrame(message);
writeFrame(frame);
this.byteCount += frame.getContentBytes().length + 1;
if (logger.isTraceEnabled()) {
logger.trace(this.byteCount + " bytes written, " + getMessageCache().size() + " more messages");
}
if (this.byteCount >= getSockJsConfig().getStreamBytesLimit()) {
if (logger.isTraceEnabled()) {
logger.trace("Streamed bytes limit reached. Recycling current request");
}
resetRequest();
break;
}
} while (!getMessageCache().isEmpty());
scheduleHeartbeat();
}
@Override
protected synchronized void resetRequest() {
super.resetRequest();
this.byteCount = 0;
}
@Override
public void writeFrame(OutputStream outputStream, SockJsFrame frame) throws IOException {
super.writeFrame(outputStream, frame);
outputStream.flush();
}
}

View File

@ -0,0 +1,60 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.sockjs.server.transport;
import org.springframework.http.server.ServerHttpRequest;
import org.springframework.http.server.ServerHttpResponse;
import org.springframework.sockjs.SockJsHandler;
import org.springframework.sockjs.SockJsSessionSupport;
import org.springframework.sockjs.TransportType;
import org.springframework.sockjs.server.SockJsConfiguration;
import org.springframework.sockjs.server.SockJsWebSocketSessionAdapter;
import org.springframework.sockjs.server.TransportHandler;
import org.springframework.sockjs.server.WebSocketSockJsHandlerAdapter;
import org.springframework.websocket.server.HandshakeRequestHandler;
import org.springframework.websocket.server.endpoint.EndpointHandshakeRequestHandler;
/**
*
* @author Rossen Stoyanchev
* @since 4.0
*/
public class WebSocketTransportHandler implements TransportHandler {
@Override
public TransportType getTransportType() {
return TransportType.WEBSOCKET;
}
@Override
public SockJsSessionSupport createSession(String sessionId, SockJsHandler handler, SockJsConfiguration config) {
return new SockJsWebSocketSessionAdapter(sessionId, handler, config);
}
@Override
public void handleRequest(ServerHttpRequest request, ServerHttpResponse response, SockJsSessionSupport session)
throws Exception {
SockJsWebSocketSessionAdapter sockJsSession = (SockJsWebSocketSessionAdapter) session;
WebSocketSockJsHandlerAdapter webSocketHandler = new WebSocketSockJsHandlerAdapter(sockJsSession);
HandshakeRequestHandler handshakeRequestHandler = new EndpointHandshakeRequestHandler(webSocketHandler);
handshakeRequestHandler.doHandshake(request, response);
}
}

View File

@ -0,0 +1,57 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.sockjs.server.transport;
import java.nio.charset.Charset;
import org.springframework.http.MediaType;
import org.springframework.http.server.ServerHttpRequest;
import org.springframework.sockjs.SockJsHandler;
import org.springframework.sockjs.TransportType;
import org.springframework.sockjs.server.SockJsConfiguration;
import org.springframework.sockjs.server.SockJsFrame.DefaultFrameFormat;
import org.springframework.sockjs.server.SockJsFrame.FrameFormat;
/**
* TODO
*
* @author Rossen Stoyanchev
* @since 4.0
*/
public class XhrPollingTransportHandler extends AbstractHttpSendingTransportHandler {
@Override
public TransportType getTransportType() {
return TransportType.XHR;
}
@Override
protected MediaType getContentType() {
return new MediaType("application", "javascript", Charset.forName("UTF-8"));
}
@Override
protected FrameFormat getFrameFormat(ServerHttpRequest request) {
return new DefaultFrameFormat("%s\n");
}
public PollingHttpServerSession createSession(String sessionId, SockJsHandler handler, SockJsConfiguration config) {
return new PollingHttpServerSession(sessionId, handler, config);
}
}

View File

@ -0,0 +1,62 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.sockjs.server.transport;
import java.io.IOException;
import java.nio.charset.Charset;
import org.springframework.http.MediaType;
import org.springframework.http.server.ServerHttpRequest;
import org.springframework.http.server.ServerHttpResponse;
import org.springframework.sockjs.TransportType;
import org.springframework.sockjs.server.SockJsFrame.DefaultFrameFormat;
import org.springframework.sockjs.server.SockJsFrame.FrameFormat;
/**
* TODO
*
* @author Rossen Stoyanchev
* @since 4.0
*/
public class XhrStreamingTransportHandler extends AbstractStreamingTransportHandler {
@Override
public TransportType getTransportType() {
return TransportType.XHR_STREAMING;
}
@Override
protected MediaType getContentType() {
return new MediaType("application", "javascript", Charset.forName("UTF-8"));
}
@Override
protected void writePrelude(ServerHttpRequest request, ServerHttpResponse response) throws IOException {
for (int i=0; i < 2048; i++) {
response.getBody().write('h');
}
response.getBody().write('\n');
response.getBody().flush();
}
@Override
protected FrameFormat getFrameFormat(ServerHttpRequest request) {
return new DefaultFrameFormat("%s\n");
}
}

View File

@ -0,0 +1,42 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.sockjs.server.transport;
import java.io.IOException;
import org.springframework.http.HttpStatus;
import org.springframework.http.server.ServerHttpRequest;
import org.springframework.sockjs.TransportType;
public class XhrTransportHandler extends AbstractHttpReceivingTransportHandler {
@Override
public TransportType getTransportType() {
return TransportType.XHR_SEND;
}
@Override
protected String[] readMessages(ServerHttpRequest request) throws IOException {
return getObjectMapper().readValue(request.getBody(), String[].class);
}
@Override
protected HttpStatus getResponseStatus() {
return HttpStatus.NO_CONTENT;
}
}

View File

@ -26,14 +26,14 @@ import java.io.InputStream;
*/
public interface WebSocketHandler {
void newSession(Session session) throws Exception;
void newSession(WebSocketSession session) throws Exception;
void handleTextMessage(Session session, String message) throws Exception;
void handleTextMessage(WebSocketSession session, String message) throws Exception;
void handleBinaryMessage(Session session, InputStream message) throws Exception;
void handleBinaryMessage(WebSocketSession session, InputStream message) throws Exception;
void handleException(Session session, Throwable exception);
void handleException(WebSocketSession session, Throwable exception);
void sessionClosed(Session session, int statusCode, String reason) throws Exception;
void sessionClosed(WebSocketSession session, int statusCode, String reason) throws Exception;
}

View File

@ -26,23 +26,23 @@ import java.io.InputStream;
public class WebSocketHandlerAdapter implements WebSocketHandler {
@Override
public void newSession(Session session) throws Exception {
public void newSession(WebSocketSession session) throws Exception {
}
@Override
public void handleTextMessage(Session session, String message) throws Exception {
public void handleTextMessage(WebSocketSession session, String message) throws Exception {
}
@Override
public void handleBinaryMessage(Session session, InputStream message) throws Exception {
public void handleBinaryMessage(WebSocketSession session, InputStream message) throws Exception {
}
@Override
public void handleException(Session session, Throwable exception) {
public void handleException(WebSocketSession session, Throwable exception) {
}
@Override
public void sessionClosed(Session session, int statusCode, String reason) throws Exception {
public void sessionClosed(WebSocketSession session, int statusCode, String reason) throws Exception {
}
}

View File

@ -23,10 +23,12 @@ package org.springframework.websocket;
* @author Rossen Stoyanchev
* @since 4.0
*/
public interface Session {
public interface WebSocketSession {
void sendText(String text) throws Exception;
void close(int code, String reason) throws Exception;
void close();
void close(int code, String reason);
}

View File

@ -27,7 +27,7 @@ import javax.websocket.MessageHandler;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.util.Assert;
import org.springframework.websocket.Session;
import org.springframework.websocket.WebSocketSession;
import org.springframework.websocket.WebSocketHandler;
@ -42,7 +42,7 @@ public class StandardWebSocketHandlerAdapter extends Endpoint {
private final WebSocketHandler webSocketHandler;
private final Map<String, Session> sessionMap = new ConcurrentHashMap<String, Session>();
private final Map<String, WebSocketSession> sessionMap = new ConcurrentHashMap<String, WebSocketSession>();
public StandardWebSocketHandlerAdapter(WebSocketHandler webSocketHandler) {
@ -50,13 +50,13 @@ public class StandardWebSocketHandlerAdapter extends Endpoint {
}
@Override
public void onOpen(javax.websocket.Session sourceSession, EndpointConfig config) {
logger.debug("New WebSocket session: " + sourceSession);
public void onOpen(javax.websocket.Session session, EndpointConfig config) {
logger.debug("New WebSocket session: " + session);
try {
Session session = new StandardSessionAdapter(sourceSession);
this.sessionMap.put(sourceSession.getId(), session);
sourceSession.addMessageHandler(new StandardMessageHandler(sourceSession.getId()));
this.webSocketHandler.newSession(session);
WebSocketSession webSocketSession = new WebSocketStandardSessionAdapter(session);
this.sessionMap.put(session.getId(), webSocketSession);
session.addMessageHandler(new StandardMessageHandler(session.getId()));
this.webSocketHandler.newSession(webSocketSession);
}
catch (Throwable ex) {
// TODO
@ -65,18 +65,18 @@ public class StandardWebSocketHandlerAdapter extends Endpoint {
}
@Override
public void onClose(javax.websocket.Session sourceSession, CloseReason closeReason) {
String id = sourceSession.getId();
public void onClose(javax.websocket.Session session, CloseReason closeReason) {
String id = session.getId();
if (logger.isDebugEnabled()) {
logger.debug("Closing session: " + sourceSession + ", " + closeReason);
logger.debug("Closing session: " + session + ", " + closeReason);
}
try {
Session session = getSession(id);
WebSocketSession webSocketSession = getSession(id);
this.sessionMap.remove(id);
int code = closeReason.getCloseCode().getCode();
String reason = closeReason.getReasonPhrase();
session.close(code, reason);
this.webSocketHandler.sessionClosed(session, code, reason);
webSocketSession.close(code, reason);
this.webSocketHandler.sessionClosed(webSocketSession, code, reason);
}
catch (Throwable ex) {
// TODO
@ -85,11 +85,11 @@ public class StandardWebSocketHandlerAdapter extends Endpoint {
}
@Override
public void onError(javax.websocket.Session sourceSession, Throwable exception) {
logger.error("Error for WebSocket session: " + sourceSession.getId(), exception);
public void onError(javax.websocket.Session session, Throwable exception) {
logger.error("Error for WebSocket session: " + session.getId(), exception);
try {
Session session = getSession(sourceSession.getId());
this.webSocketHandler.handleException(session, exception);
WebSocketSession webSocketSession = getSession(session.getId());
this.webSocketHandler.handleException(webSocketSession, exception);
}
catch (Throwable ex) {
// TODO
@ -97,28 +97,28 @@ public class StandardWebSocketHandlerAdapter extends Endpoint {
}
}
private Session getSession(String sourceSessionId) {
Session session = this.sessionMap.get(sourceSessionId);
Assert.notNull(session, "No session");
return session;
private WebSocketSession getSession(String sourceSessionId) {
WebSocketSession webSocketSession = this.sessionMap.get(sourceSessionId);
Assert.notNull(webSocketSession, "No session");
return webSocketSession;
}
private class StandardMessageHandler implements MessageHandler.Whole<String> {
private final String sourceSessionId;
private final String sessionId;
public StandardMessageHandler(String sourceSessionId) {
this.sourceSessionId = sourceSessionId;
public StandardMessageHandler(String sessionId) {
this.sessionId = sessionId;
}
@Override
public void onMessage(String message) {
if (logger.isTraceEnabled()) {
logger.trace("Message for session [" + this.sourceSessionId + "]: " + message);
logger.trace("Message for session [" + this.sessionId + "]: " + message);
}
try {
Session session = getSession(this.sourceSessionId);
WebSocketSession session = getSession(this.sessionId);
StandardWebSocketHandlerAdapter.this.webSocketHandler.handleTextMessage(session, message);
}
catch (Throwable ex) {

View File

@ -18,7 +18,7 @@ package org.springframework.websocket.endpoint;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.websocket.Session;
import org.springframework.websocket.WebSocketSession;
/**
@ -26,26 +26,33 @@ import org.springframework.websocket.Session;
* @author Rossen Stoyanchev
* @since 4.0
*/
public class StandardSessionAdapter implements Session {
public class WebSocketStandardSessionAdapter implements WebSocketSession {
private static Log logger = LogFactory.getLog(StandardSessionAdapter.class);
private static Log logger = LogFactory.getLog(WebSocketStandardSessionAdapter.class);
private javax.websocket.Session sourceSession;
private javax.websocket.Session session;
public StandardSessionAdapter(javax.websocket.Session sourceSession) {
this.sourceSession = sourceSession;
public WebSocketStandardSessionAdapter(javax.websocket.Session session) {
this.session = session;
}
@Override
public void sendText(String text) throws Exception {
logger.trace("Sending text message: " + text);
this.sourceSession.getBasicRemote().sendText(text);
// TODO: check closed
this.session.getBasicRemote().sendText(text);
}
@Override
public void close(int code, String reason) throws Exception {
this.sourceSession = null;
public void close() {
// TODO: delegate with code and reason
this.session = null;
}
@Override
public void close(int code, String reason) {
this.session = null;
}
}

View File

@ -35,6 +35,7 @@ import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.BeanFactoryAware;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;
import org.springframework.web.context.ContextLoader;
import org.springframework.websocket.WebSocketHandler;
import org.springframework.websocket.endpoint.StandardWebSocketHandlerAdapter;
@ -69,6 +70,8 @@ public class EndpointRegistration implements ServerEndpointConfig, BeanFactoryAw
private final Configurator configurator = new Configurator() {};
// ContextLoader.getCurrentWebApplicationContext().getAutowireCapableBeanFactory().createBean(Class<T>)
public EndpointRegistration(String path, String beanName) {
Assert.hasText(path, "path must not be empty");
Assert.notNull(beanName, "beanName is required");