mirror of https://github.com/apache/kafka.git
MINOR: Cleaning up code for share feature listener (#19715)
CI / build (push) Waiting to run
Details
CI / build (push) Waiting to run
Details
The PR is a minor follow up on https://github.com/apache/kafka/pull/19659. KafkaApis.scala already have a check which denies new share fetch related calls if the share group is not supported. Hence no new sessions shall be created so the requirement to have share group enabled flag in ShareSessionCache is not needed, unless I am missing something. Reviewers: Abhinav Dixit <adixit@confluent.io>, Andrew Schofield <aschofield@confluent.io>
This commit is contained in:
parent
a1008dc85d
commit
6d45657cde
|
@ -63,7 +63,7 @@ public class SharePartitionCache {
|
|||
* @return The set of topic-partitions for the group id.
|
||||
*/
|
||||
public synchronized Set<TopicIdPartition> topicIdPartitionsForGroup(String groupId) {
|
||||
return Set.copyOf(groups.get(groupId));
|
||||
return groups.containsKey(groupId) ? Set.copyOf(groups.get(groupId)) : Set.of();
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -532,6 +532,24 @@ public class SharePartitionManager implements AutoCloseable {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* The handler for share version feature metadata changes.
|
||||
* @param shareVersion the new share version feature
|
||||
* @param isEnabledFromConfig whether the share version feature is enabled from config
|
||||
*/
|
||||
public void onShareVersionToggle(ShareVersion shareVersion, boolean isEnabledFromConfig) {
|
||||
// Clear the cache and remove all share partitions from the cache if the share version does
|
||||
// not support share groups.
|
||||
if (!shareVersion.supportsShareGroups() && !isEnabledFromConfig) {
|
||||
cache.removeAllSessions();
|
||||
Set<SharePartitionKey> sharePartitionKeys = partitionCache.cachedSharePartitionKeys();
|
||||
// Remove all share partitions from partition cache.
|
||||
sharePartitionKeys.forEach(sharePartitionKey ->
|
||||
removeSharePartitionFromCache(sharePartitionKey, partitionCache, replicaManager)
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* The cachedTopicIdPartitionsInShareSession method is used to get the cached topic-partitions in the share session.
|
||||
*
|
||||
|
@ -748,27 +766,6 @@ public class SharePartitionManager implements AutoCloseable {
|
|||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* The handler for share version feature metadata changes.
|
||||
* @param shareVersion the new share version feature
|
||||
*/
|
||||
public void onShareVersionToggle(ShareVersion shareVersion) {
|
||||
if (!shareVersion.supportsShareGroups()) {
|
||||
cache.updateSupportsShareGroups(false);
|
||||
// Remove all share sessions from share session cache.
|
||||
synchronized (cache) {
|
||||
cache.removeAllSessions();
|
||||
}
|
||||
Set<SharePartitionKey> sharePartitionKeys = partitionCache.cachedSharePartitionKeys();
|
||||
// Remove all share partitions from partition cache.
|
||||
sharePartitionKeys.forEach(sharePartitionKey ->
|
||||
removeSharePartitionFromCache(sharePartitionKey, partitionCache, replicaManager)
|
||||
);
|
||||
} else {
|
||||
cache.updateSupportsShareGroups(true);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* The SharePartitionListener is used to listen for partition events. The share partition is associated with
|
||||
* the topic-partition, we need to handle the partition events for the share partition.
|
||||
|
|
|
@ -45,8 +45,7 @@ import org.apache.kafka.metadata.{BrokerState, ListenerInfo}
|
|||
import org.apache.kafka.metadata.publisher.AclPublisher
|
||||
import org.apache.kafka.security.CredentialProvider
|
||||
import org.apache.kafka.server.authorizer.Authorizer
|
||||
import org.apache.kafka.server.common.MetadataVersion.MINIMUM_VERSION
|
||||
import org.apache.kafka.server.common.{ApiMessageAndVersion, DirectoryEventHandler, FinalizedFeatures, NodeToControllerChannelManager, ShareVersion, TopicIdPartition}
|
||||
import org.apache.kafka.server.common.{ApiMessageAndVersion, DirectoryEventHandler, NodeToControllerChannelManager, TopicIdPartition}
|
||||
import org.apache.kafka.server.config.{ConfigType, DelegationTokenManagerConfigs}
|
||||
import org.apache.kafka.server.log.remote.storage.{RemoteLogManager, RemoteLogManagerConfig}
|
||||
import org.apache.kafka.server.metrics.{ClientMetricsReceiverPlugin, KafkaYammerMetrics}
|
||||
|
@ -261,10 +260,7 @@ class BrokerServer(
|
|||
)
|
||||
|
||||
val shareFetchSessionCache : ShareSessionCache = new ShareSessionCache(
|
||||
config.shareGroupConfig.shareGroupMaxShareSessions(),
|
||||
ShareVersion.fromFeatureLevel(
|
||||
FinalizedFeatures.fromKRaftVersion(MINIMUM_VERSION).finalizedFeatures().getOrDefault(ShareVersion.FEATURE_NAME, 0.toShort)
|
||||
).supportsShareGroups()
|
||||
config.shareGroupConfig.shareGroupMaxShareSessions()
|
||||
)
|
||||
|
||||
val connectionDisconnectListeners = Seq(
|
||||
|
@ -487,6 +483,7 @@ class BrokerServer(
|
|||
groupCoordinator,
|
||||
transactionCoordinator,
|
||||
shareCoordinator,
|
||||
sharePartitionManager,
|
||||
new DynamicConfigPublisher(
|
||||
config,
|
||||
sharedServer.metadataPublishingFaultHandler,
|
||||
|
@ -522,8 +519,7 @@ class BrokerServer(
|
|||
authorizerPlugin.toJava
|
||||
),
|
||||
sharedServer.initialBrokerMetadataLoadFaultHandler,
|
||||
sharedServer.metadataPublishingFaultHandler,
|
||||
sharePartitionManager
|
||||
sharedServer.metadataPublishingFaultHandler
|
||||
)
|
||||
// If the BrokerLifecycleManager's initial catch-up future fails, it means we timed out
|
||||
// or are shutting down before we could catch up. Therefore, also fail the firstPublishFuture.
|
||||
|
|
|
@ -74,6 +74,7 @@ class BrokerMetadataPublisher(
|
|||
groupCoordinator: GroupCoordinator,
|
||||
txnCoordinator: TransactionCoordinator,
|
||||
shareCoordinator: ShareCoordinator,
|
||||
sharePartitionManager: SharePartitionManager,
|
||||
var dynamicConfigPublisher: DynamicConfigPublisher,
|
||||
dynamicClientQuotaPublisher: DynamicClientQuotaPublisher,
|
||||
dynamicTopicClusterQuotaPublisher: DynamicTopicClusterQuotaPublisher,
|
||||
|
@ -81,8 +82,7 @@ class BrokerMetadataPublisher(
|
|||
delegationTokenPublisher: DelegationTokenPublisher,
|
||||
aclPublisher: AclPublisher,
|
||||
fatalFaultHandler: FaultHandler,
|
||||
metadataPublishingFaultHandler: FaultHandler,
|
||||
sharePartitionManager: SharePartitionManager
|
||||
metadataPublishingFaultHandler: FaultHandler
|
||||
) extends MetadataPublisher with Logging {
|
||||
logIdent = s"[BrokerMetadataPublisher id=${config.nodeId}] "
|
||||
|
||||
|
@ -254,16 +254,17 @@ class BrokerMetadataPublisher(
|
|||
if (delta.featuresDelta != null) {
|
||||
try {
|
||||
val newFinalizedFeatures = new FinalizedFeatures(newImage.features.metadataVersionOrThrow, newImage.features.finalizedVersions, newImage.provenance.lastContainedOffset)
|
||||
val newFinalizedShareVersion = newFinalizedFeatures.finalizedFeatures().getOrDefault(ShareVersion.FEATURE_NAME, 0.toShort)
|
||||
// Share version feature has been toggled.
|
||||
if (newFinalizedFeatures.finalizedFeatures().getOrDefault(ShareVersion.FEATURE_NAME, 0.toShort) != finalizedShareVersion) {
|
||||
finalizedShareVersion = newFinalizedFeatures.finalizedFeatures().getOrDefault(ShareVersion.FEATURE_NAME, 0.toShort)
|
||||
if (newFinalizedShareVersion != finalizedShareVersion) {
|
||||
finalizedShareVersion = newFinalizedShareVersion
|
||||
val shareVersion: ShareVersion = ShareVersion.fromFeatureLevel(finalizedShareVersion)
|
||||
info(s"Feature share.version has been updated to version $finalizedShareVersion")
|
||||
sharePartitionManager.onShareVersionToggle(shareVersion)
|
||||
sharePartitionManager.onShareVersionToggle(shareVersion, config.shareGroupConfig.isShareGroupEnabled)
|
||||
}
|
||||
} catch {
|
||||
case t: Throwable => metadataPublishingFaultHandler.handleFault("Error updating share partition manager " +
|
||||
s" with share version feature change in $delta", t)
|
||||
s" with share version feature change in $deltaName", t)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -186,7 +186,7 @@ public class SharePartitionManagerTest {
|
|||
|
||||
@Test
|
||||
public void testNewContextReturnsFinalContextWithoutRequestData() {
|
||||
ShareSessionCache cache = new ShareSessionCache(10, true);
|
||||
ShareSessionCache cache = new ShareSessionCache(10);
|
||||
sharePartitionManager = SharePartitionManagerBuilder.builder()
|
||||
.withCache(cache)
|
||||
.build();
|
||||
|
@ -213,7 +213,7 @@ public class SharePartitionManagerTest {
|
|||
|
||||
@Test
|
||||
public void testNewContextReturnsFinalContextWithRequestData() {
|
||||
ShareSessionCache cache = new ShareSessionCache(10, true);
|
||||
ShareSessionCache cache = new ShareSessionCache(10);
|
||||
sharePartitionManager = SharePartitionManagerBuilder.builder()
|
||||
.withCache(cache)
|
||||
.build();
|
||||
|
@ -243,7 +243,7 @@ public class SharePartitionManagerTest {
|
|||
|
||||
@Test
|
||||
public void testNewContextReturnsFinalContextWhenTopicPartitionsArePresentInRequestData() {
|
||||
ShareSessionCache cache = new ShareSessionCache(10, true);
|
||||
ShareSessionCache cache = new ShareSessionCache(10);
|
||||
sharePartitionManager = SharePartitionManagerBuilder.builder()
|
||||
.withCache(cache)
|
||||
.build();
|
||||
|
@ -274,7 +274,7 @@ public class SharePartitionManagerTest {
|
|||
|
||||
@Test
|
||||
public void testNewContext() {
|
||||
ShareSessionCache cache = new ShareSessionCache(10, true);
|
||||
ShareSessionCache cache = new ShareSessionCache(10);
|
||||
sharePartitionManager = SharePartitionManagerBuilder.builder()
|
||||
.withCache(cache)
|
||||
.build();
|
||||
|
@ -435,7 +435,7 @@ public class SharePartitionManagerTest {
|
|||
|
||||
@Test
|
||||
public void testZeroSizeShareSession() {
|
||||
ShareSessionCache cache = new ShareSessionCache(10, true);
|
||||
ShareSessionCache cache = new ShareSessionCache(10);
|
||||
sharePartitionManager = SharePartitionManagerBuilder.builder()
|
||||
.withCache(cache)
|
||||
.build();
|
||||
|
@ -481,7 +481,7 @@ public class SharePartitionManagerTest {
|
|||
@Test
|
||||
public void testToForgetPartitions() {
|
||||
String groupId = "grp";
|
||||
ShareSessionCache cache = new ShareSessionCache(10, true);
|
||||
ShareSessionCache cache = new ShareSessionCache(10);
|
||||
sharePartitionManager = SharePartitionManagerBuilder.builder()
|
||||
.withCache(cache)
|
||||
.build();
|
||||
|
@ -519,7 +519,7 @@ public class SharePartitionManagerTest {
|
|||
@Test
|
||||
public void testShareSessionUpdateTopicIdsBrokerSide() {
|
||||
String groupId = "grp";
|
||||
ShareSessionCache cache = new ShareSessionCache(10, true);
|
||||
ShareSessionCache cache = new ShareSessionCache(10);
|
||||
sharePartitionManager = SharePartitionManagerBuilder.builder()
|
||||
.withCache(cache)
|
||||
.build();
|
||||
|
@ -570,7 +570,7 @@ public class SharePartitionManagerTest {
|
|||
|
||||
@Test
|
||||
public void testGetErroneousAndValidTopicIdPartitions() {
|
||||
ShareSessionCache cache = new ShareSessionCache(10, true);
|
||||
ShareSessionCache cache = new ShareSessionCache(10);
|
||||
sharePartitionManager = SharePartitionManagerBuilder.builder()
|
||||
.withCache(cache)
|
||||
.build();
|
||||
|
@ -663,7 +663,7 @@ public class SharePartitionManagerTest {
|
|||
|
||||
@Test
|
||||
public void testShareFetchContextResponseSize() {
|
||||
ShareSessionCache cache = new ShareSessionCache(10, true);
|
||||
ShareSessionCache cache = new ShareSessionCache(10);
|
||||
sharePartitionManager = SharePartitionManagerBuilder.builder()
|
||||
.withCache(cache)
|
||||
.build();
|
||||
|
@ -764,7 +764,7 @@ public class SharePartitionManagerTest {
|
|||
|
||||
@Test
|
||||
public void testCachedTopicPartitionsWithNoTopicPartitions() {
|
||||
ShareSessionCache cache = new ShareSessionCache(10, true);
|
||||
ShareSessionCache cache = new ShareSessionCache(10);
|
||||
sharePartitionManager = SharePartitionManagerBuilder.builder()
|
||||
.withCache(cache)
|
||||
.build();
|
||||
|
@ -775,7 +775,7 @@ public class SharePartitionManagerTest {
|
|||
|
||||
@Test
|
||||
public void testCachedTopicPartitionsForValidShareSessions() {
|
||||
ShareSessionCache cache = new ShareSessionCache(10, true);
|
||||
ShareSessionCache cache = new ShareSessionCache(10);
|
||||
sharePartitionManager = SharePartitionManagerBuilder.builder()
|
||||
.withCache(cache)
|
||||
.build();
|
||||
|
@ -1048,7 +1048,7 @@ public class SharePartitionManagerTest {
|
|||
.withReplicaManager(mockReplicaManager)
|
||||
.withTimer(mockTimer)
|
||||
.withBrokerTopicStats(brokerTopicStats)
|
||||
.withPartitionCacheMap(partitionCache)
|
||||
.withPartitionCache(partitionCache)
|
||||
.build();
|
||||
|
||||
doAnswer(invocation -> {
|
||||
|
@ -1133,7 +1133,7 @@ public class SharePartitionManagerTest {
|
|||
mockReplicaManagerDelayedShareFetch(mockReplicaManager, delayedShareFetchPurgatory);
|
||||
|
||||
sharePartitionManager = SharePartitionManagerBuilder.builder()
|
||||
.withPartitionCacheMap(partitionCache)
|
||||
.withPartitionCache(partitionCache)
|
||||
.withReplicaManager(mockReplicaManager)
|
||||
.withTimer(mockTimer)
|
||||
.withBrokerTopicStats(brokerTopicStats)
|
||||
|
@ -1243,7 +1243,7 @@ public class SharePartitionManagerTest {
|
|||
|
||||
sharePartitionManager = SharePartitionManagerBuilder.builder()
|
||||
.withCache(cache)
|
||||
.withPartitionCacheMap(partitionCache)
|
||||
.withPartitionCache(partitionCache)
|
||||
.withBrokerTopicStats(brokerTopicStats)
|
||||
.build();
|
||||
|
||||
|
@ -1384,7 +1384,7 @@ public class SharePartitionManagerTest {
|
|||
partitionCache.put(new SharePartitionKey(groupId, tp), sp);
|
||||
|
||||
sharePartitionManager = SharePartitionManagerBuilder.builder()
|
||||
.withPartitionCacheMap(partitionCache)
|
||||
.withPartitionCache(partitionCache)
|
||||
.withBrokerTopicStats(brokerTopicStats)
|
||||
.build();
|
||||
|
||||
|
@ -1432,7 +1432,7 @@ public class SharePartitionManagerTest {
|
|||
|
||||
ShareGroupMetrics shareGroupMetrics = new ShareGroupMetrics(time);
|
||||
sharePartitionManager = SharePartitionManagerBuilder.builder()
|
||||
.withPartitionCacheMap(partitionCache)
|
||||
.withPartitionCache(partitionCache)
|
||||
.withShareGroupMetrics(shareGroupMetrics)
|
||||
.withBrokerTopicStats(brokerTopicStats)
|
||||
.build();
|
||||
|
@ -1504,7 +1504,7 @@ public class SharePartitionManagerTest {
|
|||
|
||||
ShareGroupMetrics shareGroupMetrics = new ShareGroupMetrics(time);
|
||||
sharePartitionManager = SharePartitionManagerBuilder.builder()
|
||||
.withPartitionCacheMap(partitionCache)
|
||||
.withPartitionCache(partitionCache)
|
||||
.withShareGroupMetrics(shareGroupMetrics)
|
||||
.withBrokerTopicStats(brokerTopicStats)
|
||||
.build();
|
||||
|
@ -1544,7 +1544,7 @@ public class SharePartitionManagerTest {
|
|||
partitionCache.put(new SharePartitionKey(groupId, tp), sp);
|
||||
ShareGroupMetrics shareGroupMetrics = new ShareGroupMetrics(time);
|
||||
sharePartitionManager = SharePartitionManagerBuilder.builder()
|
||||
.withPartitionCacheMap(partitionCache)
|
||||
.withPartitionCache(partitionCache)
|
||||
.withBrokerTopicStats(brokerTopicStats)
|
||||
.withShareGroupMetrics(shareGroupMetrics)
|
||||
.build();
|
||||
|
@ -1587,7 +1587,7 @@ public class SharePartitionManagerTest {
|
|||
SharePartitionCache partitionCache = new SharePartitionCache();
|
||||
partitionCache.put(new SharePartitionKey(groupId, tp), sp);
|
||||
sharePartitionManager = SharePartitionManagerBuilder.builder()
|
||||
.withPartitionCacheMap(partitionCache)
|
||||
.withPartitionCache(partitionCache)
|
||||
.withBrokerTopicStats(brokerTopicStats)
|
||||
.build();
|
||||
|
||||
|
@ -1700,7 +1700,7 @@ public class SharePartitionManagerTest {
|
|||
topicIdPartitions.forEach(topicIdPartition -> delayedShareFetchWatchKeys.add(new DelayedShareFetchGroupKey(groupId, topicIdPartition.topicId(), topicIdPartition.partition())));
|
||||
|
||||
sharePartitionManager = SharePartitionManagerBuilder.builder()
|
||||
.withPartitionCacheMap(partitionCache)
|
||||
.withPartitionCache(partitionCache)
|
||||
.withReplicaManager(mockReplicaManager)
|
||||
.withTimer(mockTimer)
|
||||
.withBrokerTopicStats(brokerTopicStats)
|
||||
|
@ -1809,7 +1809,7 @@ public class SharePartitionManagerTest {
|
|||
topicIdPartitions.forEach(topicIdPartition -> delayedShareFetchWatchKeys.add(new DelayedShareFetchGroupKey(groupId, topicIdPartition.topicId(), topicIdPartition.partition())));
|
||||
|
||||
sharePartitionManager = SharePartitionManagerBuilder.builder()
|
||||
.withPartitionCacheMap(partitionCache)
|
||||
.withPartitionCache(partitionCache)
|
||||
.withReplicaManager(mockReplicaManager)
|
||||
.withTimer(mockTimer)
|
||||
.withBrokerTopicStats(brokerTopicStats)
|
||||
|
@ -1913,7 +1913,7 @@ public class SharePartitionManagerTest {
|
|||
topicIdPartitions.forEach(topicIdPartition -> delayedShareFetchWatchKeys.add(new DelayedShareFetchGroupKey(groupId, topicIdPartition.topicId(), topicIdPartition.partition())));
|
||||
|
||||
sharePartitionManager = spy(SharePartitionManagerBuilder.builder()
|
||||
.withPartitionCacheMap(partitionCache)
|
||||
.withPartitionCache(partitionCache)
|
||||
.withCache(cache)
|
||||
.withReplicaManager(mockReplicaManager)
|
||||
.withTimer(mockTimer)
|
||||
|
@ -2019,7 +2019,7 @@ public class SharePartitionManagerTest {
|
|||
topicIdPartitions.forEach(topicIdPartition -> delayedShareFetchWatchKeys.add(new DelayedShareFetchGroupKey(groupId, topicIdPartition.topicId(), topicIdPartition.partition())));
|
||||
|
||||
sharePartitionManager = spy(SharePartitionManagerBuilder.builder()
|
||||
.withPartitionCacheMap(partitionCache)
|
||||
.withPartitionCache(partitionCache)
|
||||
.withCache(cache)
|
||||
.withReplicaManager(mockReplicaManager)
|
||||
.withTimer(mockTimer)
|
||||
|
@ -2085,7 +2085,7 @@ public class SharePartitionManagerTest {
|
|||
when(time.hiResClockMs()).thenReturn(100L);
|
||||
ShareGroupMetrics shareGroupMetrics = new ShareGroupMetrics(time);
|
||||
sharePartitionManager = SharePartitionManagerBuilder.builder()
|
||||
.withPartitionCacheMap(partitionCache)
|
||||
.withPartitionCache(partitionCache)
|
||||
.withReplicaManager(mockReplicaManager)
|
||||
.withTime(time)
|
||||
.withShareGroupMetrics(shareGroupMetrics)
|
||||
|
@ -2155,7 +2155,7 @@ public class SharePartitionManagerTest {
|
|||
when(time.hiResClockMs()).thenReturn(100L);
|
||||
ShareGroupMetrics shareGroupMetrics = new ShareGroupMetrics(time);
|
||||
sharePartitionManager = SharePartitionManagerBuilder.builder()
|
||||
.withPartitionCacheMap(partitionCache)
|
||||
.withPartitionCache(partitionCache)
|
||||
.withReplicaManager(mockReplicaManager)
|
||||
.withTime(time)
|
||||
.withShareGroupMetrics(shareGroupMetrics)
|
||||
|
@ -2218,7 +2218,7 @@ public class SharePartitionManagerTest {
|
|||
mockReplicaManagerDelayedShareFetch(mockReplicaManager, shareFetchPurgatorySpy);
|
||||
|
||||
sharePartitionManager = SharePartitionManagerBuilder.builder()
|
||||
.withPartitionCacheMap(partitionCache)
|
||||
.withPartitionCache(partitionCache)
|
||||
.withReplicaManager(mockReplicaManager)
|
||||
.withTimer(mockTimer)
|
||||
.withBrokerTopicStats(brokerTopicStats)
|
||||
|
@ -2284,7 +2284,7 @@ public class SharePartitionManagerTest {
|
|||
mockReplicaManagerDelayedShareFetch(mockReplicaManager, delayedShareFetchPurgatory);
|
||||
|
||||
sharePartitionManager = SharePartitionManagerBuilder.builder()
|
||||
.withPartitionCacheMap(partitionCache)
|
||||
.withPartitionCache(partitionCache)
|
||||
.withReplicaManager(mockReplicaManager)
|
||||
.withTimer(mockTimer)
|
||||
.withBrokerTopicStats(brokerTopicStats)
|
||||
|
@ -2410,7 +2410,7 @@ public class SharePartitionManagerTest {
|
|||
.thenThrow(new RuntimeException("Error creating instance"));
|
||||
|
||||
sharePartitionManager = SharePartitionManagerBuilder.builder()
|
||||
.withPartitionCacheMap(partitionCache)
|
||||
.withPartitionCache(partitionCache)
|
||||
.withBrokerTopicStats(brokerTopicStats)
|
||||
.build();
|
||||
|
||||
|
@ -2450,7 +2450,7 @@ public class SharePartitionManagerTest {
|
|||
.thenReturn(partition);
|
||||
sharePartitionManager = SharePartitionManagerBuilder.builder()
|
||||
.withReplicaManager(replicaManager)
|
||||
.withPartitionCacheMap(partitionCache)
|
||||
.withPartitionCache(partitionCache)
|
||||
.withBrokerTopicStats(brokerTopicStats)
|
||||
.build();
|
||||
|
||||
|
@ -2528,7 +2528,7 @@ public class SharePartitionManagerTest {
|
|||
|
||||
sharePartitionManager = SharePartitionManagerBuilder.builder()
|
||||
.withReplicaManager(replicaManager)
|
||||
.withPartitionCacheMap(partitionCache)
|
||||
.withPartitionCache(partitionCache)
|
||||
.withBrokerTopicStats(brokerTopicStats)
|
||||
.withTimer(mockTimer)
|
||||
.build();
|
||||
|
@ -2585,7 +2585,7 @@ public class SharePartitionManagerTest {
|
|||
doThrow(new RuntimeException("Exception")).when(mockReplicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean());
|
||||
|
||||
sharePartitionManager = SharePartitionManagerBuilder.builder()
|
||||
.withPartitionCacheMap(partitionCache)
|
||||
.withPartitionCache(partitionCache)
|
||||
.withReplicaManager(mockReplicaManager)
|
||||
.withTimer(mockTimer)
|
||||
.withBrokerTopicStats(brokerTopicStats)
|
||||
|
@ -2647,7 +2647,7 @@ public class SharePartitionManagerTest {
|
|||
doThrow(new FencedStateEpochException("Fenced exception")).when(mockReplicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean());
|
||||
|
||||
sharePartitionManager = SharePartitionManagerBuilder.builder()
|
||||
.withPartitionCacheMap(partitionCache)
|
||||
.withPartitionCache(partitionCache)
|
||||
.withReplicaManager(mockReplicaManager)
|
||||
.withTimer(mockTimer)
|
||||
.withBrokerTopicStats(brokerTopicStats)
|
||||
|
@ -2804,25 +2804,6 @@ public class SharePartitionManagerTest {
|
|||
validateRotatedListEquals(topicIdPartitions, resultShareFetch.topicIdPartitions(), 1);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testShareSessionCacheSupportsShareGroups() {
|
||||
ShareSessionCache cache1 = new ShareSessionCache(10, false);
|
||||
sharePartitionManager = SharePartitionManagerBuilder.builder()
|
||||
.withCache(cache1)
|
||||
.build();
|
||||
// Toggle of supportsShareGroups from false to true.
|
||||
sharePartitionManager.onShareVersionToggle(ShareVersion.SV_1);
|
||||
assertTrue(cache1.supportsShareGroups());
|
||||
|
||||
ShareSessionCache cache2 = new ShareSessionCache(10, true);
|
||||
sharePartitionManager = SharePartitionManagerBuilder.builder()
|
||||
.withCache(cache2)
|
||||
.build();
|
||||
// Toggle of supportsShareGroups from true to false.
|
||||
sharePartitionManager.onShareVersionToggle(ShareVersion.SV_0);
|
||||
assertFalse(cache2.supportsShareGroups());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testOnShareVersionToggle() {
|
||||
String groupId = "grp";
|
||||
|
@ -2846,10 +2827,10 @@ public class SharePartitionManagerTest {
|
|||
new SharePartitionKey(groupId, new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo4", 0))), sp3
|
||||
);
|
||||
sharePartitionManager = SharePartitionManagerBuilder.builder()
|
||||
.withPartitionCacheMap(partitionCache)
|
||||
.withPartitionCache(partitionCache)
|
||||
.build();
|
||||
assertEquals(4, partitionCache.size());
|
||||
sharePartitionManager.onShareVersionToggle(ShareVersion.SV_0);
|
||||
sharePartitionManager.onShareVersionToggle(ShareVersion.SV_0, false);
|
||||
// Because we are toggling to a share version which does not support share groups, the cache inside share partitions must be cleared.
|
||||
assertEquals(0, partitionCache.size());
|
||||
//Check if all share partitions have been fenced.
|
||||
|
@ -2859,6 +2840,111 @@ public class SharePartitionManagerTest {
|
|||
Mockito.verify(sp3).markFenced();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testOnShareVersionToggleWhenEnabledFromConfig() {
|
||||
SharePartition sp0 = mock(SharePartition.class);
|
||||
// Mock the share partitions corresponding to the topic partitions.
|
||||
SharePartitionCache partitionCache = new SharePartitionCache();
|
||||
partitionCache.put(
|
||||
new SharePartitionKey("grp", new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0))), sp0
|
||||
);
|
||||
sharePartitionManager = SharePartitionManagerBuilder.builder()
|
||||
.withPartitionCache(partitionCache)
|
||||
.build();
|
||||
assertEquals(1, partitionCache.size());
|
||||
sharePartitionManager.onShareVersionToggle(ShareVersion.SV_0, true);
|
||||
// Though share version is toggled to off, but it's enabled from config, hence the cache should not be cleared.
|
||||
assertEquals(1, partitionCache.size());
|
||||
Mockito.verify(sp0, times(0)).markFenced();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testShareGroupListener() {
|
||||
String groupId = "grp";
|
||||
TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0));
|
||||
TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 1));
|
||||
Uuid memberId1 = Uuid.randomUuid();
|
||||
Uuid memberId2 = Uuid.randomUuid();
|
||||
|
||||
SharePartition sp0 = mock(SharePartition.class);
|
||||
SharePartition sp1 = mock(SharePartition.class);
|
||||
|
||||
ShareSessionCache cache = new ShareSessionCache(10);
|
||||
cache.maybeCreateSession(groupId, memberId1, new ImplicitLinkedHashCollection<>(), CONNECTION_ID);
|
||||
cache.maybeCreateSession(groupId, memberId2, new ImplicitLinkedHashCollection<>(), "id-2");
|
||||
|
||||
SharePartitionCache partitionCache = new SharePartitionCache();
|
||||
partitionCache.computeIfAbsent(new SharePartitionKey(groupId, tp0), k -> sp0);
|
||||
partitionCache.computeIfAbsent(new SharePartitionKey(groupId, tp1), k -> sp1);
|
||||
|
||||
sharePartitionManager = SharePartitionManagerBuilder.builder()
|
||||
.withCache(cache)
|
||||
.withPartitionCache(partitionCache)
|
||||
.withReplicaManager(mockReplicaManager)
|
||||
.build();
|
||||
|
||||
assertEquals(2, cache.size());
|
||||
assertEquals(2, partitionCache.size());
|
||||
|
||||
// Invoke listeners by simulating connection disconnect for memberId1.
|
||||
cache.connectionDisconnectListener().onDisconnect(CONNECTION_ID);
|
||||
// Session cache should remove the memberId1.
|
||||
assertEquals(1, cache.size());
|
||||
// Partition cache should not remove the share partitions as the group is not empty.
|
||||
assertEquals(2, partitionCache.size());
|
||||
assertNotNull(cache.get(new ShareSessionKey(groupId, memberId2)));
|
||||
|
||||
// Invoke listeners by simulating connection disconnect for memberId2.
|
||||
cache.connectionDisconnectListener().onDisconnect("id-2");
|
||||
// Session cache should remove the memberId2.
|
||||
assertEquals(0, cache.size());
|
||||
// Partition cache should remove the share partitions as the group is empty.
|
||||
assertEquals(0, partitionCache.size());
|
||||
|
||||
Mockito.verify(sp0, times(1)).markFenced();
|
||||
Mockito.verify(sp1, times(1)).markFenced();
|
||||
Mockito.verify(mockReplicaManager, times(2)).removeListener(any(), any());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testShareGroupListenerWithEmptyCache() {
|
||||
String groupId = "grp";
|
||||
TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0));
|
||||
Uuid memberId1 = Uuid.randomUuid();
|
||||
|
||||
SharePartition sp0 = mock(SharePartition.class);
|
||||
|
||||
ShareSessionCache cache = new ShareSessionCache(10);
|
||||
cache.maybeCreateSession(groupId, memberId1, new ImplicitLinkedHashCollection<>(), CONNECTION_ID);
|
||||
|
||||
SharePartitionCache partitionCache = spy(new SharePartitionCache());
|
||||
partitionCache.computeIfAbsent(new SharePartitionKey(groupId, tp0), k -> sp0);
|
||||
|
||||
sharePartitionManager = SharePartitionManagerBuilder.builder()
|
||||
.withCache(cache)
|
||||
.withPartitionCache(partitionCache)
|
||||
.withReplicaManager(mockReplicaManager)
|
||||
.build();
|
||||
|
||||
assertEquals(1, cache.size());
|
||||
assertEquals(1, partitionCache.size());
|
||||
|
||||
// Clean up share session and partition cache.
|
||||
sharePartitionManager.onShareVersionToggle(ShareVersion.SV_0, false);
|
||||
assertEquals(0, cache.size());
|
||||
assertEquals(0, partitionCache.size());
|
||||
|
||||
Mockito.verify(sp0, times(1)).markFenced();
|
||||
Mockito.verify(mockReplicaManager, times(1)).removeListener(any(), any());
|
||||
Mockito.verify(partitionCache, times(0)).topicIdPartitionsForGroup(groupId);
|
||||
|
||||
// Invoke listeners by simulating connection disconnect for member. As the group is empty,
|
||||
// hence onGroupEmpty method should be invoked and should complete without any exception.
|
||||
cache.connectionDisconnectListener().onDisconnect(CONNECTION_ID);
|
||||
// Verify that the listener is called for the group.
|
||||
Mockito.verify(partitionCache, times(1)).topicIdPartitionsForGroup(groupId);
|
||||
}
|
||||
|
||||
private Timer systemTimerReaper() {
|
||||
return new SystemTimerReaper(
|
||||
TIMER_NAME_PREFIX + "-test-reaper",
|
||||
|
@ -3068,7 +3154,7 @@ public class SharePartitionManagerTest {
|
|||
private final Persister persister = new NoOpStatePersister();
|
||||
private ReplicaManager replicaManager = mock(ReplicaManager.class);
|
||||
private Time time = new MockTime();
|
||||
private ShareSessionCache cache = new ShareSessionCache(10, true);
|
||||
private ShareSessionCache cache = new ShareSessionCache(10);
|
||||
private SharePartitionCache partitionCache = new SharePartitionCache();
|
||||
private Timer timer = new MockTimer();
|
||||
private ShareGroupMetrics shareGroupMetrics = new ShareGroupMetrics(time);
|
||||
|
@ -3089,7 +3175,7 @@ public class SharePartitionManagerTest {
|
|||
return this;
|
||||
}
|
||||
|
||||
SharePartitionManagerBuilder withPartitionCacheMap(SharePartitionCache partitionCache) {
|
||||
SharePartitionManagerBuilder withPartitionCache(SharePartitionCache partitionCache) {
|
||||
this.partitionCache = partitionCache;
|
||||
return this;
|
||||
}
|
||||
|
|
|
@ -200,6 +200,7 @@ class BrokerMetadataPublisherTest {
|
|||
groupCoordinator,
|
||||
mock(classOf[TransactionCoordinator]),
|
||||
mock(classOf[ShareCoordinator]),
|
||||
mock(classOf[SharePartitionManager]),
|
||||
mock(classOf[DynamicConfigPublisher]),
|
||||
mock(classOf[DynamicClientQuotaPublisher]),
|
||||
mock(classOf[DynamicTopicClusterQuotaPublisher]),
|
||||
|
@ -208,7 +209,6 @@ class BrokerMetadataPublisherTest {
|
|||
mock(classOf[AclPublisher]),
|
||||
faultHandler,
|
||||
faultHandler,
|
||||
mock(classOf[SharePartitionManager])
|
||||
)
|
||||
|
||||
val image = MetadataImage.EMPTY
|
||||
|
@ -241,6 +241,7 @@ class BrokerMetadataPublisherTest {
|
|||
mock(classOf[GroupCoordinator]),
|
||||
mock(classOf[TransactionCoordinator]),
|
||||
mock(classOf[ShareCoordinator]),
|
||||
sharePartitionManager,
|
||||
mock(classOf[DynamicConfigPublisher]),
|
||||
mock(classOf[DynamicClientQuotaPublisher]),
|
||||
mock(classOf[DynamicTopicClusterQuotaPublisher]),
|
||||
|
@ -248,8 +249,7 @@ class BrokerMetadataPublisherTest {
|
|||
mock(classOf[DelegationTokenPublisher]),
|
||||
mock(classOf[AclPublisher]),
|
||||
faultHandler,
|
||||
faultHandler,
|
||||
sharePartitionManager
|
||||
faultHandler
|
||||
)
|
||||
|
||||
val featuresImage = new FeaturesImage(
|
||||
|
@ -289,6 +289,6 @@ class BrokerMetadataPublisherTest {
|
|||
)
|
||||
|
||||
// SharePartitionManager is receiving the latest changes.
|
||||
verify(sharePartitionManager).onShareVersionToggle(any())
|
||||
verify(sharePartitionManager).onShareVersionToggle(any(), any())
|
||||
}
|
||||
}
|
||||
|
|
|
@ -30,7 +30,6 @@ import com.yammer.metrics.core.Meter;
|
|||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
/**
|
||||
* Caches share sessions.
|
||||
|
@ -70,10 +69,6 @@ public class ShareSessionCache {
|
|||
* from the cache when the respective client disconnects.
|
||||
*/
|
||||
private final Map<String, ShareSessionKey> connectionIdToSessionMap;
|
||||
/**
|
||||
* Flag indicating if share groups have been turned on.
|
||||
*/
|
||||
private final AtomicBoolean supportsShareGroups;
|
||||
/**
|
||||
* The listener for share group events. This is used to notify the listener when the group members
|
||||
* change.
|
||||
|
@ -84,9 +79,8 @@ public class ShareSessionCache {
|
|||
private long numPartitions = 0;
|
||||
|
||||
@SuppressWarnings("this-escape")
|
||||
public ShareSessionCache(int maxEntries, boolean supportsShareGroups) {
|
||||
public ShareSessionCache(int maxEntries) {
|
||||
this.maxEntries = maxEntries;
|
||||
this.supportsShareGroups = new AtomicBoolean(supportsShareGroups);
|
||||
// Register metrics for ShareSessionCache.
|
||||
KafkaMetricsGroup metricsGroup = new KafkaMetricsGroup("kafka.server", "ShareSessionCache");
|
||||
metricsGroup.newGauge(SHARE_SESSIONS_COUNT, this::size);
|
||||
|
@ -189,9 +183,7 @@ public class ShareSessionCache {
|
|||
* @param session The session.
|
||||
*/
|
||||
public synchronized void updateNumPartitions(ShareSession session) {
|
||||
if (supportsShareGroups.get()) {
|
||||
numPartitions += session.updateCachedSize();
|
||||
}
|
||||
numPartitions += session.updateCachedSize();
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -208,7 +200,7 @@ public class ShareSessionCache {
|
|||
ImplicitLinkedHashCollection<CachedSharePartition> partitionMap,
|
||||
String clientConnectionId
|
||||
) {
|
||||
if (sessions.size() < maxEntries && supportsShareGroups.get()) {
|
||||
if (sessions.size() < maxEntries) {
|
||||
ShareSession session = new ShareSession(new ShareSessionKey(groupId, memberId), partitionMap,
|
||||
ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH));
|
||||
sessions.put(session.key(), session);
|
||||
|
@ -252,17 +244,4 @@ public class ShareSessionCache {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Update the value of supportsShareGroups to reflect if share groups are turned on.
|
||||
* @param supportsShareGroups - Boolean indicating if share groups are turned on.
|
||||
*/
|
||||
public void updateSupportsShareGroups(boolean supportsShareGroups) {
|
||||
this.supportsShareGroups.set(supportsShareGroups);
|
||||
}
|
||||
|
||||
// Visible for testing.
|
||||
public boolean supportsShareGroups() {
|
||||
return supportsShareGroups.get();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -45,7 +45,7 @@ public class ShareSessionCacheTest {
|
|||
|
||||
@Test
|
||||
public void testShareSessionCache() throws InterruptedException {
|
||||
ShareSessionCache cache = new ShareSessionCache(3, true);
|
||||
ShareSessionCache cache = new ShareSessionCache(3);
|
||||
assertEquals(0, cache.size());
|
||||
ShareSessionKey key1 = cache.maybeCreateSession("grp", Uuid.randomUuid(), mockedSharePartitionMap(10), "conn-1");
|
||||
ShareSessionKey key2 = cache.maybeCreateSession("grp", Uuid.randomUuid(), mockedSharePartitionMap(20), "conn-2");
|
||||
|
@ -59,7 +59,7 @@ public class ShareSessionCacheTest {
|
|||
|
||||
@Test
|
||||
public void testResizeCachedSessions() throws InterruptedException {
|
||||
ShareSessionCache cache = new ShareSessionCache(2, true);
|
||||
ShareSessionCache cache = new ShareSessionCache(2);
|
||||
assertEquals(0, cache.size());
|
||||
assertEquals(0, cache.totalPartitions());
|
||||
ShareSessionKey key1 = cache.maybeCreateSession("grp", Uuid.randomUuid(), mockedSharePartitionMap(2), "conn-1");
|
||||
|
@ -113,7 +113,7 @@ public class ShareSessionCacheTest {
|
|||
|
||||
@Test
|
||||
public void testRemoveConnection() throws InterruptedException {
|
||||
ShareSessionCache cache = new ShareSessionCache(3, true);
|
||||
ShareSessionCache cache = new ShareSessionCache(3);
|
||||
assertEquals(0, cache.size());
|
||||
ShareSessionKey key1 = cache.maybeCreateSession("grp", Uuid.randomUuid(), mockedSharePartitionMap(1), "conn-1");
|
||||
ShareSessionKey key2 = cache.maybeCreateSession("grp", Uuid.randomUuid(), mockedSharePartitionMap(2), "conn-2");
|
||||
|
@ -143,7 +143,7 @@ public class ShareSessionCacheTest {
|
|||
|
||||
@Test
|
||||
public void testRemoveAllSessions() {
|
||||
ShareSessionCache cache = new ShareSessionCache(3, true);
|
||||
ShareSessionCache cache = new ShareSessionCache(3);
|
||||
assertEquals(0, cache.size());
|
||||
assertEquals(0, cache.totalPartitions());
|
||||
cache.maybeCreateSession("grp", Uuid.randomUuid(), mockedSharePartitionMap(10), "conn-1");
|
||||
|
@ -159,7 +159,7 @@ public class ShareSessionCacheTest {
|
|||
@Test
|
||||
public void testShareGroupListenerEvents() {
|
||||
ShareGroupListener mockListener = Mockito.mock(ShareGroupListener.class);
|
||||
ShareSessionCache cache = new ShareSessionCache(3, true);
|
||||
ShareSessionCache cache = new ShareSessionCache(3);
|
||||
cache.registerShareGroupListener(mockListener);
|
||||
|
||||
String groupId = "grp";
|
||||
|
@ -206,7 +206,7 @@ public class ShareSessionCacheTest {
|
|||
@Test
|
||||
public void testShareGroupListenerEventsMultipleGroups() {
|
||||
ShareGroupListener mockListener = Mockito.mock(ShareGroupListener.class);
|
||||
ShareSessionCache cache = new ShareSessionCache(3, true);
|
||||
ShareSessionCache cache = new ShareSessionCache(3);
|
||||
cache.registerShareGroupListener(mockListener);
|
||||
|
||||
String groupId1 = "grp1";
|
||||
|
@ -237,7 +237,7 @@ public class ShareSessionCacheTest {
|
|||
|
||||
@Test
|
||||
public void testNoShareGroupListenerRegistered() {
|
||||
ShareSessionCache cache = new ShareSessionCache(3, true);
|
||||
ShareSessionCache cache = new ShareSessionCache(3);
|
||||
|
||||
String groupId = "grp";
|
||||
Uuid memberId = Uuid.randomUuid();
|
||||
|
|
Loading…
Reference in New Issue