KAFKA-19116, KAFKA-19258: Handling share group member change events (#19666)
CI / build (push) Waiting to run Details

The PR adds ShareGroupListener which triggers on group changes, such as
when member leaves or group is empty. Such events are specific to
connection on respective broker. There events help to clean specific
states managed for respective member or group in various caches.

Reviewers: Andrew Schofield <aschofield@confluent.io>
This commit is contained in:
Apoorv Mittal 2025-05-14 09:52:47 +01:00 committed by GitHub
parent 89cbafeec1
commit ec70c44362
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 634 additions and 164 deletions

View File

@ -0,0 +1,133 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.server.share;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.server.share.SharePartitionKey;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
/**
* The SharePartitionCache is used to cache the SharePartition objects. The cache is thread-safe.
*/
public class SharePartitionCache {
/**
* The map to store the share group id and the set of topic-partitions for that group.
*/
private final Map<String, Set<TopicIdPartition>> groups;
/**
* The map is used to store the SharePartition objects for each share group topic-partition.
*/
private final Map<SharePartitionKey, SharePartition> partitions;
SharePartitionCache() {
this.groups = new HashMap<>();
this.partitions = new ConcurrentHashMap<>();
}
/**
* Returns the share partition for the given key.
*
* @param partitionKey The key to get the share partition for.
* @return The share partition for the key or null if not found.
*/
public SharePartition get(SharePartitionKey partitionKey) {
return partitions.get(partitionKey);
}
/**
* Returns the set of topic-partitions for the given group id.
*
* @param groupId The group id to get the topic-partitions for.
* @return The set of topic-partitions for the group id.
*/
public synchronized Set<TopicIdPartition> topicIdPartitionsForGroup(String groupId) {
return Set.copyOf(groups.get(groupId));
}
/**
* Removes the share partition from the cache. The method also removes the topic-partition from
* the group map.
*
* @param partitionKey The key to remove.
* @return The removed value or null if not found.
*/
public synchronized SharePartition remove(SharePartitionKey partitionKey) {
groups.computeIfPresent(partitionKey.groupId(), (k, v) -> {
v.remove(partitionKey.topicIdPartition());
return v.isEmpty() ? null : v;
});
return partitions.remove(partitionKey);
}
/**
* Computes the value for the given key if it is not already present in the cache. Method also
* updates the group map with the topic-partition for the group id.
*
* @param partitionKey The key to compute the value for.
* @param mappingFunction The function to compute the value.
* @return The computed or existing value.
*/
public synchronized SharePartition computeIfAbsent(SharePartitionKey partitionKey, Function<SharePartitionKey, SharePartition> mappingFunction) {
groups.computeIfAbsent(partitionKey.groupId(), k -> new HashSet<>()).add(partitionKey.topicIdPartition());
return partitions.computeIfAbsent(partitionKey, mappingFunction);
}
/**
* Returns the set of all share partition keys in the cache. As the cache can't be cleaned without
* marking the share partitions fenced and detaching the partition listener in the replica manager,
* hence rather providing a method to clean the cache directly, this method is provided to fetch
* all the keys in the cache.
*
* @return The set of all share partition keys.
*/
public Set<SharePartitionKey> cachedSharePartitionKeys() {
return partitions.keySet();
}
// Visible for testing. Should not be used outside the test classes.
void put(SharePartitionKey partitionKey, SharePartition sharePartition) {
partitions.put(partitionKey, sharePartition);
}
// Visible for testing.
int size() {
return partitions.size();
}
// Visible for testing.
boolean containsKey(SharePartitionKey partitionKey) {
return partitions.containsKey(partitionKey);
}
// Visible for testing.
boolean isEmpty() {
return partitions.isEmpty();
}
// Visible for testing.
synchronized Map<String, Set<TopicIdPartition>> groups() {
return Map.copyOf(groups);
}
}

View File

@ -36,6 +36,7 @@ import org.apache.kafka.common.utils.Time;
import org.apache.kafka.coordinator.group.GroupConfigManager;
import org.apache.kafka.server.common.ShareVersion;
import org.apache.kafka.server.share.CachedSharePartition;
import org.apache.kafka.server.share.ShareGroupListener;
import org.apache.kafka.server.share.SharePartitionKey;
import org.apache.kafka.server.share.acknowledge.ShareAcknowledgementBatch;
import org.apache.kafka.server.share.context.FinalContext;
@ -72,7 +73,6 @@ import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
@ -85,9 +85,9 @@ public class SharePartitionManager implements AutoCloseable {
private static final Logger log = LoggerFactory.getLogger(SharePartitionManager.class);
/**
* The partition cache map is used to store the SharePartition objects for each share group topic-partition.
* The partition cache is used to store the SharePartition objects for each share group topic-partition.
*/
private final Map<SharePartitionKey, SharePartition> partitionCacheMap;
private final SharePartitionCache partitionCache;
/**
* The replica manager is used to fetch messages from the log.
@ -159,7 +159,7 @@ public class SharePartitionManager implements AutoCloseable {
this(replicaManager,
time,
cache,
new ConcurrentHashMap<>(),
new SharePartitionCache(),
defaultRecordLockDurationMs,
maxDeliveryCount,
maxInFlightMessages,
@ -174,7 +174,7 @@ public class SharePartitionManager implements AutoCloseable {
ReplicaManager replicaManager,
Time time,
ShareSessionCache cache,
Map<SharePartitionKey, SharePartition> partitionCacheMap,
SharePartitionCache partitionCache,
int defaultRecordLockDurationMs,
int maxDeliveryCount,
int maxInFlightMessages,
@ -186,7 +186,7 @@ public class SharePartitionManager implements AutoCloseable {
this(replicaManager,
time,
cache,
partitionCacheMap,
partitionCache,
defaultRecordLockDurationMs,
new SystemTimerReaper("share-group-lock-timeout-reaper",
new SystemTimer("share-group-lock-timeout")),
@ -204,7 +204,7 @@ public class SharePartitionManager implements AutoCloseable {
ReplicaManager replicaManager,
Time time,
ShareSessionCache cache,
Map<SharePartitionKey, SharePartition> partitionCacheMap,
SharePartitionCache partitionCache,
int defaultRecordLockDurationMs,
Timer timer,
int maxDeliveryCount,
@ -217,7 +217,7 @@ public class SharePartitionManager implements AutoCloseable {
this.replicaManager = replicaManager;
this.time = time;
this.cache = cache;
this.partitionCacheMap = partitionCacheMap;
this.partitionCache = partitionCache;
this.defaultRecordLockDurationMs = defaultRecordLockDurationMs;
this.timer = timer;
this.maxDeliveryCount = maxDeliveryCount;
@ -226,6 +226,7 @@ public class SharePartitionManager implements AutoCloseable {
this.groupConfigManager = groupConfigManager;
this.shareGroupMetrics = shareGroupMetrics;
this.brokerTopicStats = brokerTopicStats;
this.cache.registerShareGroupListener(new ShareGroupListenerImpl());
}
/**
@ -287,7 +288,7 @@ public class SharePartitionManager implements AutoCloseable {
acknowledgeTopics.forEach((topicIdPartition, acknowledgePartitionBatches) -> {
topics.add(topicIdPartition.topic());
SharePartitionKey sharePartitionKey = sharePartitionKey(groupId, topicIdPartition);
SharePartition sharePartition = partitionCacheMap.get(sharePartitionKey);
SharePartition sharePartition = partitionCache.get(sharePartitionKey);
if (sharePartition != null) {
CompletableFuture<Throwable> future = new CompletableFuture<>();
sharePartition.acknowledge(memberId, acknowledgePartitionBatches).whenComplete((result, throwable) -> {
@ -363,7 +364,7 @@ public class SharePartitionManager implements AutoCloseable {
Map<TopicIdPartition, CompletableFuture<Throwable>> futuresMap = new HashMap<>();
topicIdPartitions.forEach(topicIdPartition -> {
SharePartitionKey sharePartitionKey = sharePartitionKey(groupId, topicIdPartition);
SharePartition sharePartition = partitionCacheMap.get(sharePartitionKey);
SharePartition sharePartition = partitionCache.get(sharePartitionKey);
if (sharePartition == null) {
log.error("No share partition found for groupId {} topicPartition {} while releasing acquired topic partitions", groupId, topicIdPartition);
futuresMap.put(topicIdPartition, CompletableFuture.completedFuture(Errors.UNKNOWN_TOPIC_OR_PARTITION.exception()));
@ -652,14 +653,14 @@ public class SharePartitionManager implements AutoCloseable {
}
private SharePartition getOrCreateSharePartition(SharePartitionKey sharePartitionKey) {
return partitionCacheMap.computeIfAbsent(sharePartitionKey,
return partitionCache.computeIfAbsent(sharePartitionKey,
k -> {
int leaderEpoch = ShareFetchUtils.leaderEpoch(replicaManager, sharePartitionKey.topicIdPartition().topicPartition());
// Attach listener to Partition which shall invoke partition change handlers.
// However, as there could be multiple share partitions (per group name) for a single topic-partition,
// hence create separate listeners per share partition which holds the share partition key
// to identify the respective share partition.
SharePartitionListener listener = new SharePartitionListener(sharePartitionKey, replicaManager, partitionCacheMap);
SharePartitionListener listener = new SharePartitionListener(sharePartitionKey, replicaManager, partitionCache);
replicaManager.maybeAddListener(sharePartitionKey.topicIdPartition().topicPartition(), listener);
return new SharePartition(
sharePartitionKey.groupId(),
@ -691,7 +692,7 @@ public class SharePartitionManager implements AutoCloseable {
}
// Remove the partition from the cache as it's failed to initialize.
removeSharePartitionFromCache(sharePartitionKey, partitionCacheMap, replicaManager);
removeSharePartitionFromCache(sharePartitionKey, partitionCache, replicaManager);
// The partition initialization failed, so add the partition to the erroneous partitions.
log.debug("Error initializing share partition with key {}", sharePartitionKey, throwable);
shareFetch.addErroneous(sharePartitionKey.topicIdPartition(), throwable);
@ -711,7 +712,7 @@ public class SharePartitionManager implements AutoCloseable {
// The share partition is fenced hence remove the partition from map and let the client retry.
// But surface the error to the client so client might take some action i.e. re-fetch
// the metadata and retry the fetch on new leader.
removeSharePartitionFromCache(sharePartitionKey, partitionCacheMap, replicaManager);
removeSharePartitionFromCache(sharePartitionKey, partitionCache, replicaManager);
}
};
}
@ -722,10 +723,10 @@ public class SharePartitionManager implements AutoCloseable {
private static void removeSharePartitionFromCache(
SharePartitionKey sharePartitionKey,
Map<SharePartitionKey, SharePartition> map,
SharePartitionCache partitionCache,
ReplicaManager replicaManager
) {
SharePartition sharePartition = map.remove(sharePartitionKey);
SharePartition sharePartition = partitionCache.remove(sharePartitionKey);
if (sharePartition != null) {
sharePartition.markFenced();
replicaManager.removeListener(sharePartitionKey.topicIdPartition().topicPartition(), sharePartition.listener());
@ -758,21 +759,16 @@ public class SharePartitionManager implements AutoCloseable {
synchronized (cache) {
cache.removeAllSessions();
}
Set<SharePartitionKey> sharePartitionKeys = new HashSet<>(partitionCacheMap.keySet());
Set<SharePartitionKey> sharePartitionKeys = partitionCache.cachedSharePartitionKeys();
// Remove all share partitions from partition cache.
sharePartitionKeys.forEach(sharePartitionKey ->
removeSharePartitionFromCache(sharePartitionKey, partitionCacheMap, replicaManager)
removeSharePartitionFromCache(sharePartitionKey, partitionCache, replicaManager)
);
} else {
cache.updateSupportsShareGroups(true);
}
}
// Visible for testing.
protected int partitionCacheSize() {
return partitionCacheMap.size();
}
/**
* The SharePartitionListener is used to listen for partition events. The share partition is associated with
* the topic-partition, we need to handle the partition events for the share partition.
@ -785,16 +781,16 @@ public class SharePartitionManager implements AutoCloseable {
private final SharePartitionKey sharePartitionKey;
private final ReplicaManager replicaManager;
private final Map<SharePartitionKey, SharePartition> partitionCacheMap;
private final SharePartitionCache partitionCache;
SharePartitionListener(
SharePartitionKey sharePartitionKey,
ReplicaManager replicaManager,
Map<SharePartitionKey, SharePartition> partitionCacheMap
SharePartitionCache partitionCache
) {
this.sharePartitionKey = sharePartitionKey;
this.replicaManager = replicaManager;
this.partitionCacheMap = partitionCacheMap;
this.partitionCache = partitionCache;
}
@Override
@ -824,7 +820,34 @@ public class SharePartitionManager implements AutoCloseable {
topicPartition, sharePartitionKey);
return;
}
removeSharePartitionFromCache(sharePartitionKey, partitionCacheMap, replicaManager);
removeSharePartitionFromCache(sharePartitionKey, partitionCache, replicaManager);
}
}
/**
* The ShareGroupListenerImpl is used to listen for group events. The share group is associated
* with the group id, need to handle the group events for the share group.
*/
private class ShareGroupListenerImpl implements ShareGroupListener {
@Override
public void onMemberLeave(String groupId, Uuid memberId) {
releaseSession(groupId, memberId.toString());
}
@Override
public void onGroupEmpty(String groupId) {
// Remove all share partitions from the cache. Instead of defining an API in SharePartitionCache
// for removing all share partitions for a group, share partitions are removed after fetching
// associated topic-partitions from the cache. This is done to mark the share partitions fenced
// and remove the listeners from the replica manager.
Set<TopicIdPartition> topicIdPartitions = partitionCache.topicIdPartitionsForGroup(groupId);
if (topicIdPartitions != null) {
// Remove all share partitions from partition cache.
topicIdPartitions.forEach(topicIdPartition ->
removeSharePartitionFromCache(new SharePartitionKey(groupId, topicIdPartition), partitionCache, replicaManager)
);
}
}
}
}

View File

@ -0,0 +1,109 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.server.share;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.server.share.SharePartitionKey;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class SharePartitionCacheTest {
private static final String GROUP_ID = "test-group";
private static final Uuid TOPIC_ID = Uuid.randomUuid();
private static final TopicIdPartition TOPIC_ID_PARTITION = new TopicIdPartition(TOPIC_ID, new TopicPartition("test-topic", 1));
private static final SharePartitionKey SHARE_PARTITION_KEY = new SharePartitionKey(GROUP_ID, TOPIC_ID_PARTITION);
private SharePartitionCache cache;
@BeforeEach
public void setUp() {
cache = new SharePartitionCache();
}
@Test
public void testComputeIfAbsent() {
// Test computeIfAbsent when key doesn't exist
SharePartition sharePartition = Mockito.mock(SharePartition.class);
SharePartition newPartition = cache.computeIfAbsent(SHARE_PARTITION_KEY, key -> sharePartition);
assertEquals(sharePartition, newPartition);
assertEquals(sharePartition, cache.get(SHARE_PARTITION_KEY));
assertEquals(1, cache.groups().size());
// Test computeIfAbsent when key exists
SharePartition anotherPartition = Mockito.mock(SharePartition.class);
SharePartition existingPartition = cache.computeIfAbsent(SHARE_PARTITION_KEY, key -> anotherPartition);
assertEquals(sharePartition, existingPartition);
assertEquals(sharePartition, cache.get(SHARE_PARTITION_KEY));
assertEquals(1, cache.groups().size());
}
@Test
public void testRemoveGroup() {
// Add partitions for multiple groups
String group1 = "group1";
String group2 = "group2";
TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("test-topic1", 1));
TopicIdPartition tp2 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("test-topic2", 2));
TopicIdPartition tp3 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("test-topic3", 3));
// Group1 with 2 partitions.
SharePartitionKey key1 = new SharePartitionKey(group1, tp1);
SharePartitionKey key2 = new SharePartitionKey(group1, tp2);
// Group2 with 1 partition.
SharePartitionKey key3 = new SharePartitionKey(group2, tp3);
SharePartition sp1 = Mockito.mock(SharePartition.class);
SharePartition sp2 = Mockito.mock(SharePartition.class);
SharePartition sp3 = Mockito.mock(SharePartition.class);
// Test computeIfAbsent adds to group map
cache.computeIfAbsent(key1, k -> sp1);
cache.computeIfAbsent(key2, k -> sp2);
cache.computeIfAbsent(key3, k -> sp3);
// Verify partitions are in the cache.
assertEquals(3, cache.size());
assertTrue(cache.containsKey(key1));
assertTrue(cache.containsKey(key2));
assertTrue(cache.containsKey(key3));
// Verify groups are in the group map.
assertEquals(2, cache.groups().size());
assertTrue(cache.groups().containsKey(group1));
assertTrue(cache.groups().containsKey(group2));
// Verify topic partitions are in the group map.
assertEquals(2, cache.groups().get(group1).size());
assertEquals(1, cache.groups().get(group2).size());
assertEquals(1, cache.groups().get(group1).stream().filter(tp -> tp.equals(tp1)).count());
assertEquals(1, cache.groups().get(group1).stream().filter(tp -> tp.equals(tp2)).count());
assertEquals(1, cache.groups().get(group2).stream().filter(tp -> tp.equals(tp3)).count());
// Remove one group and verify only its partitions are removed.
cache.topicIdPartitionsForGroup(group1).forEach(
topicIdPartition -> cache.remove(new SharePartitionKey(group1, topicIdPartition)));
assertEquals(1, cache.size());
assertTrue(cache.containsKey(key3));
assertEquals(1, cache.groups().size());
assertTrue(cache.groups().containsKey(group2));
}
}

View File

@ -1005,11 +1005,11 @@ public class SharePartitionManagerTest {
SharePartition sp3 = mock(SharePartition.class);
// Mock the share partitions corresponding to the topic partitions.
Map<SharePartitionKey, SharePartition> partitionCacheMap = new HashMap<>();
partitionCacheMap.put(new SharePartitionKey(groupId, tp0), sp0);
partitionCacheMap.put(new SharePartitionKey(groupId, tp1), sp1);
partitionCacheMap.put(new SharePartitionKey(groupId, tp2), sp2);
partitionCacheMap.put(new SharePartitionKey(groupId, tp3), sp3);
SharePartitionCache partitionCache = new SharePartitionCache();
partitionCache.put(new SharePartitionKey(groupId, tp0), sp0);
partitionCache.put(new SharePartitionKey(groupId, tp1), sp1);
partitionCache.put(new SharePartitionKey(groupId, tp2), sp2);
partitionCache.put(new SharePartitionKey(groupId, tp3), sp3);
// Mock the share partitions to get initialized instantaneously without any error.
when(sp0.maybeInitialize()).thenReturn(CompletableFuture.completedFuture(null));
when(sp1.maybeInitialize()).thenReturn(CompletableFuture.completedFuture(null));
@ -1048,7 +1048,7 @@ public class SharePartitionManagerTest {
.withReplicaManager(mockReplicaManager)
.withTimer(mockTimer)
.withBrokerTopicStats(brokerTopicStats)
.withPartitionCacheMap(partitionCacheMap)
.withPartitionCacheMap(partitionCache)
.build();
doAnswer(invocation -> {
@ -1123,8 +1123,8 @@ public class SharePartitionManagerTest {
when(sp0.maybeAcquireFetchLock(any())).thenReturn(true);
when(sp0.canAcquireRecords()).thenReturn(false);
when(sp0.maybeInitialize()).thenReturn(CompletableFuture.completedFuture(null));
Map<SharePartitionKey, SharePartition> partitionCacheMap = new HashMap<>();
partitionCacheMap.put(new SharePartitionKey(groupId, tp0), sp0);
SharePartitionCache partitionCache = new SharePartitionCache();
partitionCache.put(new SharePartitionKey(groupId, tp0), sp0);
Timer mockTimer = systemTimerReaper();
DelayedOperationPurgatory<DelayedShareFetch> delayedShareFetchPurgatory = new DelayedOperationPurgatory<>(
@ -1133,7 +1133,7 @@ public class SharePartitionManagerTest {
mockReplicaManagerDelayedShareFetch(mockReplicaManager, delayedShareFetchPurgatory);
sharePartitionManager = SharePartitionManagerBuilder.builder()
.withPartitionCacheMap(partitionCacheMap)
.withPartitionCacheMap(partitionCache)
.withReplicaManager(mockReplicaManager)
.withTimer(mockTimer)
.withBrokerTopicStats(brokerTopicStats)
@ -1237,13 +1237,13 @@ public class SharePartitionManagerTest {
partitionMap.add(new CachedSharePartition(tp3));
when(shareSession.partitionMap()).thenReturn(partitionMap);
Map<SharePartitionKey, SharePartition> partitionCacheMap = new HashMap<>();
partitionCacheMap.put(new SharePartitionKey(groupId, tp1), sp1);
partitionCacheMap.put(new SharePartitionKey(groupId, tp2), sp2);
SharePartitionCache partitionCache = new SharePartitionCache();
partitionCache.put(new SharePartitionKey(groupId, tp1), sp1);
partitionCache.put(new SharePartitionKey(groupId, tp2), sp2);
sharePartitionManager = SharePartitionManagerBuilder.builder()
.withCache(cache)
.withPartitionCacheMap(partitionCacheMap)
.withPartitionCacheMap(partitionCache)
.withBrokerTopicStats(brokerTopicStats)
.build();
@ -1259,7 +1259,7 @@ public class SharePartitionManagerTest {
assertEquals(2, result.get(tp2).partitionIndex());
assertEquals(Errors.INVALID_RECORD_STATE.code(), result.get(tp2).errorCode());
assertEquals("Unable to release acquired records for the batch", result.get(tp2).errorMessage());
// tp3 was not a part of partitionCacheMap.
// tp3 was not a part of partitionCache.
assertEquals(4, result.get(tp3).partitionIndex());
assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION.code(), result.get(tp3).errorCode());
assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION.message(), result.get(tp3).errorMessage());
@ -1380,11 +1380,11 @@ public class SharePartitionManagerTest {
when(sp.acknowledge(ArgumentMatchers.eq(memberId), any())).thenReturn(CompletableFuture.completedFuture(null));
Map<SharePartitionKey, SharePartition> partitionCacheMap = new HashMap<>();
partitionCacheMap.put(new SharePartitionKey(groupId, tp), sp);
SharePartitionCache partitionCache = new SharePartitionCache();
partitionCache.put(new SharePartitionKey(groupId, tp), sp);
sharePartitionManager = SharePartitionManagerBuilder.builder()
.withPartitionCacheMap(partitionCacheMap)
.withPartitionCacheMap(partitionCache)
.withBrokerTopicStats(brokerTopicStats)
.build();
@ -1425,14 +1425,14 @@ public class SharePartitionManagerTest {
when(sp2.acknowledge(ArgumentMatchers.eq(memberId), any())).thenReturn(CompletableFuture.completedFuture(null));
when(sp3.acknowledge(ArgumentMatchers.eq(memberId), any())).thenReturn(CompletableFuture.completedFuture(null));
Map<SharePartitionKey, SharePartition> partitionCacheMap = new HashMap<>();
partitionCacheMap.put(new SharePartitionKey(groupId, tp1), sp1);
partitionCacheMap.put(new SharePartitionKey(groupId, tp2), sp2);
partitionCacheMap.put(new SharePartitionKey(groupId, tp3), sp3);
SharePartitionCache partitionCache = new SharePartitionCache();
partitionCache.put(new SharePartitionKey(groupId, tp1), sp1);
partitionCache.put(new SharePartitionKey(groupId, tp2), sp2);
partitionCache.put(new SharePartitionKey(groupId, tp3), sp3);
ShareGroupMetrics shareGroupMetrics = new ShareGroupMetrics(time);
sharePartitionManager = SharePartitionManagerBuilder.builder()
.withPartitionCacheMap(partitionCacheMap)
.withPartitionCacheMap(partitionCache)
.withShareGroupMetrics(shareGroupMetrics)
.withBrokerTopicStats(brokerTopicStats)
.build();
@ -1498,13 +1498,13 @@ public class SharePartitionManagerTest {
when(sp1.acknowledge(memberId, ack1)).thenReturn(CompletableFuture.completedFuture(null));
when(sp2.acknowledge(memberId, ack2)).thenReturn(CompletableFuture.completedFuture(null));
Map<SharePartitionKey, SharePartition> partitionCacheMap = new HashMap<>();
partitionCacheMap.put(new SharePartitionKey(groupId, tp1), sp1);
partitionCacheMap.put(new SharePartitionKey(groupId, tp2), sp2);
SharePartitionCache partitionCache = new SharePartitionCache();
partitionCache.put(new SharePartitionKey(groupId, tp1), sp1);
partitionCache.put(new SharePartitionKey(groupId, tp2), sp2);
ShareGroupMetrics shareGroupMetrics = new ShareGroupMetrics(time);
sharePartitionManager = SharePartitionManagerBuilder.builder()
.withPartitionCacheMap(partitionCacheMap)
.withPartitionCacheMap(partitionCache)
.withShareGroupMetrics(shareGroupMetrics)
.withBrokerTopicStats(brokerTopicStats)
.build();
@ -1540,11 +1540,11 @@ public class SharePartitionManagerTest {
TopicIdPartition tp = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0));
SharePartition sp = mock(SharePartition.class);
Map<SharePartitionKey, SharePartition> partitionCacheMap = new HashMap<>();
partitionCacheMap.put(new SharePartitionKey(groupId, tp), sp);
SharePartitionCache partitionCache = new SharePartitionCache();
partitionCache.put(new SharePartitionKey(groupId, tp), sp);
ShareGroupMetrics shareGroupMetrics = new ShareGroupMetrics(time);
sharePartitionManager = SharePartitionManagerBuilder.builder()
.withPartitionCacheMap(partitionCacheMap)
.withPartitionCacheMap(partitionCache)
.withBrokerTopicStats(brokerTopicStats)
.withShareGroupMetrics(shareGroupMetrics)
.build();
@ -1584,10 +1584,10 @@ public class SharePartitionManagerTest {
when(sp.acknowledge(ArgumentMatchers.eq(memberId), any())).thenReturn(FutureUtils.failedFuture(
new InvalidRequestException("Member is not the owner of batch record")
));
Map<SharePartitionKey, SharePartition> partitionCacheMap = new HashMap<>();
partitionCacheMap.put(new SharePartitionKey(groupId, tp), sp);
SharePartitionCache partitionCache = new SharePartitionCache();
partitionCache.put(new SharePartitionKey(groupId, tp), sp);
sharePartitionManager = SharePartitionManagerBuilder.builder()
.withPartitionCacheMap(partitionCacheMap)
.withPartitionCacheMap(partitionCache)
.withBrokerTopicStats(brokerTopicStats)
.build();
@ -1666,9 +1666,9 @@ public class SharePartitionManagerTest {
return CompletableFuture.completedFuture(Optional.empty());
}).when(sp2).acknowledge(ArgumentMatchers.eq(memberId), any());
Map<SharePartitionKey, SharePartition> partitionCacheMap = new HashMap<>();
partitionCacheMap.put(new SharePartitionKey(groupId, tp1), sp1);
partitionCacheMap.put(new SharePartitionKey(groupId, tp2), sp2);
SharePartitionCache partitionCache = new SharePartitionCache();
partitionCache.put(new SharePartitionKey(groupId, tp1), sp1);
partitionCache.put(new SharePartitionKey(groupId, tp2), sp2);
ShareFetch shareFetch = new ShareFetch(
FETCH_PARAMS,
@ -1700,7 +1700,7 @@ public class SharePartitionManagerTest {
topicIdPartitions.forEach(topicIdPartition -> delayedShareFetchWatchKeys.add(new DelayedShareFetchGroupKey(groupId, topicIdPartition.topicId(), topicIdPartition.partition())));
sharePartitionManager = SharePartitionManagerBuilder.builder()
.withPartitionCacheMap(partitionCacheMap)
.withPartitionCacheMap(partitionCache)
.withReplicaManager(mockReplicaManager)
.withTimer(mockTimer)
.withBrokerTopicStats(brokerTopicStats)
@ -1776,10 +1776,10 @@ public class SharePartitionManagerTest {
return CompletableFuture.completedFuture(Optional.empty());
}).when(sp3).acknowledge(ArgumentMatchers.eq(memberId), any());
Map<SharePartitionKey, SharePartition> partitionCacheMap = new HashMap<>();
partitionCacheMap.put(new SharePartitionKey(groupId, tp1), sp1);
partitionCacheMap.put(new SharePartitionKey(groupId, tp2), sp2);
partitionCacheMap.put(new SharePartitionKey(groupId, tp3), sp3);
SharePartitionCache partitionCache = new SharePartitionCache();
partitionCache.put(new SharePartitionKey(groupId, tp1), sp1);
partitionCache.put(new SharePartitionKey(groupId, tp2), sp2);
partitionCache.put(new SharePartitionKey(groupId, tp3), sp3);
ShareFetch shareFetch = new ShareFetch(
FETCH_PARAMS,
@ -1809,7 +1809,7 @@ public class SharePartitionManagerTest {
topicIdPartitions.forEach(topicIdPartition -> delayedShareFetchWatchKeys.add(new DelayedShareFetchGroupKey(groupId, topicIdPartition.topicId(), topicIdPartition.partition())));
sharePartitionManager = SharePartitionManagerBuilder.builder()
.withPartitionCacheMap(partitionCacheMap)
.withPartitionCacheMap(partitionCache)
.withReplicaManager(mockReplicaManager)
.withTimer(mockTimer)
.withBrokerTopicStats(brokerTopicStats)
@ -1881,9 +1881,9 @@ public class SharePartitionManagerTest {
return CompletableFuture.completedFuture(Optional.empty());
}).when(sp2).releaseAcquiredRecords(ArgumentMatchers.eq(memberId));
Map<SharePartitionKey, SharePartition> partitionCacheMap = new HashMap<>();
partitionCacheMap.put(new SharePartitionKey(groupId, tp1), sp1);
partitionCacheMap.put(new SharePartitionKey(groupId, tp2), sp2);
SharePartitionCache partitionCache = new SharePartitionCache();
partitionCache.put(new SharePartitionKey(groupId, tp1), sp1);
partitionCache.put(new SharePartitionKey(groupId, tp2), sp2);
ShareFetch shareFetch = new ShareFetch(
FETCH_PARAMS,
@ -1913,7 +1913,7 @@ public class SharePartitionManagerTest {
topicIdPartitions.forEach(topicIdPartition -> delayedShareFetchWatchKeys.add(new DelayedShareFetchGroupKey(groupId, topicIdPartition.topicId(), topicIdPartition.partition())));
sharePartitionManager = spy(SharePartitionManagerBuilder.builder()
.withPartitionCacheMap(partitionCacheMap)
.withPartitionCacheMap(partitionCache)
.withCache(cache)
.withReplicaManager(mockReplicaManager)
.withTimer(mockTimer)
@ -1986,10 +1986,10 @@ public class SharePartitionManagerTest {
return CompletableFuture.completedFuture(Optional.empty());
}).when(sp3).releaseAcquiredRecords(ArgumentMatchers.eq(memberId));
Map<SharePartitionKey, SharePartition> partitionCacheMap = new HashMap<>();
partitionCacheMap.put(new SharePartitionKey(groupId, tp1), sp1);
partitionCacheMap.put(new SharePartitionKey(groupId, tp2), sp2);
partitionCacheMap.put(new SharePartitionKey(groupId, tp3), sp3);
SharePartitionCache partitionCache = new SharePartitionCache();
partitionCache.put(new SharePartitionKey(groupId, tp1), sp1);
partitionCache.put(new SharePartitionKey(groupId, tp2), sp2);
partitionCache.put(new SharePartitionKey(groupId, tp3), sp3);
ShareFetch shareFetch = new ShareFetch(
FETCH_PARAMS,
@ -2019,7 +2019,7 @@ public class SharePartitionManagerTest {
topicIdPartitions.forEach(topicIdPartition -> delayedShareFetchWatchKeys.add(new DelayedShareFetchGroupKey(groupId, topicIdPartition.topicId(), topicIdPartition.partition())));
sharePartitionManager = spy(SharePartitionManagerBuilder.builder()
.withPartitionCacheMap(partitionCacheMap)
.withPartitionCacheMap(partitionCache)
.withCache(cache)
.withReplicaManager(mockReplicaManager)
.withTimer(mockTimer)
@ -2067,8 +2067,8 @@ public class SharePartitionManagerTest {
List<TopicIdPartition> topicIdPartitions = List.of(tp0);
SharePartition sp0 = mock(SharePartition.class);
Map<SharePartitionKey, SharePartition> partitionCacheMap = new HashMap<>();
partitionCacheMap.put(new SharePartitionKey(groupId, tp0), sp0);
SharePartitionCache partitionCache = new SharePartitionCache();
partitionCache.put(new SharePartitionKey(groupId, tp0), sp0);
// Keep the initialization future pending, so fetch request is stuck.
CompletableFuture<Void> pendingInitializationFuture = new CompletableFuture<>();
@ -2085,7 +2085,7 @@ public class SharePartitionManagerTest {
when(time.hiResClockMs()).thenReturn(100L);
ShareGroupMetrics shareGroupMetrics = new ShareGroupMetrics(time);
sharePartitionManager = SharePartitionManagerBuilder.builder()
.withPartitionCacheMap(partitionCacheMap)
.withPartitionCacheMap(partitionCache)
.withReplicaManager(mockReplicaManager)
.withTime(time)
.withShareGroupMetrics(shareGroupMetrics)
@ -2132,9 +2132,9 @@ public class SharePartitionManagerTest {
SharePartition sp0 = mock(SharePartition.class);
SharePartition sp1 = mock(SharePartition.class);
Map<SharePartitionKey, SharePartition> partitionCacheMap = new HashMap<>();
partitionCacheMap.put(new SharePartitionKey(groupId, tp0), sp0);
partitionCacheMap.put(new SharePartitionKey(groupId, tp1), sp1);
SharePartitionCache partitionCache = new SharePartitionCache();
partitionCache.put(new SharePartitionKey(groupId, tp0), sp0);
partitionCache.put(new SharePartitionKey(groupId, tp1), sp1);
// Keep the initialization future pending, so fetch request is stuck.
CompletableFuture<Void> pendingInitializationFuture1 = new CompletableFuture<>();
@ -2155,7 +2155,7 @@ public class SharePartitionManagerTest {
when(time.hiResClockMs()).thenReturn(100L);
ShareGroupMetrics shareGroupMetrics = new ShareGroupMetrics(time);
sharePartitionManager = SharePartitionManagerBuilder.builder()
.withPartitionCacheMap(partitionCacheMap)
.withPartitionCacheMap(partitionCache)
.withReplicaManager(mockReplicaManager)
.withTime(time)
.withShareGroupMetrics(shareGroupMetrics)
@ -2200,8 +2200,8 @@ public class SharePartitionManagerTest {
List<TopicIdPartition> topicIdPartitions = List.of(tp0);
SharePartition sp0 = mock(SharePartition.class);
Map<SharePartitionKey, SharePartition> partitionCacheMap = new HashMap<>();
partitionCacheMap.put(new SharePartitionKey(groupId, tp0), sp0);
SharePartitionCache partitionCache = new SharePartitionCache();
partitionCache.put(new SharePartitionKey(groupId, tp0), sp0);
// Keep the 2 initialization futures pending and 1 completed with leader not available exception.
CompletableFuture<Void> pendingInitializationFuture1 = new CompletableFuture<>();
@ -2218,7 +2218,7 @@ public class SharePartitionManagerTest {
mockReplicaManagerDelayedShareFetch(mockReplicaManager, shareFetchPurgatorySpy);
sharePartitionManager = SharePartitionManagerBuilder.builder()
.withPartitionCacheMap(partitionCacheMap)
.withPartitionCacheMap(partitionCache)
.withReplicaManager(mockReplicaManager)
.withTimer(mockTimer)
.withBrokerTopicStats(brokerTopicStats)
@ -2274,8 +2274,8 @@ public class SharePartitionManagerTest {
List<TopicIdPartition> topicIdPartitions = List.of(tp0);
SharePartition sp0 = mock(SharePartition.class);
Map<SharePartitionKey, SharePartition> partitionCacheMap = new HashMap<>();
partitionCacheMap.put(new SharePartitionKey(groupId, tp0), sp0);
SharePartitionCache partitionCache = new SharePartitionCache();
partitionCache.put(new SharePartitionKey(groupId, tp0), sp0);
Timer mockTimer = systemTimerReaper();
DelayedOperationPurgatory<DelayedShareFetch> delayedShareFetchPurgatory = new DelayedOperationPurgatory<>(
@ -2284,7 +2284,7 @@ public class SharePartitionManagerTest {
mockReplicaManagerDelayedShareFetch(mockReplicaManager, delayedShareFetchPurgatory);
sharePartitionManager = SharePartitionManagerBuilder.builder()
.withPartitionCacheMap(partitionCacheMap)
.withPartitionCacheMap(partitionCache)
.withReplicaManager(mockReplicaManager)
.withTimer(mockTimer)
.withBrokerTopicStats(brokerTopicStats)
@ -2305,7 +2305,7 @@ public class SharePartitionManagerTest {
assertTrue(future.join().isEmpty());
Mockito.verify(sp0, times(0)).markFenced();
// Verify that the share partition is still in the cache on LeaderNotAvailableException.
assertEquals(1, partitionCacheMap.size());
assertEquals(1, partitionCache.size());
// Return IllegalStateException to simulate initialization failure.
when(sp0.maybeInitialize()).thenReturn(FutureUtils.failedFuture(new IllegalStateException("Illegal state")));
@ -2317,10 +2317,10 @@ public class SharePartitionManagerTest {
() -> "Processing in delayed share fetch queue never ended.");
validateShareFetchFutureException(future, tp0, Errors.UNKNOWN_SERVER_ERROR, "Illegal state");
Mockito.verify(sp0, times(1)).markFenced();
assertTrue(partitionCacheMap.isEmpty());
assertTrue(partitionCache.isEmpty());
// The last exception removes the share partition from the cache hence re-add the share partition to cache.
partitionCacheMap.put(new SharePartitionKey(groupId, tp0), sp0);
partitionCache.put(new SharePartitionKey(groupId, tp0), sp0);
// Return CoordinatorNotAvailableException to simulate initialization failure.
when(sp0.maybeInitialize()).thenReturn(FutureUtils.failedFuture(new CoordinatorNotAvailableException("Coordinator not available")));
future = sharePartitionManager.fetchMessages(groupId, memberId.toString(), FETCH_PARAMS, 0,
@ -2331,10 +2331,10 @@ public class SharePartitionManagerTest {
() -> "Processing in delayed share fetch queue never ended.");
validateShareFetchFutureException(future, tp0, Errors.COORDINATOR_NOT_AVAILABLE, "Coordinator not available");
Mockito.verify(sp0, times(2)).markFenced();
assertTrue(partitionCacheMap.isEmpty());
assertTrue(partitionCache.isEmpty());
// The last exception removes the share partition from the cache hence re-add the share partition to cache.
partitionCacheMap.put(new SharePartitionKey(groupId, tp0), sp0);
partitionCache.put(new SharePartitionKey(groupId, tp0), sp0);
// Return InvalidRequestException to simulate initialization failure.
when(sp0.maybeInitialize()).thenReturn(FutureUtils.failedFuture(new InvalidRequestException("Invalid request")));
future = sharePartitionManager.fetchMessages(groupId, memberId.toString(), FETCH_PARAMS, 0,
@ -2345,10 +2345,10 @@ public class SharePartitionManagerTest {
() -> "Processing in delayed share fetch queue never ended.");
validateShareFetchFutureException(future, tp0, Errors.INVALID_REQUEST, "Invalid request");
Mockito.verify(sp0, times(3)).markFenced();
assertTrue(partitionCacheMap.isEmpty());
assertTrue(partitionCache.isEmpty());
// The last exception removes the share partition from the cache hence re-add the share partition to cache.
partitionCacheMap.put(new SharePartitionKey(groupId, tp0), sp0);
partitionCache.put(new SharePartitionKey(groupId, tp0), sp0);
// Return FencedStateEpochException to simulate initialization failure.
when(sp0.maybeInitialize()).thenReturn(FutureUtils.failedFuture(new FencedStateEpochException("Fenced state epoch")));
future = sharePartitionManager.fetchMessages(groupId, memberId.toString(), FETCH_PARAMS, 0,
@ -2359,10 +2359,10 @@ public class SharePartitionManagerTest {
() -> "Processing in delayed share fetch queue never ended.");
validateShareFetchFutureException(future, tp0, Errors.FENCED_STATE_EPOCH, "Fenced state epoch");
Mockito.verify(sp0, times(4)).markFenced();
assertTrue(partitionCacheMap.isEmpty());
assertTrue(partitionCache.isEmpty());
// The last exception removes the share partition from the cache hence re-add the share partition to cache.
partitionCacheMap.put(new SharePartitionKey(groupId, tp0), sp0);
partitionCache.put(new SharePartitionKey(groupId, tp0), sp0);
// Return NotLeaderOrFollowerException to simulate initialization failure.
when(sp0.maybeInitialize()).thenReturn(FutureUtils.failedFuture(new NotLeaderOrFollowerException("Not leader or follower")));
future = sharePartitionManager.fetchMessages(groupId, memberId.toString(), FETCH_PARAMS, 0,
@ -2373,10 +2373,10 @@ public class SharePartitionManagerTest {
() -> "Processing in delayed share fetch queue never ended.");
validateShareFetchFutureException(future, tp0, Errors.NOT_LEADER_OR_FOLLOWER, "Not leader or follower");
Mockito.verify(sp0, times(5)).markFenced();
assertTrue(partitionCacheMap.isEmpty());
assertTrue(partitionCache.isEmpty());
// The last exception removes the share partition from the cache hence re-add the share partition to cache.
partitionCacheMap.put(new SharePartitionKey(groupId, tp0), sp0);
partitionCache.put(new SharePartitionKey(groupId, tp0), sp0);
// Return RuntimeException to simulate initialization failure.
when(sp0.maybeInitialize()).thenReturn(FutureUtils.failedFuture(new RuntimeException("Runtime exception")));
future = sharePartitionManager.fetchMessages(groupId, memberId.toString(), FETCH_PARAMS, 0,
@ -2387,7 +2387,7 @@ public class SharePartitionManagerTest {
() -> "Processing in delayed share fetch queue never ended.");
validateShareFetchFutureException(future, tp0, Errors.UNKNOWN_SERVER_ERROR, "Runtime exception");
Mockito.verify(sp0, times(6)).markFenced();
assertTrue(partitionCacheMap.isEmpty());
assertTrue(partitionCache.isEmpty());
// Should have 7 fetch recorded and 6 failures as 1 fetch was waiting on initialization and
// didn't error out.
validateBrokerTopicStatsMetrics(
@ -2404,13 +2404,13 @@ public class SharePartitionManagerTest {
TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0));
List<TopicIdPartition> topicIdPartitions = List.of(tp0);
Map<SharePartitionKey, SharePartition> partitionCacheMap = (Map<SharePartitionKey, SharePartition>) mock(Map.class);
SharePartitionCache partitionCache = mock(SharePartitionCache.class);
// Throw the exception for first fetch request. Return share partition for next.
when(partitionCacheMap.computeIfAbsent(any(), any()))
when(partitionCache.computeIfAbsent(any(), any()))
.thenThrow(new RuntimeException("Error creating instance"));
sharePartitionManager = SharePartitionManagerBuilder.builder()
.withPartitionCacheMap(partitionCacheMap)
.withPartitionCacheMap(partitionCache)
.withBrokerTopicStats(brokerTopicStats)
.build();
@ -2437,7 +2437,7 @@ public class SharePartitionManagerTest {
List<TopicIdPartition> topicIdPartitions = List.of(tp0);
// Send map to check no share partition is created.
Map<SharePartitionKey, SharePartition> partitionCacheMap = new HashMap<>();
SharePartitionCache partitionCache = new SharePartitionCache();
// Validate when partition is not the leader.
Partition partition = mock(Partition.class);
when(partition.isLeader()).thenReturn(false);
@ -2450,7 +2450,7 @@ public class SharePartitionManagerTest {
.thenReturn(partition);
sharePartitionManager = SharePartitionManagerBuilder.builder()
.withReplicaManager(replicaManager)
.withPartitionCacheMap(partitionCacheMap)
.withPartitionCacheMap(partitionCache)
.withBrokerTopicStats(brokerTopicStats)
.build();
@ -2463,7 +2463,7 @@ public class SharePartitionManagerTest {
DELAYED_SHARE_FETCH_TIMEOUT_MS,
() -> "Processing for delayed share fetch request not finished.");
validateShareFetchFutureException(future, tp0, Errors.KAFKA_STORAGE_ERROR, "Exception");
assertTrue(partitionCacheMap.isEmpty());
assertTrue(partitionCache.isEmpty());
// Validate when partition is not leader.
future = sharePartitionManager.fetchMessages(groupId, Uuid.randomUuid().toString(), FETCH_PARAMS, 0,
@ -2473,7 +2473,7 @@ public class SharePartitionManagerTest {
DELAYED_SHARE_FETCH_TIMEOUT_MS,
() -> "Processing for delayed share fetch request not finished.");
validateShareFetchFutureException(future, tp0, Errors.NOT_LEADER_OR_FOLLOWER);
assertTrue(partitionCacheMap.isEmpty());
assertTrue(partitionCache.isEmpty());
// Should have 2 fetch recorded and 2 failure.
validateBrokerTopicStatsMetrics(
brokerTopicStats,
@ -2503,8 +2503,8 @@ public class SharePartitionManagerTest {
// Mock share partition for tp1, so it can succeed.
SharePartition sp1 = mock(SharePartition.class);
Map<SharePartitionKey, SharePartition> partitionCacheMap = new HashMap<>();
partitionCacheMap.put(new SharePartitionKey(groupId, tp1), sp1);
SharePartitionCache partitionCache = new SharePartitionCache();
partitionCache.put(new SharePartitionKey(groupId, tp1), sp1);
when(sp1.maybeAcquireFetchLock(any())).thenReturn(true);
when(sp1.canAcquireRecords()).thenReturn(true);
@ -2513,7 +2513,7 @@ public class SharePartitionManagerTest {
// Fail initialization for tp2.
SharePartition sp2 = mock(SharePartition.class);
partitionCacheMap.put(new SharePartitionKey(groupId, tp2), sp2);
partitionCache.put(new SharePartitionKey(groupId, tp2), sp2);
when(sp2.maybeInitialize()).thenReturn(CompletableFuture.failedFuture(new FencedStateEpochException("Fenced state epoch")));
Timer mockTimer = systemTimerReaper();
@ -2528,7 +2528,7 @@ public class SharePartitionManagerTest {
sharePartitionManager = SharePartitionManagerBuilder.builder()
.withReplicaManager(replicaManager)
.withPartitionCacheMap(partitionCacheMap)
.withPartitionCacheMap(partitionCache)
.withBrokerTopicStats(brokerTopicStats)
.withTimer(mockTimer)
.build();
@ -2573,8 +2573,8 @@ public class SharePartitionManagerTest {
when(sp0.maybeAcquireFetchLock(any())).thenReturn(true);
when(sp0.canAcquireRecords()).thenReturn(true);
when(sp0.maybeInitialize()).thenReturn(CompletableFuture.completedFuture(null));
Map<SharePartitionKey, SharePartition> partitionCacheMap = new HashMap<>();
partitionCacheMap.put(new SharePartitionKey(groupId, tp0), sp0);
SharePartitionCache partitionCache = new SharePartitionCache();
partitionCache.put(new SharePartitionKey(groupId, tp0), sp0);
Timer mockTimer = systemTimerReaper();
DelayedOperationPurgatory<DelayedShareFetch> delayedShareFetchPurgatory = new DelayedOperationPurgatory<>(
@ -2585,7 +2585,7 @@ public class SharePartitionManagerTest {
doThrow(new RuntimeException("Exception")).when(mockReplicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean());
sharePartitionManager = SharePartitionManagerBuilder.builder()
.withPartitionCacheMap(partitionCacheMap)
.withPartitionCacheMap(partitionCache)
.withReplicaManager(mockReplicaManager)
.withTimer(mockTimer)
.withBrokerTopicStats(brokerTopicStats)
@ -2596,7 +2596,7 @@ public class SharePartitionManagerTest {
MAX_FETCH_RECORDS, BATCH_SIZE, topicIdPartitions);
validateShareFetchFutureException(future, tp0, Errors.UNKNOWN_SERVER_ERROR, "Exception");
// Verify that the share partition is still in the cache on exception.
assertEquals(1, partitionCacheMap.size());
assertEquals(1, partitionCache.size());
// Throw NotLeaderOrFollowerException from replica manager fetch which should evict instance from the cache.
doThrow(new NotLeaderOrFollowerException("Leader exception")).when(mockReplicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean());
@ -2604,7 +2604,7 @@ public class SharePartitionManagerTest {
future = sharePartitionManager.fetchMessages(groupId, memberId.toString(), FETCH_PARAMS, 0,
MAX_FETCH_RECORDS, BATCH_SIZE, topicIdPartitions);
validateShareFetchFutureException(future, tp0, Errors.NOT_LEADER_OR_FOLLOWER, "Leader exception");
assertTrue(partitionCacheMap.isEmpty());
assertTrue(partitionCache.isEmpty());
// Should have 2 fetch recorded and 2 failures.
validateBrokerTopicStatsMetrics(
brokerTopicStats,
@ -2633,9 +2633,9 @@ public class SharePartitionManagerTest {
when(sp1.maybeAcquireFetchLock(any())).thenReturn(false);
when(sp1.maybeInitialize()).thenReturn(CompletableFuture.completedFuture(null));
Map<SharePartitionKey, SharePartition> partitionCacheMap = new HashMap<>();
partitionCacheMap.put(new SharePartitionKey(groupId, tp0), sp0);
partitionCacheMap.put(new SharePartitionKey(groupId, tp1), sp1);
SharePartitionCache partitionCache = new SharePartitionCache();
partitionCache.put(new SharePartitionKey(groupId, tp0), sp0);
partitionCache.put(new SharePartitionKey(groupId, tp1), sp1);
Timer mockTimer = systemTimerReaper();
DelayedOperationPurgatory<DelayedShareFetch> delayedShareFetchPurgatory = new DelayedOperationPurgatory<>(
@ -2647,7 +2647,7 @@ public class SharePartitionManagerTest {
doThrow(new FencedStateEpochException("Fenced exception")).when(mockReplicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean());
sharePartitionManager = SharePartitionManagerBuilder.builder()
.withPartitionCacheMap(partitionCacheMap)
.withPartitionCacheMap(partitionCache)
.withReplicaManager(mockReplicaManager)
.withTimer(mockTimer)
.withBrokerTopicStats(brokerTopicStats)
@ -2658,21 +2658,21 @@ public class SharePartitionManagerTest {
MAX_FETCH_RECORDS, BATCH_SIZE, topicIdPartitions);
validateShareFetchFutureException(future, tp0, Errors.FENCED_STATE_EPOCH, "Fenced exception");
// Verify that tp1 is still in the cache on exception.
assertEquals(1, partitionCacheMap.size());
assertEquals(sp1, partitionCacheMap.get(new SharePartitionKey(groupId, tp1)));
assertEquals(1, partitionCache.size());
assertEquals(sp1, partitionCache.get(new SharePartitionKey(groupId, tp1)));
// Make sp1 acquirable and add sp0 back in partition cache. Both share partitions should be
// removed from the cache.
when(sp1.maybeAcquireFetchLock(any())).thenReturn(true);
when(sp1.canAcquireRecords()).thenReturn(true);
partitionCacheMap.put(new SharePartitionKey(groupId, tp0), sp0);
partitionCache.put(new SharePartitionKey(groupId, tp0), sp0);
// Throw FencedStateEpochException from replica manager fetch which should evict instance from the cache.
doThrow(new FencedStateEpochException("Fenced exception again")).when(mockReplicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean());
future = sharePartitionManager.fetchMessages(groupId, memberId.toString(), FETCH_PARAMS, 0,
MAX_FETCH_RECORDS, BATCH_SIZE, topicIdPartitions);
validateShareFetchFutureException(future, List.of(tp0, tp1), Errors.FENCED_STATE_EPOCH, "Fenced exception again");
assertTrue(partitionCacheMap.isEmpty());
assertTrue(partitionCache.isEmpty());
// Should have 4 fetch recorded (2 fetch and 2 topics) and 3 failures as sp1 was not acquired
// in first fetch and shall have empty response. Similarly, tp0 should record 2 failures and
// tp1 should record 1 failure.
@ -2719,33 +2719,33 @@ public class SharePartitionManagerTest {
public void testSharePartitionListenerOnFailed() {
SharePartitionKey sharePartitionKey = new SharePartitionKey("grp",
new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0)));
Map<SharePartitionKey, SharePartition> partitionCacheMap = new HashMap<>();
SharePartitionCache partitionCache = new SharePartitionCache();
ReplicaManager mockReplicaManager = mock(ReplicaManager.class);
SharePartitionListener partitionListener = new SharePartitionListener(sharePartitionKey, mockReplicaManager, partitionCacheMap);
testSharePartitionListener(sharePartitionKey, partitionCacheMap, mockReplicaManager, partitionListener::onFailed);
SharePartitionListener partitionListener = new SharePartitionListener(sharePartitionKey, mockReplicaManager, partitionCache);
testSharePartitionListener(sharePartitionKey, partitionCache, mockReplicaManager, partitionListener::onFailed);
}
@Test
public void testSharePartitionListenerOnDeleted() {
SharePartitionKey sharePartitionKey = new SharePartitionKey("grp",
new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0)));
Map<SharePartitionKey, SharePartition> partitionCacheMap = new HashMap<>();
SharePartitionCache partitionCache = new SharePartitionCache();
ReplicaManager mockReplicaManager = mock(ReplicaManager.class);
SharePartitionListener partitionListener = new SharePartitionListener(sharePartitionKey, mockReplicaManager, partitionCacheMap);
testSharePartitionListener(sharePartitionKey, partitionCacheMap, mockReplicaManager, partitionListener::onDeleted);
SharePartitionListener partitionListener = new SharePartitionListener(sharePartitionKey, mockReplicaManager, partitionCache);
testSharePartitionListener(sharePartitionKey, partitionCache, mockReplicaManager, partitionListener::onDeleted);
}
@Test
public void testSharePartitionListenerOnBecomingFollower() {
SharePartitionKey sharePartitionKey = new SharePartitionKey("grp",
new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0)));
Map<SharePartitionKey, SharePartition> partitionCacheMap = new HashMap<>();
SharePartitionCache partitionCache = new SharePartitionCache();
ReplicaManager mockReplicaManager = mock(ReplicaManager.class);
SharePartitionListener partitionListener = new SharePartitionListener(sharePartitionKey, mockReplicaManager, partitionCacheMap);
testSharePartitionListener(sharePartitionKey, partitionCacheMap, mockReplicaManager, partitionListener::onBecomingFollower);
SharePartitionListener partitionListener = new SharePartitionListener(sharePartitionKey, mockReplicaManager, partitionCache);
testSharePartitionListener(sharePartitionKey, partitionCache, mockReplicaManager, partitionListener::onBecomingFollower);
}
@Test
@ -2832,26 +2832,26 @@ public class SharePartitionManagerTest {
SharePartition sp3 = mock(SharePartition.class);
// Mock the share partitions corresponding to the topic partitions.
Map<SharePartitionKey, SharePartition> partitionCacheMap = new HashMap<>();
partitionCacheMap.put(
SharePartitionCache partitionCache = new SharePartitionCache();
partitionCache.put(
new SharePartitionKey(groupId, new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo1", 0))), sp0
);
partitionCacheMap.put(
partitionCache.put(
new SharePartitionKey(groupId, new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo2", 0))), sp1
);
partitionCacheMap.put(
partitionCache.put(
new SharePartitionKey(groupId, new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo3", 0))), sp2
);
partitionCacheMap.put(
partitionCache.put(
new SharePartitionKey(groupId, new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo4", 0))), sp3
);
sharePartitionManager = SharePartitionManagerBuilder.builder()
.withPartitionCacheMap(partitionCacheMap)
.withPartitionCacheMap(partitionCache)
.build();
assertEquals(4, sharePartitionManager.partitionCacheSize());
assertEquals(4, partitionCache.size());
sharePartitionManager.onShareVersionToggle(ShareVersion.SV_0);
// Because we are toggling to a share version which does not support share groups, the cache inside share partitions must be cleared.
assertEquals(0, sharePartitionManager.partitionCacheSize());
assertEquals(0, partitionCache.size());
//Check if all share partitions have been fenced.
Mockito.verify(sp0).markFenced();
Mockito.verify(sp1).markFenced();
@ -2873,7 +2873,7 @@ public class SharePartitionManagerTest {
private void testSharePartitionListener(
SharePartitionKey sharePartitionKey,
Map<SharePartitionKey, SharePartition> partitionCacheMap,
SharePartitionCache partitionCache,
ReplicaManager mockReplicaManager,
Consumer<TopicPartition> listenerConsumer
) {
@ -2884,22 +2884,22 @@ public class SharePartitionManagerTest {
SharePartition sp0 = mock(SharePartition.class);
SharePartition sp1 = mock(SharePartition.class);
partitionCacheMap.put(sharePartitionKey, sp0);
partitionCacheMap.put(spk, sp1);
partitionCache.put(sharePartitionKey, sp0);
partitionCache.put(spk, sp1);
// Invoke listener for first share partition.
listenerConsumer.accept(sharePartitionKey.topicIdPartition().topicPartition());
// Validate that the share partition is removed from the cache.
assertEquals(1, partitionCacheMap.size());
assertFalse(partitionCacheMap.containsKey(sharePartitionKey));
assertEquals(1, partitionCache.size());
assertFalse(partitionCache.containsKey(sharePartitionKey));
verify(sp0, times(1)).markFenced();
verify(mockReplicaManager, times(1)).removeListener(any(), any());
// Invoke listener for non-matching share partition.
listenerConsumer.accept(tp);
// The non-matching share partition should not be removed as the listener is attached to a different topic partition.
assertEquals(1, partitionCacheMap.size());
assertEquals(1, partitionCache.size());
verify(sp1, times(0)).markFenced();
// Verify the remove listener is not called for the second share partition.
verify(mockReplicaManager, times(1)).removeListener(any(), any());
@ -3069,7 +3069,7 @@ public class SharePartitionManagerTest {
private ReplicaManager replicaManager = mock(ReplicaManager.class);
private Time time = new MockTime();
private ShareSessionCache cache = new ShareSessionCache(10, true);
private Map<SharePartitionKey, SharePartition> partitionCacheMap = new HashMap<>();
private SharePartitionCache partitionCache = new SharePartitionCache();
private Timer timer = new MockTimer();
private ShareGroupMetrics shareGroupMetrics = new ShareGroupMetrics(time);
private BrokerTopicStats brokerTopicStats;
@ -3089,8 +3089,8 @@ public class SharePartitionManagerTest {
return this;
}
SharePartitionManagerBuilder withPartitionCacheMap(Map<SharePartitionKey, SharePartition> partitionCacheMap) {
this.partitionCacheMap = partitionCacheMap;
SharePartitionManagerBuilder withPartitionCacheMap(SharePartitionCache partitionCache) {
this.partitionCache = partitionCache;
return this;
}
@ -3117,7 +3117,7 @@ public class SharePartitionManagerTest {
return new SharePartitionManager(replicaManager,
time,
cache,
partitionCacheMap,
partitionCache,
DEFAULT_RECORD_LOCK_DURATION_MS,
timer,
MAX_DELIVERY_COUNT,

View File

@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server.share;
import org.apache.kafka.common.Uuid;
/**
* The ShareGroupListener is used to notify when there is a change in the share group members.
*/
public interface ShareGroupListener {
/**
* Called when member leaves the group.
*
* @param groupId The id of the group.
* @param memberId The id of the member.
*/
void onMemberLeave(String groupId, Uuid memberId);
/**
* Called when the group is empty.
*
* @param groupId The id of the group.
*/
void onGroupEmpty(String groupId);
}

View File

@ -23,6 +23,7 @@ import org.apache.kafka.common.utils.ImplicitLinkedHashCollection;
import org.apache.kafka.server.metrics.KafkaMetricsGroup;
import org.apache.kafka.server.network.ConnectionDisconnectListener;
import org.apache.kafka.server.share.CachedSharePartition;
import org.apache.kafka.server.share.ShareGroupListener;
import com.yammer.metrics.core.Meter;
@ -52,19 +53,35 @@ public class ShareSessionCache {
* Metric for the rate of eviction of share sessions.
*/
private final Meter evictionsMeter;
private final int maxEntries;
private long numPartitions = 0;
/**
* The listener for connection disconnect events for the client.
*/
private final ConnectionDisconnectListener connectionDisconnectListener;
// A map of session key to ShareSession.
/**
* Map of session key to ShareSession.
*/
private final Map<ShareSessionKey, ShareSession> sessions = new HashMap<>();
/**
* Map of groupId to number of members in the group.
*/
private final Map<String, Integer> numMembersPerGroup = new HashMap<>();
/**
* The map to store the client connection id to session key. This is used to remove the session
* from the cache when the respective client disconnects.
*/
private final Map<String, ShareSessionKey> connectionIdToSessionMap;
/**
* Flag indicating if share groups have been turned on.
*/
private final AtomicBoolean supportsShareGroups;
/**
* The listener for share group events. This is used to notify the listener when the group members
* change.
*/
private ShareGroupListener shareGroupListener;
private final int maxEntries;
private long numPartitions = 0;
@SuppressWarnings("this-escape")
public ShareSessionCache(int maxEntries, boolean supportsShareGroups) {
@ -101,6 +118,7 @@ public class ShareSessionCache {
*/
public synchronized void removeAllSessions() {
sessions.clear();
numMembersPerGroup.clear();
numPartitions = 0;
}
@ -115,6 +133,41 @@ public class ShareSessionCache {
return null;
}
/**
* Maybe remove the session and notify listeners. This is called when the connection is disconnected
* for the client. The session may have already been removed by the client as part of final epoch,
* hence check if the session is still present in the cache.
*
* @param key The share session key.
*/
public synchronized void maybeRemoveAndNotifyListeners(ShareSessionKey key) {
ShareSession session = get(key);
if (session != null) {
// Notify the share group listener that member has left the group. Notify listener prior
// removing the session from the cache to ensure that the listener has access to the session
// while it is still in the cache.
if (shareGroupListener != null) {
shareGroupListener.onMemberLeave(key.groupId(), key.memberId());
}
// As session is not null hence it's removed as part of connection disconnect. Hence,
// update the evictions metric.
evictionsMeter.mark();
// Try removing session if not already removed. The listener might have removed the session
// already.
remove(session);
}
// Notify the share group listener if the group is empty. This should be checked regardless
// session is evicted by connection disconnect or client's final epoch.
int numMembers = numMembersPerGroup.getOrDefault(key.groupId(), 0);
if (numMembers == 0) {
// Remove the group from the map as it is empty.
numMembersPerGroup.remove(key.groupId());
if (shareGroupListener != null) {
shareGroupListener.onGroupEmpty(key.groupId());
}
}
}
/**
* Remove an entry from the session cache.
*
@ -125,6 +178,7 @@ public class ShareSessionCache {
ShareSession removeResult = sessions.remove(session.key());
if (removeResult != null) {
numPartitions = numPartitions - session.cachedSize();
numMembersPerGroup.compute(session.key().groupId(), (k, v) -> v != null ? v - 1 : 0);
}
return removeResult;
}
@ -159,6 +213,7 @@ public class ShareSessionCache {
ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH));
sessions.put(session.key(), session);
updateNumPartitions(session);
numMembersPerGroup.compute(session.key().groupId(), (k, v) -> v != null ? v + 1 : 1);
connectionIdToSessionMap.put(clientConnectionId, session.key());
return session.key();
}
@ -169,11 +224,20 @@ public class ShareSessionCache {
return connectionDisconnectListener;
}
public synchronized void registerShareGroupListener(ShareGroupListener shareGroupListener) {
this.shareGroupListener = shareGroupListener;
}
// Visible for testing.
Meter evictionsMeter() {
return evictionsMeter;
}
// Visible for testing.
Integer numMembers(String groupId) {
return numMembersPerGroup.get(groupId);
}
private final class ClientConnectionDisconnectListener implements ConnectionDisconnectListener {
// When the client disconnects, the corresponding session should be removed from the cache.
@ -181,11 +245,10 @@ public class ShareSessionCache {
public void onDisconnect(String connectionId) {
ShareSessionKey shareSessionKey = connectionIdToSessionMap.remove(connectionId);
if (shareSessionKey != null) {
// Remove the session from the cache.
ShareSession removedSession = remove(shareSessionKey);
if (removedSession != null) {
evictionsMeter.mark();
}
// Try removing session and notify listeners. The session might already be removed
// as part of final epoch from client, so we need to check if the session is still
// present in the cache.
maybeRemoveAndNotifyListeners(shareSessionKey);
}
}
}

View File

@ -30,6 +30,10 @@ public class ShareSessionKey {
this.memberId = Objects.requireNonNull(memberId);
}
public String groupId() {
return groupId;
}
public Uuid memberId() {
return memberId;
}

View File

@ -19,10 +19,12 @@ package org.apache.kafka.server.share.session;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.utils.ImplicitLinkedHashCollection;
import org.apache.kafka.server.share.CachedSharePartition;
import org.apache.kafka.server.share.ShareGroupListener;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
import java.util.Iterator;
import java.util.List;
@ -154,6 +156,102 @@ public class ShareSessionCacheTest {
assertEquals(0, cache.totalPartitions());
}
@Test
public void testShareGroupListenerEvents() {
ShareGroupListener mockListener = Mockito.mock(ShareGroupListener.class);
ShareSessionCache cache = new ShareSessionCache(3, true);
cache.registerShareGroupListener(mockListener);
String groupId = "grp";
Uuid memberId1 = Uuid.randomUuid();
Uuid memberId2 = Uuid.randomUuid();
ShareSessionKey key1 = cache.maybeCreateSession(groupId, memberId1, mockedSharePartitionMap(1), "conn-1");
ShareSessionKey key2 = cache.maybeCreateSession(groupId, memberId2, mockedSharePartitionMap(1), "conn-2");
// Verify member count is tracked
assertEquals(2, cache.size());
assertNotNull(cache.get(key1));
assertNotNull(cache.get(key2));
assertEquals(2, cache.numMembers(groupId));
// Remove session and verify listener are not called as connection disconnect listener didn't
// remove the session.
cache.remove(key1);
Mockito.verify(mockListener, Mockito.times(0)).onMemberLeave(groupId, memberId1);
Mockito.verify(mockListener, Mockito.times(0)).onGroupEmpty(groupId);
// Verify member count is updated
assertEquals(1, cache.numMembers(groupId));
// Re-create session for memberId1.
cache.maybeCreateSession(groupId, memberId1, mockedSharePartitionMap(1), "conn-1");
assertEquals(2, cache.numMembers(groupId));
// Simulate connection disconnect for memberId1.
cache.connectionDisconnectListener().onDisconnect("conn-1");
// Verify only member leave event is triggered for memberId1. Empty group event should not be triggered.
Mockito.verify(mockListener, Mockito.times(1)).onMemberLeave(groupId, memberId1);
Mockito.verify(mockListener, Mockito.times(0)).onMemberLeave(groupId, memberId2);
Mockito.verify(mockListener, Mockito.times(0)).onGroupEmpty(groupId);
assertEquals(1, cache.numMembers(groupId));
// Simulate connection disconnect for memberId2.
cache.connectionDisconnectListener().onDisconnect("conn-2");
// Verify both member leave event and empty group event should be triggered.
Mockito.verify(mockListener, Mockito.times(1)).onMemberLeave(groupId, memberId1);
Mockito.verify(mockListener, Mockito.times(1)).onMemberLeave(groupId, memberId2);
Mockito.verify(mockListener, Mockito.times(1)).onGroupEmpty(groupId);
assertNull(cache.numMembers(groupId));
}
@Test
public void testShareGroupListenerEventsMultipleGroups() {
ShareGroupListener mockListener = Mockito.mock(ShareGroupListener.class);
ShareSessionCache cache = new ShareSessionCache(3, true);
cache.registerShareGroupListener(mockListener);
String groupId1 = "grp1";
String groupId2 = "grp2";
Uuid memberId1 = Uuid.randomUuid();
Uuid memberId2 = Uuid.randomUuid();
ShareSessionKey key1 = cache.maybeCreateSession(groupId1, memberId1, mockedSharePartitionMap(1), "conn-1");
ShareSessionKey key2 = cache.maybeCreateSession(groupId2, memberId2, mockedSharePartitionMap(1), "conn-2");
// Verify member count is tracked
assertEquals(2, cache.size());
assertNotNull(cache.get(key1));
assertNotNull(cache.get(key2));
assertEquals(1, cache.numMembers(groupId1));
assertEquals(1, cache.numMembers(groupId2));
// Remove session for group1 and verify listeners are only called for group1.
cache.connectionDisconnectListener().onDisconnect("conn-1");
Mockito.verify(mockListener, Mockito.times(1)).onMemberLeave(groupId1, memberId1);
Mockito.verify(mockListener, Mockito.times(1)).onGroupEmpty(groupId1);
// Listener should not be called for group2.
Mockito.verify(mockListener, Mockito.times(0)).onMemberLeave(groupId2, memberId2);
Mockito.verify(mockListener, Mockito.times(0)).onGroupEmpty(groupId2);
// Verify member count is updated.
assertNull(cache.numMembers(groupId1));
assertEquals(1, cache.numMembers(groupId2));
}
@Test
public void testNoShareGroupListenerRegistered() {
ShareSessionCache cache = new ShareSessionCache(3, true);
String groupId = "grp";
Uuid memberId = Uuid.randomUuid();
ShareSessionKey key = cache.maybeCreateSession(groupId, memberId, mockedSharePartitionMap(1), "conn-1");
// Verify member count is still tracked even without listener
assertEquals(1, cache.numMembers(groupId));
assertNotNull(cache.get(key));
// Remove session should not throw any exceptions.
cache.connectionDisconnectListener().onDisconnect("conn-1");
assertNull(cache.numMembers(groupId));
}
private ImplicitLinkedHashCollection<CachedSharePartition> mockedSharePartitionMap(int size) {
ImplicitLinkedHashCollection<CachedSharePartition> cacheMap = new
ImplicitLinkedHashCollection<>(size);