diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java index 87f0277d196..f8f364f06f7 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java @@ -114,7 +114,8 @@ public class ApiVersionsResponse extends AbstractResponse { NodeApiVersions controllerApiVersions, ListenerType listenerType, boolean enableUnstableLastVersion, - boolean zkMigrationEnabled + boolean zkMigrationEnabled, + boolean clientTelemetryEnabled ) { ApiVersionCollection apiKeys; if (controllerApiVersions != null) { @@ -122,13 +123,15 @@ public class ApiVersionsResponse extends AbstractResponse { listenerType, minRecordVersion, controllerApiVersions.allSupportedApiVersions(), - enableUnstableLastVersion + enableUnstableLastVersion, + clientTelemetryEnabled ); } else { apiKeys = filterApis( minRecordVersion, listenerType, - enableUnstableLastVersion + enableUnstableLastVersion, + clientTelemetryEnabled ); } @@ -167,16 +170,21 @@ public class ApiVersionsResponse extends AbstractResponse { RecordVersion minRecordVersion, ApiMessageType.ListenerType listenerType ) { - return filterApis(minRecordVersion, listenerType, false); + return filterApis(minRecordVersion, listenerType, false, false); } public static ApiVersionCollection filterApis( RecordVersion minRecordVersion, ApiMessageType.ListenerType listenerType, - boolean enableUnstableLastVersion + boolean enableUnstableLastVersion, + boolean clientTelemetryEnabled ) { ApiVersionCollection apiKeys = new ApiVersionCollection(); for (ApiKeys apiKey : ApiKeys.apisForListener(listenerType)) { + // Skip telemetry APIs if client telemetry is disabled. + if ((apiKey == ApiKeys.GET_TELEMETRY_SUBSCRIPTIONS || apiKey == ApiKeys.PUSH_TELEMETRY) && !clientTelemetryEnabled) + continue; + if (apiKey.minRequiredInterBrokerMagic <= minRecordVersion.value) { apiKey.toApiVersion(enableUnstableLastVersion).ifPresent(apiKeys::add); } @@ -203,13 +211,15 @@ public class ApiVersionsResponse extends AbstractResponse { * @param minRecordVersion min inter broker magic * @param activeControllerApiVersions controller ApiVersions * @param enableUnstableLastVersion whether unstable versions should be advertised or not + * @param clientTelemetryEnabled whether client telemetry is enabled or not * @return commonly agreed ApiVersion collection */ public static ApiVersionCollection intersectForwardableApis( final ApiMessageType.ListenerType listenerType, final RecordVersion minRecordVersion, final Map activeControllerApiVersions, - boolean enableUnstableLastVersion + boolean enableUnstableLastVersion, + boolean clientTelemetryEnabled ) { ApiVersionCollection apiKeys = new ApiVersionCollection(); for (ApiKeys apiKey : ApiKeys.apisForListener(listenerType)) { @@ -220,6 +230,10 @@ public class ApiVersionsResponse extends AbstractResponse { continue; } + // Skip telemetry APIs if client telemetry is disabled. + if ((apiKey == ApiKeys.GET_TELEMETRY_SUBSCRIPTIONS || apiKey == ApiKeys.PUSH_TELEMETRY) && !clientTelemetryEnabled) + continue; + final ApiVersion finalApiVersion; if (!apiKey.forwardable) { finalApiVersion = brokerApiVersion.get(); diff --git a/clients/src/test/java/org/apache/kafka/common/requests/ApiVersionsResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/ApiVersionsResponseTest.java index 19d3c468186..cf832050f2a 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/ApiVersionsResponseTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/ApiVersionsResponseTest.java @@ -103,7 +103,8 @@ public class ApiVersionsResponseTest { ApiMessageType.ListenerType.ZK_BROKER, RecordVersion.current(), activeControllerApiVersions, - true + true, + false ); verifyVersions(forwardableAPIKey.id, minVersion, maxVersion, commonResponse); @@ -123,7 +124,8 @@ public class ApiVersionsResponseTest { null, ListenerType.ZK_BROKER, true, - false + false, + true ); verifyApiKeysForMagic(response, RecordBatch.MAGIC_VALUE_V1); assertEquals(10, response.throttleTimeMs()); @@ -144,7 +146,8 @@ public class ApiVersionsResponseTest { null, ListenerType.ZK_BROKER, true, - false + false, + true ); verifyApiKeysForMagic(response, RecordBatch.MAGIC_VALUE_V1); @@ -174,7 +177,8 @@ public class ApiVersionsResponseTest { null, listenerType, true, - false + false, + true ); assertEquals(new HashSet<>(ApiKeys.apisForListener(listenerType)), apiKeysInResponse(response)); assertEquals(AbstractResponse.DEFAULT_THROTTLE_TIME, response.throttleTimeMs()); @@ -183,6 +187,40 @@ public class ApiVersionsResponseTest { assertEquals(ApiVersionsResponse.UNKNOWN_FINALIZED_FEATURES_EPOCH, response.data().finalizedFeaturesEpoch()); } + @Test + public void shouldCreateApiResponseWithTelemetryWhenEnabled() { + ApiVersionsResponse response = ApiVersionsResponse.createApiVersionsResponse( + 10, + RecordVersion.V1, + Features.emptySupportedFeatures(), + Collections.emptyMap(), + ApiVersionsResponse.UNKNOWN_FINALIZED_FEATURES_EPOCH, + null, + ListenerType.BROKER, + true, + false, + true + ); + verifyApiKeysForTelemetry(response, 2); + } + + @Test + public void shouldNotCreateApiResponseWithTelemetryWhenDisabled() { + ApiVersionsResponse response = ApiVersionsResponse.createApiVersionsResponse( + 10, + RecordVersion.V1, + Features.emptySupportedFeatures(), + Collections.emptyMap(), + ApiVersionsResponse.UNKNOWN_FINALIZED_FEATURES_EPOCH, + null, + ListenerType.BROKER, + true, + false, + false + ); + verifyApiKeysForTelemetry(response, 0); + } + @Test public void testMetadataQuorumApisAreDisabled() { ApiVersionsResponse response = ApiVersionsResponse.createApiVersionsResponse( @@ -194,7 +232,8 @@ public class ApiVersionsResponseTest { null, ListenerType.ZK_BROKER, true, - false + false, + true ); // Ensure that APIs needed for the KRaft mode are not exposed through ApiVersions until we are ready for them @@ -254,6 +293,16 @@ public class ApiVersionsResponseTest { } } + private void verifyApiKeysForTelemetry(ApiVersionsResponse response, int expectedCount) { + int count = 0; + for (ApiVersion version : response.data().apiKeys()) { + if (version.apiKey() == ApiKeys.GET_TELEMETRY_SUBSCRIPTIONS.id || version.apiKey() == ApiKeys.PUSH_TELEMETRY.id) { + count++; + } + } + assertEquals(expectedCount, count); + } + private HashSet apiKeysInResponse(ApiVersionsResponse apiVersions) { HashSet apiKeys = new HashSet<>(); for (ApiVersion version : apiVersions.data().apiKeys()) { diff --git a/clients/src/test/java/org/apache/kafka/test/TestUtils.java b/clients/src/test/java/org/apache/kafka/test/TestUtils.java index 3fc51f23a2b..c8a6db6f6ca 100644 --- a/clients/src/test/java/org/apache/kafka/test/TestUtils.java +++ b/clients/src/test/java/org/apache/kafka/test/TestUtils.java @@ -600,7 +600,7 @@ public class TestUtils { ) { return createApiVersionsResponse( throttleTimeMs, - ApiVersionsResponse.filterApis(RecordVersion.current(), listenerType, true), + ApiVersionsResponse.filterApis(RecordVersion.current(), listenerType, true, true), Features.emptySupportedFeatures(), false ); @@ -613,7 +613,7 @@ public class TestUtils { ) { return createApiVersionsResponse( throttleTimeMs, - ApiVersionsResponse.filterApis(RecordVersion.current(), listenerType, enableUnstableLastVersion), + ApiVersionsResponse.filterApis(RecordVersion.current(), listenerType, enableUnstableLastVersion, true), Features.emptySupportedFeatures(), false ); diff --git a/core/src/main/scala/kafka/server/ApiVersionManager.scala b/core/src/main/scala/kafka/server/ApiVersionManager.scala index fa796bc6688..6e1057469df 100644 --- a/core/src/main/scala/kafka/server/ApiVersionManager.scala +++ b/core/src/main/scala/kafka/server/ApiVersionManager.scala @@ -22,6 +22,7 @@ import org.apache.kafka.common.feature.SupportedVersionRange import org.apache.kafka.common.message.ApiMessageType.ListenerType import org.apache.kafka.common.protocol.ApiKeys import org.apache.kafka.common.requests.ApiVersionsResponse +import org.apache.kafka.server.ClientMetricsManager import org.apache.kafka.server.common.Features import scala.jdk.CollectionConverters._ @@ -47,7 +48,8 @@ object ApiVersionManager { config: KafkaConfig, forwardingManager: Option[ForwardingManager], supportedFeatures: BrokerFeatures, - metadataCache: MetadataCache + metadataCache: MetadataCache, + clientMetricsManager: Option[ClientMetricsManager] ): ApiVersionManager = { new DefaultApiVersionManager( listenerType, @@ -55,7 +57,8 @@ object ApiVersionManager { supportedFeatures, metadataCache, config.unstableApiVersionsEnabled, - config.migrationEnabled + config.migrationEnabled, + clientMetricsManager ) } } @@ -123,6 +126,7 @@ class SimpleApiVersionManager( * @param metadataCache the metadata cache, used to get the finalized features and the metadata version * @param enableUnstableLastVersion whether to enable unstable last version, see [[KafkaConfig.unstableApiVersionsEnabled]] * @param zkMigrationEnabled whether to enable zk migration, see [[KafkaConfig.migrationEnabled]] + * @param clientMetricsManager the client metrics manager, helps to determine whether client telemetry is enabled */ class DefaultApiVersionManager( val listenerType: ListenerType, @@ -130,7 +134,8 @@ class DefaultApiVersionManager( brokerFeatures: BrokerFeatures, metadataCache: MetadataCache, val enableUnstableLastVersion: Boolean, - val zkMigrationEnabled: Boolean = false + val zkMigrationEnabled: Boolean = false, + val clientMetricsManager: Option[ClientMetricsManager] = None ) extends ApiVersionManager { val enabledApis = ApiKeys.apisForListener(listenerType).asScala @@ -139,6 +144,10 @@ class DefaultApiVersionManager( val supportedFeatures = brokerFeatures.supportedFeatures val finalizedFeatures = metadataCache.features() val controllerApiVersions = forwardingManager.flatMap(_.controllerApiVersions) + val clientTelemetryEnabled = clientMetricsManager match { + case Some(manager) => manager.isTelemetryReceiverConfigured + case None => false + } ApiVersionsResponse.createApiVersionsResponse( throttleTimeMs, @@ -149,7 +158,8 @@ class DefaultApiVersionManager( controllerApiVersions.orNull, listenerType, enableUnstableLastVersion, - zkMigrationEnabled + zkMigrationEnabled, + clientTelemetryEnabled ) } diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala b/core/src/main/scala/kafka/server/BrokerServer.scala index 4d5731a3fdc..836818ef8b4 100644 --- a/core/src/main/scala/kafka/server/BrokerServer.scala +++ b/core/src/main/scala/kafka/server/BrokerServer.scala @@ -235,14 +235,15 @@ class BrokerServer( ) clientToControllerChannelManager.start() forwardingManager = new ForwardingManagerImpl(clientToControllerChannelManager) - + clientMetricsManager = new ClientMetricsManager(clientMetricsReceiverPlugin, config.clientTelemetryMaxBytes, time) val apiVersionManager = ApiVersionManager( ListenerType.BROKER, config, Some(forwardingManager), brokerFeatures, - metadataCache + metadataCache, + Some(clientMetricsManager) ) // Create and start the socket server acceptor threads so that the bound port is known. @@ -347,8 +348,6 @@ class BrokerServer( config, Some(clientToControllerChannelManager), None, None, groupCoordinator, transactionCoordinator) - clientMetricsManager = new ClientMetricsManager(clientMetricsReceiverPlugin, config.clientTelemetryMaxBytes, time) - dynamicConfigHandlers = Map[String, ConfigHandler]( ConfigType.Topic -> new TopicConfigHandler(replicaManager, config, quotaManagers, None), ConfigType.Broker -> new BrokerConfigHandler(config, quotaManagers), diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 1ac38a0895d..9464fd5023f 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -3775,16 +3775,38 @@ class KafkaApis(val requestChannel: RequestChannel, } - // Just a place holder for now. def handleGetTelemetrySubscriptionsRequest(request: RequestChannel.Request): Unit = { - requestHelper.sendMaybeThrottle(request, request.body[GetTelemetrySubscriptionsRequest].getErrorResponse(Errors.UNSUPPORTED_VERSION.exception)) - CompletableFuture.completedFuture[Unit](()) + val subscriptionRequest = request.body[GetTelemetrySubscriptionsRequest] + + clientMetricsManager match { + case Some(metricsManager) => + try { + requestHelper.sendMaybeThrottle(request, metricsManager.processGetTelemetrySubscriptionRequest(subscriptionRequest, request.context)) + } catch { + case _: Exception => + requestHelper.sendMaybeThrottle(request, subscriptionRequest.getErrorResponse(Errors.INVALID_REQUEST.exception)) + } + case None => + info("Received get telemetry client request for zookeeper based cluster") + requestHelper.sendMaybeThrottle(request, subscriptionRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception)) + } } - // Just a place holder for now. def handlePushTelemetryRequest(request: RequestChannel.Request): Unit = { - requestHelper.sendMaybeThrottle(request, request.body[PushTelemetryRequest].getErrorResponse(Errors.UNSUPPORTED_VERSION.exception)) - CompletableFuture.completedFuture[Unit](()) + val pushTelemetryRequest = request.body[PushTelemetryRequest] + + clientMetricsManager match { + case Some(metricsManager) => + try { + requestHelper.sendMaybeThrottle(request, metricsManager.processPushTelemetryRequest(pushTelemetryRequest, request.context)) + } catch { + case _: Exception => + requestHelper.sendMaybeThrottle(request, pushTelemetryRequest.getErrorResponse(Errors.INVALID_REQUEST.exception)) + } + case None => + info("Received push telemetry client request for zookeeper based cluster") + requestHelper.sendMaybeThrottle(request, pushTelemetryRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception)) + } } private def updateRecordConversionStats(request: RequestChannel.Request, diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index 2a43f7f5e28..3c9c6076623 100755 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -365,7 +365,8 @@ class KafkaServer( config, forwardingManager, brokerFeatures, - metadataCache + metadataCache, + None ) // Create and start the socket server acceptor threads so that the bound port is known. diff --git a/core/src/test/scala/integration/kafka/admin/BrokerApiVersionsCommandTest.scala b/core/src/test/scala/integration/kafka/admin/BrokerApiVersionsCommandTest.scala index c3eca854ccc..06ee92ff968 100644 --- a/core/src/test/scala/integration/kafka/admin/BrokerApiVersionsCommandTest.scala +++ b/core/src/test/scala/integration/kafka/admin/BrokerApiVersionsCommandTest.scala @@ -87,7 +87,9 @@ class BrokerApiVersionsCommandTest extends KafkaServerTestHarness { else s"${apiVersion.minVersion} to ${apiVersion.maxVersion}" val usableVersion = nodeApiVersions.latestUsableVersion(apiKey) - val line = s"\t${apiKey.name}(${apiKey.id}): $versionRangeStr [usable: $usableVersion]$terminator" + val line = + if (apiKey == ApiKeys.GET_TELEMETRY_SUBSCRIPTIONS || apiKey == ApiKeys.PUSH_TELEMETRY) s"\t${apiKey.name}(${apiKey.id}): UNSUPPORTED$terminator" + else s"\t${apiKey.name}(${apiKey.id}): $versionRangeStr [usable: $usableVersion]$terminator" assertTrue(lineIter.hasNext) assertEquals(line, lineIter.next()) } else { diff --git a/core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala b/core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala index 5276d030cb8..5b9e0aafee2 100644 --- a/core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala @@ -74,6 +74,7 @@ abstract class AbstractApiVersionsRequestTest(cluster: ClusterInstance) { apiVersionsResponse: ApiVersionsResponse, listenerName: ListenerName = cluster.clientListener(), enableUnstableLastVersion: Boolean = false, + clientTelemetryEnabled: Boolean = false, apiVersion: Short = ApiKeys.API_VERSIONS.latestVersion ): Unit = { if (cluster.isKRaftTest && apiVersion >= 3) { @@ -100,7 +101,8 @@ abstract class AbstractApiVersionsRequestTest(cluster: ClusterInstance) { ApiMessageType.ListenerType.BROKER, RecordVersion.current, NodeApiVersions.create(ApiKeys.controllerApis().asScala.map(ApiVersionsResponse.toApiVersion).asJava).allSupportedApiVersions(), - enableUnstableLastVersion + enableUnstableLastVersion, + clientTelemetryEnabled ) } diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala index 82ef89d10b7..bf5ce8750d2 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala @@ -95,6 +95,7 @@ import org.apache.kafka.common.message.CreatePartitionsRequestData.CreatePartiti import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult import org.apache.kafka.common.message.OffsetDeleteResponseData.{OffsetDeleteResponsePartition, OffsetDeleteResponsePartitionCollection, OffsetDeleteResponseTopic, OffsetDeleteResponseTopicCollection} import org.apache.kafka.coordinator.group.GroupCoordinator +import org.apache.kafka.server.ClientMetricsManager import org.apache.kafka.server.common.{Features, MetadataVersion} import org.apache.kafka.server.common.MetadataVersion.{IBP_0_10_2_IV0, IBP_2_2_IV1} import org.apache.kafka.server.metrics.ClientMetricsTestUtils @@ -129,6 +130,7 @@ class KafkaApisTest { private val quotas = QuotaManagers(clientQuotaManager, clientQuotaManager, clientRequestQuotaManager, clientControllerQuotaManager, replicaQuotaManager, replicaQuotaManager, replicaQuotaManager, None) private val fetchManager: FetchManager = mock(classOf[FetchManager]) + private val clientMetricsManager: ClientMetricsManager = mock(classOf[ClientMetricsManager]) private val brokerTopicStats = new BrokerTopicStats private val clusterId = "clusterId" private val time = new MockTime @@ -196,6 +198,8 @@ class KafkaApisTest { false, () => new Features(MetadataVersion.latest(), Collections.emptyMap[String, java.lang.Short], 0, raftSupport)) + val clientMetricsManagerOpt = if (raftSupport) Some(clientMetricsManager) else None + new KafkaApis( requestChannel = requestChannel, metadataSupport = metadataSupport, @@ -216,7 +220,7 @@ class KafkaApisTest { time = time, tokenManager = null, apiVersionManager = apiVersionManager, - clientMetricsManager = null) + clientMetricsManager = clientMetricsManagerOpt) } @Test @@ -6840,18 +6844,39 @@ class KafkaApisTest { } @Test - def testGetTelemetrySubscriptionsUnsupportedVersionForKRaftClusters(): Unit = { - val data = new GetTelemetrySubscriptionsRequestData() + def testGetTelemetrySubscriptions(): Unit = { + val request = buildRequest(new GetTelemetrySubscriptionsRequest.Builder( + new GetTelemetrySubscriptionsRequestData(), true).build()) - val request = buildRequest(new GetTelemetrySubscriptionsRequest.Builder(data, true).build()) - val errorCode = Errors.UNSUPPORTED_VERSION.code - val expectedResponse = new GetTelemetrySubscriptionsResponseData() - expectedResponse.setErrorCode(errorCode) + when(clientMetricsManager.isTelemetryReceiverConfigured).thenReturn(true) + when(clientMetricsManager.processGetTelemetrySubscriptionRequest(any[GetTelemetrySubscriptionsRequest](), + any[RequestContext]())).thenReturn(new GetTelemetrySubscriptionsResponse( + new GetTelemetrySubscriptionsResponseData())) metadataCache = MetadataCache.kRaftMetadataCache(brokerId) createKafkaApis(raftSupport = true).handle(request, RequestLocal.NoCaching) + val response = verifyNoThrottling[GetTelemetrySubscriptionsResponse](request) + val expectedResponse = new GetTelemetrySubscriptionsResponseData() + assertEquals(expectedResponse, response.data) + } + + @Test + def testGetTelemetrySubscriptionsWithException(): Unit = { + val request = buildRequest(new GetTelemetrySubscriptionsRequest.Builder( + new GetTelemetrySubscriptionsRequestData(), true).build()) + + when(clientMetricsManager.isTelemetryReceiverConfigured).thenReturn(true) + when(clientMetricsManager.processGetTelemetrySubscriptionRequest(any[GetTelemetrySubscriptionsRequest](), + any[RequestContext]())).thenThrow(new RuntimeException("test")) + + metadataCache = MetadataCache.kRaftMetadataCache(brokerId) + createKafkaApis(raftSupport = true).handle(request, RequestLocal.NoCaching) + + val response = verifyNoThrottling[GetTelemetrySubscriptionsResponse](request) + + val expectedResponse = new GetTelemetrySubscriptionsResponseData().setErrorCode(Errors.INVALID_REQUEST.code) assertEquals(expectedResponse, response.data) } @@ -6867,18 +6892,34 @@ class KafkaApisTest { } @Test - def testPushTelemetryUnsupportedVersionForKRaftClusters(): Unit = { - val data = new PushTelemetryRequestData() + def testPushTelemetry(): Unit = { + val request = buildRequest(new PushTelemetryRequest.Builder(new PushTelemetryRequestData(), true).build()) - val request = buildRequest(new PushTelemetryRequest.Builder(data, true).build()) - val errorCode = Errors.UNSUPPORTED_VERSION.code - val expectedResponse = new PushTelemetryResponseData() - expectedResponse.setErrorCode(errorCode) + when(clientMetricsManager.isTelemetryReceiverConfigured).thenReturn(true) + when(clientMetricsManager.processPushTelemetryRequest(any[PushTelemetryRequest](), any[RequestContext]())) + .thenReturn(new PushTelemetryResponse(new PushTelemetryResponseData())) metadataCache = MetadataCache.kRaftMetadataCache(brokerId) createKafkaApis(raftSupport = true).handle(request, RequestLocal.NoCaching) val response = verifyNoThrottling[PushTelemetryResponse](request) + val expectedResponse = new PushTelemetryResponseData().setErrorCode(Errors.NONE.code) + assertEquals(expectedResponse, response.data) + } + + @Test + def testPushTelemetryWithException(): Unit = { + val request = buildRequest(new PushTelemetryRequest.Builder(new PushTelemetryRequestData(), true).build()) + + when(clientMetricsManager.isTelemetryReceiverConfigured()).thenReturn(true) + when(clientMetricsManager.processPushTelemetryRequest(any[PushTelemetryRequest](), any[RequestContext]())) + .thenThrow(new RuntimeException("test")) + + metadataCache = MetadataCache.kRaftMetadataCache(brokerId) + createKafkaApis(raftSupport = true).handle(request, RequestLocal.NoCaching) + val response = verifyNoThrottling[PushTelemetryResponse](request) + + val expectedResponse = new PushTelemetryResponseData().setErrorCode(Errors.INVALID_REQUEST.code) assertEquals(expectedResponse, response.data) } } diff --git a/server/src/main/java/org/apache/kafka/server/metrics/ClientMetricsReceiverPlugin.java b/server/src/main/java/org/apache/kafka/server/metrics/ClientMetricsReceiverPlugin.java index 7108155b07e..c608510350a 100644 --- a/server/src/main/java/org/apache/kafka/server/metrics/ClientMetricsReceiverPlugin.java +++ b/server/src/main/java/org/apache/kafka/server/metrics/ClientMetricsReceiverPlugin.java @@ -21,6 +21,7 @@ import org.apache.kafka.common.requests.RequestContext; import org.apache.kafka.server.telemetry.ClientTelemetryReceiver; import java.util.ArrayList; +import java.util.Collections; import java.util.List; /** @@ -32,7 +33,7 @@ public class ClientMetricsReceiverPlugin { private final List receivers; public ClientMetricsReceiverPlugin() { - this.receivers = new ArrayList<>(); + this.receivers = Collections.synchronizedList(new ArrayList<>()); } public boolean isEmpty() {