MINOR: KafkaProducerTest - Fix resource leakage and replace explicit invocation of close() method with try with resources (#18678)

Reviewers: Divij Vaidya <diviv@amazon.com>, Greg Harris <greg.harris@aiven.io>, Christo Lolov <lolovc@amazon.com>
This commit is contained in:
Pramithas Dhakal 2025-01-30 17:19:57 +05:45 committed by GitHub
parent c0b5d3334a
commit aa27df9396
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 130 additions and 92 deletions

View File

@ -95,6 +95,7 @@ import org.apache.kafka.test.MockSerializer;
import org.apache.kafka.test.TestUtils;
import org.apache.logging.log4j.Level;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
@ -135,6 +136,7 @@ import javax.management.ObjectName;
import static java.util.Collections.emptyMap;
import static java.util.Collections.singletonMap;
import static org.apache.kafka.clients.producer.KafkaProducer.NETWORK_THREAD_PREFIX;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
@ -205,6 +207,12 @@ public class KafkaProducerTest {
this.testInfo = testInfo;
}
@AfterEach
public void detectLeaks() {
// Assert no thread leakage of Kafka producer.
TestUtils.assertNoLeakedThreadsWithNameAndDaemonStatus(NETWORK_THREAD_PREFIX, Boolean.TRUE);
}
@Test
public void testOverwriteAcksAndRetriesForIdempotentProducers() {
Properties props = new Properties();
@ -468,16 +476,14 @@ public class KafkaProducerTest {
Properties props = new Properties();
props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
props.setProperty(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName());
KafkaProducer<String, String> producer = new KafkaProducer<>(
props, new StringSerializer(), new StringSerializer());
try (KafkaProducer<String, String> producer = new KafkaProducer<>(
props, new StringSerializer(), new StringSerializer())) {
assertEquals(2, producer.metrics.reporters().size());
assertEquals(2, producer.metrics.reporters().size());
MockMetricsReporter mockMetricsReporter = (MockMetricsReporter) producer.metrics.reporters().stream()
.filter(reporter -> reporter instanceof MockMetricsReporter).findFirst().get();
assertEquals(producer.getClientId(), mockMetricsReporter.clientId);
producer.close();
MockMetricsReporter mockMetricsReporter = (MockMetricsReporter) producer.metrics.reporters().stream()
.filter(reporter -> reporter instanceof MockMetricsReporter).findFirst().get();
assertEquals(producer.getClientId(), mockMetricsReporter.clientId);
}
}
@Test
@ -486,9 +492,9 @@ public class KafkaProducerTest {
props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
props.setProperty(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG, "");
props.setProperty(ProducerConfig.ENABLE_METRICS_PUSH_CONFIG, "false");
KafkaProducer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer());
assertTrue(producer.metrics.reporters().isEmpty());
producer.close();
try (KafkaProducer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer())) {
assertTrue(producer.metrics.reporters().isEmpty());
}
}
@Test
@ -497,10 +503,10 @@ public class KafkaProducerTest {
props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
props.setProperty(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG, "org.apache.kafka.common.metrics.JmxReporter");
props.setProperty(ProducerConfig.ENABLE_METRICS_PUSH_CONFIG, "false");
KafkaProducer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer());
assertEquals(1, producer.metrics.reporters().size());
assertInstanceOf(JmxReporter.class, producer.metrics.reporters().get(0));
producer.close();
try (KafkaProducer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer())) {
assertEquals(1, producer.metrics.reporters().size());
assertInstanceOf(JmxReporter.class, producer.metrics.reporters().get(0));
}
}
@Test
@ -508,10 +514,10 @@ public class KafkaProducerTest {
Properties props = new Properties();
props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
props.setProperty(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG, "");
KafkaProducer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer());
assertEquals(1, producer.metrics.reporters().size());
assertInstanceOf(ClientTelemetryReporter.class, producer.metrics.reporters().get(0));
producer.close();
try (KafkaProducer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer())) {
assertEquals(1, producer.metrics.reporters().size());
assertInstanceOf(ClientTelemetryReporter.class, producer.metrics.reporters().get(0));
}
}
@Test
@ -523,15 +529,24 @@ public class KafkaProducerTest {
@Test
public void testNoSerializerProvided() {
Properties producerProps = new Properties();
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
assertThrows(ConfigException.class, () -> new KafkaProducer(producerProps));
assertThrows(ConfigException.class, () -> {
try (KafkaProducer<?, ?> producer = new KafkaProducer<>(producerProps)) {
// KafkaProducer will be closed automatically after the block
}
});
final Map<String, Object> configs = new HashMap<>();
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
// Invalid value null for configuration key.serializer: must be non-null.
assertThrows(ConfigException.class, () -> new KafkaProducer<String, String>(configs));
assertThrows(ConfigException.class, () -> {
try (KafkaProducer<String, String> producer = new KafkaProducer<>(configs)) {
// KafkaProducer will be closed automatically after the block
}
});
}
@Test
@ -604,15 +619,14 @@ public class KafkaProducerTest {
props.setProperty(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, MockProducerInterceptor.class.getName());
props.setProperty(MockProducerInterceptor.APPEND_STRING_PROP, "something");
KafkaProducer<String, String> producer = new KafkaProducer<>(
props, new StringSerializer(), new StringSerializer());
assertEquals(1, MockProducerInterceptor.INIT_COUNT.get());
assertEquals(0, MockProducerInterceptor.CLOSE_COUNT.get());
try (KafkaProducer<String, String> producer = new KafkaProducer<>(
props, new StringSerializer(), new StringSerializer())) {
assertEquals(1, MockProducerInterceptor.INIT_COUNT.get());
assertEquals(0, MockProducerInterceptor.CLOSE_COUNT.get());
// Cluster metadata will only be updated on calling onSend.
assertNull(MockProducerInterceptor.CLUSTER_META.get());
producer.close();
// Cluster metadata will only be updated on calling onSend.
assertNull(MockProducerInterceptor.CLUSTER_META.get());
}
assertEquals(1, MockProducerInterceptor.INIT_COUNT.get());
assertEquals(1, MockProducerInterceptor.CLOSE_COUNT.get());
} finally {
@ -652,12 +666,12 @@ public class KafkaProducerTest {
MockPartitioner.resetCounters();
props.setProperty(ProducerConfig.PARTITIONER_CLASS_CONFIG, MockPartitioner.class.getName());
KafkaProducer<String, String> producer = new KafkaProducer<>(
props, new StringSerializer(), new StringSerializer());
assertEquals(1, MockPartitioner.INIT_COUNT.get());
assertEquals(0, MockPartitioner.CLOSE_COUNT.get());
try (KafkaProducer<String, String> producer = new KafkaProducer<>(
props, new StringSerializer(), new StringSerializer())) {
assertEquals(1, MockPartitioner.INIT_COUNT.get());
assertEquals(0, MockPartitioner.CLOSE_COUNT.get());
}
producer.close();
assertEquals(1, MockPartitioner.INIT_COUNT.get());
assertEquals(1, MockPartitioner.CLOSE_COUNT.get());
} finally {
@ -1934,11 +1948,10 @@ public class KafkaProducerTest {
Properties props = new Properties();
props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
KafkaProducer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer());
Exception exception = assertThrows(IllegalArgumentException.class, () -> producer.clientInstanceId(Duration.ofMillis(-1)));
assertEquals("The timeout cannot be negative.", exception.getMessage());
producer.close();
try (KafkaProducer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer())) {
Exception exception = assertThrows(IllegalArgumentException.class, () -> producer.clientInstanceId(Duration.ofMillis(-1)));
assertEquals("The timeout cannot be negative.", exception.getMessage());
}
}
@Test
@ -1947,11 +1960,10 @@ public class KafkaProducerTest {
props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
props.setProperty(ProducerConfig.ENABLE_METRICS_PUSH_CONFIG, "false");
KafkaProducer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer());
Exception exception = assertThrows(IllegalStateException.class, () -> producer.clientInstanceId(Duration.ofMillis(0)));
assertEquals("Telemetry is not enabled. Set config `enable.metrics.push` to `true`.", exception.getMessage());
producer.close();
try (KafkaProducer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer())) {
Exception exception = assertThrows(IllegalStateException.class, () -> producer.clientInstanceId(Duration.ofMillis(0)));
assertEquals("Telemetry is not enabled. Set config `enable.metrics.push` to `true`.", exception.getMessage());
}
}
private void verifyInvalidGroupMetadata(ConsumerGroupMetadata groupMetadata) {
@ -2227,15 +2239,14 @@ public class KafkaProducerTest {
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
props.put("client.id", "client-1");
KafkaProducer<String, String> producer = new KafkaProducer<>(
props, new StringSerializer(), new StringSerializer());
MBeanServer server = ManagementFactory.getPlatformMBeanServer();
MetricName testMetricName = producer.metrics.metricName("test-metric",
"grp1", "test metric");
producer.metrics.addMetric(testMetricName, new Avg());
assertNotNull(server.getObjectInstance(new ObjectName("kafka.producer:type=grp1,client-id=client-1")));
producer.close();
try (KafkaProducer<String, String> producer = new KafkaProducer<>(
props, new StringSerializer(), new StringSerializer())) {
MBeanServer server = ManagementFactory.getPlatformMBeanServer();
MetricName testMetricName = producer.metrics.metricName("test-metric",
"grp1", "test metric");
producer.metrics.addMetric(testMetricName, new Avg());
assertNotNull(server.getObjectInstance(new ObjectName("kafka.producer:type=grp1,client-id=client-1")));
}
}
private static ProducerMetadata newMetadata(long refreshBackoffMs, long refreshBackoffMaxMs, long expirationMs) {
@ -2646,14 +2657,14 @@ public class KafkaProducerTest {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
KafkaProducer<String, String> producer = new KafkaProducer<>(
props, new StringSerializer(), new StringSerializer());
try (KafkaProducer<String, String> producer = new KafkaProducer<>(
props, new StringSerializer(), new StringSerializer())) {
Map<MetricName, KafkaMetric> customMetrics = customMetrics();
customMetrics.forEach((name, metric) -> producer.registerMetricForSubscription(metric));
Map<MetricName, KafkaMetric> customMetrics = customMetrics();
customMetrics.forEach((name, metric) -> producer.registerMetricForSubscription(metric));
Map<MetricName, ? extends Metric> producerMetrics = producer.metrics();
customMetrics.forEach((name, metric) -> assertFalse(producerMetrics.containsKey(name)));
Map<MetricName, ? extends Metric> producerMetrics = producer.metrics();
customMetrics.forEach((name, metric) -> assertFalse(producerMetrics.containsKey(name)));
}
}
@Test
@ -2661,12 +2672,12 @@ public class KafkaProducerTest {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
KafkaProducer<String, String> producer = new KafkaProducer<>(
props, new StringSerializer(), new StringSerializer());
Map<MetricName, KafkaMetric> customMetrics = customMetrics();
//Metrics never registered but removed should not cause an error
customMetrics.forEach((name, metric) -> assertDoesNotThrow(() -> producer.unregisterMetricFromSubscription(metric)));
try (KafkaProducer<String, String> producer = new KafkaProducer<>(
props, new StringSerializer(), new StringSerializer())) {
Map<MetricName, KafkaMetric> customMetrics = customMetrics();
//Metrics never registered but removed should not cause an error
customMetrics.forEach((name, metric) -> assertDoesNotThrow(() -> producer.unregisterMetricFromSubscription(metric)));
}
}
@Test
@ -2675,12 +2686,13 @@ public class KafkaProducerTest {
appender.setClassLogger(KafkaProducer.class, Level.DEBUG);
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
KafkaProducer<String, String> producer = new KafkaProducer<>(
props, new StringSerializer(), new StringSerializer());
KafkaMetric existingMetricToAdd = (KafkaMetric) producer.metrics().entrySet().iterator().next().getValue();
producer.registerMetricForSubscription(existingMetricToAdd);
final String expectedMessage = String.format("Skipping registration for metric %s. Existing producer metrics cannot be overwritten.", existingMetricToAdd.metricName());
assertTrue(appender.getMessages().stream().anyMatch(m -> m.contains(expectedMessage)));
try (KafkaProducer<String, String> producer = new KafkaProducer<>(
props, new StringSerializer(), new StringSerializer())) {
KafkaMetric existingMetricToAdd = (KafkaMetric) producer.metrics().entrySet().iterator().next().getValue();
producer.registerMetricForSubscription(existingMetricToAdd);
final String expectedMessage = String.format("Skipping registration for metric %s. Existing producer metrics cannot be overwritten.", existingMetricToAdd.metricName());
assertTrue(appender.getMessages().stream().anyMatch(m -> m.contains(expectedMessage)));
}
}
}
@ -2690,12 +2702,13 @@ public class KafkaProducerTest {
appender.setClassLogger(KafkaProducer.class, Level.DEBUG);
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
KafkaProducer<String, String> producer = new KafkaProducer<>(
props, new StringSerializer(), new StringSerializer());
KafkaMetric existingMetricToRemove = (KafkaMetric) producer.metrics().entrySet().iterator().next().getValue();
producer.unregisterMetricFromSubscription(existingMetricToRemove);
final String expectedMessage = String.format("Skipping unregistration for metric %s. Existing producer metrics cannot be removed.", existingMetricToRemove.metricName());
assertTrue(appender.getMessages().stream().anyMatch(m -> m.contains(expectedMessage)));
try (KafkaProducer<String, String> producer = new KafkaProducer<>(
props, new StringSerializer(), new StringSerializer())) {
KafkaMetric existingMetricToRemove = (KafkaMetric) producer.metrics().entrySet().iterator().next().getValue();
producer.unregisterMetricFromSubscription(existingMetricToRemove);
final String expectedMessage = String.format("Skipping unregistration for metric %s. Existing producer metrics cannot be removed.", existingMetricToRemove.metricName());
assertTrue(appender.getMessages().stream().anyMatch(m -> m.contains(expectedMessage)));
}
}
}
@ -2708,12 +2721,13 @@ public class KafkaProducerTest {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
KafkaProducer<String, String> producer = new KafkaProducer<>(
props, new StringSerializer(), new StringSerializer());
KafkaMetric existingMetric = (KafkaMetric) producer.metrics().entrySet().iterator().next().getValue();
producer.registerMetricForSubscription(existingMetric);
// This test would fail without the check as the exising metric is registered in the producer on startup
Mockito.verify(clientTelemetryReporter, atMostOnce()).metricChange(existingMetric);
try (KafkaProducer<String, String> producer = new KafkaProducer<>(
props, new StringSerializer(), new StringSerializer())) {
KafkaMetric existingMetric = (KafkaMetric) producer.metrics().entrySet().iterator().next().getValue();
producer.registerMetricForSubscription(existingMetric);
// This test would fail without the check as the exising metric is registered in the producer on startup
Mockito.verify(clientTelemetryReporter, atMostOnce()).metricChange(existingMetric);
}
}
}
@ -2726,12 +2740,13 @@ public class KafkaProducerTest {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
KafkaProducer<String, String> producer = new KafkaProducer<>(
props, new StringSerializer(), new StringSerializer());
KafkaMetric existingMetric = (KafkaMetric) producer.metrics().entrySet().iterator().next().getValue();
producer.unregisterMetricFromSubscription(existingMetric);
// This test would fail without the check as the exising metric is registered in the consumer on startup
Mockito.verify(clientTelemetryReporter, never()).metricRemoval(existingMetric);
try (KafkaProducer<String, String> producer = new KafkaProducer<>(
props, new StringSerializer(), new StringSerializer())) {
KafkaMetric existingMetric = (KafkaMetric) producer.metrics().entrySet().iterator().next().getValue();
producer.unregisterMetricFromSubscription(existingMetric);
// This test would fail without the check as the exising metric is registered in the consumer on startup
Mockito.verify(clientTelemetryReporter, never()).metricRemoval(existingMetric);
}
}
}

View File

@ -70,6 +70,7 @@ import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import static java.util.Arrays.asList;
import static org.junit.jupiter.api.Assertions.assertEquals;
@ -153,6 +154,28 @@ public class TestUtils {
return new MetadataSnapshot("kafka-cluster", nodesById, partsMetadatas, Collections.emptySet(), Collections.emptySet(), Collections.emptySet(), null, Collections.emptyMap());
}
/**
* Asserts that there are no leaked threads with a specified name prefix and daemon status.
* This method checks all threads in the JVM, filters them by the provided thread name prefix
* and daemon status, and verifies that no matching threads are alive.
* If any matching threads are found, the test will fail.
*
* @param threadName The prefix of the thread names to check. Only threads whose names
* start with this prefix will be considered.
* @param isDaemon The daemon status to check. Only threads with the specified
* daemon status (either true for daemon threads or false for non-daemon threads)
* will be considered.
*
* @throws AssertionError If any thread with the specified name prefix and daemon status is found and is alive.
*/
public static void assertNoLeakedThreadsWithNameAndDaemonStatus(String threadName, boolean isDaemon) {
List<Thread> threads = Thread.getAllStackTraces().keySet().stream()
.filter(t -> t.isDaemon() == isDaemon && t.isAlive() && t.getName().startsWith(threadName))
.collect(Collectors.toList());
int threadCount = threads.size();
assertEquals(0, threadCount);
}
/**
* Test utility function to get MetadataSnapshot of cluster with configured, and 0 partitions.
* @param nodes number of nodes in the cluster.