KAFKA-19504: Remove unused metrics reporter initialization in KafkaAdminClient (#20166)
CI / build (push) Has been cancelled Details

The `AdminClient` adds a telemetry reporter to the metrics reporters
list in the constructor.  The problem is that the reporter was already
added in the `createInternal` method.  In the `createInternal` method
call, the `clientTelemetryReporter` is added to a
`List<MetricReporters>` which is passed to the `Metrics` object, will
get closed when `Metrics.close()` is called.  But adding a reporter to
the reporters list in the constructor is not used by the `Metrics`
object and hence doesn't get closed, causing a memory leak.

All related tests pass after this change.

Reviewers: Apoorv Mittal <apoorvmittal10@apache.org>, Matthias J. Sax
 <matthias@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>,
 Jhen-Yung Hsu <jhenyunghsu@gmail.com>
This commit is contained in:
Bill Bejeck 2025-07-14 20:19:16 -04:00 committed by Bill
parent 793294bd2e
commit f35f94b3e6
1 changed files with 3 additions and 3 deletions

View File

@ -579,10 +579,12 @@ public class KafkaAdminClient extends AdminClient {
Time time) {
Metrics metrics = null;
String clientId = generateClientId(config);
List<MetricsReporter> reporters = CommonClientConfigs.metricsReporters(clientId, config);
Optional<ClientTelemetryReporter> clientTelemetryReporter = CommonClientConfigs.telemetryReporter(clientId, config);
clientTelemetryReporter.ifPresent(reporters::add);
try {
metrics = new Metrics(new MetricConfig(), new LinkedList<>(), time);
metrics = new Metrics(new MetricConfig(), reporters, time);
LogContext logContext = createLogContext(clientId);
return new KafkaAdminClient(config, clientId, time, metadataManager, metrics,
client, null, logContext, clientTelemetryReporter);
@ -627,9 +629,7 @@ public class KafkaAdminClient extends AdminClient {
CommonClientConfigs.RETRY_BACKOFF_EXP_BASE,
retryBackoffMaxMs,
CommonClientConfigs.RETRY_BACKOFF_JITTER);
List<MetricsReporter> reporters = CommonClientConfigs.metricsReporters(this.clientId, config);
this.clientTelemetryReporter = clientTelemetryReporter;
this.clientTelemetryReporter.ifPresent(reporters::add);
this.metadataRecoveryStrategy = MetadataRecoveryStrategy.forName(config.getString(AdminClientConfig.METADATA_RECOVERY_STRATEGY_CONFIG));
this.partitionLeaderCache = new HashMap<>();
this.adminFetchMetricsManager = new AdminFetchMetricsManager(metrics);