diff --git a/core/src/main/java/kafka/server/share/SharePartitionCache.java b/core/src/main/java/kafka/server/share/SharePartitionCache.java new file mode 100644 index 00000000000..5e35830ef3b --- /dev/null +++ b/core/src/main/java/kafka/server/share/SharePartitionCache.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.server.share; + +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.server.share.SharePartitionKey; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Function; + +/** + * The SharePartitionCache is used to cache the SharePartition objects. The cache is thread-safe. + */ +public class SharePartitionCache { + + /** + * The map to store the share group id and the set of topic-partitions for that group. + */ + private final Map> groups; + + /** + * The map is used to store the SharePartition objects for each share group topic-partition. + */ + private final Map partitions; + + SharePartitionCache() { + this.groups = new HashMap<>(); + this.partitions = new ConcurrentHashMap<>(); + } + + /** + * Returns the share partition for the given key. + * + * @param partitionKey The key to get the share partition for. + * @return The share partition for the key or null if not found. + */ + public SharePartition get(SharePartitionKey partitionKey) { + return partitions.get(partitionKey); + } + + /** + * Returns the set of topic-partitions for the given group id. + * + * @param groupId The group id to get the topic-partitions for. + * @return The set of topic-partitions for the group id. + */ + public synchronized Set topicIdPartitionsForGroup(String groupId) { + return Set.copyOf(groups.get(groupId)); + } + + /** + * Removes the share partition from the cache. The method also removes the topic-partition from + * the group map. + * + * @param partitionKey The key to remove. + * @return The removed value or null if not found. + */ + public synchronized SharePartition remove(SharePartitionKey partitionKey) { + groups.computeIfPresent(partitionKey.groupId(), (k, v) -> { + v.remove(partitionKey.topicIdPartition()); + return v.isEmpty() ? null : v; + }); + return partitions.remove(partitionKey); + } + + /** + * Computes the value for the given key if it is not already present in the cache. Method also + * updates the group map with the topic-partition for the group id. + * + * @param partitionKey The key to compute the value for. + * @param mappingFunction The function to compute the value. + * @return The computed or existing value. + */ + public synchronized SharePartition computeIfAbsent(SharePartitionKey partitionKey, Function mappingFunction) { + groups.computeIfAbsent(partitionKey.groupId(), k -> new HashSet<>()).add(partitionKey.topicIdPartition()); + return partitions.computeIfAbsent(partitionKey, mappingFunction); + } + + /** + * Returns the set of all share partition keys in the cache. As the cache can't be cleaned without + * marking the share partitions fenced and detaching the partition listener in the replica manager, + * hence rather providing a method to clean the cache directly, this method is provided to fetch + * all the keys in the cache. + * + * @return The set of all share partition keys. + */ + public Set cachedSharePartitionKeys() { + return partitions.keySet(); + } + + // Visible for testing. Should not be used outside the test classes. + void put(SharePartitionKey partitionKey, SharePartition sharePartition) { + partitions.put(partitionKey, sharePartition); + } + + // Visible for testing. + int size() { + return partitions.size(); + } + + // Visible for testing. + boolean containsKey(SharePartitionKey partitionKey) { + return partitions.containsKey(partitionKey); + } + + // Visible for testing. + boolean isEmpty() { + return partitions.isEmpty(); + } + + // Visible for testing. + synchronized Map> groups() { + return Map.copyOf(groups); + } +} diff --git a/core/src/main/java/kafka/server/share/SharePartitionManager.java b/core/src/main/java/kafka/server/share/SharePartitionManager.java index 88321c1b87e..e2e58dc3f69 100644 --- a/core/src/main/java/kafka/server/share/SharePartitionManager.java +++ b/core/src/main/java/kafka/server/share/SharePartitionManager.java @@ -36,6 +36,7 @@ import org.apache.kafka.common.utils.Time; import org.apache.kafka.coordinator.group.GroupConfigManager; import org.apache.kafka.server.common.ShareVersion; import org.apache.kafka.server.share.CachedSharePartition; +import org.apache.kafka.server.share.ShareGroupListener; import org.apache.kafka.server.share.SharePartitionKey; import org.apache.kafka.server.share.acknowledge.ShareAcknowledgementBatch; import org.apache.kafka.server.share.context.FinalContext; @@ -72,7 +73,6 @@ import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentHashMap; import java.util.function.BiConsumer; import java.util.function.Consumer; @@ -85,9 +85,9 @@ public class SharePartitionManager implements AutoCloseable { private static final Logger log = LoggerFactory.getLogger(SharePartitionManager.class); /** - * The partition cache map is used to store the SharePartition objects for each share group topic-partition. + * The partition cache is used to store the SharePartition objects for each share group topic-partition. */ - private final Map partitionCacheMap; + private final SharePartitionCache partitionCache; /** * The replica manager is used to fetch messages from the log. @@ -159,7 +159,7 @@ public class SharePartitionManager implements AutoCloseable { this(replicaManager, time, cache, - new ConcurrentHashMap<>(), + new SharePartitionCache(), defaultRecordLockDurationMs, maxDeliveryCount, maxInFlightMessages, @@ -174,7 +174,7 @@ public class SharePartitionManager implements AutoCloseable { ReplicaManager replicaManager, Time time, ShareSessionCache cache, - Map partitionCacheMap, + SharePartitionCache partitionCache, int defaultRecordLockDurationMs, int maxDeliveryCount, int maxInFlightMessages, @@ -186,7 +186,7 @@ public class SharePartitionManager implements AutoCloseable { this(replicaManager, time, cache, - partitionCacheMap, + partitionCache, defaultRecordLockDurationMs, new SystemTimerReaper("share-group-lock-timeout-reaper", new SystemTimer("share-group-lock-timeout")), @@ -204,7 +204,7 @@ public class SharePartitionManager implements AutoCloseable { ReplicaManager replicaManager, Time time, ShareSessionCache cache, - Map partitionCacheMap, + SharePartitionCache partitionCache, int defaultRecordLockDurationMs, Timer timer, int maxDeliveryCount, @@ -217,7 +217,7 @@ public class SharePartitionManager implements AutoCloseable { this.replicaManager = replicaManager; this.time = time; this.cache = cache; - this.partitionCacheMap = partitionCacheMap; + this.partitionCache = partitionCache; this.defaultRecordLockDurationMs = defaultRecordLockDurationMs; this.timer = timer; this.maxDeliveryCount = maxDeliveryCount; @@ -226,6 +226,7 @@ public class SharePartitionManager implements AutoCloseable { this.groupConfigManager = groupConfigManager; this.shareGroupMetrics = shareGroupMetrics; this.brokerTopicStats = brokerTopicStats; + this.cache.registerShareGroupListener(new ShareGroupListenerImpl()); } /** @@ -287,7 +288,7 @@ public class SharePartitionManager implements AutoCloseable { acknowledgeTopics.forEach((topicIdPartition, acknowledgePartitionBatches) -> { topics.add(topicIdPartition.topic()); SharePartitionKey sharePartitionKey = sharePartitionKey(groupId, topicIdPartition); - SharePartition sharePartition = partitionCacheMap.get(sharePartitionKey); + SharePartition sharePartition = partitionCache.get(sharePartitionKey); if (sharePartition != null) { CompletableFuture future = new CompletableFuture<>(); sharePartition.acknowledge(memberId, acknowledgePartitionBatches).whenComplete((result, throwable) -> { @@ -363,7 +364,7 @@ public class SharePartitionManager implements AutoCloseable { Map> futuresMap = new HashMap<>(); topicIdPartitions.forEach(topicIdPartition -> { SharePartitionKey sharePartitionKey = sharePartitionKey(groupId, topicIdPartition); - SharePartition sharePartition = partitionCacheMap.get(sharePartitionKey); + SharePartition sharePartition = partitionCache.get(sharePartitionKey); if (sharePartition == null) { log.error("No share partition found for groupId {} topicPartition {} while releasing acquired topic partitions", groupId, topicIdPartition); futuresMap.put(topicIdPartition, CompletableFuture.completedFuture(Errors.UNKNOWN_TOPIC_OR_PARTITION.exception())); @@ -652,14 +653,14 @@ public class SharePartitionManager implements AutoCloseable { } private SharePartition getOrCreateSharePartition(SharePartitionKey sharePartitionKey) { - return partitionCacheMap.computeIfAbsent(sharePartitionKey, + return partitionCache.computeIfAbsent(sharePartitionKey, k -> { int leaderEpoch = ShareFetchUtils.leaderEpoch(replicaManager, sharePartitionKey.topicIdPartition().topicPartition()); // Attach listener to Partition which shall invoke partition change handlers. // However, as there could be multiple share partitions (per group name) for a single topic-partition, // hence create separate listeners per share partition which holds the share partition key // to identify the respective share partition. - SharePartitionListener listener = new SharePartitionListener(sharePartitionKey, replicaManager, partitionCacheMap); + SharePartitionListener listener = new SharePartitionListener(sharePartitionKey, replicaManager, partitionCache); replicaManager.maybeAddListener(sharePartitionKey.topicIdPartition().topicPartition(), listener); return new SharePartition( sharePartitionKey.groupId(), @@ -691,7 +692,7 @@ public class SharePartitionManager implements AutoCloseable { } // Remove the partition from the cache as it's failed to initialize. - removeSharePartitionFromCache(sharePartitionKey, partitionCacheMap, replicaManager); + removeSharePartitionFromCache(sharePartitionKey, partitionCache, replicaManager); // The partition initialization failed, so add the partition to the erroneous partitions. log.debug("Error initializing share partition with key {}", sharePartitionKey, throwable); shareFetch.addErroneous(sharePartitionKey.topicIdPartition(), throwable); @@ -711,7 +712,7 @@ public class SharePartitionManager implements AutoCloseable { // The share partition is fenced hence remove the partition from map and let the client retry. // But surface the error to the client so client might take some action i.e. re-fetch // the metadata and retry the fetch on new leader. - removeSharePartitionFromCache(sharePartitionKey, partitionCacheMap, replicaManager); + removeSharePartitionFromCache(sharePartitionKey, partitionCache, replicaManager); } }; } @@ -722,10 +723,10 @@ public class SharePartitionManager implements AutoCloseable { private static void removeSharePartitionFromCache( SharePartitionKey sharePartitionKey, - Map map, + SharePartitionCache partitionCache, ReplicaManager replicaManager ) { - SharePartition sharePartition = map.remove(sharePartitionKey); + SharePartition sharePartition = partitionCache.remove(sharePartitionKey); if (sharePartition != null) { sharePartition.markFenced(); replicaManager.removeListener(sharePartitionKey.topicIdPartition().topicPartition(), sharePartition.listener()); @@ -758,21 +759,16 @@ public class SharePartitionManager implements AutoCloseable { synchronized (cache) { cache.removeAllSessions(); } - Set sharePartitionKeys = new HashSet<>(partitionCacheMap.keySet()); + Set sharePartitionKeys = partitionCache.cachedSharePartitionKeys(); // Remove all share partitions from partition cache. sharePartitionKeys.forEach(sharePartitionKey -> - removeSharePartitionFromCache(sharePartitionKey, partitionCacheMap, replicaManager) + removeSharePartitionFromCache(sharePartitionKey, partitionCache, replicaManager) ); } else { cache.updateSupportsShareGroups(true); } } - // Visible for testing. - protected int partitionCacheSize() { - return partitionCacheMap.size(); - } - /** * The SharePartitionListener is used to listen for partition events. The share partition is associated with * the topic-partition, we need to handle the partition events for the share partition. @@ -785,16 +781,16 @@ public class SharePartitionManager implements AutoCloseable { private final SharePartitionKey sharePartitionKey; private final ReplicaManager replicaManager; - private final Map partitionCacheMap; + private final SharePartitionCache partitionCache; SharePartitionListener( SharePartitionKey sharePartitionKey, ReplicaManager replicaManager, - Map partitionCacheMap + SharePartitionCache partitionCache ) { this.sharePartitionKey = sharePartitionKey; this.replicaManager = replicaManager; - this.partitionCacheMap = partitionCacheMap; + this.partitionCache = partitionCache; } @Override @@ -824,7 +820,34 @@ public class SharePartitionManager implements AutoCloseable { topicPartition, sharePartitionKey); return; } - removeSharePartitionFromCache(sharePartitionKey, partitionCacheMap, replicaManager); + removeSharePartitionFromCache(sharePartitionKey, partitionCache, replicaManager); + } + } + + /** + * The ShareGroupListenerImpl is used to listen for group events. The share group is associated + * with the group id, need to handle the group events for the share group. + */ + private class ShareGroupListenerImpl implements ShareGroupListener { + + @Override + public void onMemberLeave(String groupId, Uuid memberId) { + releaseSession(groupId, memberId.toString()); + } + + @Override + public void onGroupEmpty(String groupId) { + // Remove all share partitions from the cache. Instead of defining an API in SharePartitionCache + // for removing all share partitions for a group, share partitions are removed after fetching + // associated topic-partitions from the cache. This is done to mark the share partitions fenced + // and remove the listeners from the replica manager. + Set topicIdPartitions = partitionCache.topicIdPartitionsForGroup(groupId); + if (topicIdPartitions != null) { + // Remove all share partitions from partition cache. + topicIdPartitions.forEach(topicIdPartition -> + removeSharePartitionFromCache(new SharePartitionKey(groupId, topicIdPartition), partitionCache, replicaManager) + ); + } } } } diff --git a/core/src/test/java/kafka/server/share/SharePartitionCacheTest.java b/core/src/test/java/kafka/server/share/SharePartitionCacheTest.java new file mode 100644 index 00000000000..0d33d95f65b --- /dev/null +++ b/core/src/test/java/kafka/server/share/SharePartitionCacheTest.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.server.share; + +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.server.share.SharePartitionKey; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class SharePartitionCacheTest { + + private static final String GROUP_ID = "test-group"; + private static final Uuid TOPIC_ID = Uuid.randomUuid(); + private static final TopicIdPartition TOPIC_ID_PARTITION = new TopicIdPartition(TOPIC_ID, new TopicPartition("test-topic", 1)); + private static final SharePartitionKey SHARE_PARTITION_KEY = new SharePartitionKey(GROUP_ID, TOPIC_ID_PARTITION); + + private SharePartitionCache cache; + + @BeforeEach + public void setUp() { + cache = new SharePartitionCache(); + } + + @Test + public void testComputeIfAbsent() { + // Test computeIfAbsent when key doesn't exist + SharePartition sharePartition = Mockito.mock(SharePartition.class); + SharePartition newPartition = cache.computeIfAbsent(SHARE_PARTITION_KEY, key -> sharePartition); + + assertEquals(sharePartition, newPartition); + assertEquals(sharePartition, cache.get(SHARE_PARTITION_KEY)); + assertEquals(1, cache.groups().size()); + + // Test computeIfAbsent when key exists + SharePartition anotherPartition = Mockito.mock(SharePartition.class); + SharePartition existingPartition = cache.computeIfAbsent(SHARE_PARTITION_KEY, key -> anotherPartition); + assertEquals(sharePartition, existingPartition); + assertEquals(sharePartition, cache.get(SHARE_PARTITION_KEY)); + assertEquals(1, cache.groups().size()); + } + + @Test + public void testRemoveGroup() { + // Add partitions for multiple groups + String group1 = "group1"; + String group2 = "group2"; + TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("test-topic1", 1)); + TopicIdPartition tp2 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("test-topic2", 2)); + TopicIdPartition tp3 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("test-topic3", 3)); + // Group1 with 2 partitions. + SharePartitionKey key1 = new SharePartitionKey(group1, tp1); + SharePartitionKey key2 = new SharePartitionKey(group1, tp2); + // Group2 with 1 partition. + SharePartitionKey key3 = new SharePartitionKey(group2, tp3); + SharePartition sp1 = Mockito.mock(SharePartition.class); + SharePartition sp2 = Mockito.mock(SharePartition.class); + SharePartition sp3 = Mockito.mock(SharePartition.class); + + // Test computeIfAbsent adds to group map + cache.computeIfAbsent(key1, k -> sp1); + cache.computeIfAbsent(key2, k -> sp2); + cache.computeIfAbsent(key3, k -> sp3); + + // Verify partitions are in the cache. + assertEquals(3, cache.size()); + assertTrue(cache.containsKey(key1)); + assertTrue(cache.containsKey(key2)); + assertTrue(cache.containsKey(key3)); + // Verify groups are in the group map. + assertEquals(2, cache.groups().size()); + assertTrue(cache.groups().containsKey(group1)); + assertTrue(cache.groups().containsKey(group2)); + // Verify topic partitions are in the group map. + assertEquals(2, cache.groups().get(group1).size()); + assertEquals(1, cache.groups().get(group2).size()); + assertEquals(1, cache.groups().get(group1).stream().filter(tp -> tp.equals(tp1)).count()); + assertEquals(1, cache.groups().get(group1).stream().filter(tp -> tp.equals(tp2)).count()); + assertEquals(1, cache.groups().get(group2).stream().filter(tp -> tp.equals(tp3)).count()); + + // Remove one group and verify only its partitions are removed. + cache.topicIdPartitionsForGroup(group1).forEach( + topicIdPartition -> cache.remove(new SharePartitionKey(group1, topicIdPartition))); + assertEquals(1, cache.size()); + assertTrue(cache.containsKey(key3)); + assertEquals(1, cache.groups().size()); + assertTrue(cache.groups().containsKey(group2)); + } +} \ No newline at end of file diff --git a/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java b/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java index 34b8c0c2840..41348a54365 100644 --- a/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java +++ b/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java @@ -1005,11 +1005,11 @@ public class SharePartitionManagerTest { SharePartition sp3 = mock(SharePartition.class); // Mock the share partitions corresponding to the topic partitions. - Map partitionCacheMap = new HashMap<>(); - partitionCacheMap.put(new SharePartitionKey(groupId, tp0), sp0); - partitionCacheMap.put(new SharePartitionKey(groupId, tp1), sp1); - partitionCacheMap.put(new SharePartitionKey(groupId, tp2), sp2); - partitionCacheMap.put(new SharePartitionKey(groupId, tp3), sp3); + SharePartitionCache partitionCache = new SharePartitionCache(); + partitionCache.put(new SharePartitionKey(groupId, tp0), sp0); + partitionCache.put(new SharePartitionKey(groupId, tp1), sp1); + partitionCache.put(new SharePartitionKey(groupId, tp2), sp2); + partitionCache.put(new SharePartitionKey(groupId, tp3), sp3); // Mock the share partitions to get initialized instantaneously without any error. when(sp0.maybeInitialize()).thenReturn(CompletableFuture.completedFuture(null)); when(sp1.maybeInitialize()).thenReturn(CompletableFuture.completedFuture(null)); @@ -1048,7 +1048,7 @@ public class SharePartitionManagerTest { .withReplicaManager(mockReplicaManager) .withTimer(mockTimer) .withBrokerTopicStats(brokerTopicStats) - .withPartitionCacheMap(partitionCacheMap) + .withPartitionCacheMap(partitionCache) .build(); doAnswer(invocation -> { @@ -1123,8 +1123,8 @@ public class SharePartitionManagerTest { when(sp0.maybeAcquireFetchLock(any())).thenReturn(true); when(sp0.canAcquireRecords()).thenReturn(false); when(sp0.maybeInitialize()).thenReturn(CompletableFuture.completedFuture(null)); - Map partitionCacheMap = new HashMap<>(); - partitionCacheMap.put(new SharePartitionKey(groupId, tp0), sp0); + SharePartitionCache partitionCache = new SharePartitionCache(); + partitionCache.put(new SharePartitionKey(groupId, tp0), sp0); Timer mockTimer = systemTimerReaper(); DelayedOperationPurgatory delayedShareFetchPurgatory = new DelayedOperationPurgatory<>( @@ -1133,7 +1133,7 @@ public class SharePartitionManagerTest { mockReplicaManagerDelayedShareFetch(mockReplicaManager, delayedShareFetchPurgatory); sharePartitionManager = SharePartitionManagerBuilder.builder() - .withPartitionCacheMap(partitionCacheMap) + .withPartitionCacheMap(partitionCache) .withReplicaManager(mockReplicaManager) .withTimer(mockTimer) .withBrokerTopicStats(brokerTopicStats) @@ -1237,13 +1237,13 @@ public class SharePartitionManagerTest { partitionMap.add(new CachedSharePartition(tp3)); when(shareSession.partitionMap()).thenReturn(partitionMap); - Map partitionCacheMap = new HashMap<>(); - partitionCacheMap.put(new SharePartitionKey(groupId, tp1), sp1); - partitionCacheMap.put(new SharePartitionKey(groupId, tp2), sp2); + SharePartitionCache partitionCache = new SharePartitionCache(); + partitionCache.put(new SharePartitionKey(groupId, tp1), sp1); + partitionCache.put(new SharePartitionKey(groupId, tp2), sp2); sharePartitionManager = SharePartitionManagerBuilder.builder() .withCache(cache) - .withPartitionCacheMap(partitionCacheMap) + .withPartitionCacheMap(partitionCache) .withBrokerTopicStats(brokerTopicStats) .build(); @@ -1259,7 +1259,7 @@ public class SharePartitionManagerTest { assertEquals(2, result.get(tp2).partitionIndex()); assertEquals(Errors.INVALID_RECORD_STATE.code(), result.get(tp2).errorCode()); assertEquals("Unable to release acquired records for the batch", result.get(tp2).errorMessage()); - // tp3 was not a part of partitionCacheMap. + // tp3 was not a part of partitionCache. assertEquals(4, result.get(tp3).partitionIndex()); assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION.code(), result.get(tp3).errorCode()); assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION.message(), result.get(tp3).errorMessage()); @@ -1380,11 +1380,11 @@ public class SharePartitionManagerTest { when(sp.acknowledge(ArgumentMatchers.eq(memberId), any())).thenReturn(CompletableFuture.completedFuture(null)); - Map partitionCacheMap = new HashMap<>(); - partitionCacheMap.put(new SharePartitionKey(groupId, tp), sp); + SharePartitionCache partitionCache = new SharePartitionCache(); + partitionCache.put(new SharePartitionKey(groupId, tp), sp); sharePartitionManager = SharePartitionManagerBuilder.builder() - .withPartitionCacheMap(partitionCacheMap) + .withPartitionCacheMap(partitionCache) .withBrokerTopicStats(brokerTopicStats) .build(); @@ -1425,14 +1425,14 @@ public class SharePartitionManagerTest { when(sp2.acknowledge(ArgumentMatchers.eq(memberId), any())).thenReturn(CompletableFuture.completedFuture(null)); when(sp3.acknowledge(ArgumentMatchers.eq(memberId), any())).thenReturn(CompletableFuture.completedFuture(null)); - Map partitionCacheMap = new HashMap<>(); - partitionCacheMap.put(new SharePartitionKey(groupId, tp1), sp1); - partitionCacheMap.put(new SharePartitionKey(groupId, tp2), sp2); - partitionCacheMap.put(new SharePartitionKey(groupId, tp3), sp3); + SharePartitionCache partitionCache = new SharePartitionCache(); + partitionCache.put(new SharePartitionKey(groupId, tp1), sp1); + partitionCache.put(new SharePartitionKey(groupId, tp2), sp2); + partitionCache.put(new SharePartitionKey(groupId, tp3), sp3); ShareGroupMetrics shareGroupMetrics = new ShareGroupMetrics(time); sharePartitionManager = SharePartitionManagerBuilder.builder() - .withPartitionCacheMap(partitionCacheMap) + .withPartitionCacheMap(partitionCache) .withShareGroupMetrics(shareGroupMetrics) .withBrokerTopicStats(brokerTopicStats) .build(); @@ -1498,13 +1498,13 @@ public class SharePartitionManagerTest { when(sp1.acknowledge(memberId, ack1)).thenReturn(CompletableFuture.completedFuture(null)); when(sp2.acknowledge(memberId, ack2)).thenReturn(CompletableFuture.completedFuture(null)); - Map partitionCacheMap = new HashMap<>(); - partitionCacheMap.put(new SharePartitionKey(groupId, tp1), sp1); - partitionCacheMap.put(new SharePartitionKey(groupId, tp2), sp2); + SharePartitionCache partitionCache = new SharePartitionCache(); + partitionCache.put(new SharePartitionKey(groupId, tp1), sp1); + partitionCache.put(new SharePartitionKey(groupId, tp2), sp2); ShareGroupMetrics shareGroupMetrics = new ShareGroupMetrics(time); sharePartitionManager = SharePartitionManagerBuilder.builder() - .withPartitionCacheMap(partitionCacheMap) + .withPartitionCacheMap(partitionCache) .withShareGroupMetrics(shareGroupMetrics) .withBrokerTopicStats(brokerTopicStats) .build(); @@ -1540,11 +1540,11 @@ public class SharePartitionManagerTest { TopicIdPartition tp = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0)); SharePartition sp = mock(SharePartition.class); - Map partitionCacheMap = new HashMap<>(); - partitionCacheMap.put(new SharePartitionKey(groupId, tp), sp); + SharePartitionCache partitionCache = new SharePartitionCache(); + partitionCache.put(new SharePartitionKey(groupId, tp), sp); ShareGroupMetrics shareGroupMetrics = new ShareGroupMetrics(time); sharePartitionManager = SharePartitionManagerBuilder.builder() - .withPartitionCacheMap(partitionCacheMap) + .withPartitionCacheMap(partitionCache) .withBrokerTopicStats(brokerTopicStats) .withShareGroupMetrics(shareGroupMetrics) .build(); @@ -1584,10 +1584,10 @@ public class SharePartitionManagerTest { when(sp.acknowledge(ArgumentMatchers.eq(memberId), any())).thenReturn(FutureUtils.failedFuture( new InvalidRequestException("Member is not the owner of batch record") )); - Map partitionCacheMap = new HashMap<>(); - partitionCacheMap.put(new SharePartitionKey(groupId, tp), sp); + SharePartitionCache partitionCache = new SharePartitionCache(); + partitionCache.put(new SharePartitionKey(groupId, tp), sp); sharePartitionManager = SharePartitionManagerBuilder.builder() - .withPartitionCacheMap(partitionCacheMap) + .withPartitionCacheMap(partitionCache) .withBrokerTopicStats(brokerTopicStats) .build(); @@ -1666,9 +1666,9 @@ public class SharePartitionManagerTest { return CompletableFuture.completedFuture(Optional.empty()); }).when(sp2).acknowledge(ArgumentMatchers.eq(memberId), any()); - Map partitionCacheMap = new HashMap<>(); - partitionCacheMap.put(new SharePartitionKey(groupId, tp1), sp1); - partitionCacheMap.put(new SharePartitionKey(groupId, tp2), sp2); + SharePartitionCache partitionCache = new SharePartitionCache(); + partitionCache.put(new SharePartitionKey(groupId, tp1), sp1); + partitionCache.put(new SharePartitionKey(groupId, tp2), sp2); ShareFetch shareFetch = new ShareFetch( FETCH_PARAMS, @@ -1700,7 +1700,7 @@ public class SharePartitionManagerTest { topicIdPartitions.forEach(topicIdPartition -> delayedShareFetchWatchKeys.add(new DelayedShareFetchGroupKey(groupId, topicIdPartition.topicId(), topicIdPartition.partition()))); sharePartitionManager = SharePartitionManagerBuilder.builder() - .withPartitionCacheMap(partitionCacheMap) + .withPartitionCacheMap(partitionCache) .withReplicaManager(mockReplicaManager) .withTimer(mockTimer) .withBrokerTopicStats(brokerTopicStats) @@ -1776,10 +1776,10 @@ public class SharePartitionManagerTest { return CompletableFuture.completedFuture(Optional.empty()); }).when(sp3).acknowledge(ArgumentMatchers.eq(memberId), any()); - Map partitionCacheMap = new HashMap<>(); - partitionCacheMap.put(new SharePartitionKey(groupId, tp1), sp1); - partitionCacheMap.put(new SharePartitionKey(groupId, tp2), sp2); - partitionCacheMap.put(new SharePartitionKey(groupId, tp3), sp3); + SharePartitionCache partitionCache = new SharePartitionCache(); + partitionCache.put(new SharePartitionKey(groupId, tp1), sp1); + partitionCache.put(new SharePartitionKey(groupId, tp2), sp2); + partitionCache.put(new SharePartitionKey(groupId, tp3), sp3); ShareFetch shareFetch = new ShareFetch( FETCH_PARAMS, @@ -1809,7 +1809,7 @@ public class SharePartitionManagerTest { topicIdPartitions.forEach(topicIdPartition -> delayedShareFetchWatchKeys.add(new DelayedShareFetchGroupKey(groupId, topicIdPartition.topicId(), topicIdPartition.partition()))); sharePartitionManager = SharePartitionManagerBuilder.builder() - .withPartitionCacheMap(partitionCacheMap) + .withPartitionCacheMap(partitionCache) .withReplicaManager(mockReplicaManager) .withTimer(mockTimer) .withBrokerTopicStats(brokerTopicStats) @@ -1881,9 +1881,9 @@ public class SharePartitionManagerTest { return CompletableFuture.completedFuture(Optional.empty()); }).when(sp2).releaseAcquiredRecords(ArgumentMatchers.eq(memberId)); - Map partitionCacheMap = new HashMap<>(); - partitionCacheMap.put(new SharePartitionKey(groupId, tp1), sp1); - partitionCacheMap.put(new SharePartitionKey(groupId, tp2), sp2); + SharePartitionCache partitionCache = new SharePartitionCache(); + partitionCache.put(new SharePartitionKey(groupId, tp1), sp1); + partitionCache.put(new SharePartitionKey(groupId, tp2), sp2); ShareFetch shareFetch = new ShareFetch( FETCH_PARAMS, @@ -1913,7 +1913,7 @@ public class SharePartitionManagerTest { topicIdPartitions.forEach(topicIdPartition -> delayedShareFetchWatchKeys.add(new DelayedShareFetchGroupKey(groupId, topicIdPartition.topicId(), topicIdPartition.partition()))); sharePartitionManager = spy(SharePartitionManagerBuilder.builder() - .withPartitionCacheMap(partitionCacheMap) + .withPartitionCacheMap(partitionCache) .withCache(cache) .withReplicaManager(mockReplicaManager) .withTimer(mockTimer) @@ -1986,10 +1986,10 @@ public class SharePartitionManagerTest { return CompletableFuture.completedFuture(Optional.empty()); }).when(sp3).releaseAcquiredRecords(ArgumentMatchers.eq(memberId)); - Map partitionCacheMap = new HashMap<>(); - partitionCacheMap.put(new SharePartitionKey(groupId, tp1), sp1); - partitionCacheMap.put(new SharePartitionKey(groupId, tp2), sp2); - partitionCacheMap.put(new SharePartitionKey(groupId, tp3), sp3); + SharePartitionCache partitionCache = new SharePartitionCache(); + partitionCache.put(new SharePartitionKey(groupId, tp1), sp1); + partitionCache.put(new SharePartitionKey(groupId, tp2), sp2); + partitionCache.put(new SharePartitionKey(groupId, tp3), sp3); ShareFetch shareFetch = new ShareFetch( FETCH_PARAMS, @@ -2019,7 +2019,7 @@ public class SharePartitionManagerTest { topicIdPartitions.forEach(topicIdPartition -> delayedShareFetchWatchKeys.add(new DelayedShareFetchGroupKey(groupId, topicIdPartition.topicId(), topicIdPartition.partition()))); sharePartitionManager = spy(SharePartitionManagerBuilder.builder() - .withPartitionCacheMap(partitionCacheMap) + .withPartitionCacheMap(partitionCache) .withCache(cache) .withReplicaManager(mockReplicaManager) .withTimer(mockTimer) @@ -2067,8 +2067,8 @@ public class SharePartitionManagerTest { List topicIdPartitions = List.of(tp0); SharePartition sp0 = mock(SharePartition.class); - Map partitionCacheMap = new HashMap<>(); - partitionCacheMap.put(new SharePartitionKey(groupId, tp0), sp0); + SharePartitionCache partitionCache = new SharePartitionCache(); + partitionCache.put(new SharePartitionKey(groupId, tp0), sp0); // Keep the initialization future pending, so fetch request is stuck. CompletableFuture pendingInitializationFuture = new CompletableFuture<>(); @@ -2085,7 +2085,7 @@ public class SharePartitionManagerTest { when(time.hiResClockMs()).thenReturn(100L); ShareGroupMetrics shareGroupMetrics = new ShareGroupMetrics(time); sharePartitionManager = SharePartitionManagerBuilder.builder() - .withPartitionCacheMap(partitionCacheMap) + .withPartitionCacheMap(partitionCache) .withReplicaManager(mockReplicaManager) .withTime(time) .withShareGroupMetrics(shareGroupMetrics) @@ -2132,9 +2132,9 @@ public class SharePartitionManagerTest { SharePartition sp0 = mock(SharePartition.class); SharePartition sp1 = mock(SharePartition.class); - Map partitionCacheMap = new HashMap<>(); - partitionCacheMap.put(new SharePartitionKey(groupId, tp0), sp0); - partitionCacheMap.put(new SharePartitionKey(groupId, tp1), sp1); + SharePartitionCache partitionCache = new SharePartitionCache(); + partitionCache.put(new SharePartitionKey(groupId, tp0), sp0); + partitionCache.put(new SharePartitionKey(groupId, tp1), sp1); // Keep the initialization future pending, so fetch request is stuck. CompletableFuture pendingInitializationFuture1 = new CompletableFuture<>(); @@ -2155,7 +2155,7 @@ public class SharePartitionManagerTest { when(time.hiResClockMs()).thenReturn(100L); ShareGroupMetrics shareGroupMetrics = new ShareGroupMetrics(time); sharePartitionManager = SharePartitionManagerBuilder.builder() - .withPartitionCacheMap(partitionCacheMap) + .withPartitionCacheMap(partitionCache) .withReplicaManager(mockReplicaManager) .withTime(time) .withShareGroupMetrics(shareGroupMetrics) @@ -2200,8 +2200,8 @@ public class SharePartitionManagerTest { List topicIdPartitions = List.of(tp0); SharePartition sp0 = mock(SharePartition.class); - Map partitionCacheMap = new HashMap<>(); - partitionCacheMap.put(new SharePartitionKey(groupId, tp0), sp0); + SharePartitionCache partitionCache = new SharePartitionCache(); + partitionCache.put(new SharePartitionKey(groupId, tp0), sp0); // Keep the 2 initialization futures pending and 1 completed with leader not available exception. CompletableFuture pendingInitializationFuture1 = new CompletableFuture<>(); @@ -2218,7 +2218,7 @@ public class SharePartitionManagerTest { mockReplicaManagerDelayedShareFetch(mockReplicaManager, shareFetchPurgatorySpy); sharePartitionManager = SharePartitionManagerBuilder.builder() - .withPartitionCacheMap(partitionCacheMap) + .withPartitionCacheMap(partitionCache) .withReplicaManager(mockReplicaManager) .withTimer(mockTimer) .withBrokerTopicStats(brokerTopicStats) @@ -2274,8 +2274,8 @@ public class SharePartitionManagerTest { List topicIdPartitions = List.of(tp0); SharePartition sp0 = mock(SharePartition.class); - Map partitionCacheMap = new HashMap<>(); - partitionCacheMap.put(new SharePartitionKey(groupId, tp0), sp0); + SharePartitionCache partitionCache = new SharePartitionCache(); + partitionCache.put(new SharePartitionKey(groupId, tp0), sp0); Timer mockTimer = systemTimerReaper(); DelayedOperationPurgatory delayedShareFetchPurgatory = new DelayedOperationPurgatory<>( @@ -2284,7 +2284,7 @@ public class SharePartitionManagerTest { mockReplicaManagerDelayedShareFetch(mockReplicaManager, delayedShareFetchPurgatory); sharePartitionManager = SharePartitionManagerBuilder.builder() - .withPartitionCacheMap(partitionCacheMap) + .withPartitionCacheMap(partitionCache) .withReplicaManager(mockReplicaManager) .withTimer(mockTimer) .withBrokerTopicStats(brokerTopicStats) @@ -2305,7 +2305,7 @@ public class SharePartitionManagerTest { assertTrue(future.join().isEmpty()); Mockito.verify(sp0, times(0)).markFenced(); // Verify that the share partition is still in the cache on LeaderNotAvailableException. - assertEquals(1, partitionCacheMap.size()); + assertEquals(1, partitionCache.size()); // Return IllegalStateException to simulate initialization failure. when(sp0.maybeInitialize()).thenReturn(FutureUtils.failedFuture(new IllegalStateException("Illegal state"))); @@ -2317,10 +2317,10 @@ public class SharePartitionManagerTest { () -> "Processing in delayed share fetch queue never ended."); validateShareFetchFutureException(future, tp0, Errors.UNKNOWN_SERVER_ERROR, "Illegal state"); Mockito.verify(sp0, times(1)).markFenced(); - assertTrue(partitionCacheMap.isEmpty()); + assertTrue(partitionCache.isEmpty()); // The last exception removes the share partition from the cache hence re-add the share partition to cache. - partitionCacheMap.put(new SharePartitionKey(groupId, tp0), sp0); + partitionCache.put(new SharePartitionKey(groupId, tp0), sp0); // Return CoordinatorNotAvailableException to simulate initialization failure. when(sp0.maybeInitialize()).thenReturn(FutureUtils.failedFuture(new CoordinatorNotAvailableException("Coordinator not available"))); future = sharePartitionManager.fetchMessages(groupId, memberId.toString(), FETCH_PARAMS, 0, @@ -2331,10 +2331,10 @@ public class SharePartitionManagerTest { () -> "Processing in delayed share fetch queue never ended."); validateShareFetchFutureException(future, tp0, Errors.COORDINATOR_NOT_AVAILABLE, "Coordinator not available"); Mockito.verify(sp0, times(2)).markFenced(); - assertTrue(partitionCacheMap.isEmpty()); + assertTrue(partitionCache.isEmpty()); // The last exception removes the share partition from the cache hence re-add the share partition to cache. - partitionCacheMap.put(new SharePartitionKey(groupId, tp0), sp0); + partitionCache.put(new SharePartitionKey(groupId, tp0), sp0); // Return InvalidRequestException to simulate initialization failure. when(sp0.maybeInitialize()).thenReturn(FutureUtils.failedFuture(new InvalidRequestException("Invalid request"))); future = sharePartitionManager.fetchMessages(groupId, memberId.toString(), FETCH_PARAMS, 0, @@ -2345,10 +2345,10 @@ public class SharePartitionManagerTest { () -> "Processing in delayed share fetch queue never ended."); validateShareFetchFutureException(future, tp0, Errors.INVALID_REQUEST, "Invalid request"); Mockito.verify(sp0, times(3)).markFenced(); - assertTrue(partitionCacheMap.isEmpty()); + assertTrue(partitionCache.isEmpty()); // The last exception removes the share partition from the cache hence re-add the share partition to cache. - partitionCacheMap.put(new SharePartitionKey(groupId, tp0), sp0); + partitionCache.put(new SharePartitionKey(groupId, tp0), sp0); // Return FencedStateEpochException to simulate initialization failure. when(sp0.maybeInitialize()).thenReturn(FutureUtils.failedFuture(new FencedStateEpochException("Fenced state epoch"))); future = sharePartitionManager.fetchMessages(groupId, memberId.toString(), FETCH_PARAMS, 0, @@ -2359,10 +2359,10 @@ public class SharePartitionManagerTest { () -> "Processing in delayed share fetch queue never ended."); validateShareFetchFutureException(future, tp0, Errors.FENCED_STATE_EPOCH, "Fenced state epoch"); Mockito.verify(sp0, times(4)).markFenced(); - assertTrue(partitionCacheMap.isEmpty()); + assertTrue(partitionCache.isEmpty()); // The last exception removes the share partition from the cache hence re-add the share partition to cache. - partitionCacheMap.put(new SharePartitionKey(groupId, tp0), sp0); + partitionCache.put(new SharePartitionKey(groupId, tp0), sp0); // Return NotLeaderOrFollowerException to simulate initialization failure. when(sp0.maybeInitialize()).thenReturn(FutureUtils.failedFuture(new NotLeaderOrFollowerException("Not leader or follower"))); future = sharePartitionManager.fetchMessages(groupId, memberId.toString(), FETCH_PARAMS, 0, @@ -2373,10 +2373,10 @@ public class SharePartitionManagerTest { () -> "Processing in delayed share fetch queue never ended."); validateShareFetchFutureException(future, tp0, Errors.NOT_LEADER_OR_FOLLOWER, "Not leader or follower"); Mockito.verify(sp0, times(5)).markFenced(); - assertTrue(partitionCacheMap.isEmpty()); + assertTrue(partitionCache.isEmpty()); // The last exception removes the share partition from the cache hence re-add the share partition to cache. - partitionCacheMap.put(new SharePartitionKey(groupId, tp0), sp0); + partitionCache.put(new SharePartitionKey(groupId, tp0), sp0); // Return RuntimeException to simulate initialization failure. when(sp0.maybeInitialize()).thenReturn(FutureUtils.failedFuture(new RuntimeException("Runtime exception"))); future = sharePartitionManager.fetchMessages(groupId, memberId.toString(), FETCH_PARAMS, 0, @@ -2387,7 +2387,7 @@ public class SharePartitionManagerTest { () -> "Processing in delayed share fetch queue never ended."); validateShareFetchFutureException(future, tp0, Errors.UNKNOWN_SERVER_ERROR, "Runtime exception"); Mockito.verify(sp0, times(6)).markFenced(); - assertTrue(partitionCacheMap.isEmpty()); + assertTrue(partitionCache.isEmpty()); // Should have 7 fetch recorded and 6 failures as 1 fetch was waiting on initialization and // didn't error out. validateBrokerTopicStatsMetrics( @@ -2404,13 +2404,13 @@ public class SharePartitionManagerTest { TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0)); List topicIdPartitions = List.of(tp0); - Map partitionCacheMap = (Map) mock(Map.class); + SharePartitionCache partitionCache = mock(SharePartitionCache.class); // Throw the exception for first fetch request. Return share partition for next. - when(partitionCacheMap.computeIfAbsent(any(), any())) + when(partitionCache.computeIfAbsent(any(), any())) .thenThrow(new RuntimeException("Error creating instance")); sharePartitionManager = SharePartitionManagerBuilder.builder() - .withPartitionCacheMap(partitionCacheMap) + .withPartitionCacheMap(partitionCache) .withBrokerTopicStats(brokerTopicStats) .build(); @@ -2437,7 +2437,7 @@ public class SharePartitionManagerTest { List topicIdPartitions = List.of(tp0); // Send map to check no share partition is created. - Map partitionCacheMap = new HashMap<>(); + SharePartitionCache partitionCache = new SharePartitionCache(); // Validate when partition is not the leader. Partition partition = mock(Partition.class); when(partition.isLeader()).thenReturn(false); @@ -2450,7 +2450,7 @@ public class SharePartitionManagerTest { .thenReturn(partition); sharePartitionManager = SharePartitionManagerBuilder.builder() .withReplicaManager(replicaManager) - .withPartitionCacheMap(partitionCacheMap) + .withPartitionCacheMap(partitionCache) .withBrokerTopicStats(brokerTopicStats) .build(); @@ -2463,7 +2463,7 @@ public class SharePartitionManagerTest { DELAYED_SHARE_FETCH_TIMEOUT_MS, () -> "Processing for delayed share fetch request not finished."); validateShareFetchFutureException(future, tp0, Errors.KAFKA_STORAGE_ERROR, "Exception"); - assertTrue(partitionCacheMap.isEmpty()); + assertTrue(partitionCache.isEmpty()); // Validate when partition is not leader. future = sharePartitionManager.fetchMessages(groupId, Uuid.randomUuid().toString(), FETCH_PARAMS, 0, @@ -2473,7 +2473,7 @@ public class SharePartitionManagerTest { DELAYED_SHARE_FETCH_TIMEOUT_MS, () -> "Processing for delayed share fetch request not finished."); validateShareFetchFutureException(future, tp0, Errors.NOT_LEADER_OR_FOLLOWER); - assertTrue(partitionCacheMap.isEmpty()); + assertTrue(partitionCache.isEmpty()); // Should have 2 fetch recorded and 2 failure. validateBrokerTopicStatsMetrics( brokerTopicStats, @@ -2503,8 +2503,8 @@ public class SharePartitionManagerTest { // Mock share partition for tp1, so it can succeed. SharePartition sp1 = mock(SharePartition.class); - Map partitionCacheMap = new HashMap<>(); - partitionCacheMap.put(new SharePartitionKey(groupId, tp1), sp1); + SharePartitionCache partitionCache = new SharePartitionCache(); + partitionCache.put(new SharePartitionKey(groupId, tp1), sp1); when(sp1.maybeAcquireFetchLock(any())).thenReturn(true); when(sp1.canAcquireRecords()).thenReturn(true); @@ -2513,7 +2513,7 @@ public class SharePartitionManagerTest { // Fail initialization for tp2. SharePartition sp2 = mock(SharePartition.class); - partitionCacheMap.put(new SharePartitionKey(groupId, tp2), sp2); + partitionCache.put(new SharePartitionKey(groupId, tp2), sp2); when(sp2.maybeInitialize()).thenReturn(CompletableFuture.failedFuture(new FencedStateEpochException("Fenced state epoch"))); Timer mockTimer = systemTimerReaper(); @@ -2528,7 +2528,7 @@ public class SharePartitionManagerTest { sharePartitionManager = SharePartitionManagerBuilder.builder() .withReplicaManager(replicaManager) - .withPartitionCacheMap(partitionCacheMap) + .withPartitionCacheMap(partitionCache) .withBrokerTopicStats(brokerTopicStats) .withTimer(mockTimer) .build(); @@ -2573,8 +2573,8 @@ public class SharePartitionManagerTest { when(sp0.maybeAcquireFetchLock(any())).thenReturn(true); when(sp0.canAcquireRecords()).thenReturn(true); when(sp0.maybeInitialize()).thenReturn(CompletableFuture.completedFuture(null)); - Map partitionCacheMap = new HashMap<>(); - partitionCacheMap.put(new SharePartitionKey(groupId, tp0), sp0); + SharePartitionCache partitionCache = new SharePartitionCache(); + partitionCache.put(new SharePartitionKey(groupId, tp0), sp0); Timer mockTimer = systemTimerReaper(); DelayedOperationPurgatory delayedShareFetchPurgatory = new DelayedOperationPurgatory<>( @@ -2585,7 +2585,7 @@ public class SharePartitionManagerTest { doThrow(new RuntimeException("Exception")).when(mockReplicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean()); sharePartitionManager = SharePartitionManagerBuilder.builder() - .withPartitionCacheMap(partitionCacheMap) + .withPartitionCacheMap(partitionCache) .withReplicaManager(mockReplicaManager) .withTimer(mockTimer) .withBrokerTopicStats(brokerTopicStats) @@ -2596,7 +2596,7 @@ public class SharePartitionManagerTest { MAX_FETCH_RECORDS, BATCH_SIZE, topicIdPartitions); validateShareFetchFutureException(future, tp0, Errors.UNKNOWN_SERVER_ERROR, "Exception"); // Verify that the share partition is still in the cache on exception. - assertEquals(1, partitionCacheMap.size()); + assertEquals(1, partitionCache.size()); // Throw NotLeaderOrFollowerException from replica manager fetch which should evict instance from the cache. doThrow(new NotLeaderOrFollowerException("Leader exception")).when(mockReplicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean()); @@ -2604,7 +2604,7 @@ public class SharePartitionManagerTest { future = sharePartitionManager.fetchMessages(groupId, memberId.toString(), FETCH_PARAMS, 0, MAX_FETCH_RECORDS, BATCH_SIZE, topicIdPartitions); validateShareFetchFutureException(future, tp0, Errors.NOT_LEADER_OR_FOLLOWER, "Leader exception"); - assertTrue(partitionCacheMap.isEmpty()); + assertTrue(partitionCache.isEmpty()); // Should have 2 fetch recorded and 2 failures. validateBrokerTopicStatsMetrics( brokerTopicStats, @@ -2633,9 +2633,9 @@ public class SharePartitionManagerTest { when(sp1.maybeAcquireFetchLock(any())).thenReturn(false); when(sp1.maybeInitialize()).thenReturn(CompletableFuture.completedFuture(null)); - Map partitionCacheMap = new HashMap<>(); - partitionCacheMap.put(new SharePartitionKey(groupId, tp0), sp0); - partitionCacheMap.put(new SharePartitionKey(groupId, tp1), sp1); + SharePartitionCache partitionCache = new SharePartitionCache(); + partitionCache.put(new SharePartitionKey(groupId, tp0), sp0); + partitionCache.put(new SharePartitionKey(groupId, tp1), sp1); Timer mockTimer = systemTimerReaper(); DelayedOperationPurgatory delayedShareFetchPurgatory = new DelayedOperationPurgatory<>( @@ -2647,7 +2647,7 @@ public class SharePartitionManagerTest { doThrow(new FencedStateEpochException("Fenced exception")).when(mockReplicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean()); sharePartitionManager = SharePartitionManagerBuilder.builder() - .withPartitionCacheMap(partitionCacheMap) + .withPartitionCacheMap(partitionCache) .withReplicaManager(mockReplicaManager) .withTimer(mockTimer) .withBrokerTopicStats(brokerTopicStats) @@ -2658,21 +2658,21 @@ public class SharePartitionManagerTest { MAX_FETCH_RECORDS, BATCH_SIZE, topicIdPartitions); validateShareFetchFutureException(future, tp0, Errors.FENCED_STATE_EPOCH, "Fenced exception"); // Verify that tp1 is still in the cache on exception. - assertEquals(1, partitionCacheMap.size()); - assertEquals(sp1, partitionCacheMap.get(new SharePartitionKey(groupId, tp1))); + assertEquals(1, partitionCache.size()); + assertEquals(sp1, partitionCache.get(new SharePartitionKey(groupId, tp1))); // Make sp1 acquirable and add sp0 back in partition cache. Both share partitions should be // removed from the cache. when(sp1.maybeAcquireFetchLock(any())).thenReturn(true); when(sp1.canAcquireRecords()).thenReturn(true); - partitionCacheMap.put(new SharePartitionKey(groupId, tp0), sp0); + partitionCache.put(new SharePartitionKey(groupId, tp0), sp0); // Throw FencedStateEpochException from replica manager fetch which should evict instance from the cache. doThrow(new FencedStateEpochException("Fenced exception again")).when(mockReplicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean()); future = sharePartitionManager.fetchMessages(groupId, memberId.toString(), FETCH_PARAMS, 0, MAX_FETCH_RECORDS, BATCH_SIZE, topicIdPartitions); validateShareFetchFutureException(future, List.of(tp0, tp1), Errors.FENCED_STATE_EPOCH, "Fenced exception again"); - assertTrue(partitionCacheMap.isEmpty()); + assertTrue(partitionCache.isEmpty()); // Should have 4 fetch recorded (2 fetch and 2 topics) and 3 failures as sp1 was not acquired // in first fetch and shall have empty response. Similarly, tp0 should record 2 failures and // tp1 should record 1 failure. @@ -2719,33 +2719,33 @@ public class SharePartitionManagerTest { public void testSharePartitionListenerOnFailed() { SharePartitionKey sharePartitionKey = new SharePartitionKey("grp", new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0))); - Map partitionCacheMap = new HashMap<>(); + SharePartitionCache partitionCache = new SharePartitionCache(); ReplicaManager mockReplicaManager = mock(ReplicaManager.class); - SharePartitionListener partitionListener = new SharePartitionListener(sharePartitionKey, mockReplicaManager, partitionCacheMap); - testSharePartitionListener(sharePartitionKey, partitionCacheMap, mockReplicaManager, partitionListener::onFailed); + SharePartitionListener partitionListener = new SharePartitionListener(sharePartitionKey, mockReplicaManager, partitionCache); + testSharePartitionListener(sharePartitionKey, partitionCache, mockReplicaManager, partitionListener::onFailed); } @Test public void testSharePartitionListenerOnDeleted() { SharePartitionKey sharePartitionKey = new SharePartitionKey("grp", new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0))); - Map partitionCacheMap = new HashMap<>(); + SharePartitionCache partitionCache = new SharePartitionCache(); ReplicaManager mockReplicaManager = mock(ReplicaManager.class); - SharePartitionListener partitionListener = new SharePartitionListener(sharePartitionKey, mockReplicaManager, partitionCacheMap); - testSharePartitionListener(sharePartitionKey, partitionCacheMap, mockReplicaManager, partitionListener::onDeleted); + SharePartitionListener partitionListener = new SharePartitionListener(sharePartitionKey, mockReplicaManager, partitionCache); + testSharePartitionListener(sharePartitionKey, partitionCache, mockReplicaManager, partitionListener::onDeleted); } @Test public void testSharePartitionListenerOnBecomingFollower() { SharePartitionKey sharePartitionKey = new SharePartitionKey("grp", new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0))); - Map partitionCacheMap = new HashMap<>(); + SharePartitionCache partitionCache = new SharePartitionCache(); ReplicaManager mockReplicaManager = mock(ReplicaManager.class); - SharePartitionListener partitionListener = new SharePartitionListener(sharePartitionKey, mockReplicaManager, partitionCacheMap); - testSharePartitionListener(sharePartitionKey, partitionCacheMap, mockReplicaManager, partitionListener::onBecomingFollower); + SharePartitionListener partitionListener = new SharePartitionListener(sharePartitionKey, mockReplicaManager, partitionCache); + testSharePartitionListener(sharePartitionKey, partitionCache, mockReplicaManager, partitionListener::onBecomingFollower); } @Test @@ -2832,26 +2832,26 @@ public class SharePartitionManagerTest { SharePartition sp3 = mock(SharePartition.class); // Mock the share partitions corresponding to the topic partitions. - Map partitionCacheMap = new HashMap<>(); - partitionCacheMap.put( + SharePartitionCache partitionCache = new SharePartitionCache(); + partitionCache.put( new SharePartitionKey(groupId, new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo1", 0))), sp0 ); - partitionCacheMap.put( + partitionCache.put( new SharePartitionKey(groupId, new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo2", 0))), sp1 ); - partitionCacheMap.put( + partitionCache.put( new SharePartitionKey(groupId, new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo3", 0))), sp2 ); - partitionCacheMap.put( + partitionCache.put( new SharePartitionKey(groupId, new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo4", 0))), sp3 ); sharePartitionManager = SharePartitionManagerBuilder.builder() - .withPartitionCacheMap(partitionCacheMap) + .withPartitionCacheMap(partitionCache) .build(); - assertEquals(4, sharePartitionManager.partitionCacheSize()); + assertEquals(4, partitionCache.size()); sharePartitionManager.onShareVersionToggle(ShareVersion.SV_0); // Because we are toggling to a share version which does not support share groups, the cache inside share partitions must be cleared. - assertEquals(0, sharePartitionManager.partitionCacheSize()); + assertEquals(0, partitionCache.size()); //Check if all share partitions have been fenced. Mockito.verify(sp0).markFenced(); Mockito.verify(sp1).markFenced(); @@ -2873,7 +2873,7 @@ public class SharePartitionManagerTest { private void testSharePartitionListener( SharePartitionKey sharePartitionKey, - Map partitionCacheMap, + SharePartitionCache partitionCache, ReplicaManager mockReplicaManager, Consumer listenerConsumer ) { @@ -2884,22 +2884,22 @@ public class SharePartitionManagerTest { SharePartition sp0 = mock(SharePartition.class); SharePartition sp1 = mock(SharePartition.class); - partitionCacheMap.put(sharePartitionKey, sp0); - partitionCacheMap.put(spk, sp1); + partitionCache.put(sharePartitionKey, sp0); + partitionCache.put(spk, sp1); // Invoke listener for first share partition. listenerConsumer.accept(sharePartitionKey.topicIdPartition().topicPartition()); // Validate that the share partition is removed from the cache. - assertEquals(1, partitionCacheMap.size()); - assertFalse(partitionCacheMap.containsKey(sharePartitionKey)); + assertEquals(1, partitionCache.size()); + assertFalse(partitionCache.containsKey(sharePartitionKey)); verify(sp0, times(1)).markFenced(); verify(mockReplicaManager, times(1)).removeListener(any(), any()); // Invoke listener for non-matching share partition. listenerConsumer.accept(tp); // The non-matching share partition should not be removed as the listener is attached to a different topic partition. - assertEquals(1, partitionCacheMap.size()); + assertEquals(1, partitionCache.size()); verify(sp1, times(0)).markFenced(); // Verify the remove listener is not called for the second share partition. verify(mockReplicaManager, times(1)).removeListener(any(), any()); @@ -3069,7 +3069,7 @@ public class SharePartitionManagerTest { private ReplicaManager replicaManager = mock(ReplicaManager.class); private Time time = new MockTime(); private ShareSessionCache cache = new ShareSessionCache(10, true); - private Map partitionCacheMap = new HashMap<>(); + private SharePartitionCache partitionCache = new SharePartitionCache(); private Timer timer = new MockTimer(); private ShareGroupMetrics shareGroupMetrics = new ShareGroupMetrics(time); private BrokerTopicStats brokerTopicStats; @@ -3089,8 +3089,8 @@ public class SharePartitionManagerTest { return this; } - SharePartitionManagerBuilder withPartitionCacheMap(Map partitionCacheMap) { - this.partitionCacheMap = partitionCacheMap; + SharePartitionManagerBuilder withPartitionCacheMap(SharePartitionCache partitionCache) { + this.partitionCache = partitionCache; return this; } @@ -3117,7 +3117,7 @@ public class SharePartitionManagerTest { return new SharePartitionManager(replicaManager, time, cache, - partitionCacheMap, + partitionCache, DEFAULT_RECORD_LOCK_DURATION_MS, timer, MAX_DELIVERY_COUNT, diff --git a/server/src/main/java/org/apache/kafka/server/share/ShareGroupListener.java b/server/src/main/java/org/apache/kafka/server/share/ShareGroupListener.java new file mode 100644 index 00000000000..9ae3df57a96 --- /dev/null +++ b/server/src/main/java/org/apache/kafka/server/share/ShareGroupListener.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.share; + +import org.apache.kafka.common.Uuid; + +/** + * The ShareGroupListener is used to notify when there is a change in the share group members. + */ +public interface ShareGroupListener { + + /** + * Called when member leaves the group. + * + * @param groupId The id of the group. + * @param memberId The id of the member. + */ + void onMemberLeave(String groupId, Uuid memberId); + + /** + * Called when the group is empty. + * + * @param groupId The id of the group. + */ + void onGroupEmpty(String groupId); +} diff --git a/server/src/main/java/org/apache/kafka/server/share/session/ShareSessionCache.java b/server/src/main/java/org/apache/kafka/server/share/session/ShareSessionCache.java index e57a7c8ed9c..b3558558227 100644 --- a/server/src/main/java/org/apache/kafka/server/share/session/ShareSessionCache.java +++ b/server/src/main/java/org/apache/kafka/server/share/session/ShareSessionCache.java @@ -23,6 +23,7 @@ import org.apache.kafka.common.utils.ImplicitLinkedHashCollection; import org.apache.kafka.server.metrics.KafkaMetricsGroup; import org.apache.kafka.server.network.ConnectionDisconnectListener; import org.apache.kafka.server.share.CachedSharePartition; +import org.apache.kafka.server.share.ShareGroupListener; import com.yammer.metrics.core.Meter; @@ -52,19 +53,35 @@ public class ShareSessionCache { * Metric for the rate of eviction of share sessions. */ private final Meter evictionsMeter; - - private final int maxEntries; - private long numPartitions = 0; + /** + * The listener for connection disconnect events for the client. + */ private final ConnectionDisconnectListener connectionDisconnectListener; - - // A map of session key to ShareSession. + /** + * Map of session key to ShareSession. + */ private final Map sessions = new HashMap<>(); - + /** + * Map of groupId to number of members in the group. + */ + private final Map numMembersPerGroup = new HashMap<>(); + /** + * The map to store the client connection id to session key. This is used to remove the session + * from the cache when the respective client disconnects. + */ private final Map connectionIdToSessionMap; /** * Flag indicating if share groups have been turned on. */ private final AtomicBoolean supportsShareGroups; + /** + * The listener for share group events. This is used to notify the listener when the group members + * change. + */ + private ShareGroupListener shareGroupListener; + + private final int maxEntries; + private long numPartitions = 0; @SuppressWarnings("this-escape") public ShareSessionCache(int maxEntries, boolean supportsShareGroups) { @@ -101,6 +118,7 @@ public class ShareSessionCache { */ public synchronized void removeAllSessions() { sessions.clear(); + numMembersPerGroup.clear(); numPartitions = 0; } @@ -115,6 +133,41 @@ public class ShareSessionCache { return null; } + /** + * Maybe remove the session and notify listeners. This is called when the connection is disconnected + * for the client. The session may have already been removed by the client as part of final epoch, + * hence check if the session is still present in the cache. + * + * @param key The share session key. + */ + public synchronized void maybeRemoveAndNotifyListeners(ShareSessionKey key) { + ShareSession session = get(key); + if (session != null) { + // Notify the share group listener that member has left the group. Notify listener prior + // removing the session from the cache to ensure that the listener has access to the session + // while it is still in the cache. + if (shareGroupListener != null) { + shareGroupListener.onMemberLeave(key.groupId(), key.memberId()); + } + // As session is not null hence it's removed as part of connection disconnect. Hence, + // update the evictions metric. + evictionsMeter.mark(); + // Try removing session if not already removed. The listener might have removed the session + // already. + remove(session); + } + // Notify the share group listener if the group is empty. This should be checked regardless + // session is evicted by connection disconnect or client's final epoch. + int numMembers = numMembersPerGroup.getOrDefault(key.groupId(), 0); + if (numMembers == 0) { + // Remove the group from the map as it is empty. + numMembersPerGroup.remove(key.groupId()); + if (shareGroupListener != null) { + shareGroupListener.onGroupEmpty(key.groupId()); + } + } + } + /** * Remove an entry from the session cache. * @@ -125,6 +178,7 @@ public class ShareSessionCache { ShareSession removeResult = sessions.remove(session.key()); if (removeResult != null) { numPartitions = numPartitions - session.cachedSize(); + numMembersPerGroup.compute(session.key().groupId(), (k, v) -> v != null ? v - 1 : 0); } return removeResult; } @@ -159,6 +213,7 @@ public class ShareSessionCache { ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH)); sessions.put(session.key(), session); updateNumPartitions(session); + numMembersPerGroup.compute(session.key().groupId(), (k, v) -> v != null ? v + 1 : 1); connectionIdToSessionMap.put(clientConnectionId, session.key()); return session.key(); } @@ -169,11 +224,20 @@ public class ShareSessionCache { return connectionDisconnectListener; } + public synchronized void registerShareGroupListener(ShareGroupListener shareGroupListener) { + this.shareGroupListener = shareGroupListener; + } + // Visible for testing. Meter evictionsMeter() { return evictionsMeter; } + // Visible for testing. + Integer numMembers(String groupId) { + return numMembersPerGroup.get(groupId); + } + private final class ClientConnectionDisconnectListener implements ConnectionDisconnectListener { // When the client disconnects, the corresponding session should be removed from the cache. @@ -181,11 +245,10 @@ public class ShareSessionCache { public void onDisconnect(String connectionId) { ShareSessionKey shareSessionKey = connectionIdToSessionMap.remove(connectionId); if (shareSessionKey != null) { - // Remove the session from the cache. - ShareSession removedSession = remove(shareSessionKey); - if (removedSession != null) { - evictionsMeter.mark(); - } + // Try removing session and notify listeners. The session might already be removed + // as part of final epoch from client, so we need to check if the session is still + // present in the cache. + maybeRemoveAndNotifyListeners(shareSessionKey); } } } diff --git a/server/src/main/java/org/apache/kafka/server/share/session/ShareSessionKey.java b/server/src/main/java/org/apache/kafka/server/share/session/ShareSessionKey.java index c7709857f5c..8064bf92899 100644 --- a/server/src/main/java/org/apache/kafka/server/share/session/ShareSessionKey.java +++ b/server/src/main/java/org/apache/kafka/server/share/session/ShareSessionKey.java @@ -30,6 +30,10 @@ public class ShareSessionKey { this.memberId = Objects.requireNonNull(memberId); } + public String groupId() { + return groupId; + } + public Uuid memberId() { return memberId; } diff --git a/server/src/test/java/org/apache/kafka/server/share/session/ShareSessionCacheTest.java b/server/src/test/java/org/apache/kafka/server/share/session/ShareSessionCacheTest.java index 7022fbb59fa..f2e840b1f07 100644 --- a/server/src/test/java/org/apache/kafka/server/share/session/ShareSessionCacheTest.java +++ b/server/src/test/java/org/apache/kafka/server/share/session/ShareSessionCacheTest.java @@ -19,10 +19,12 @@ package org.apache.kafka.server.share.session; import org.apache.kafka.common.Uuid; import org.apache.kafka.common.utils.ImplicitLinkedHashCollection; import org.apache.kafka.server.share.CachedSharePartition; +import org.apache.kafka.server.share.ShareGroupListener; import org.apache.kafka.test.TestUtils; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.mockito.Mockito; import java.util.Iterator; import java.util.List; @@ -154,6 +156,102 @@ public class ShareSessionCacheTest { assertEquals(0, cache.totalPartitions()); } + @Test + public void testShareGroupListenerEvents() { + ShareGroupListener mockListener = Mockito.mock(ShareGroupListener.class); + ShareSessionCache cache = new ShareSessionCache(3, true); + cache.registerShareGroupListener(mockListener); + + String groupId = "grp"; + Uuid memberId1 = Uuid.randomUuid(); + Uuid memberId2 = Uuid.randomUuid(); + ShareSessionKey key1 = cache.maybeCreateSession(groupId, memberId1, mockedSharePartitionMap(1), "conn-1"); + ShareSessionKey key2 = cache.maybeCreateSession(groupId, memberId2, mockedSharePartitionMap(1), "conn-2"); + + // Verify member count is tracked + assertEquals(2, cache.size()); + assertNotNull(cache.get(key1)); + assertNotNull(cache.get(key2)); + assertEquals(2, cache.numMembers(groupId)); + + // Remove session and verify listener are not called as connection disconnect listener didn't + // remove the session. + cache.remove(key1); + Mockito.verify(mockListener, Mockito.times(0)).onMemberLeave(groupId, memberId1); + Mockito.verify(mockListener, Mockito.times(0)).onGroupEmpty(groupId); + // Verify member count is updated + assertEquals(1, cache.numMembers(groupId)); + + // Re-create session for memberId1. + cache.maybeCreateSession(groupId, memberId1, mockedSharePartitionMap(1), "conn-1"); + assertEquals(2, cache.numMembers(groupId)); + + // Simulate connection disconnect for memberId1. + cache.connectionDisconnectListener().onDisconnect("conn-1"); + // Verify only member leave event is triggered for memberId1. Empty group event should not be triggered. + Mockito.verify(mockListener, Mockito.times(1)).onMemberLeave(groupId, memberId1); + Mockito.verify(mockListener, Mockito.times(0)).onMemberLeave(groupId, memberId2); + Mockito.verify(mockListener, Mockito.times(0)).onGroupEmpty(groupId); + assertEquals(1, cache.numMembers(groupId)); + + // Simulate connection disconnect for memberId2. + cache.connectionDisconnectListener().onDisconnect("conn-2"); + // Verify both member leave event and empty group event should be triggered. + Mockito.verify(mockListener, Mockito.times(1)).onMemberLeave(groupId, memberId1); + Mockito.verify(mockListener, Mockito.times(1)).onMemberLeave(groupId, memberId2); + Mockito.verify(mockListener, Mockito.times(1)).onGroupEmpty(groupId); + assertNull(cache.numMembers(groupId)); + } + + @Test + public void testShareGroupListenerEventsMultipleGroups() { + ShareGroupListener mockListener = Mockito.mock(ShareGroupListener.class); + ShareSessionCache cache = new ShareSessionCache(3, true); + cache.registerShareGroupListener(mockListener); + + String groupId1 = "grp1"; + String groupId2 = "grp2"; + Uuid memberId1 = Uuid.randomUuid(); + Uuid memberId2 = Uuid.randomUuid(); + ShareSessionKey key1 = cache.maybeCreateSession(groupId1, memberId1, mockedSharePartitionMap(1), "conn-1"); + ShareSessionKey key2 = cache.maybeCreateSession(groupId2, memberId2, mockedSharePartitionMap(1), "conn-2"); + + // Verify member count is tracked + assertEquals(2, cache.size()); + assertNotNull(cache.get(key1)); + assertNotNull(cache.get(key2)); + assertEquals(1, cache.numMembers(groupId1)); + assertEquals(1, cache.numMembers(groupId2)); + + // Remove session for group1 and verify listeners are only called for group1. + cache.connectionDisconnectListener().onDisconnect("conn-1"); + Mockito.verify(mockListener, Mockito.times(1)).onMemberLeave(groupId1, memberId1); + Mockito.verify(mockListener, Mockito.times(1)).onGroupEmpty(groupId1); + // Listener should not be called for group2. + Mockito.verify(mockListener, Mockito.times(0)).onMemberLeave(groupId2, memberId2); + Mockito.verify(mockListener, Mockito.times(0)).onGroupEmpty(groupId2); + // Verify member count is updated. + assertNull(cache.numMembers(groupId1)); + assertEquals(1, cache.numMembers(groupId2)); + } + + @Test + public void testNoShareGroupListenerRegistered() { + ShareSessionCache cache = new ShareSessionCache(3, true); + + String groupId = "grp"; + Uuid memberId = Uuid.randomUuid(); + ShareSessionKey key = cache.maybeCreateSession(groupId, memberId, mockedSharePartitionMap(1), "conn-1"); + + // Verify member count is still tracked even without listener + assertEquals(1, cache.numMembers(groupId)); + assertNotNull(cache.get(key)); + + // Remove session should not throw any exceptions. + cache.connectionDisconnectListener().onDisconnect("conn-1"); + assertNull(cache.numMembers(groupId)); + } + private ImplicitLinkedHashCollection mockedSharePartitionMap(int size) { ImplicitLinkedHashCollection cacheMap = new ImplicitLinkedHashCollection<>(size);