From dd82542493e2350bb6b1f01b1f716656b02804da Mon Sep 17 00:00:00 2001 From: Bill Bejeck Date: Mon, 14 Jul 2025 20:19:16 -0400 Subject: [PATCH] KAFKA-19504: Remove unused metrics reporter initialization in KafkaAdminClient (#20166) The `AdminClient` adds a telemetry reporter to the metrics reporters list in the constructor. The problem is that the reporter was already added in the `createInternal` method. In the `createInternal` method call, the `clientTelemetryReporter` is added to a `List` which is passed to the `Metrics` object, will get closed when `Metrics.close()` is called. But adding a reporter to the reporters list in the constructor is not used by the `Metrics` object and hence doesn't get closed, causing a memory leak. All related tests pass after this change. Reviewers: Apoorv Mittal , Matthias J. Sax , Chia-Ping Tsai , Jhen-Yung Hsu --- .../org/apache/kafka/clients/admin/KafkaAdminClient.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index e1be4304950..ac2d67d2022 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -579,10 +579,12 @@ public class KafkaAdminClient extends AdminClient { Time time) { Metrics metrics = null; String clientId = generateClientId(config); + List reporters = CommonClientConfigs.metricsReporters(clientId, config); Optional clientTelemetryReporter = CommonClientConfigs.telemetryReporter(clientId, config); + clientTelemetryReporter.ifPresent(reporters::add); try { - metrics = new Metrics(new MetricConfig(), new LinkedList<>(), time); + metrics = new Metrics(new MetricConfig(), reporters, time); LogContext logContext = createLogContext(clientId); return new KafkaAdminClient(config, clientId, time, metadataManager, metrics, client, null, logContext, clientTelemetryReporter); @@ -627,9 +629,7 @@ public class KafkaAdminClient extends AdminClient { CommonClientConfigs.RETRY_BACKOFF_EXP_BASE, retryBackoffMaxMs, CommonClientConfigs.RETRY_BACKOFF_JITTER); - List reporters = CommonClientConfigs.metricsReporters(this.clientId, config); this.clientTelemetryReporter = clientTelemetryReporter; - this.clientTelemetryReporter.ifPresent(reporters::add); this.metadataRecoveryStrategy = MetadataRecoveryStrategy.forName(config.getString(AdminClientConfig.METADATA_RECOVERY_STRATEGY_CONFIG)); this.partitionLeaderCache = new HashMap<>(); this.adminFetchMetricsManager = new AdminFetchMetricsManager(metrics);