KAFKA-18539 Remove optional managers in KafkaApis (#18550)

Removed Optional for SharePartitionManager and ClientMetricsManager as zookeeper code is being removed. Also removed asScala and asJava conversion in KafkaApis.handleListClientMetricsResources, moved to java stream.

Reviewers: Mickael Maison <mickael.maison@gmail.com>, Ismael Juma <ismael@juma.me.uk>, Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
Apoorv Mittal 2025-01-15 20:46:05 +00:00 committed by GitHub
parent 74484d223a
commit 3fa998475b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 49 additions and 90 deletions

View File

@ -45,8 +45,6 @@ import java.util.Optional;
import scala.jdk.javaapi.OptionConverters;
public class KafkaApisBuilder {
private RequestChannel requestChannel = null;
private MetadataSupport metadataSupport = null;
@ -62,13 +60,13 @@ public class KafkaApisBuilder {
private Optional<Authorizer> authorizer = Optional.empty();
private QuotaManagers quotas = null;
private FetchManager fetchManager = null;
private Optional<SharePartitionManager> sharePartitionManager = Optional.empty();
private SharePartitionManager sharePartitionManager = null;
private BrokerTopicStats brokerTopicStats = null;
private String clusterId = "clusterId";
private Time time = Time.SYSTEM;
private DelegationTokenManager tokenManager = null;
private ApiVersionManager apiVersionManager = null;
private Optional<ClientMetricsManager> clientMetricsManager = Optional.empty();
private ClientMetricsManager clientMetricsManager = null;
private Optional<ShareCoordinator> shareCoordinator = Optional.empty();
public KafkaApisBuilder setRequestChannel(RequestChannel requestChannel) {
@ -146,7 +144,7 @@ public class KafkaApisBuilder {
return this;
}
public KafkaApisBuilder setSharePartitionManager(Optional<SharePartitionManager> sharePartitionManager) {
public KafkaApisBuilder setSharePartitionManager(SharePartitionManager sharePartitionManager) {
this.sharePartitionManager = sharePartitionManager;
return this;
}
@ -176,11 +174,12 @@ public class KafkaApisBuilder {
return this;
}
public KafkaApisBuilder setClientMetricsManager(Optional<ClientMetricsManager> clientMetricsManager) {
public KafkaApisBuilder setClientMetricsManager(ClientMetricsManager clientMetricsManager) {
this.clientMetricsManager = clientMetricsManager;
return this;
}
@SuppressWarnings({"CyclomaticComplexity"})
public KafkaApis build() {
if (requestChannel == null) throw new RuntimeException("you must set requestChannel");
if (metadataSupport == null) throw new RuntimeException("you must set metadataSupport");
@ -195,6 +194,8 @@ public class KafkaApisBuilder {
if (metrics == null) throw new RuntimeException("You must set metrics");
if (quotas == null) throw new RuntimeException("You must set quotas");
if (fetchManager == null) throw new RuntimeException("You must set fetchManager");
if (sharePartitionManager == null) throw new RuntimeException("You must set sharePartitionManager");
if (clientMetricsManager == null) throw new RuntimeException("You must set clientMetricsManager");
if (brokerTopicStats == null) brokerTopicStats = new BrokerTopicStats(config.remoteLogManagerConfig().isRemoteStorageSystemEnabled());
if (apiVersionManager == null) throw new RuntimeException("You must set apiVersionManager");
@ -213,12 +214,12 @@ public class KafkaApisBuilder {
OptionConverters.toScala(authorizer),
quotas,
fetchManager,
OptionConverters.toScala(sharePartitionManager),
sharePartitionManager,
brokerTopicStats,
clusterId,
time,
tokenManager,
apiVersionManager,
OptionConverters.toScala(clientMetricsManager));
clientMetricsManager);
}
}

View File

@ -464,13 +464,13 @@ class BrokerServer(
authorizer = authorizer,
quotas = quotaManagers,
fetchManager = fetchManager,
sharePartitionManager = Some(sharePartitionManager),
sharePartitionManager = sharePartitionManager,
brokerTopicStats = brokerTopicStats,
clusterId = clusterId,
time = time,
tokenManager = tokenManager,
apiVersionManager = apiVersionManager,
clientMetricsManager = Some(clientMetricsManager))
clientMetricsManager = clientMetricsManager)
dataPlaneRequestHandlerPool = new KafkaRequestHandlerPool(config.nodeId,
socketServer.dataPlaneRequestChannel, dataPlaneRequestProcessor, time,

View File

@ -97,13 +97,13 @@ class KafkaApis(val requestChannel: RequestChannel,
val authorizer: Option[Authorizer],
val quotas: QuotaManagers,
val fetchManager: FetchManager,
val sharePartitionManager: Option[SharePartitionManager],
val sharePartitionManager: SharePartitionManager,
brokerTopicStats: BrokerTopicStats,
val clusterId: String,
time: Time,
val tokenManager: DelegationTokenManager,
val apiVersionManager: ApiVersionManager,
val clientMetricsManager: Option[ClientMetricsManager]
val clientMetricsManager: ClientMetricsManager
) extends ApiRequestHandler with Logging {
type FetchResponseStats = Map[TopicPartition, RecordValidationStats]
@ -2655,35 +2655,21 @@ class KafkaApis(val requestChannel: RequestChannel,
def handleGetTelemetrySubscriptionsRequest(request: RequestChannel.Request): Unit = {
val subscriptionRequest = request.body[GetTelemetrySubscriptionsRequest]
clientMetricsManager match {
case Some(metricsManager) =>
try {
requestHelper.sendMaybeThrottle(request, metricsManager.processGetTelemetrySubscriptionRequest(subscriptionRequest, request.context))
} catch {
case _: Exception =>
requestHelper.sendMaybeThrottle(request, subscriptionRequest.getErrorResponse(Errors.INVALID_REQUEST.exception))
}
case None =>
info("Received get telemetry client request for zookeeper based cluster")
requestHelper.sendMaybeThrottle(request, subscriptionRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
try {
requestHelper.sendMaybeThrottle(request, clientMetricsManager.processGetTelemetrySubscriptionRequest(subscriptionRequest, request.context))
} catch {
case _: Exception =>
requestHelper.sendMaybeThrottle(request, subscriptionRequest.getErrorResponse(Errors.INVALID_REQUEST.exception))
}
}
def handlePushTelemetryRequest(request: RequestChannel.Request): Unit = {
private def handlePushTelemetryRequest(request: RequestChannel.Request): Unit = {
val pushTelemetryRequest = request.body[PushTelemetryRequest]
clientMetricsManager match {
case Some(metricsManager) =>
try {
requestHelper.sendMaybeThrottle(request, metricsManager.processPushTelemetryRequest(pushTelemetryRequest, request.context))
} catch {
case _: Exception =>
requestHelper.sendMaybeThrottle(request, pushTelemetryRequest.getErrorResponse(Errors.INVALID_REQUEST.exception))
}
case None =>
info("Received push telemetry client request for zookeeper based cluster")
requestHelper.sendMaybeThrottle(request, pushTelemetryRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
try {
requestHelper.sendMaybeThrottle(request, clientMetricsManager.processPushTelemetryRequest(pushTelemetryRequest, request.context))
} catch {
case _: Exception =>
requestHelper.sendMaybeThrottle(request, pushTelemetryRequest.getErrorResponse(Errors.INVALID_REQUEST.exception))
}
}
@ -2693,18 +2679,10 @@ class KafkaApis(val requestChannel: RequestChannel,
if (!authHelper.authorize(request.context, DESCRIBE_CONFIGS, CLUSTER, CLUSTER_NAME)) {
requestHelper.sendMaybeThrottle(request, listClientMetricsResourcesRequest.getErrorResponse(Errors.CLUSTER_AUTHORIZATION_FAILED.exception))
} else {
clientMetricsManager match {
case Some(metricsManager) =>
val data = new ListClientMetricsResourcesResponseData().setClientMetricsResources(
metricsManager.listClientMetricsResources.asScala.map(
name => new ClientMetricsResource().setName(name)).toList.asJava)
requestHelper.sendMaybeThrottle(request, new ListClientMetricsResourcesResponse(data))
case None =>
// This should never happen as ZK based cluster calls should get rejected earlier itself,
// but we should handle it gracefully.
info("Received list client metrics resources request for zookeeper based cluster")
requestHelper.sendMaybeThrottle(request, listClientMetricsResourcesRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
}
val data = new ListClientMetricsResourcesResponseData().setClientMetricsResources(
clientMetricsManager.listClientMetricsResources.stream.map(
name => new ClientMetricsResource().setName(name)).toList)
requestHelper.sendMaybeThrottle(request, new ListClientMetricsResourcesResponse(data))
}
}
@ -2796,14 +2774,6 @@ class KafkaApis(val requestChannel: RequestChannel,
requestHelper.sendMaybeThrottle(request, shareFetchRequest.getErrorResponse(AbstractResponse.DEFAULT_THROTTLE_TIME, Errors.UNSUPPORTED_VERSION.exception))
return
}
val sharePartitionManagerInstance: SharePartitionManager = sharePartitionManager match {
case Some(manager) => manager
case None =>
// The API is not supported when the SharePartitionManager is not defined on the broker
info("Received share fetch request for zookeeper based cluster")
requestHelper.sendMaybeThrottle(request, shareFetchRequest.getErrorResponse(AbstractResponse.DEFAULT_THROTTLE_TIME, Errors.UNSUPPORTED_VERSION.exception))
return
}
val groupId = shareFetchRequest.data.groupId
@ -2833,7 +2803,7 @@ class KafkaApis(val requestChannel: RequestChannel,
try {
// Creating the shareFetchContext for Share Session Handling. if context creation fails, the request is failed directly here.
shareFetchContext = sharePartitionManagerInstance.newContext(groupId, shareFetchData, forgottenTopics, newReqMetadata, isAcknowledgeDataPresent)
shareFetchContext = sharePartitionManager.newContext(groupId, shareFetchData, forgottenTopics, newReqMetadata, isAcknowledgeDataPresent)
} catch {
case e: Exception =>
requestHelper.sendMaybeThrottle(request, shareFetchRequest.getErrorResponse(AbstractResponse.DEFAULT_THROTTLE_TIME, e))
@ -2872,7 +2842,7 @@ class KafkaApis(val requestChannel: RequestChannel,
acknowledgeResult = handleAcknowledgements(
acknowledgementDataFromRequest,
erroneous,
sharePartitionManagerInstance,
sharePartitionManager,
authorizedTopics,
groupId,
memberId,
@ -2885,7 +2855,7 @@ class KafkaApis(val requestChannel: RequestChannel,
handleFetchFromShareFetchRequest(
request,
erroneousAndValidPartitionData,
sharePartitionManagerInstance,
sharePartitionManager,
authorizedTopics
)
@ -2952,7 +2922,7 @@ class KafkaApis(val requestChannel: RequestChannel,
}
if (shareSessionEpoch == ShareRequestMetadata.FINAL_EPOCH) {
sharePartitionManagerInstance.releaseSession(groupId, memberId).
sharePartitionManager.releaseSession(groupId, memberId).
whenComplete((releaseAcquiredRecordsData, throwable) =>
if (throwable != null) {
error(s"Releasing share session close with correlation from client ${request.header.clientId} " +
@ -3117,15 +3087,6 @@ class KafkaApis(val requestChannel: RequestChannel,
return
}
val sharePartitionManagerInstance: SharePartitionManager = sharePartitionManager match {
case Some(manager) => manager
case None =>
// The API is not supported when the SharePartitionManager is not defined on the broker
info("Received share acknowledge request for zookeeper based cluster")
requestHelper.sendMaybeThrottle(request,
shareAcknowledgeRequest.getErrorResponse(AbstractResponse.DEFAULT_THROTTLE_TIME, Errors.UNSUPPORTED_VERSION.exception))
return
}
val groupId = shareAcknowledgeRequest.data.groupId
// Share Acknowledge needs permission to perform READ action on the named group resource (groupId)
@ -3141,7 +3102,7 @@ class KafkaApis(val requestChannel: RequestChannel,
try {
// Updating the cache for Share Session Handling
sharePartitionManagerInstance.acknowledgeSessionUpdate(groupId, newReqMetadata)
sharePartitionManager.acknowledgeSessionUpdate(groupId, newReqMetadata)
} catch {
case e: Exception =>
requestHelper.sendMaybeThrottle(request, shareAcknowledgeRequest.getErrorResponse(AbstractResponse.DEFAULT_THROTTLE_TIME, e))
@ -3170,13 +3131,13 @@ class KafkaApis(val requestChannel: RequestChannel,
val erroneous = mutable.Map[TopicIdPartition, ShareAcknowledgeResponseData.PartitionData]()
val acknowledgementDataFromRequest = getAcknowledgeBatchesFromShareAcknowledgeRequest(shareAcknowledgeRequest, topicIdNames, erroneous)
handleAcknowledgements(acknowledgementDataFromRequest, erroneous, sharePartitionManagerInstance, authorizedTopics, groupId, memberId)
handleAcknowledgements(acknowledgementDataFromRequest, erroneous, sharePartitionManager, authorizedTopics, groupId, memberId)
.handle[Unit] {(result, exception) =>
if (exception != null) {
requestHelper.sendMaybeThrottle(request, shareAcknowledgeRequest.getErrorResponse(AbstractResponse.DEFAULT_THROTTLE_TIME, exception))
} else {
if (shareSessionEpoch == ShareRequestMetadata.FINAL_EPOCH) {
sharePartitionManagerInstance.releaseSession(groupId, memberId).
sharePartitionManager.releaseSession(groupId, memberId).
whenComplete{ (releaseAcquiredRecordsData, throwable) =>
if (throwable != null) {
debug(s"Releasing share session close with correlation from client ${request.header.clientId} " +

View File

@ -185,8 +185,6 @@ class KafkaApisTest extends Logging {
true,
() => new FinalizedFeatures(MetadataVersion.latestTesting(), Collections.emptyMap[String, java.lang.Short], 0, true))
val clientMetricsManagerOpt = Some(clientMetricsManager)
when(groupCoordinator.isNewGroupCoordinator).thenReturn(config.isNewGroupCoordinatorEnabled)
setupFeatures(featureVersions)
@ -206,13 +204,13 @@ class KafkaApisTest extends Logging {
authorizer = authorizer,
quotas = quotas,
fetchManager = fetchManager,
sharePartitionManager = Some(sharePartitionManager),
sharePartitionManager = sharePartitionManager,
brokerTopicStats = brokerTopicStats,
clusterId = clusterId,
time = time,
tokenManager = null,
apiVersionManager = apiVersionManager,
clientMetricsManager = clientMetricsManagerOpt)
clientMetricsManager = clientMetricsManager)
}
private def setupFeatures(featureVersions: Seq[FeatureVersion]): Unit = {
@ -9665,8 +9663,7 @@ class KafkaApisTest extends Logging {
consumerGroupHeartbeatRequest
)).thenReturn(future)
kafkaApis = createKafkaApis(
featureVersions = Seq(GroupVersion.GV_1),
featureVersions = Seq(GroupVersion.GV_1)
)
kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
@ -9692,8 +9689,7 @@ class KafkaApisTest extends Logging {
consumerGroupHeartbeatRequest
)).thenReturn(future)
kafkaApis = createKafkaApis(
featureVersions = Seq(GroupVersion.GV_1),
featureVersions = Seq(GroupVersion.GV_1)
)
kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
@ -9715,8 +9711,7 @@ class KafkaApisTest extends Logging {
.thenReturn(Seq(AuthorizationResult.DENIED).asJava)
kafkaApis = createKafkaApis(
authorizer = Some(authorizer),
featureVersions = Seq(GroupVersion.GV_1),
featureVersions = Seq(GroupVersion.GV_1)
)
kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
@ -9741,8 +9736,7 @@ class KafkaApisTest extends Logging {
any[util.List[String]]
)).thenReturn(future)
kafkaApis = createKafkaApis(
featureVersions = Seq(GroupVersion.GV_1),
featureVersions = Seq(GroupVersion.GV_1)
)
kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
@ -9812,8 +9806,7 @@ class KafkaApisTest extends Logging {
future.complete(List().asJava)
kafkaApis = createKafkaApis(
authorizer = Some(authorizer),
featureVersions = Seq(GroupVersion.GV_1),
featureVersions = Seq(GroupVersion.GV_1)
)
kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
@ -9835,8 +9828,7 @@ class KafkaApisTest extends Logging {
any[util.List[String]]
)).thenReturn(future)
kafkaApis = createKafkaApis(
featureVersions = Seq(GroupVersion.GV_1),
featureVersions = Seq(GroupVersion.GV_1)
)
kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)

View File

@ -36,6 +36,7 @@ import kafka.server.SimpleApiVersionManager;
import kafka.server.builders.KafkaApisBuilder;
import kafka.server.metadata.KRaftMetadataCache;
import kafka.server.metadata.MockConfigRepository;
import kafka.server.share.SharePartitionManager;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.memory.MemoryPool;
@ -60,6 +61,7 @@ import org.apache.kafka.image.MetadataProvenance;
import org.apache.kafka.network.RequestConvertToJson;
import org.apache.kafka.network.metrics.RequestChannelMetrics;
import org.apache.kafka.raft.QuorumConfig;
import org.apache.kafka.server.ClientMetricsManager;
import org.apache.kafka.server.common.FinalizedFeatures;
import org.apache.kafka.server.common.KRaftVersion;
import org.apache.kafka.server.common.MetadataVersion;
@ -118,6 +120,8 @@ public class KRaftMetadataRequestBenchmark {
clientQuotaManager, clientRequestQuotaManager, controllerMutationQuotaManager, replicaQuotaManager,
replicaQuotaManager, replicaQuotaManager, Optional.empty());
private final FetchManager fetchManager = Mockito.mock(FetchManager.class);
private final SharePartitionManager sharePartitionManager = Mockito.mock(SharePartitionManager.class);
private final ClientMetricsManager clientMetricsManager = Mockito.mock(ClientMetricsManager.class);
private final BrokerTopicStats brokerTopicStats = new BrokerTopicStats(false);
private final KafkaPrincipal principal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "test-user");
@Param({"500", "1000", "5000"})
@ -200,7 +204,8 @@ public class KRaftMetadataRequestBenchmark {
setAuthorizer(Optional.empty()).
setQuotas(quotaManagers).
setFetchManager(fetchManager).
setSharePartitionManager(Optional.empty()).
setSharePartitionManager(sharePartitionManager).
setClientMetricsManager(clientMetricsManager).
setBrokerTopicStats(brokerTopicStats).
setClusterId("clusterId").
setTime(Time.SYSTEM).