diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java index 465c45c6150..6ad4e4afc6e 100644 --- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java @@ -30,6 +30,7 @@ import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.metrics.MetricsReporter; import org.apache.kafka.common.metrics.Sensor.RecordingLevel; import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.common.utils.LogCaptureAppender; import org.apache.kafka.common.utils.MockTime; @@ -138,7 +139,7 @@ import static org.mockito.Mockito.when; public class KafkaStreamsTest { private static final int NUM_THREADS = 2; - private static final String APPLICATION_ID = "appId"; + private static final String APPLICATION_ID = "appId-"; private static final String CLIENT_ID = "test-client"; private static final Duration DEFAULT_DURATION = Duration.ofSeconds(30); @@ -180,14 +181,14 @@ public class KafkaStreamsTest { } @BeforeEach - public void before() throws Exception { + public void before(final TestInfo testInfo) throws Exception { time = new MockTime(); supplier = new MockClientSupplier(); supplier.setCluster(Cluster.bootstrap(singletonList(new InetSocketAddress("localhost", 9999)))); adminClient = (MockAdminClient) supplier.getAdmin(null); streamsStateListener = new StateListenerStub(); props = new Properties(); - props.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_ID); + props.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_ID + safeUniqueTestName(testInfo)); props.put(StreamsConfig.CLIENT_ID_CONFIG, CLIENT_ID); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:2018"); props.put(StreamsConfig.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName()); @@ -851,13 +852,14 @@ public class KafkaStreamsTest { prepareStreams(); prepareStreamThread(streamThreadOne, 1); prepareStreamThread(streamThreadTwo, 2); - final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time); - try { - streams.cleanUp(); - streams.start(); - } finally { - streams.close(); - streams.cleanUp(); + try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) { + try { + streams.cleanUp(); + streams.start(); + } finally { + streams.close(); + streams.cleanUp(); + } } } @@ -913,19 +915,19 @@ public class KafkaStreamsTest { prepareThreadState(streamThreadOne, state1); prepareThreadState(streamThreadTwo, state2); prepareTerminableThread(streamThreadOne); - final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time); - streams.start(); - waitForCondition( - () -> streams.state() == KafkaStreams.State.RUNNING, - "Streams never started."); + try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) { + streams.start(); + waitForCondition( + () -> streams.state() == KafkaStreams.State.RUNNING, + "Streams never started."); - streams.close(Duration.ZERO); - assertThat(streams.state() == State.PENDING_SHUTDOWN, equalTo(true)); - assertThrows(IllegalStateException.class, streams::cleanUp); - assertThat(streams.state() == State.PENDING_SHUTDOWN, equalTo(true)); + streams.close(Duration.ZERO); + assertThat(streams.state() == State.PENDING_SHUTDOWN, equalTo(true)); + assertThrows(IllegalStateException.class, streams::cleanUp); + assertThat(streams.state() == State.PENDING_SHUTDOWN, equalTo(true)); + } } - @SuppressWarnings("unchecked") @Test public void shouldThrowOnCleanupWhileShuttingDownStreamClosedWithCloseOptionLeaveGroupFalse() throws Exception { prepareStreams(); @@ -963,19 +965,20 @@ public class KafkaStreamsTest { prepareThreadState(streamThreadOne, state1); prepareThreadState(streamThreadTwo, state2); prepareTerminableThread(streamThreadOne); - final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time); - streams.start(); - waitForCondition( + try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) { + streams.start(); + waitForCondition( () -> streams.state() == KafkaStreams.State.RUNNING, "Streams never started."); - final KafkaStreams.CloseOptions closeOptions = new KafkaStreams.CloseOptions(); - closeOptions.timeout(Duration.ZERO); + final KafkaStreams.CloseOptions closeOptions = new KafkaStreams.CloseOptions(); + closeOptions.timeout(Duration.ZERO); - streams.close(closeOptions); - assertThat(streams.state() == State.PENDING_SHUTDOWN, equalTo(true)); - assertThrows(IllegalStateException.class, streams::cleanUp); - assertThat(streams.state() == State.PENDING_SHUTDOWN, equalTo(true)); + streams.close(closeOptions); + assertThat(streams.state() == State.PENDING_SHUTDOWN, equalTo(true)); + assertThrows(IllegalStateException.class, streams::cleanUp); + assertThat(streams.state() == State.PENDING_SHUTDOWN, equalTo(true)); + } } @Test @@ -1020,12 +1023,12 @@ public class KafkaStreamsTest { prepareThreadState(streamThreadOne, state1); prepareThreadState(streamThreadTwo, state2); try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) { - assertThrows(StreamsNotStartedException.class, () -> streams.queryMetadataForKey("store", "key", Serdes.String().serializer())); + assertThrows(StreamsNotStartedException.class, () -> streams.queryMetadataForKey("store", "key", new StringSerializer())); streams.start(); waitForApplicationState(Collections.singletonList(streams), KafkaStreams.State.RUNNING, DEFAULT_DURATION); streams.close(); waitForApplicationState(Collections.singletonList(streams), KafkaStreams.State.NOT_RUNNING, DEFAULT_DURATION); - assertThrows(IllegalStateException.class, () -> streams.queryMetadataForKey("store", "key", Serdes.String().serializer())); + assertThrows(IllegalStateException.class, () -> streams.queryMetadataForKey("store", "key", new StringSerializer())); } } @@ -1037,7 +1040,7 @@ public class KafkaStreamsTest { try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) { streams.start(); - assertEquals(KeyQueryMetadata.NOT_AVAILABLE, streams.queryMetadataForKey("store", "key", Serdes.String().serializer())); + assertEquals(KeyQueryMetadata.NOT_AVAILABLE, streams.queryMetadataForKey("store", "key", new StringSerializer())); } } @@ -1191,7 +1194,6 @@ public class KafkaStreamsTest { } } - @SuppressWarnings("unchecked") @Test public void shouldReturnFalseOnCloseWithCloseOptionWithLeaveGroupTrueWhenThreadsHaventTerminated() throws Exception { prepareStreams(); @@ -1229,7 +1231,6 @@ public class KafkaStreamsTest { } } - @SuppressWarnings("unchecked") @Test public void shouldNotBlockInCloseWithCloseOptionLeaveGroupTrueForZeroDuration() throws Exception { prepareStreams(); @@ -1291,7 +1292,9 @@ public class KafkaStreamsTest { final StreamsConfig mockConfig = spy(config); when(mockConfig.getKafkaClientSupplier()).thenReturn(supplier); - new KafkaStreams(getBuilderWithSource().build(), mockConfig); + try (final KafkaStreams ignored = new KafkaStreams(getBuilderWithSource().build(), mockConfig)) { + // no-op + } // It's called once in above when mock verify(mockConfig, times(2)).getKafkaClientSupplier(); } @@ -1308,7 +1311,9 @@ public class KafkaStreamsTest { final StreamsConfig mockConfig = spy(config); when(mockConfig.getKafkaClientSupplier()).thenReturn(supplier); - new KafkaStreams(getBuilderWithSource().build(), mockConfig, time); + try (final KafkaStreams ignored = new KafkaStreams(getBuilderWithSource().build(), mockConfig, time)) { + // no-op + } // It's called once in above when mock verify(mockConfig, times(2)).getKafkaClientSupplier(); } @@ -1324,7 +1329,9 @@ public class KafkaStreamsTest { final StreamsConfig config = new StreamsConfig(props); final StreamsConfig mockConfig = spy(config); - new KafkaStreams(getBuilderWithSource().build(), mockConfig, supplier); + try (final KafkaStreams ignored = new KafkaStreams(getBuilderWithSource().build(), mockConfig, supplier)) { + // no-op + } // It's called once in above when mock verify(mockConfig, times(0)).getKafkaClientSupplier(); } @@ -1389,7 +1396,7 @@ public class KafkaStreamsTest { final String inputTopic = safeTestName + "-input"; final String outputTopic = safeTestName + "-output"; final Topology topology = new Topology(); - topology.addSource("source", Serdes.String().deserializer(), Serdes.String().deserializer(), inputTopic) + topology.addSource("source", new StringDeserializer(), new StringDeserializer(), inputTopic) .addProcessor("process", () -> new Processor() { private ProcessorContext context; @@ -1444,8 +1451,7 @@ public class KafkaStreamsTest { @Test public void shouldThrowTopologyExceptionOnEmptyTopology() { prepareStreams(); - try { - new KafkaStreams(new StreamsBuilder().build(), props, supplier, time); + try (final KafkaStreams ignored = new KafkaStreams(new StreamsBuilder().build(), props, supplier, time)) { fail("Should have thrown TopologyException"); } catch (final TopologyException e) { assertThat( @@ -1831,7 +1837,7 @@ public class KafkaStreamsTest { Serdes.String(), Serdes.Long()); final Topology topology = new Topology(); - topology.addSource("source", Serdes.String().deserializer(), Serdes.String().deserializer(), inputTopic) + topology.addSource("source", new StringDeserializer(), new StringDeserializer(), inputTopic) .addProcessor("process", () -> new Processor() { private ProcessorContext context; @@ -1858,8 +1864,8 @@ public class KafkaStreamsTest { topology.addGlobalStore( globalStoreBuilder, "global", - Serdes.String().deserializer(), - Serdes.String().deserializer(), + new StringDeserializer(), + new StringDeserializer(), globalTopicName, globalTopicName + "-processor", new MockProcessorSupplier<>());