Polishing
This commit is contained in:
parent
5f60d700a1
commit
05d475a275
|
|
@ -450,10 +450,11 @@ public abstract class AbstractMessageListenerContainer extends AbstractJmsListen
|
||||||
/**
|
/**
|
||||||
* Configure the reply destination type. By default, the configured {@code pubSubDomain}
|
* Configure the reply destination type. By default, the configured {@code pubSubDomain}
|
||||||
* value is used (see {@link #isPubSubDomain()}.
|
* value is used (see {@link #isPubSubDomain()}.
|
||||||
* <p>This setting primarily indicates what type of destination to resolve
|
* <p>This setting primarily indicates what type of destination to resolve if dynamic
|
||||||
* if dynamic destinations are enabled.
|
* destinations are enabled.
|
||||||
* @param replyPubSubDomain "true" for the Publish/Subscribe domain ({@link javax.jms.Topic Topics}),
|
* @param replyPubSubDomain "true" for the Publish/Subscribe domain ({@link Topic Topics}),
|
||||||
* "false" for the Point-to-Point domain ({@link javax.jms.Queue Queues})
|
* "false" for the Point-to-Point domain ({@link Queue Queues})
|
||||||
|
* @since 4.2
|
||||||
* @see #setDestinationResolver
|
* @see #setDestinationResolver
|
||||||
*/
|
*/
|
||||||
public void setReplyPubSubDomain(boolean replyPubSubDomain) {
|
public void setReplyPubSubDomain(boolean replyPubSubDomain) {
|
||||||
|
|
@ -462,8 +463,9 @@ public abstract class AbstractMessageListenerContainer extends AbstractJmsListen
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Return whether the Publish/Subscribe domain ({@link javax.jms.Topic Topics}) is used
|
* Return whether the Publish/Subscribe domain ({@link javax.jms.Topic Topics}) is used
|
||||||
* for replies. Otherwise, the Point-to-Point domain ({@link javax.jms.Queue Queues}) is
|
* for replies. Otherwise, the Point-to-Point domain ({@link javax.jms.Queue Queues})
|
||||||
* used.
|
* is used.
|
||||||
|
* @since 4.2
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public boolean isReplyPubSubDomain() {
|
public boolean isReplyPubSubDomain() {
|
||||||
|
|
|
||||||
|
|
@ -21,9 +21,9 @@ import java.util.Set;
|
||||||
/**
|
/**
|
||||||
* A contract for adding and removing user sessions.
|
* A contract for adding and removing user sessions.
|
||||||
*
|
*
|
||||||
* <p>As of 4.2 this interface extends {@link SimpUserRegistry}.
|
* <p>As of 4.2, this interface is replaced by {@link SimpUserRegistry},
|
||||||
* exposing methods to return all registered users as well as to provide more
|
* exposing methods to return all registered users as well as to provide
|
||||||
* extensive information for each user.
|
* more extensive information for each user.
|
||||||
*
|
*
|
||||||
* @author Rossen Stoyanchev
|
* @author Rossen Stoyanchev
|
||||||
* @since 4.0
|
* @since 4.0
|
||||||
|
|
|
||||||
|
|
@ -23,7 +23,6 @@ import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
|
||||||
import javax.xml.transform.Source;
|
import javax.xml.transform.Source;
|
||||||
|
|
||||||
import org.springframework.core.ParameterizedTypeReference;
|
import org.springframework.core.ParameterizedTypeReference;
|
||||||
|
|
|
||||||
|
|
@ -218,8 +218,11 @@ public class RequestParamMethodArgumentResolver extends AbstractNamedValueMethod
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean isMultipartFileCollection(MethodParameter parameter) {
|
private boolean isMultipartFileCollection(MethodParameter parameter) {
|
||||||
Class<?> collectionType = getCollectionParameterType(parameter);
|
return (MultipartFile.class == getCollectionParameterType(parameter));
|
||||||
return (collectionType != null && MultipartFile.class == collectionType);
|
}
|
||||||
|
|
||||||
|
private boolean isMultipartFileArray(MethodParameter parameter) {
|
||||||
|
return (MultipartFile.class == parameter.getParameterType().getComponentType());
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean isPartCollection(MethodParameter parameter) {
|
private boolean isPartCollection(MethodParameter parameter) {
|
||||||
|
|
@ -232,11 +235,6 @@ public class RequestParamMethodArgumentResolver extends AbstractNamedValueMethod
|
||||||
return (paramType != null && "javax.servlet.http.Part".equals(paramType.getName()));
|
return (paramType != null && "javax.servlet.http.Part".equals(paramType.getName()));
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean isMultipartFileArray(MethodParameter parameter) {
|
|
||||||
Class<?> paramType = parameter.getParameterType().getComponentType();
|
|
||||||
return (paramType != null && MultipartFile.class == paramType);
|
|
||||||
}
|
|
||||||
|
|
||||||
private Class<?> getCollectionParameterType(MethodParameter parameter) {
|
private Class<?> getCollectionParameterType(MethodParameter parameter) {
|
||||||
Class<?> paramType = parameter.getParameterType();
|
Class<?> paramType = parameter.getParameterType();
|
||||||
if (Collection.class == paramType || List.class.isAssignableFrom(paramType)){
|
if (Collection.class == paramType || List.class.isAssignableFrom(paramType)){
|
||||||
|
|
|
||||||
|
|
@ -135,9 +135,7 @@ public class StompSubProtocolHandler implements SubProtocolHandler, ApplicationE
|
||||||
* Since a STOMP message can be received in multiple WebSocket messages,
|
* Since a STOMP message can be received in multiple WebSocket messages,
|
||||||
* buffering may be required and therefore it is necessary to know the maximum
|
* buffering may be required and therefore it is necessary to know the maximum
|
||||||
* allowed message size.
|
* allowed message size.
|
||||||
*
|
|
||||||
* <p>By default this property is set to 64K.
|
* <p>By default this property is set to 64K.
|
||||||
*
|
|
||||||
* @since 4.0.3
|
* @since 4.0.3
|
||||||
*/
|
*/
|
||||||
public void setMessageSizeLimit(int messageSizeLimit) {
|
public void setMessageSizeLimit(int messageSizeLimit) {
|
||||||
|
|
@ -146,14 +144,12 @@ public class StompSubProtocolHandler implements SubProtocolHandler, ApplicationE
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get the configured message buffer size limit in bytes.
|
* Get the configured message buffer size limit in bytes.
|
||||||
*
|
|
||||||
* @since 4.0.3
|
* @since 4.0.3
|
||||||
*/
|
*/
|
||||||
public int getMessageSizeLimit() {
|
public int getMessageSizeLimit() {
|
||||||
return this.messageSizeLimit;
|
return this.messageSizeLimit;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Provide a registry with which to register active user session ids.
|
* Provide a registry with which to register active user session ids.
|
||||||
* @see org.springframework.messaging.simp.user.UserDestinationMessageHandler
|
* @see org.springframework.messaging.simp.user.UserDestinationMessageHandler
|
||||||
|
|
@ -179,7 +175,6 @@ public class StompSubProtocolHandler implements SubProtocolHandler, ApplicationE
|
||||||
* Configure a {@link MessageHeaderInitializer} to apply to the headers of all
|
* Configure a {@link MessageHeaderInitializer} to apply to the headers of all
|
||||||
* messages created from decoded STOMP frames and other messages sent to the
|
* messages created from decoded STOMP frames and other messages sent to the
|
||||||
* client inbound channel.
|
* client inbound channel.
|
||||||
*
|
|
||||||
* <p>By default this property is not set.
|
* <p>By default this property is not set.
|
||||||
*/
|
*/
|
||||||
public void setHeaderInitializer(MessageHeaderInitializer headerInitializer) {
|
public void setHeaderInitializer(MessageHeaderInitializer headerInitializer) {
|
||||||
|
|
@ -188,7 +183,7 @@ public class StompSubProtocolHandler implements SubProtocolHandler, ApplicationE
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @return the configured header initializer.
|
* Return the configured header initializer.
|
||||||
*/
|
*/
|
||||||
public MessageHeaderInitializer getHeaderInitializer() {
|
public MessageHeaderInitializer getHeaderInitializer() {
|
||||||
return this.headerInitializer;
|
return this.headerInitializer;
|
||||||
|
|
@ -259,7 +254,6 @@ public class StompSubProtocolHandler implements SubProtocolHandler, ApplicationE
|
||||||
try {
|
try {
|
||||||
StompHeaderAccessor headerAccessor =
|
StompHeaderAccessor headerAccessor =
|
||||||
MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
|
MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
|
||||||
|
|
||||||
Principal user = session.getPrincipal();
|
Principal user = session.getPrincipal();
|
||||||
|
|
||||||
headerAccessor.setSessionId(session.getId());
|
headerAccessor.setSessionId(session.getId());
|
||||||
|
|
@ -314,34 +308,37 @@ public class StompSubProtocolHandler implements SubProtocolHandler, ApplicationE
|
||||||
sendErrorMessage(session, ex);
|
sendErrorMessage(session, ex);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
Message<byte[]> message = getErrorHandler().handleClientMessageProcessingError(clientMessage, ex);
|
Message<byte[]> message = getErrorHandler().handleClientMessageProcessingError(clientMessage, ex);
|
||||||
if (message == null) {
|
if (message == null) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
|
StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
|
||||||
Assert.notNull(accessor, "Expected STOMP headers.");
|
Assert.notNull(accessor, "Expected STOMP headers");
|
||||||
sendToClient(session, accessor, message.getPayload());
|
sendToClient(session, accessor, message.getPayload());
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Invoked when no
|
* Invoked when no
|
||||||
* {@link #setErrorHandler(StompSubProtocolErrorHandler) errorHandler} is
|
* {@link #setErrorHandler(StompSubProtocolErrorHandler) errorHandler}
|
||||||
* configured to send an ERROR frame to the client.
|
* is configured to send an ERROR frame to the client.
|
||||||
* @deprecated as of 4.2 this method is deprecated in favor of
|
* @deprecated as of Spring 4.2, in favor of
|
||||||
* {@link #setErrorHandler(StompSubProtocolErrorHandler) configuring} a
|
* {@link #setErrorHandler(StompSubProtocolErrorHandler) configuring}
|
||||||
* {@code StompSubProtocolErrorHandler}.
|
* a {@code StompSubProtocolErrorHandler}
|
||||||
*/
|
*/
|
||||||
@Deprecated
|
@Deprecated
|
||||||
protected void sendErrorMessage(WebSocketSession session, Throwable error) {
|
protected void sendErrorMessage(WebSocketSession session, Throwable error) {
|
||||||
StompHeaderAccessor headerAccessor = StompHeaderAccessor.create(StompCommand.ERROR);
|
StompHeaderAccessor headerAccessor = StompHeaderAccessor.create(StompCommand.ERROR);
|
||||||
headerAccessor.setMessage(error.getMessage());
|
headerAccessor.setMessage(error.getMessage());
|
||||||
|
|
||||||
byte[] bytes = this.stompEncoder.encode(headerAccessor.getMessageHeaders(), EMPTY_PAYLOAD);
|
byte[] bytes = this.stompEncoder.encode(headerAccessor.getMessageHeaders(), EMPTY_PAYLOAD);
|
||||||
try {
|
try {
|
||||||
session.sendMessage(new TextMessage(bytes));
|
session.sendMessage(new TextMessage(bytes));
|
||||||
}
|
}
|
||||||
catch (Throwable ex) {
|
catch (Throwable ex) {
|
||||||
// Could be part of normal workflow (e.g. browser tab closed)
|
// Could be part of normal workflow (e.g. browser tab closed)
|
||||||
logger.debug("Failed to send STOMP ERROR to client.", ex);
|
logger.debug("Failed to send STOMP ERROR to client", ex);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -349,6 +346,7 @@ public class StompSubProtocolHandler implements SubProtocolHandler, ApplicationE
|
||||||
if (this.immutableMessageInterceptorPresent != null) {
|
if (this.immutableMessageInterceptorPresent != null) {
|
||||||
return this.immutableMessageInterceptorPresent;
|
return this.immutableMessageInterceptorPresent;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (channel instanceof AbstractMessageChannel) {
|
if (channel instanceof AbstractMessageChannel) {
|
||||||
for (ChannelInterceptor interceptor : ((AbstractMessageChannel) channel).getInterceptors()) {
|
for (ChannelInterceptor interceptor : ((AbstractMessageChannel) channel).getInterceptors()) {
|
||||||
if (interceptor instanceof ImmutableMessageChannelInterceptor) {
|
if (interceptor instanceof ImmutableMessageChannelInterceptor) {
|
||||||
|
|
@ -366,7 +364,7 @@ public class StompSubProtocolHandler implements SubProtocolHandler, ApplicationE
|
||||||
this.eventPublisher.publishEvent(event);
|
this.eventPublisher.publishEvent(event);
|
||||||
}
|
}
|
||||||
catch (Throwable ex) {
|
catch (Throwable ex) {
|
||||||
logger.error("Error publishing " + event + ".", ex);
|
logger.error("Error publishing " + event, ex);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -380,8 +378,10 @@ public class StompSubProtocolHandler implements SubProtocolHandler, ApplicationE
|
||||||
logger.error("Expected byte[] payload. Ignoring " + message + ".");
|
logger.error("Expected byte[] payload. Ignoring " + message + ".");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
StompHeaderAccessor stompAccessor = getStompHeaderAccessor(message);
|
StompHeaderAccessor stompAccessor = getStompHeaderAccessor(message);
|
||||||
StompCommand command = stompAccessor.getCommand();
|
StompCommand command = stompAccessor.getCommand();
|
||||||
|
|
||||||
if (StompCommand.MESSAGE.equals(command)) {
|
if (StompCommand.MESSAGE.equals(command)) {
|
||||||
if (stompAccessor.getSubscriptionId() == null) {
|
if (stompAccessor.getSubscriptionId() == null) {
|
||||||
logger.warn("No STOMP \"subscription\" header in " + message);
|
logger.warn("No STOMP \"subscription\" header in " + message);
|
||||||
|
|
@ -414,7 +414,7 @@ public class StompSubProtocolHandler implements SubProtocolHandler, ApplicationE
|
||||||
if (StompCommand.ERROR.equals(command) && getErrorHandler() != null) {
|
if (StompCommand.ERROR.equals(command) && getErrorHandler() != null) {
|
||||||
Message<byte[]> errorMessage = getErrorHandler().handleErrorMessageToClient((Message<byte[]>) message);
|
Message<byte[]> errorMessage = getErrorHandler().handleErrorMessageToClient((Message<byte[]>) message);
|
||||||
stompAccessor = MessageHeaderAccessor.getAccessor(errorMessage, StompHeaderAccessor.class);
|
stompAccessor = MessageHeaderAccessor.getAccessor(errorMessage, StompHeaderAccessor.class);
|
||||||
Assert.notNull(stompAccessor, "Expected STOMP headers.");
|
Assert.notNull(stompAccessor, "Expected STOMP headers");
|
||||||
payload = errorMessage.getPayload();
|
payload = errorMessage.getPayload();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -442,7 +442,7 @@ public class StompSubProtocolHandler implements SubProtocolHandler, ApplicationE
|
||||||
}
|
}
|
||||||
catch (Throwable ex) {
|
catch (Throwable ex) {
|
||||||
// Could be part of normal workflow (e.g. browser tab closed)
|
// Could be part of normal workflow (e.g. browser tab closed)
|
||||||
logger.debug("Failed to send WebSocket message to client in session " + session.getId() + ".", ex);
|
logger.debug("Failed to send WebSocket message to client in session " + session.getId(), ex);
|
||||||
command = StompCommand.ERROR;
|
command = StompCommand.ERROR;
|
||||||
}
|
}
|
||||||
finally {
|
finally {
|
||||||
|
|
@ -461,7 +461,7 @@ public class StompSubProtocolHandler implements SubProtocolHandler, ApplicationE
|
||||||
MessageHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, MessageHeaderAccessor.class);
|
MessageHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, MessageHeaderAccessor.class);
|
||||||
if (accessor == null) {
|
if (accessor == null) {
|
||||||
// Shouldn't happen (only broker broadcasts directly to clients)
|
// Shouldn't happen (only broker broadcasts directly to clients)
|
||||||
throw new IllegalStateException("No header accessor in " + message + ".");
|
throw new IllegalStateException("No header accessor in " + message);
|
||||||
}
|
}
|
||||||
StompHeaderAccessor stompAccessor;
|
StompHeaderAccessor stompAccessor;
|
||||||
if (accessor instanceof StompHeaderAccessor) {
|
if (accessor instanceof StompHeaderAccessor) {
|
||||||
|
|
@ -487,7 +487,7 @@ public class StompSubProtocolHandler implements SubProtocolHandler, ApplicationE
|
||||||
else {
|
else {
|
||||||
// Shouldn't happen (only broker broadcasts directly to clients)
|
// Shouldn't happen (only broker broadcasts directly to clients)
|
||||||
throw new IllegalStateException(
|
throw new IllegalStateException(
|
||||||
"Unexpected header accessor type: " + accessor.getClass() + " in " + message + ".");
|
"Unexpected header accessor type: " + accessor.getClass() + " in " + message);
|
||||||
}
|
}
|
||||||
return stompAccessor;
|
return stompAccessor;
|
||||||
}
|
}
|
||||||
|
|
@ -515,7 +515,8 @@ public class StompSubProtocolHandler implements SubProtocolHandler, ApplicationE
|
||||||
long[] heartbeat = (long[]) connectAckHeaders.getHeader(SimpMessageHeaderAccessor.HEART_BEAT_HEADER);
|
long[] heartbeat = (long[]) connectAckHeaders.getHeader(SimpMessageHeaderAccessor.HEART_BEAT_HEADER);
|
||||||
if (heartbeat != null) {
|
if (heartbeat != null) {
|
||||||
connectedHeaders.setHeartbeat(heartbeat[0], heartbeat[1]);
|
connectedHeaders.setHeartbeat(heartbeat[0], heartbeat[1]);
|
||||||
} else {
|
}
|
||||||
|
else {
|
||||||
connectedHeaders.setHeartbeat(0, 0);
|
connectedHeaders.setHeartbeat(0, 0);
|
||||||
}
|
}
|
||||||
return connectedHeaders;
|
return connectedHeaders;
|
||||||
|
|
@ -537,6 +538,7 @@ public class StompSubProtocolHandler implements SubProtocolHandler, ApplicationE
|
||||||
this.userSessionRegistry.registerSessionId(userName, session.getId());
|
this.userSessionRegistry.registerSessionId(userName, session.getId());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
long[] heartbeat = accessor.getHeartbeat();
|
long[] heartbeat = accessor.getHeartbeat();
|
||||||
if (heartbeat[1] > 0) {
|
if (heartbeat[1] > 0) {
|
||||||
session = WebSocketSessionDecorator.unwrap(session);
|
session = WebSocketSessionDecorator.unwrap(session);
|
||||||
|
|
@ -544,6 +546,7 @@ public class StompSubProtocolHandler implements SubProtocolHandler, ApplicationE
|
||||||
((SockJsSession) session).disableHeartbeat();
|
((SockJsSession) session).disableHeartbeat();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return accessor;
|
return accessor;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -571,11 +574,13 @@ public class StompSubProtocolHandler implements SubProtocolHandler, ApplicationE
|
||||||
@Override
|
@Override
|
||||||
public void afterSessionEnded(WebSocketSession session, CloseStatus closeStatus, MessageChannel outputChannel) {
|
public void afterSessionEnded(WebSocketSession session, CloseStatus closeStatus, MessageChannel outputChannel) {
|
||||||
this.decoders.remove(session.getId());
|
this.decoders.remove(session.getId());
|
||||||
|
|
||||||
Principal principal = session.getPrincipal();
|
Principal principal = session.getPrincipal();
|
||||||
if (principal != null && this.userSessionRegistry != null) {
|
if (principal != null && this.userSessionRegistry != null) {
|
||||||
String userName = getSessionRegistryUserName(principal);
|
String userName = getSessionRegistryUserName(principal);
|
||||||
this.userSessionRegistry.unregisterSessionId(userName, session.getId());
|
this.userSessionRegistry.unregisterSessionId(userName, session.getId());
|
||||||
}
|
}
|
||||||
|
|
||||||
Message<byte[]> message = createDisconnectMessage(session);
|
Message<byte[]> message = createDisconnectMessage(session);
|
||||||
SimpAttributes simpAttributes = SimpAttributes.fromMessage(message);
|
SimpAttributes simpAttributes = SimpAttributes.fromMessage(message);
|
||||||
try {
|
try {
|
||||||
|
|
@ -608,7 +613,8 @@ public class StompSubProtocolHandler implements SubProtocolHandler, ApplicationE
|
||||||
return "StompSubProtocolHandler" + getSupportedProtocols();
|
return "StompSubProtocolHandler" + getSupportedProtocols();
|
||||||
}
|
}
|
||||||
|
|
||||||
private class Stats {
|
|
||||||
|
private static class Stats {
|
||||||
|
|
||||||
private final AtomicInteger connect = new AtomicInteger();
|
private final AtomicInteger connect = new AtomicInteger();
|
||||||
|
|
||||||
|
|
@ -616,7 +622,6 @@ public class StompSubProtocolHandler implements SubProtocolHandler, ApplicationE
|
||||||
|
|
||||||
private final AtomicInteger disconnect = new AtomicInteger();
|
private final AtomicInteger disconnect = new AtomicInteger();
|
||||||
|
|
||||||
|
|
||||||
public void incrementConnectCount() {
|
public void incrementConnectCount() {
|
||||||
this.connect.incrementAndGet();
|
this.connect.incrementAndGet();
|
||||||
}
|
}
|
||||||
|
|
@ -629,7 +634,6 @@ public class StompSubProtocolHandler implements SubProtocolHandler, ApplicationE
|
||||||
this.disconnect.incrementAndGet();
|
this.disconnect.incrementAndGet();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
public String toString() {
|
public String toString() {
|
||||||
return "processed CONNECT(" + this.connect.get() + ")-CONNECTED(" +
|
return "processed CONNECT(" + this.connect.get() + ")-CONNECTED(" +
|
||||||
this.connected.get() + ")-DISCONNECT(" + this.disconnect.get() + ")";
|
this.connected.get() + ")-DISCONNECT(" + this.disconnect.get() + ")";
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue