KAFKA-15830: Add telemetry API handling (KIP-714) (#14767)

The PR adds handling of telemetry APIs in KafkaAPIs.scala which calls the respective manager to handle the API calls. Also the telemetry plugin which if registered in configs get registered for exporting client metrics.

Reviewers: Jun Rao <junrao@gmail.com>
This commit is contained in:
Apoorv Mittal 2023-12-05 05:30:35 +05:30 committed by GitHub
parent a83bc2d977
commit 463ed09f4e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 185 additions and 44 deletions

View File

@ -114,7 +114,8 @@ public class ApiVersionsResponse extends AbstractResponse {
NodeApiVersions controllerApiVersions,
ListenerType listenerType,
boolean enableUnstableLastVersion,
boolean zkMigrationEnabled
boolean zkMigrationEnabled,
boolean clientTelemetryEnabled
) {
ApiVersionCollection apiKeys;
if (controllerApiVersions != null) {
@ -122,13 +123,15 @@ public class ApiVersionsResponse extends AbstractResponse {
listenerType,
minRecordVersion,
controllerApiVersions.allSupportedApiVersions(),
enableUnstableLastVersion
enableUnstableLastVersion,
clientTelemetryEnabled
);
} else {
apiKeys = filterApis(
minRecordVersion,
listenerType,
enableUnstableLastVersion
enableUnstableLastVersion,
clientTelemetryEnabled
);
}
@ -167,16 +170,21 @@ public class ApiVersionsResponse extends AbstractResponse {
RecordVersion minRecordVersion,
ApiMessageType.ListenerType listenerType
) {
return filterApis(minRecordVersion, listenerType, false);
return filterApis(minRecordVersion, listenerType, false, false);
}
public static ApiVersionCollection filterApis(
RecordVersion minRecordVersion,
ApiMessageType.ListenerType listenerType,
boolean enableUnstableLastVersion
boolean enableUnstableLastVersion,
boolean clientTelemetryEnabled
) {
ApiVersionCollection apiKeys = new ApiVersionCollection();
for (ApiKeys apiKey : ApiKeys.apisForListener(listenerType)) {
// Skip telemetry APIs if client telemetry is disabled.
if ((apiKey == ApiKeys.GET_TELEMETRY_SUBSCRIPTIONS || apiKey == ApiKeys.PUSH_TELEMETRY) && !clientTelemetryEnabled)
continue;
if (apiKey.minRequiredInterBrokerMagic <= minRecordVersion.value) {
apiKey.toApiVersion(enableUnstableLastVersion).ifPresent(apiKeys::add);
}
@ -203,13 +211,15 @@ public class ApiVersionsResponse extends AbstractResponse {
* @param minRecordVersion min inter broker magic
* @param activeControllerApiVersions controller ApiVersions
* @param enableUnstableLastVersion whether unstable versions should be advertised or not
* @param clientTelemetryEnabled whether client telemetry is enabled or not
* @return commonly agreed ApiVersion collection
*/
public static ApiVersionCollection intersectForwardableApis(
final ApiMessageType.ListenerType listenerType,
final RecordVersion minRecordVersion,
final Map<ApiKeys, ApiVersion> activeControllerApiVersions,
boolean enableUnstableLastVersion
boolean enableUnstableLastVersion,
boolean clientTelemetryEnabled
) {
ApiVersionCollection apiKeys = new ApiVersionCollection();
for (ApiKeys apiKey : ApiKeys.apisForListener(listenerType)) {
@ -220,6 +230,10 @@ public class ApiVersionsResponse extends AbstractResponse {
continue;
}
// Skip telemetry APIs if client telemetry is disabled.
if ((apiKey == ApiKeys.GET_TELEMETRY_SUBSCRIPTIONS || apiKey == ApiKeys.PUSH_TELEMETRY) && !clientTelemetryEnabled)
continue;
final ApiVersion finalApiVersion;
if (!apiKey.forwardable) {
finalApiVersion = brokerApiVersion.get();

View File

@ -103,7 +103,8 @@ public class ApiVersionsResponseTest {
ApiMessageType.ListenerType.ZK_BROKER,
RecordVersion.current(),
activeControllerApiVersions,
true
true,
false
);
verifyVersions(forwardableAPIKey.id, minVersion, maxVersion, commonResponse);
@ -123,7 +124,8 @@ public class ApiVersionsResponseTest {
null,
ListenerType.ZK_BROKER,
true,
false
false,
true
);
verifyApiKeysForMagic(response, RecordBatch.MAGIC_VALUE_V1);
assertEquals(10, response.throttleTimeMs());
@ -144,7 +146,8 @@ public class ApiVersionsResponseTest {
null,
ListenerType.ZK_BROKER,
true,
false
false,
true
);
verifyApiKeysForMagic(response, RecordBatch.MAGIC_VALUE_V1);
@ -174,7 +177,8 @@ public class ApiVersionsResponseTest {
null,
listenerType,
true,
false
false,
true
);
assertEquals(new HashSet<>(ApiKeys.apisForListener(listenerType)), apiKeysInResponse(response));
assertEquals(AbstractResponse.DEFAULT_THROTTLE_TIME, response.throttleTimeMs());
@ -183,6 +187,40 @@ public class ApiVersionsResponseTest {
assertEquals(ApiVersionsResponse.UNKNOWN_FINALIZED_FEATURES_EPOCH, response.data().finalizedFeaturesEpoch());
}
@Test
public void shouldCreateApiResponseWithTelemetryWhenEnabled() {
ApiVersionsResponse response = ApiVersionsResponse.createApiVersionsResponse(
10,
RecordVersion.V1,
Features.emptySupportedFeatures(),
Collections.emptyMap(),
ApiVersionsResponse.UNKNOWN_FINALIZED_FEATURES_EPOCH,
null,
ListenerType.BROKER,
true,
false,
true
);
verifyApiKeysForTelemetry(response, 2);
}
@Test
public void shouldNotCreateApiResponseWithTelemetryWhenDisabled() {
ApiVersionsResponse response = ApiVersionsResponse.createApiVersionsResponse(
10,
RecordVersion.V1,
Features.emptySupportedFeatures(),
Collections.emptyMap(),
ApiVersionsResponse.UNKNOWN_FINALIZED_FEATURES_EPOCH,
null,
ListenerType.BROKER,
true,
false,
false
);
verifyApiKeysForTelemetry(response, 0);
}
@Test
public void testMetadataQuorumApisAreDisabled() {
ApiVersionsResponse response = ApiVersionsResponse.createApiVersionsResponse(
@ -194,7 +232,8 @@ public class ApiVersionsResponseTest {
null,
ListenerType.ZK_BROKER,
true,
false
false,
true
);
// Ensure that APIs needed for the KRaft mode are not exposed through ApiVersions until we are ready for them
@ -254,6 +293,16 @@ public class ApiVersionsResponseTest {
}
}
private void verifyApiKeysForTelemetry(ApiVersionsResponse response, int expectedCount) {
int count = 0;
for (ApiVersion version : response.data().apiKeys()) {
if (version.apiKey() == ApiKeys.GET_TELEMETRY_SUBSCRIPTIONS.id || version.apiKey() == ApiKeys.PUSH_TELEMETRY.id) {
count++;
}
}
assertEquals(expectedCount, count);
}
private HashSet<ApiKeys> apiKeysInResponse(ApiVersionsResponse apiVersions) {
HashSet<ApiKeys> apiKeys = new HashSet<>();
for (ApiVersion version : apiVersions.data().apiKeys()) {

View File

@ -600,7 +600,7 @@ public class TestUtils {
) {
return createApiVersionsResponse(
throttleTimeMs,
ApiVersionsResponse.filterApis(RecordVersion.current(), listenerType, true),
ApiVersionsResponse.filterApis(RecordVersion.current(), listenerType, true, true),
Features.emptySupportedFeatures(),
false
);
@ -613,7 +613,7 @@ public class TestUtils {
) {
return createApiVersionsResponse(
throttleTimeMs,
ApiVersionsResponse.filterApis(RecordVersion.current(), listenerType, enableUnstableLastVersion),
ApiVersionsResponse.filterApis(RecordVersion.current(), listenerType, enableUnstableLastVersion, true),
Features.emptySupportedFeatures(),
false
);

View File

@ -22,6 +22,7 @@ import org.apache.kafka.common.feature.SupportedVersionRange
import org.apache.kafka.common.message.ApiMessageType.ListenerType
import org.apache.kafka.common.protocol.ApiKeys
import org.apache.kafka.common.requests.ApiVersionsResponse
import org.apache.kafka.server.ClientMetricsManager
import org.apache.kafka.server.common.Features
import scala.jdk.CollectionConverters._
@ -47,7 +48,8 @@ object ApiVersionManager {
config: KafkaConfig,
forwardingManager: Option[ForwardingManager],
supportedFeatures: BrokerFeatures,
metadataCache: MetadataCache
metadataCache: MetadataCache,
clientMetricsManager: Option[ClientMetricsManager]
): ApiVersionManager = {
new DefaultApiVersionManager(
listenerType,
@ -55,7 +57,8 @@ object ApiVersionManager {
supportedFeatures,
metadataCache,
config.unstableApiVersionsEnabled,
config.migrationEnabled
config.migrationEnabled,
clientMetricsManager
)
}
}
@ -123,6 +126,7 @@ class SimpleApiVersionManager(
* @param metadataCache the metadata cache, used to get the finalized features and the metadata version
* @param enableUnstableLastVersion whether to enable unstable last version, see [[KafkaConfig.unstableApiVersionsEnabled]]
* @param zkMigrationEnabled whether to enable zk migration, see [[KafkaConfig.migrationEnabled]]
* @param clientMetricsManager the client metrics manager, helps to determine whether client telemetry is enabled
*/
class DefaultApiVersionManager(
val listenerType: ListenerType,
@ -130,7 +134,8 @@ class DefaultApiVersionManager(
brokerFeatures: BrokerFeatures,
metadataCache: MetadataCache,
val enableUnstableLastVersion: Boolean,
val zkMigrationEnabled: Boolean = false
val zkMigrationEnabled: Boolean = false,
val clientMetricsManager: Option[ClientMetricsManager] = None
) extends ApiVersionManager {
val enabledApis = ApiKeys.apisForListener(listenerType).asScala
@ -139,6 +144,10 @@ class DefaultApiVersionManager(
val supportedFeatures = brokerFeatures.supportedFeatures
val finalizedFeatures = metadataCache.features()
val controllerApiVersions = forwardingManager.flatMap(_.controllerApiVersions)
val clientTelemetryEnabled = clientMetricsManager match {
case Some(manager) => manager.isTelemetryReceiverConfigured
case None => false
}
ApiVersionsResponse.createApiVersionsResponse(
throttleTimeMs,
@ -149,7 +158,8 @@ class DefaultApiVersionManager(
controllerApiVersions.orNull,
listenerType,
enableUnstableLastVersion,
zkMigrationEnabled
zkMigrationEnabled,
clientTelemetryEnabled
)
}

View File

@ -235,14 +235,15 @@ class BrokerServer(
)
clientToControllerChannelManager.start()
forwardingManager = new ForwardingManagerImpl(clientToControllerChannelManager)
clientMetricsManager = new ClientMetricsManager(clientMetricsReceiverPlugin, config.clientTelemetryMaxBytes, time)
val apiVersionManager = ApiVersionManager(
ListenerType.BROKER,
config,
Some(forwardingManager),
brokerFeatures,
metadataCache
metadataCache,
Some(clientMetricsManager)
)
// Create and start the socket server acceptor threads so that the bound port is known.
@ -347,8 +348,6 @@ class BrokerServer(
config, Some(clientToControllerChannelManager), None, None,
groupCoordinator, transactionCoordinator)
clientMetricsManager = new ClientMetricsManager(clientMetricsReceiverPlugin, config.clientTelemetryMaxBytes, time)
dynamicConfigHandlers = Map[String, ConfigHandler](
ConfigType.Topic -> new TopicConfigHandler(replicaManager, config, quotaManagers, None),
ConfigType.Broker -> new BrokerConfigHandler(config, quotaManagers),

View File

@ -3775,16 +3775,38 @@ class KafkaApis(val requestChannel: RequestChannel,
}
// Just a place holder for now.
def handleGetTelemetrySubscriptionsRequest(request: RequestChannel.Request): Unit = {
requestHelper.sendMaybeThrottle(request, request.body[GetTelemetrySubscriptionsRequest].getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
CompletableFuture.completedFuture[Unit](())
val subscriptionRequest = request.body[GetTelemetrySubscriptionsRequest]
clientMetricsManager match {
case Some(metricsManager) =>
try {
requestHelper.sendMaybeThrottle(request, metricsManager.processGetTelemetrySubscriptionRequest(subscriptionRequest, request.context))
} catch {
case _: Exception =>
requestHelper.sendMaybeThrottle(request, subscriptionRequest.getErrorResponse(Errors.INVALID_REQUEST.exception))
}
case None =>
info("Received get telemetry client request for zookeeper based cluster")
requestHelper.sendMaybeThrottle(request, subscriptionRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
}
}
// Just a place holder for now.
def handlePushTelemetryRequest(request: RequestChannel.Request): Unit = {
requestHelper.sendMaybeThrottle(request, request.body[PushTelemetryRequest].getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
CompletableFuture.completedFuture[Unit](())
val pushTelemetryRequest = request.body[PushTelemetryRequest]
clientMetricsManager match {
case Some(metricsManager) =>
try {
requestHelper.sendMaybeThrottle(request, metricsManager.processPushTelemetryRequest(pushTelemetryRequest, request.context))
} catch {
case _: Exception =>
requestHelper.sendMaybeThrottle(request, pushTelemetryRequest.getErrorResponse(Errors.INVALID_REQUEST.exception))
}
case None =>
info("Received push telemetry client request for zookeeper based cluster")
requestHelper.sendMaybeThrottle(request, pushTelemetryRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
}
}
private def updateRecordConversionStats(request: RequestChannel.Request,

View File

@ -365,7 +365,8 @@ class KafkaServer(
config,
forwardingManager,
brokerFeatures,
metadataCache
metadataCache,
None
)
// Create and start the socket server acceptor threads so that the bound port is known.

View File

@ -87,7 +87,9 @@ class BrokerApiVersionsCommandTest extends KafkaServerTestHarness {
else s"${apiVersion.minVersion} to ${apiVersion.maxVersion}"
val usableVersion = nodeApiVersions.latestUsableVersion(apiKey)
val line = s"\t${apiKey.name}(${apiKey.id}): $versionRangeStr [usable: $usableVersion]$terminator"
val line =
if (apiKey == ApiKeys.GET_TELEMETRY_SUBSCRIPTIONS || apiKey == ApiKeys.PUSH_TELEMETRY) s"\t${apiKey.name}(${apiKey.id}): UNSUPPORTED$terminator"
else s"\t${apiKey.name}(${apiKey.id}): $versionRangeStr [usable: $usableVersion]$terminator"
assertTrue(lineIter.hasNext)
assertEquals(line, lineIter.next())
} else {

View File

@ -74,6 +74,7 @@ abstract class AbstractApiVersionsRequestTest(cluster: ClusterInstance) {
apiVersionsResponse: ApiVersionsResponse,
listenerName: ListenerName = cluster.clientListener(),
enableUnstableLastVersion: Boolean = false,
clientTelemetryEnabled: Boolean = false,
apiVersion: Short = ApiKeys.API_VERSIONS.latestVersion
): Unit = {
if (cluster.isKRaftTest && apiVersion >= 3) {
@ -100,7 +101,8 @@ abstract class AbstractApiVersionsRequestTest(cluster: ClusterInstance) {
ApiMessageType.ListenerType.BROKER,
RecordVersion.current,
NodeApiVersions.create(ApiKeys.controllerApis().asScala.map(ApiVersionsResponse.toApiVersion).asJava).allSupportedApiVersions(),
enableUnstableLastVersion
enableUnstableLastVersion,
clientTelemetryEnabled
)
}

View File

@ -95,6 +95,7 @@ import org.apache.kafka.common.message.CreatePartitionsRequestData.CreatePartiti
import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult
import org.apache.kafka.common.message.OffsetDeleteResponseData.{OffsetDeleteResponsePartition, OffsetDeleteResponsePartitionCollection, OffsetDeleteResponseTopic, OffsetDeleteResponseTopicCollection}
import org.apache.kafka.coordinator.group.GroupCoordinator
import org.apache.kafka.server.ClientMetricsManager
import org.apache.kafka.server.common.{Features, MetadataVersion}
import org.apache.kafka.server.common.MetadataVersion.{IBP_0_10_2_IV0, IBP_2_2_IV1}
import org.apache.kafka.server.metrics.ClientMetricsTestUtils
@ -129,6 +130,7 @@ class KafkaApisTest {
private val quotas = QuotaManagers(clientQuotaManager, clientQuotaManager, clientRequestQuotaManager,
clientControllerQuotaManager, replicaQuotaManager, replicaQuotaManager, replicaQuotaManager, None)
private val fetchManager: FetchManager = mock(classOf[FetchManager])
private val clientMetricsManager: ClientMetricsManager = mock(classOf[ClientMetricsManager])
private val brokerTopicStats = new BrokerTopicStats
private val clusterId = "clusterId"
private val time = new MockTime
@ -196,6 +198,8 @@ class KafkaApisTest {
false,
() => new Features(MetadataVersion.latest(), Collections.emptyMap[String, java.lang.Short], 0, raftSupport))
val clientMetricsManagerOpt = if (raftSupport) Some(clientMetricsManager) else None
new KafkaApis(
requestChannel = requestChannel,
metadataSupport = metadataSupport,
@ -216,7 +220,7 @@ class KafkaApisTest {
time = time,
tokenManager = null,
apiVersionManager = apiVersionManager,
clientMetricsManager = null)
clientMetricsManager = clientMetricsManagerOpt)
}
@Test
@ -6840,18 +6844,39 @@ class KafkaApisTest {
}
@Test
def testGetTelemetrySubscriptionsUnsupportedVersionForKRaftClusters(): Unit = {
val data = new GetTelemetrySubscriptionsRequestData()
def testGetTelemetrySubscriptions(): Unit = {
val request = buildRequest(new GetTelemetrySubscriptionsRequest.Builder(
new GetTelemetrySubscriptionsRequestData(), true).build())
val request = buildRequest(new GetTelemetrySubscriptionsRequest.Builder(data, true).build())
val errorCode = Errors.UNSUPPORTED_VERSION.code
val expectedResponse = new GetTelemetrySubscriptionsResponseData()
expectedResponse.setErrorCode(errorCode)
when(clientMetricsManager.isTelemetryReceiverConfigured).thenReturn(true)
when(clientMetricsManager.processGetTelemetrySubscriptionRequest(any[GetTelemetrySubscriptionsRequest](),
any[RequestContext]())).thenReturn(new GetTelemetrySubscriptionsResponse(
new GetTelemetrySubscriptionsResponseData()))
metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
createKafkaApis(raftSupport = true).handle(request, RequestLocal.NoCaching)
val response = verifyNoThrottling[GetTelemetrySubscriptionsResponse](request)
val expectedResponse = new GetTelemetrySubscriptionsResponseData()
assertEquals(expectedResponse, response.data)
}
@Test
def testGetTelemetrySubscriptionsWithException(): Unit = {
val request = buildRequest(new GetTelemetrySubscriptionsRequest.Builder(
new GetTelemetrySubscriptionsRequestData(), true).build())
when(clientMetricsManager.isTelemetryReceiverConfigured).thenReturn(true)
when(clientMetricsManager.processGetTelemetrySubscriptionRequest(any[GetTelemetrySubscriptionsRequest](),
any[RequestContext]())).thenThrow(new RuntimeException("test"))
metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
createKafkaApis(raftSupport = true).handle(request, RequestLocal.NoCaching)
val response = verifyNoThrottling[GetTelemetrySubscriptionsResponse](request)
val expectedResponse = new GetTelemetrySubscriptionsResponseData().setErrorCode(Errors.INVALID_REQUEST.code)
assertEquals(expectedResponse, response.data)
}
@ -6867,18 +6892,34 @@ class KafkaApisTest {
}
@Test
def testPushTelemetryUnsupportedVersionForKRaftClusters(): Unit = {
val data = new PushTelemetryRequestData()
def testPushTelemetry(): Unit = {
val request = buildRequest(new PushTelemetryRequest.Builder(new PushTelemetryRequestData(), true).build())
val request = buildRequest(new PushTelemetryRequest.Builder(data, true).build())
val errorCode = Errors.UNSUPPORTED_VERSION.code
val expectedResponse = new PushTelemetryResponseData()
expectedResponse.setErrorCode(errorCode)
when(clientMetricsManager.isTelemetryReceiverConfigured).thenReturn(true)
when(clientMetricsManager.processPushTelemetryRequest(any[PushTelemetryRequest](), any[RequestContext]()))
.thenReturn(new PushTelemetryResponse(new PushTelemetryResponseData()))
metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
createKafkaApis(raftSupport = true).handle(request, RequestLocal.NoCaching)
val response = verifyNoThrottling[PushTelemetryResponse](request)
val expectedResponse = new PushTelemetryResponseData().setErrorCode(Errors.NONE.code)
assertEquals(expectedResponse, response.data)
}
@Test
def testPushTelemetryWithException(): Unit = {
val request = buildRequest(new PushTelemetryRequest.Builder(new PushTelemetryRequestData(), true).build())
when(clientMetricsManager.isTelemetryReceiverConfigured()).thenReturn(true)
when(clientMetricsManager.processPushTelemetryRequest(any[PushTelemetryRequest](), any[RequestContext]()))
.thenThrow(new RuntimeException("test"))
metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
createKafkaApis(raftSupport = true).handle(request, RequestLocal.NoCaching)
val response = verifyNoThrottling[PushTelemetryResponse](request)
val expectedResponse = new PushTelemetryResponseData().setErrorCode(Errors.INVALID_REQUEST.code)
assertEquals(expectedResponse, response.data)
}
}

View File

@ -21,6 +21,7 @@ import org.apache.kafka.common.requests.RequestContext;
import org.apache.kafka.server.telemetry.ClientTelemetryReceiver;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
/**
@ -32,7 +33,7 @@ public class ClientMetricsReceiverPlugin {
private final List<ClientTelemetryReceiver> receivers;
public ClientMetricsReceiverPlugin() {
this.receivers = new ArrayList<>();
this.receivers = Collections.synchronizedList(new ArrayList<>());
}
public boolean isEmpty() {