From 98738c0bbb9a19cef2ea06aa35efa3713147908c Mon Sep 17 00:00:00 2001 From: Sebastien Deleuze Date: Mon, 5 May 2014 09:43:13 +0200 Subject: [PATCH] Avoid ConcurrentModificationException Removal of cached destination is now moved outside the for loop that removes subscriptions to avoid ConcurrentModificationException. Also since updateCache is a LinkedHashMap with accessOrder=true, a simple access with updateCache.get() modify the map. By iterating over updateCache.entrySet(), we avoid this update. Issue: SPR-11755 --- .../broker/DefaultSubscriptionRegistry.java | 42 ++++++++++++------- .../DefaultSubscriptionRegistryTests.java | 35 ++++++++++++++++ 2 files changed, 61 insertions(+), 16 deletions(-) diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/broker/DefaultSubscriptionRegistry.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/broker/DefaultSubscriptionRegistry.java index 309a36feb8..f228099f4f 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/broker/DefaultSubscriptionRegistry.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/broker/DefaultSubscriptionRegistry.java @@ -55,7 +55,6 @@ public class DefaultSubscriptionRegistry extends AbstractSubscriptionRegistry { private PathMatcher pathMatcher = new AntPathMatcher(); - /** * Specify the maximum number of entries for the resolved destination cache. * Default is 1024. @@ -115,7 +114,7 @@ public class DefaultSubscriptionRegistry extends AbstractSubscriptionRegistry { @Override protected MultiValueMap findSubscriptionsInternal(String destination, Message message) { - MultiValueMap result = this.destinationCache.getSubscriptions(destination); + MultiValueMap result = this.destinationCache.getSubscriptions(destination); if (result != null) { return result; } @@ -129,7 +128,7 @@ public class DefaultSubscriptionRegistry extends AbstractSubscriptionRegistry { } } } - if(!result.isEmpty()) { + if (!result.isEmpty()) { this.destinationCache.addSubscriptions(destination, result); } return result; @@ -175,10 +174,11 @@ public class DefaultSubscriptionRegistry extends AbstractSubscriptionRegistry { } public void updateAfterNewSubscription(String destination, String sessionId, String subsId) { - synchronized(this.updateCache) { - for (String cachedDestination : this.updateCache.keySet()) { + synchronized (this.updateCache) { + for (Map.Entry> entry : this.updateCache.entrySet()) { + String cachedDestination = entry.getKey(); if (getPathMatcher().match(destination, cachedDestination)) { - MultiValueMap subs = this.updateCache.get(cachedDestination); + MultiValueMap subs = entry.getValue(); subs.add(sessionId, subsId); this.accessCache.put(cachedDestination, new LinkedMultiValueMap(subs)); } @@ -187,43 +187,53 @@ public class DefaultSubscriptionRegistry extends AbstractSubscriptionRegistry { } public void updateAfterRemovedSubscription(String destination, String sessionId, String subsId) { - synchronized(this.updateCache) { - for (String cachedDestination : this.updateCache.keySet()) { + synchronized (this.updateCache) { + Set destinationsToRemove = new HashSet(); + for (Map.Entry> entry : this.updateCache.entrySet()) { + String cachedDestination = entry.getKey(); if (getPathMatcher().match(destination, cachedDestination)) { - MultiValueMap subs = this.updateCache.get(cachedDestination); + MultiValueMap subs = entry.getValue(); List subsIds = subs.get(sessionId); subsIds.remove(subsId); if (subsIds.isEmpty()) { subs.remove(sessionId); } if (subs.isEmpty()) { - this.updateCache.remove(cachedDestination); - this.accessCache.remove(cachedDestination); + destinationsToRemove.add(cachedDestination); } else { this.accessCache.put(cachedDestination, new LinkedMultiValueMap(subs)); } } } + for (String d : destinationsToRemove) { + this.updateCache.remove(d); + this.accessCache.remove(d); + } } } public void updateAfterRemovedSession(SessionSubscriptionInfo info) { - synchronized(this.updateCache) { + synchronized (this.updateCache) { for (String destination : info.getDestinations()) { - for (String cachedDestination : this.updateCache.keySet()) { + Set destinationsToRemove = new HashSet(); + for (Map.Entry> entry : this.updateCache.entrySet()) { + String cachedDestination = entry.getKey(); if (getPathMatcher().match(destination, cachedDestination)) { - MultiValueMap subs = this.updateCache.get(cachedDestination); + MultiValueMap subs = entry.getValue(); subs.remove(info.getSessionId()); if (subs.isEmpty()) { - this.updateCache.remove(cachedDestination); - this.accessCache.remove(cachedDestination); + destinationsToRemove.add(cachedDestination); } else { this.accessCache.put(cachedDestination,new LinkedMultiValueMap(subs)); } } } + for (String d : destinationsToRemove) { + this.updateCache.remove(d); + this.accessCache.remove(d); + } } } } diff --git a/spring-messaging/src/test/java/org/springframework/messaging/simp/broker/DefaultSubscriptionRegistryTests.java b/spring-messaging/src/test/java/org/springframework/messaging/simp/broker/DefaultSubscriptionRegistryTests.java index 633f06a6ff..ce7b7aaa97 100644 --- a/spring-messaging/src/test/java/org/springframework/messaging/simp/broker/DefaultSubscriptionRegistryTests.java +++ b/spring-messaging/src/test/java/org/springframework/messaging/simp/broker/DefaultSubscriptionRegistryTests.java @@ -185,6 +185,41 @@ public class DefaultSubscriptionRegistryTests { assertEquals(0, actual.size()); } + // SPR-11755 + + @Test + public void registerAndUnregisterMultipleDestinations() { + + String sess1 = "sess01"; + String sess2 = "sess02"; + + String subs1 = "subs01"; + String subs2 = "subs02"; + String subs3 = "subs03"; + String subs4 = "subs04"; + String subs5 = "subs05"; + + this.registry.registerSubscription(subscribeMessage(sess1, subs1, "/topic/PRICE.STOCK.NASDAQ.IBM")); + this.registry.registerSubscription(subscribeMessage(sess1, subs2, "/topic/PRICE.STOCK.NYSE.IBM")); + this.registry.registerSubscription(subscribeMessage(sess1, subs3, "/topic/PRICE.STOCK.NASDAQ.GOOG")); + + this.registry.findSubscriptions(message("/topic/PRICE.STOCK.NYSE.IBM")); + this.registry.findSubscriptions(message("/topic/PRICE.STOCK.NASDAQ.GOOG")); + this.registry.findSubscriptions(message("/topic/PRICE.STOCK.NASDAQ.IBM")); + + this.registry.unregisterSubscription(unsubscribeMessage(sess1, subs1)); + this.registry.unregisterSubscription(unsubscribeMessage(sess1, subs2)); + this.registry.unregisterSubscription(unsubscribeMessage(sess1, subs3)); + + this.registry.registerSubscription(subscribeMessage(sess1, subs1, "/topic/PRICE.STOCK.NASDAQ.IBM")); + this.registry.registerSubscription(subscribeMessage(sess1, subs2, "/topic/PRICE.STOCK.NYSE.IBM")); + this.registry.registerSubscription(subscribeMessage(sess1, subs3, "/topic/PRICE.STOCK.NASDAQ.GOOG")); + this.registry.registerSubscription(subscribeMessage(sess1, subs4, "/topic/PRICE.STOCK.NYSE.IBM")); + this.registry.registerSubscription(subscribeMessage(sess2, subs5, "/topic/PRICE.STOCK.NASDAQ.GOOG")); + this.registry.unregisterAllSubscriptions(sess1); + this.registry.unregisterAllSubscriptions(sess2); + } + @Test public void registerSubscriptionWithDestinationPatternRegex() {