Avoid ConcurrentModificationException
Removal of cached destination is now moved outside the for loop that removes subscriptions to avoid ConcurrentModificationException. Also since updateCache is a LinkedHashMap with accessOrder=true, a simple access with updateCache.get() modify the map. By iterating over updateCache.entrySet(), we avoid this update. Issue: SPR-11755
This commit is contained in:
parent
426b77b834
commit
98738c0bbb
|
@ -55,7 +55,6 @@ public class DefaultSubscriptionRegistry extends AbstractSubscriptionRegistry {
|
||||||
private PathMatcher pathMatcher = new AntPathMatcher();
|
private PathMatcher pathMatcher = new AntPathMatcher();
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Specify the maximum number of entries for the resolved destination cache.
|
* Specify the maximum number of entries for the resolved destination cache.
|
||||||
* Default is 1024.
|
* Default is 1024.
|
||||||
|
@ -115,7 +114,7 @@ public class DefaultSubscriptionRegistry extends AbstractSubscriptionRegistry {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected MultiValueMap<String, String> findSubscriptionsInternal(String destination, Message<?> message) {
|
protected MultiValueMap<String, String> findSubscriptionsInternal(String destination, Message<?> message) {
|
||||||
MultiValueMap<String,String> result = this.destinationCache.getSubscriptions(destination);
|
MultiValueMap<String, String> result = this.destinationCache.getSubscriptions(destination);
|
||||||
if (result != null) {
|
if (result != null) {
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
@ -129,7 +128,7 @@ public class DefaultSubscriptionRegistry extends AbstractSubscriptionRegistry {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if(!result.isEmpty()) {
|
if (!result.isEmpty()) {
|
||||||
this.destinationCache.addSubscriptions(destination, result);
|
this.destinationCache.addSubscriptions(destination, result);
|
||||||
}
|
}
|
||||||
return result;
|
return result;
|
||||||
|
@ -175,10 +174,11 @@ public class DefaultSubscriptionRegistry extends AbstractSubscriptionRegistry {
|
||||||
}
|
}
|
||||||
|
|
||||||
public void updateAfterNewSubscription(String destination, String sessionId, String subsId) {
|
public void updateAfterNewSubscription(String destination, String sessionId, String subsId) {
|
||||||
synchronized(this.updateCache) {
|
synchronized (this.updateCache) {
|
||||||
for (String cachedDestination : this.updateCache.keySet()) {
|
for (Map.Entry<String, MultiValueMap<String, String>> entry : this.updateCache.entrySet()) {
|
||||||
|
String cachedDestination = entry.getKey();
|
||||||
if (getPathMatcher().match(destination, cachedDestination)) {
|
if (getPathMatcher().match(destination, cachedDestination)) {
|
||||||
MultiValueMap<String, String> subs = this.updateCache.get(cachedDestination);
|
MultiValueMap<String, String> subs = entry.getValue();
|
||||||
subs.add(sessionId, subsId);
|
subs.add(sessionId, subsId);
|
||||||
this.accessCache.put(cachedDestination, new LinkedMultiValueMap<String, String>(subs));
|
this.accessCache.put(cachedDestination, new LinkedMultiValueMap<String, String>(subs));
|
||||||
}
|
}
|
||||||
|
@ -187,43 +187,53 @@ public class DefaultSubscriptionRegistry extends AbstractSubscriptionRegistry {
|
||||||
}
|
}
|
||||||
|
|
||||||
public void updateAfterRemovedSubscription(String destination, String sessionId, String subsId) {
|
public void updateAfterRemovedSubscription(String destination, String sessionId, String subsId) {
|
||||||
synchronized(this.updateCache) {
|
synchronized (this.updateCache) {
|
||||||
for (String cachedDestination : this.updateCache.keySet()) {
|
Set<String> destinationsToRemove = new HashSet<String>();
|
||||||
|
for (Map.Entry<String, MultiValueMap<String, String>> entry : this.updateCache.entrySet()) {
|
||||||
|
String cachedDestination = entry.getKey();
|
||||||
if (getPathMatcher().match(destination, cachedDestination)) {
|
if (getPathMatcher().match(destination, cachedDestination)) {
|
||||||
MultiValueMap<String, String> subs = this.updateCache.get(cachedDestination);
|
MultiValueMap<String, String> subs = entry.getValue();
|
||||||
List<String> subsIds = subs.get(sessionId);
|
List<String> subsIds = subs.get(sessionId);
|
||||||
subsIds.remove(subsId);
|
subsIds.remove(subsId);
|
||||||
if (subsIds.isEmpty()) {
|
if (subsIds.isEmpty()) {
|
||||||
subs.remove(sessionId);
|
subs.remove(sessionId);
|
||||||
}
|
}
|
||||||
if (subs.isEmpty()) {
|
if (subs.isEmpty()) {
|
||||||
this.updateCache.remove(cachedDestination);
|
destinationsToRemove.add(cachedDestination);
|
||||||
this.accessCache.remove(cachedDestination);
|
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
this.accessCache.put(cachedDestination, new LinkedMultiValueMap<String, String>(subs));
|
this.accessCache.put(cachedDestination, new LinkedMultiValueMap<String, String>(subs));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
for (String d : destinationsToRemove) {
|
||||||
|
this.updateCache.remove(d);
|
||||||
|
this.accessCache.remove(d);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public void updateAfterRemovedSession(SessionSubscriptionInfo info) {
|
public void updateAfterRemovedSession(SessionSubscriptionInfo info) {
|
||||||
synchronized(this.updateCache) {
|
synchronized (this.updateCache) {
|
||||||
for (String destination : info.getDestinations()) {
|
for (String destination : info.getDestinations()) {
|
||||||
for (String cachedDestination : this.updateCache.keySet()) {
|
Set<String> destinationsToRemove = new HashSet<String>();
|
||||||
|
for (Map.Entry<String, MultiValueMap<String, String>> entry : this.updateCache.entrySet()) {
|
||||||
|
String cachedDestination = entry.getKey();
|
||||||
if (getPathMatcher().match(destination, cachedDestination)) {
|
if (getPathMatcher().match(destination, cachedDestination)) {
|
||||||
MultiValueMap<String, String> subs = this.updateCache.get(cachedDestination);
|
MultiValueMap<String, String> subs = entry.getValue();
|
||||||
subs.remove(info.getSessionId());
|
subs.remove(info.getSessionId());
|
||||||
if (subs.isEmpty()) {
|
if (subs.isEmpty()) {
|
||||||
this.updateCache.remove(cachedDestination);
|
destinationsToRemove.add(cachedDestination);
|
||||||
this.accessCache.remove(cachedDestination);
|
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
this.accessCache.put(cachedDestination,new LinkedMultiValueMap<String, String>(subs));
|
this.accessCache.put(cachedDestination,new LinkedMultiValueMap<String, String>(subs));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
for (String d : destinationsToRemove) {
|
||||||
|
this.updateCache.remove(d);
|
||||||
|
this.accessCache.remove(d);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -185,6 +185,41 @@ public class DefaultSubscriptionRegistryTests {
|
||||||
assertEquals(0, actual.size());
|
assertEquals(0, actual.size());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SPR-11755
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void registerAndUnregisterMultipleDestinations() {
|
||||||
|
|
||||||
|
String sess1 = "sess01";
|
||||||
|
String sess2 = "sess02";
|
||||||
|
|
||||||
|
String subs1 = "subs01";
|
||||||
|
String subs2 = "subs02";
|
||||||
|
String subs3 = "subs03";
|
||||||
|
String subs4 = "subs04";
|
||||||
|
String subs5 = "subs05";
|
||||||
|
|
||||||
|
this.registry.registerSubscription(subscribeMessage(sess1, subs1, "/topic/PRICE.STOCK.NASDAQ.IBM"));
|
||||||
|
this.registry.registerSubscription(subscribeMessage(sess1, subs2, "/topic/PRICE.STOCK.NYSE.IBM"));
|
||||||
|
this.registry.registerSubscription(subscribeMessage(sess1, subs3, "/topic/PRICE.STOCK.NASDAQ.GOOG"));
|
||||||
|
|
||||||
|
this.registry.findSubscriptions(message("/topic/PRICE.STOCK.NYSE.IBM"));
|
||||||
|
this.registry.findSubscriptions(message("/topic/PRICE.STOCK.NASDAQ.GOOG"));
|
||||||
|
this.registry.findSubscriptions(message("/topic/PRICE.STOCK.NASDAQ.IBM"));
|
||||||
|
|
||||||
|
this.registry.unregisterSubscription(unsubscribeMessage(sess1, subs1));
|
||||||
|
this.registry.unregisterSubscription(unsubscribeMessage(sess1, subs2));
|
||||||
|
this.registry.unregisterSubscription(unsubscribeMessage(sess1, subs3));
|
||||||
|
|
||||||
|
this.registry.registerSubscription(subscribeMessage(sess1, subs1, "/topic/PRICE.STOCK.NASDAQ.IBM"));
|
||||||
|
this.registry.registerSubscription(subscribeMessage(sess1, subs2, "/topic/PRICE.STOCK.NYSE.IBM"));
|
||||||
|
this.registry.registerSubscription(subscribeMessage(sess1, subs3, "/topic/PRICE.STOCK.NASDAQ.GOOG"));
|
||||||
|
this.registry.registerSubscription(subscribeMessage(sess1, subs4, "/topic/PRICE.STOCK.NYSE.IBM"));
|
||||||
|
this.registry.registerSubscription(subscribeMessage(sess2, subs5, "/topic/PRICE.STOCK.NASDAQ.GOOG"));
|
||||||
|
this.registry.unregisterAllSubscriptions(sess1);
|
||||||
|
this.registry.unregisterAllSubscriptions(sess2);
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void registerSubscriptionWithDestinationPatternRegex() {
|
public void registerSubscriptionWithDestinationPatternRegex() {
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue