Polishing contribution

Renaming, trimming of method parameters, minor refactoring of helper
methods, comments, etc. Completely functionally neutral.

See gh-25298
This commit is contained in:
Rossen Stoyanchev 2020-07-15 22:44:12 +03:00
parent 524ca1a676
commit 44f1f94f97
1 changed files with 145 additions and 154 deletions

View File

@ -36,7 +36,6 @@ import org.springframework.expression.TypedValue;
import org.springframework.expression.spel.SpelEvaluationException;
import org.springframework.expression.spel.standard.SpelExpressionParser;
import org.springframework.expression.spel.support.SimpleEvaluationContext;
import org.springframework.lang.NonNull;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
@ -86,7 +85,7 @@ public class DefaultSubscriptionRegistry extends AbstractSubscriptionRegistry {
private final DestinationCache destinationCache = new DestinationCache();
private final SessionSubscriptionRegistry subscriptionRegistry = new SessionSubscriptionRegistry();
private final SessionRegistry sessionRegistry = new SessionRegistry();
/**
@ -147,46 +146,47 @@ public class DefaultSubscriptionRegistry extends AbstractSubscriptionRegistry {
}
@Override
protected void addSubscriptionInternal(@NonNull String sessionId, @NonNull String subscriptionId,
@NonNull String destination, @NonNull Message<?> message) {
Expression expression = getSelectorExpression(message.getHeaders());
boolean isAntPattern = this.pathMatcher.isPattern(destination);
Subscription subscription = new Subscription(subscriptionId, expression, destination, isAntPattern);
protected void addSubscriptionInternal(
String sessionId, String subscriptionId, String destination, Message<?> message) {
Subscription previousValue = this.subscriptionRegistry.addSubscription(sessionId, subscriptionId, subscription);
if (previousValue == null) {
this.destinationCache.updateAfterNewSubscription(destination, isAntPattern, sessionId, subscriptionId);
}
boolean isPattern = this.pathMatcher.isPattern(destination);
Expression expression = getSelectorExpression(message.getHeaders());
Subscription subscription = new Subscription(subscriptionId, destination, isPattern, expression);
this.sessionRegistry.addSubscription(sessionId, subscription);
this.destinationCache.updateAfterNewSubscription(sessionId, subscription);
}
@Nullable
private Expression getSelectorExpression(MessageHeaders headers) {
if (getSelectorHeaderName() == null) {
return null;
}
String selector = SimpMessageHeaderAccessor.getFirstNativeHeader(getSelectorHeaderName(), headers);
if (selector == null) {
return null;
}
Expression expression = null;
if (getSelectorHeaderName() != null) {
String selector = SimpMessageHeaderAccessor.getFirstNativeHeader(getSelectorHeaderName(), headers);
if (selector != null) {
try {
expression = this.expressionParser.parseExpression(selector);
this.selectorHeaderInUse = true;
if (logger.isTraceEnabled()) {
logger.trace("Subscription selector: [" + selector + "]");
}
}
catch (Throwable ex) {
if (logger.isDebugEnabled()) {
logger.debug("Failed to parse selector: " + selector, ex);
}
}
try {
expression = this.expressionParser.parseExpression(selector);
this.selectorHeaderInUse = true;
if (logger.isTraceEnabled()) {
logger.trace("Subscription selector: [" + selector + "]");
}
}
catch (Throwable ex) {
if (logger.isDebugEnabled()) {
logger.debug("Failed to parse selector: " + selector, ex);
}
}
return expression;
}
@Override
protected void removeSubscriptionInternal(String sessionId, String subsId, Message<?> message) {
SessionSubscriptionInfo info = this.subscriptionRegistry.getSubscriptions(sessionId);
protected void removeSubscriptionInternal(String sessionId, String subscriptionId, Message<?> message) {
SessionInfo info = this.sessionRegistry.getSession(sessionId);
if (info != null) {
Subscription subscription = info.removeSubscription(subsId);
Subscription subscription = info.removeSubscription(subscriptionId);
if (subscription != null) {
this.destinationCache.updateAfterRemovedSubscription(sessionId, subscription);
}
@ -195,45 +195,41 @@ public class DefaultSubscriptionRegistry extends AbstractSubscriptionRegistry {
@Override
public void unregisterAllSubscriptions(String sessionId) {
SessionSubscriptionInfo info = this.subscriptionRegistry.removeSubscriptions(sessionId);
SessionInfo info = this.sessionRegistry.removeSubscriptions(sessionId);
if (info != null) {
this.destinationCache.updateAfterRemovedSession(sessionId, info.getSubscriptions());
this.destinationCache.updateAfterRemovedSession(sessionId, info);
}
}
@Override
protected MultiValueMap<String, String> findSubscriptionsInternal(String destination, Message<?> message) {
MultiValueMap<String, String> result = this.destinationCache.getSubscriptions(destination);
return filterSubscriptions(result, message);
}
private MultiValueMap<String, String> filterSubscriptions(
MultiValueMap<String, String> allMatches, Message<?> message) {
MultiValueMap<String, String> allMatches = this.destinationCache.getSubscriptions(destination);
if (!this.selectorHeaderInUse) {
return allMatches;
}
MultiValueMap<String, String> result = new LinkedMultiValueMap<>(allMatches.size());
allMatches.forEach((sessionId, subscriptionsIds) -> {
SessionSubscriptionInfo subscriptions = this.subscriptionRegistry.getSubscriptions(sessionId);
if (subscriptions != null) {
for (String subscriptionId : subscriptionsIds) {
Subscription subscription = subscriptions.getSubscription(subscriptionId);
if (subscription != null && evaluateExpression(subscription.getSelectorExpression(), message)) {
allMatches.forEach((sessionId, subscriptionIds) -> {
SessionInfo info = this.sessionRegistry.getSession(sessionId);
if (info != null) {
for (String subscriptionId : subscriptionIds) {
Subscription subscription = info.getSubscription(subscriptionId);
if (subscription != null && evaluateExpression(subscription.getSelector(), message)) {
result.add(sessionId, subscription.getId());
}
}
}
});
return result;
}
private boolean evaluateExpression(@Nullable Expression expression, Message<?> message) {
boolean result = false;
if (expression == null) {
return true;
}
try {
if (expression == null || Boolean.TRUE.equals(expression.getValue(messageEvalContext, message, Boolean.class))) {
result = true;
Boolean result = expression.getValue(messageEvalContext, message, Boolean.class);
if (Boolean.TRUE.equals(result)) {
return true;
}
}
catch (SpelEvaluationException ex) {
@ -244,67 +240,68 @@ public class DefaultSubscriptionRegistry extends AbstractSubscriptionRegistry {
catch (Throwable ex) {
logger.debug("Failed to evaluate selector", ex);
}
return result;
return false;
}
/**
* A cache for destinations previously resolved via
* Cache for destinations resolved previously via
* {@link DefaultSubscriptionRegistry#findSubscriptionsInternal(String, Message)}.
*/
private final class DestinationCache {
/** Map from destination to {@code <sessionId, subscriptionId>} for fast look-ups. */
// destination -> [sessionId -> subscriptionId's]
private final Map<String, LinkedMultiValueMap<String, String>> destinationCache =
new ConcurrentHashMap<>(DEFAULT_CACHE_LIMIT);
private final Queue<String> cacheEvictionPolicy = new ConcurrentLinkedQueue<>();
private final AtomicInteger cacheSize = new AtomicInteger();
private final Queue<String> cacheEvictionPolicy = new ConcurrentLinkedQueue<>();
public LinkedMultiValueMap<String, String> getSubscriptions(String destination) {
LinkedMultiValueMap<String, String> subscriptions = this.destinationCache.get(destination);
if (subscriptions == null) {
subscriptions = this.destinationCache.computeIfAbsent(destination, dest -> {
LinkedMultiValueMap<String, String> sessionSubscriptions = calculateSubscriptions(destination);
this.cacheEvictionPolicy.add(destination);
LinkedMultiValueMap<String, String> sessionIdToSubscriptionIds = this.destinationCache.get(destination);
if (sessionIdToSubscriptionIds == null) {
sessionIdToSubscriptionIds = this.destinationCache.computeIfAbsent(destination, _destination -> {
LinkedMultiValueMap<String, String> matches = computeMatchingSubscriptions(destination);
this.cacheSize.incrementAndGet();
return sessionSubscriptions;
this.cacheEvictionPolicy.add(destination);
return matches;
});
ensureCacheLimit();
}
return subscriptions;
return sessionIdToSubscriptionIds;
}
@NonNull
private LinkedMultiValueMap<String, String> calculateSubscriptions(String destination) {
LinkedMultiValueMap<String, String> sessionsToSubscriptions = new LinkedMultiValueMap<>();
DefaultSubscriptionRegistry.this.subscriptionRegistry.forEachSubscription((sessionId, subscriptionDetail) -> {
if (subscriptionDetail.isAntPattern()) {
if (pathMatcher.match(subscriptionDetail.getDestination(), destination)) {
sessionsToSubscriptions.compute(sessionId, (s, subscriptions) ->
addToList(subscriptionDetail.getId(), subscriptions));
private LinkedMultiValueMap<String, String> computeMatchingSubscriptions(String destination) {
LinkedMultiValueMap<String, String> sessionIdToSubscriptionIds = new LinkedMultiValueMap<>();
DefaultSubscriptionRegistry.this.sessionRegistry.forEachSubscription((sessionId, subscription) -> {
if (subscription.isPattern()) {
if (pathMatcher.match(subscription.getDestination(), destination)) {
addMatchedSubscriptionId(sessionIdToSubscriptionIds, sessionId, subscription.getId());
}
}
else if (destination.equals(subscriptionDetail.getDestination())) {
sessionsToSubscriptions.compute(sessionId, (s, subscriptions) ->
addToList(subscriptionDetail.getId(), subscriptions));
else if (destination.equals(subscription.getDestination())) {
addMatchedSubscriptionId(sessionIdToSubscriptionIds, sessionId, subscription.getId());
}
});
return sessionsToSubscriptions;
return sessionIdToSubscriptionIds;
}
@NonNull
private List<String> addToList(String subscriptionId, @Nullable List<String> subscriptions) {
if (subscriptions == null) {
return Collections.singletonList(subscriptionId);
}
else {
List<String> newSubscriptions = new ArrayList<>(subscriptions.size() + 1);
newSubscriptions.addAll(subscriptions);
newSubscriptions.add(subscriptionId);
return newSubscriptions;
}
private void addMatchedSubscriptionId(
LinkedMultiValueMap<String, String> sessionIdToSubscriptionIds,
String sessionId, String subscriptionId) {
sessionIdToSubscriptionIds.compute(sessionId, (_sessionId, subscriptionIds) -> {
if (subscriptionIds == null) {
return Collections.singletonList(subscriptionId);
}
else {
List<String> result = new ArrayList<>(subscriptionIds.size() + 1);
result.addAll(subscriptionIds);
result.add(subscriptionId);
return result;
}
});
}
private void ensureCacheLimit() {
@ -318,147 +315,140 @@ public class DefaultSubscriptionRegistry extends AbstractSubscriptionRegistry {
}
}
public void updateAfterNewSubscription(String destination, boolean isPattern, String sessionId, String subscriptionId) {
if (isPattern) {
public void updateAfterNewSubscription(String sessionId, Subscription subscription) {
if (subscription.isPattern()) {
for (String cachedDestination : this.destinationCache.keySet()) {
if (pathMatcher.match(destination, cachedDestination)) {
addToDestination(cachedDestination, sessionId, subscriptionId);
if (pathMatcher.match(subscription.getDestination(), cachedDestination)) {
addToDestination(cachedDestination, sessionId, subscription.getId());
}
}
}
else {
addToDestination(destination, sessionId, subscriptionId);
addToDestination(subscription.getDestination(), sessionId, subscription.getId());
}
}
private void addToDestination(String destination, String sessionId, String subscriptionId) {
this.destinationCache.computeIfPresent(destination, (dest, sessionsToSubscriptions) -> {
sessionsToSubscriptions = sessionsToSubscriptions.clone();
sessionsToSubscriptions.compute(sessionId, (s, subscriptions) -> addToList(subscriptionId, subscriptions));
return sessionsToSubscriptions;
this.destinationCache.computeIfPresent(destination, (_destination, sessionIdToSubscriptionIds) -> {
sessionIdToSubscriptionIds = sessionIdToSubscriptionIds.clone();
addMatchedSubscriptionId(sessionIdToSubscriptionIds, sessionId, subscriptionId);
return sessionIdToSubscriptionIds;
});
}
public void updateAfterRemovedSubscription(String sessionId, Subscription subscriptionDetail) {
if (subscriptionDetail.isAntPattern()) {
String patternDestination = subscriptionDetail.getDestination();
public void updateAfterRemovedSubscription(String sessionId, Subscription subscription) {
if (subscription.isPattern()) {
String patternDestination = subscription.getDestination();
for (String destination : this.destinationCache.keySet()) {
if (pathMatcher.match(patternDestination, destination)) {
removeInternal(destination, sessionId, subscriptionDetail.getId());
removeInternal(destination, sessionId, subscription.getId());
}
}
}
else {
removeInternal(subscriptionDetail.getDestination(), sessionId, subscriptionDetail.getId());
removeInternal(subscription.getDestination(), sessionId, subscription.getId());
}
}
private void removeInternal(String destination, String sessionId, String subscription) {
this.destinationCache.computeIfPresent(destination, (dest, subscriptions) -> {
subscriptions = subscriptions.clone();
subscriptions.computeIfPresent(sessionId, (session, subs) -> {
/* it is very likely that one session has only one subscription per one destination */
if (subs.size() == 1 && subscription.equals(subs.get(0))) {
private void removeInternal(String destination, String sessionId, String subscriptionId) {
this.destinationCache.computeIfPresent(destination, (_destination, sessionIdToSubscriptionIds) -> {
sessionIdToSubscriptionIds = sessionIdToSubscriptionIds.clone();
sessionIdToSubscriptionIds.computeIfPresent(sessionId, (_sessionId, subscriptionIds) -> {
/* Most likely case: single subscription per destination per session. */
if (subscriptionIds.size() == 1 && subscriptionId.equals(subscriptionIds.get(0))) {
return null;
}
else {
subs = new ArrayList<>(subs);
subs.remove(subscription);
return emptyListToNUll(subs);
}
subscriptionIds = new ArrayList<>(subscriptionIds);
subscriptionIds.remove(subscriptionId);
return (subscriptionIds.isEmpty() ? null : subscriptionIds);
});
return subscriptions;
return sessionIdToSubscriptionIds;
});
}
@Nullable
private <T> List<T> emptyListToNUll(@NonNull List<T> list) {
return list.isEmpty() ? null : list;
}
public void updateAfterRemovedSession(String sessionId, Collection<Subscription> subscriptionDetails) {
for (Subscription subscriptionDetail : subscriptionDetails) {
updateAfterRemovedSubscription(sessionId, subscriptionDetail);
public void updateAfterRemovedSession(String sessionId, SessionInfo info) {
for (Subscription subscription : info.getSubscriptions()) {
updateAfterRemovedSubscription(sessionId, subscription);
}
}
}
/**
* Provide access to session subscriptions by sessionId.
* Registry for all session and their subscriptions.
*/
private static final class SessionSubscriptionRegistry {
private static final class SessionRegistry {
// 'sessionId' -> 'subscriptionId' -> 'destination, selector expression'
private final ConcurrentMap<String, SessionSubscriptionInfo> sessions = new ConcurrentHashMap<>();
private final ConcurrentMap<String, SessionInfo> sessions = new ConcurrentHashMap<>();
@Nullable
public SessionSubscriptionInfo getSubscriptions(String sessionId) {
public SessionInfo getSession(String sessionId) {
return this.sessions.get(sessionId);
}
public void forEachSubscription(BiConsumer<String, Subscription> consumer) {
this.sessions.forEach((sessionId, subscriptions) ->
subscriptions.getSubscriptions().forEach(subscriptionDetail ->
consumer.accept(sessionId, subscriptionDetail)));
this.sessions.forEach((sessionId, info) ->
info.getSubscriptions().forEach(subscription -> consumer.accept(sessionId, subscription)));
}
public void addSubscription(String sessionId, Subscription subscription) {
SessionInfo info = this.sessions.computeIfAbsent(sessionId, _sessionId -> new SessionInfo());
info.addSubscription(subscription);
}
@Nullable
public Subscription addSubscription(String sessionId, String subscriptionId, Subscription subscriptionDetail) {
SessionSubscriptionInfo subscriptions = this.sessions.computeIfAbsent(sessionId, s -> new SessionSubscriptionInfo());
return subscriptions.addSubscription(subscriptionId, subscriptionDetail);
}
@Nullable
public SessionSubscriptionInfo removeSubscriptions(String sessionId) {
public SessionInfo removeSubscriptions(String sessionId) {
return this.sessions.remove(sessionId);
}
}
/**
* Hold subscriptions for a session.
* Container for the subscriptions of a session.
*/
private static final class SessionSubscriptionInfo {
private static final class SessionInfo {
private final Map<String, Subscription> subscriptionLookup = new ConcurrentHashMap<>();
// subscriptionId -> Subscription
private final Map<String, Subscription> subscriptionMap = new ConcurrentHashMap<>();
public Collection<Subscription> getSubscriptions() {
return this.subscriptionLookup.values();
return this.subscriptionMap.values();
}
@Nullable
public Subscription getSubscription(String subscriptionId) {
return this.subscriptionLookup.get(subscriptionId);
return this.subscriptionMap.get(subscriptionId);
}
@Nullable
public Subscription addSubscription(String subscriptionId, Subscription subscriptionDetail) {
return this.subscriptionLookup.putIfAbsent(subscriptionId, subscriptionDetail);
public void addSubscription(Subscription subscription) {
this.subscriptionMap.putIfAbsent(subscription.getId(), subscription);
}
@Nullable
public Subscription removeSubscription(String subscriptionId) {
return this.subscriptionLookup.remove(subscriptionId);
return this.subscriptionMap.remove(subscriptionId);
}
}
/**
* Represents a subscription.
*/
private static final class Subscription {
private final String id;
@Nullable
private final Expression selectorExpression;
private final String destination;
private final boolean isAntPattern;
private final boolean isPattern;
public Subscription(String id, @Nullable Expression selector, String destination, boolean isAntPattern) {
@Nullable
private final Expression selector;
public Subscription(String id, String destination, boolean isPattern, @Nullable Expression selector) {
Assert.notNull(id, "Subscription id must not be null");
Assert.notNull(destination, "Subscription destination must not be null");
this.id = id;
this.selectorExpression = selector;
this.selector = selector;
this.destination = destination;
this.isAntPattern = isAntPattern;
this.isPattern = isPattern;
}
public String getId() {
@ -469,18 +459,19 @@ public class DefaultSubscriptionRegistry extends AbstractSubscriptionRegistry {
return this.destination;
}
public boolean isAntPattern() {
return this.isAntPattern;
public boolean isPattern() {
return this.isPattern;
}
@Nullable
public Expression getSelectorExpression() {
return this.selectorExpression;
public Expression getSelector() {
return this.selector;
}
@Override
public boolean equals(@Nullable Object other) {
return (this == other || (other instanceof Subscription && this.id.equals(((Subscription) other).id)));
return (this == other ||
(other instanceof Subscription && this.id.equals(((Subscription) other).id)));
}
@Override