diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinMultiIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinMultiIntegrationTest.java index 82f6ffae188..08f4fe5cfc1 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinMultiIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinMultiIntegrationTest.java @@ -42,14 +42,13 @@ import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.utils.UniqueTopicSerdeScope; import org.apache.kafka.test.IntegrationTest; import org.apache.kafka.test.TestUtils; -import org.junit.After; -import org.junit.AfterClass; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Rule; -import org.junit.Test; import org.junit.experimental.categories.Category; -import org.junit.rules.Timeout; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; import java.io.IOException; import java.util.Collections; @@ -62,12 +61,11 @@ import java.util.function.Function; import static java.time.Duration.ofSeconds; import static java.util.Arrays.asList; import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.startApplicationAndWaitUntilRunning; -import static org.junit.Assert.assertEquals; +import static org.junit.jupiter.api.Assertions.assertEquals; +@Timeout(600) @Category({IntegrationTest.class}) public class KTableKTableForeignKeyInnerJoinMultiIntegrationTest { - @Rule - public Timeout globalTimeout = Timeout.seconds(600); private final static int NUM_BROKERS = 1; public final static EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS); @@ -88,7 +86,7 @@ public class KTableKTableForeignKeyInnerJoinMultiIntegrationTest { private final static Properties PRODUCER_CONFIG_2 = new Properties(); private final static Properties PRODUCER_CONFIG_3 = new Properties(); - @BeforeClass + @BeforeAll public static void startCluster() throws IOException, InterruptedException { CLUSTER.start(); //Use multiple partitions to ensure distribution of keys. @@ -149,12 +147,12 @@ public class KTableKTableForeignKeyInnerJoinMultiIntegrationTest { CONSUMER_CONFIG.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); } - @AfterClass + @AfterAll public static void closeCluster() { CLUSTER.stop(); } - @Before + @BeforeEach public void before() throws IOException { final String stateDirBasePath = TestUtils.tempDirectory().getPath(); streamsConfig.put(StreamsConfig.STATE_DIR_CONFIG, stateDirBasePath + "-1"); @@ -162,7 +160,7 @@ public class KTableKTableForeignKeyInnerJoinMultiIntegrationTest { streamsConfigThree.put(StreamsConfig.STATE_DIR_CONFIG, stateDirBasePath + "-3"); } - @After + @AfterEach public void after() throws IOException { if (streams != null) { streams.close(); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinDistributedTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinDistributedTest.java index c95b37ae855..ff5b67965f6 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinDistributedTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinDistributedTest.java @@ -34,15 +34,14 @@ import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.ValueJoiner; import org.apache.kafka.test.IntegrationTest; import org.apache.kafka.test.TestUtils; -import org.junit.After; -import org.junit.AfterClass; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Rule; -import org.junit.Test; import org.junit.experimental.categories.Category; -import org.junit.rules.TestName; -import org.junit.rules.Timeout; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInfo; +import org.junit.jupiter.api.Timeout; import java.io.IOException; import java.util.Arrays; @@ -55,35 +54,30 @@ import java.util.function.Predicate; import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.quietlyCleanStateAfterTest; import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName; -import static org.junit.Assert.assertEquals; +import static org.junit.jupiter.api.Assertions.assertEquals; +@Timeout(600) @Category({IntegrationTest.class}) public class KTableKTableForeignKeyJoinDistributedTest { - @Rule - public Timeout globalTimeout = Timeout.seconds(600); private static final int NUM_BROKERS = 1; private static final String LEFT_TABLE = "left_table"; private static final String RIGHT_TABLE = "right_table"; private static final String OUTPUT = "output-topic"; public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS); - @BeforeClass + @BeforeAll public static void startCluster() throws IOException, InterruptedException { CLUSTER.start(); CLUSTER.createTopic(INPUT_TOPIC, 2, 1); } - @AfterClass + @AfterAll public static void closeCluster() { CLUSTER.stop(); } private static final Properties CONSUMER_CONFIG = new Properties(); - @Rule - public TestName testName = new TestName(); - - private static final String INPUT_TOPIC = "input-topic"; private KafkaStreams client1; @@ -92,7 +86,7 @@ public class KTableKTableForeignKeyJoinDistributedTest { private volatile boolean client1IsOk = false; private volatile boolean client2IsOk = false; - @Before + @BeforeEach public void setupTopics() throws InterruptedException { CLUSTER.createTopic(LEFT_TABLE, 1, 1); CLUSTER.createTopic(RIGHT_TABLE, 1, 1); @@ -125,7 +119,7 @@ public class KTableKTableForeignKeyJoinDistributedTest { CONSUMER_CONFIG.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); } - @After + @AfterEach public void after() { client1.close(); client2.close(); @@ -133,8 +127,8 @@ public class KTableKTableForeignKeyJoinDistributedTest { quietlyCleanStateAfterTest(CLUSTER, client2); } - public Properties getStreamsConfiguration() { - final String safeTestName = safeUniqueTestName(getClass(), testName); + public Properties getStreamsConfiguration(final TestInfo testInfo) { + final String safeTestName = safeUniqueTestName(getClass(), testInfo); final Properties streamsConfiguration = new Properties(); streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + safeTestName); streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); @@ -164,9 +158,9 @@ public class KTableKTableForeignKeyJoinDistributedTest { } @Test - public void shouldBeInitializedWithDefaultSerde() throws Exception { - final Properties streamsConfiguration1 = getStreamsConfiguration(); - final Properties streamsConfiguration2 = getStreamsConfiguration(); + public void shouldBeInitializedWithDefaultSerde(final TestInfo testInfo) throws Exception { + final Properties streamsConfiguration1 = getStreamsConfiguration(testInfo); + final Properties streamsConfiguration2 = getStreamsConfiguration(testInfo); //Each streams client needs to have it's own StreamsBuilder in order to simulate //a truly distributed run diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/LagFetchIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/LagFetchIntegrationTest.java index 3b7251d6da1..363fed3b8e6 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/LagFetchIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/LagFetchIntegrationTest.java @@ -39,15 +39,14 @@ import org.apache.kafka.streams.processor.internals.StreamThread; import org.apache.kafka.streams.processor.internals.assignment.FallbackPriorTaskAssignor; import org.apache.kafka.test.IntegrationTest; import org.apache.kafka.test.TestUtils; -import org.junit.After; -import org.junit.AfterClass; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Rule; -import org.junit.Test; import org.junit.experimental.categories.Category; -import org.junit.rules.TestName; -import org.junit.rules.Timeout; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInfo; +import org.junit.jupiter.api.Timeout; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -73,20 +72,19 @@ import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.sa import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.startApplicationAndWaitUntilRunning; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.core.IsEqual.equalTo; -import static org.junit.Assert.assertTrue; +import static org.junit.jupiter.api.Assertions.assertTrue; +@Timeout(600) @Category({IntegrationTest.class}) public class LagFetchIntegrationTest { - @Rule - public Timeout globalTimeout = Timeout.seconds(600); public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1); - @BeforeClass + @BeforeAll public static void startCluster() throws IOException { CLUSTER.start(); } - @AfterClass + @AfterAll public static void closeCluster() { CLUSTER.stop(); } @@ -102,12 +100,9 @@ public class LagFetchIntegrationTest { private String outputTopicName; private String stateStoreName; - @Rule - public TestName testName = new TestName(); - - @Before - public void before() { - final String safeTestName = safeUniqueTestName(getClass(), testName); + @BeforeEach + public void before(final TestInfo testInfo) { + final String safeTestName = safeUniqueTestName(getClass(), testInfo); inputTopicName = "input-topic-" + safeTestName; outputTopicName = "output-topic-" + safeTestName; stateStoreName = "lagfetch-test-store" + safeTestName; @@ -128,7 +123,7 @@ public class LagFetchIntegrationTest { consumerConfiguration.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName()); } - @After + @AfterEach public void shutdown() throws Exception { IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration); } @@ -318,7 +313,7 @@ public class LagFetchIntegrationTest { IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration); Files.walk(stateDir.toPath()).sorted(Comparator.reverseOrder()) .map(Path::toFile) - .forEach(f -> assertTrue("Some state " + f + " could not be deleted", f.delete())); + .forEach(f -> assertTrue(f.delete(), "Some state " + f + " could not be deleted")); // wait till the lag goes down to 0 final KafkaStreams restartedStreams = new KafkaStreams(builder.build(), props); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java index 5fb5188e5f2..8821a0cebec 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java @@ -43,16 +43,14 @@ import org.apache.kafka.streams.state.Stores; import org.apache.kafka.streams.state.WindowStore; import org.apache.kafka.test.IntegrationTest; import org.apache.kafka.test.TestUtils; -import org.junit.After; -import org.junit.AfterClass; -import org.junit.Assert; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Rule; -import org.junit.Test; import org.junit.experimental.categories.Category; -import org.junit.rules.TestName; -import org.junit.rules.Timeout; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInfo; +import org.junit.jupiter.api.Timeout; import java.io.IOException; import java.time.Duration; @@ -65,23 +63,24 @@ import java.util.stream.Collectors; import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +@Timeout(600) @Category({IntegrationTest.class}) @SuppressWarnings("deprecation") public class MetricsIntegrationTest { - @Rule - public Timeout globalTimeout = Timeout.seconds(600); private static final int NUM_BROKERS = 1; private static final int NUM_THREADS = 2; public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS); - @BeforeClass + @BeforeAll public static void startCluster() throws IOException { CLUSTER.start(); } - @AfterClass + @AfterAll public static void closeCluster() { CLUSTER.stop(); } @@ -242,15 +241,12 @@ public class MetricsIntegrationTest { private String appId; - @Rule - public TestName testName = new TestName(); - - @Before - public void before() throws InterruptedException { + @BeforeEach + public void before(final TestInfo testInfo) throws InterruptedException { builder = new StreamsBuilder(); CLUSTER.createTopics(STREAM_INPUT, STREAM_OUTPUT_1, STREAM_OUTPUT_2, STREAM_OUTPUT_3, STREAM_OUTPUT_4); - final String safeTestName = safeUniqueTestName(getClass(), testName); + final String safeTestName = safeUniqueTestName(getClass(), testInfo); appId = "app-" + safeTestName; streamsConfiguration = new Properties(); @@ -263,7 +259,7 @@ public class MetricsIntegrationTest { streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); } - @After + @AfterEach public void after() throws InterruptedException { CLUSTER.deleteTopics(STREAM_INPUT, STREAM_OUTPUT_1, STREAM_OUTPUT_2, STREAM_OUTPUT_3, STREAM_OUTPUT_4); } @@ -752,9 +748,9 @@ public class MetricsIntegrationTest { final List metrics = listMetric.stream() .filter(m -> m.metricName().name().equals(metricName)) .collect(Collectors.toList()); - Assert.assertEquals("Size of metrics of type:'" + metricName + "' must be equal to " + numMetric + " but it's equal to " + metrics.size(), numMetric, metrics.size()); + assertEquals(numMetric, metrics.size(), "Size of metrics of type:'" + metricName + "' must be equal to " + numMetric + " but it's equal to " + metrics.size()); for (final Metric m : metrics) { - Assert.assertNotNull("Metric:'" + m.metricName() + "' must be not null", m.metricValue()); + assertNotNull(m.metricValue(), "Metric:'" + m.metricName() + "' must be not null"); } } } \ No newline at end of file diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/MetricsReporterIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/MetricsReporterIntegrationTest.java index e07a8886900..d9441d8ca69 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/MetricsReporterIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/MetricsReporterIntegrationTest.java @@ -28,14 +28,13 @@ import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.kstream.Produced; import org.apache.kafka.test.IntegrationTest; -import org.junit.AfterClass; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Rule; -import org.junit.Test; import org.junit.experimental.categories.Category; -import org.junit.rules.TestName; -import org.junit.rules.Timeout; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInfo; +import org.junit.jupiter.api.Timeout; import java.io.IOException; import java.util.HashMap; @@ -47,10 +46,9 @@ import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.sa import static org.hamcrest.CoreMatchers.notNullValue; import static org.hamcrest.MatcherAssert.assertThat; +@Timeout(600) @Category({IntegrationTest.class}) public class MetricsReporterIntegrationTest { - @Rule - public Timeout globalTimeout = Timeout.seconds(600); private static final int NUM_BROKERS = 1; public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS); @@ -62,24 +60,21 @@ public class MetricsReporterIntegrationTest { private StreamsBuilder builder; private Properties streamsConfiguration; - @Rule - public TestName testName = new TestName(); - - @BeforeClass + @BeforeAll public static void startCluster() throws IOException { CLUSTER.start(); } - @AfterClass + @AfterAll public static void closeCluster() { CLUSTER.stop(); } - @Before - public void before() throws InterruptedException { + @BeforeEach + public void before(final TestInfo testInfo) throws InterruptedException { builder = new StreamsBuilder(); - final String safeTestName = safeUniqueTestName(getClass(), testName); + final String safeTestName = safeUniqueTestName(getClass(), testInfo); final String appId = "app-" + safeTestName; streamsConfiguration = new Properties(); @@ -116,7 +111,6 @@ public class MetricsReporterIntegrationTest { @Override public void close() { } - } @Test diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java index d1659cbc9d4..9d4f270ed38 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java @@ -65,15 +65,14 @@ import java.util.HashMap; import java.util.LinkedList; import java.util.Map; import java.util.Queue; -import org.junit.After; -import org.junit.AfterClass; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Rule; -import org.junit.Test; import org.junit.experimental.categories.Category; -import org.junit.rules.TestName; -import org.junit.rules.Timeout; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInfo; +import org.junit.jupiter.api.Timeout; import java.time.Duration; import java.util.Collection; @@ -105,10 +104,9 @@ import static java.util.Arrays.asList; import static java.util.Collections.singleton; import static java.util.Collections.singletonList; +@Timeout(600) @Category(IntegrationTest.class) public class NamedTopologyIntegrationTest { - @Rule - public Timeout globalTimeout = Timeout.seconds(600); private static final Duration STARTUP_TIMEOUT = Duration.ofSeconds(45); public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1); @@ -149,7 +147,7 @@ public class NamedTopologyIntegrationTest { private static Properties producerConfig; private static Properties consumerConfig; - @BeforeClass + @BeforeAll public static void initializeClusterAndStandardTopics() throws Exception { CLUSTER.start(); @@ -165,13 +163,11 @@ public class NamedTopologyIntegrationTest { produceToInputTopics(INPUT_STREAM_3, STANDARD_INPUT_DATA); } - @AfterClass + @AfterAll public static void closeCluster() { CLUSTER.stop(); } - @Rule - public final TestName testName = new TestName(); private String appId; private String changelog1; private String changelog2; @@ -219,9 +215,9 @@ public class NamedTopologyIntegrationTest { return streamsConfiguration; } - @Before - public void setup() throws Exception { - appId = safeUniqueTestName(NamedTopologyIntegrationTest.class, testName); + @BeforeEach + public void setup(final TestInfo testInfo) throws Exception { + appId = safeUniqueTestName(NamedTopologyIntegrationTest.class, testInfo); changelog1 = TOPIC_PREFIX + "-" + TOPOLOGY_1 + "-store-changelog"; changelog2 = TOPIC_PREFIX + "-" + TOPOLOGY_2 + "-store-changelog"; changelog3 = TOPIC_PREFIX + "-" + TOPOLOGY_3 + "-store-changelog"; @@ -246,7 +242,7 @@ public class NamedTopologyIntegrationTest { topology2Builder2 = streams2.newNamedTopologyBuilder(TOPOLOGY_2); } - @After + @AfterEach public void shutdown() throws Exception { if (streams != null) { streams.close(Duration.ofSeconds(30)); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/OptimizedKTableIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/OptimizedKTableIntegrationTest.java index 2e709af0b11..5ebe2d27575 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/OptimizedKTableIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/OptimizedKTableIntegrationTest.java @@ -55,22 +55,20 @@ import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; import org.apache.kafka.test.IntegrationTest; import org.apache.kafka.test.NoRetryException; import org.apache.kafka.test.TestUtils; -import org.junit.After; -import org.junit.AfterClass; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Rule; -import org.junit.Test; import org.junit.experimental.categories.Category; -import org.junit.rules.TestName; -import org.junit.rules.Timeout; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInfo; +import org.junit.jupiter.api.Timeout; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +@Timeout(600) @Category(IntegrationTest.class) public class OptimizedKTableIntegrationTest { - @Rule - public Timeout globalTimeout = Timeout.seconds(600); private static final Logger LOG = LoggerFactory.getLogger(OptimizedKTableIntegrationTest.class); private static final int NUM_BROKERS = 1; private static int port = 0; @@ -79,29 +77,25 @@ public class OptimizedKTableIntegrationTest { public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS); - @BeforeClass + @BeforeAll public static void startCluster() throws IOException { CLUSTER.start(); } - @AfterClass + @AfterAll public static void closeCluster() { CLUSTER.stop(); } - - @Rule - public final TestName testName = new TestName(); - private final List streamsToCleanup = new ArrayList<>(); private final MockTime mockTime = CLUSTER.time; - @Before + @BeforeEach public void before() throws InterruptedException { CLUSTER.createTopic(INPUT_TOPIC_NAME, 2, 1); } - @After + @AfterEach public void after() { for (final KafkaStreams kafkaStreams : streamsToCleanup) { kafkaStreams.close(); @@ -109,7 +103,7 @@ public class OptimizedKTableIntegrationTest { } @Test - public void shouldApplyUpdatesToStandbyStore() throws Exception { + public void shouldApplyUpdatesToStandbyStore(final TestInfo testInfo) throws Exception { final int batch1NumMessages = 100; final int batch2NumMessages = 100; final int key = 1; @@ -123,8 +117,8 @@ public class OptimizedKTableIntegrationTest { .toStream() .peek((k, v) -> semaphore.release()); - final KafkaStreams kafkaStreams1 = createKafkaStreams(builder, streamsConfiguration()); - final KafkaStreams kafkaStreams2 = createKafkaStreams(builder, streamsConfiguration()); + final KafkaStreams kafkaStreams1 = createKafkaStreams(builder, streamsConfiguration(testInfo)); + final KafkaStreams kafkaStreams2 = createKafkaStreams(builder, streamsConfiguration(testInfo)); final List kafkaStreamsList = Arrays.asList(kafkaStreams1, kafkaStreams2); startApplicationAndWaitUntilRunning(kafkaStreamsList, Duration.ofSeconds(60)); @@ -201,8 +195,8 @@ public class OptimizedKTableIntegrationTest { return streams; } - private Properties streamsConfiguration() { - final String safeTestName = safeUniqueTestName(getClass(), testName); + private Properties streamsConfiguration(final TestInfo testInfo) { + final String safeTestName = safeUniqueTestName(getClass(), testInfo); final Properties config = new Properties(); config.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.OPTIMIZE); config.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + safeTestName); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/PurgeRepartitionTopicIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/PurgeRepartitionTopicIntegrationTest.java index 26720a00215..3b4db318eea 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/PurgeRepartitionTopicIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/PurgeRepartitionTopicIntegrationTest.java @@ -37,14 +37,13 @@ import org.apache.kafka.test.IntegrationTest; import org.apache.kafka.test.MockMapper; import org.apache.kafka.test.TestCondition; import org.apache.kafka.test.TestUtils; -import org.junit.After; -import org.junit.AfterClass; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Rule; -import org.junit.Test; import org.junit.experimental.categories.Category; -import org.junit.rules.Timeout; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; import java.io.IOException; import java.time.Duration; @@ -55,10 +54,9 @@ import java.util.List; import java.util.Properties; import java.util.Set; +@Timeout(600) @Category({IntegrationTest.class}) public class PurgeRepartitionTopicIntegrationTest { - @Rule - public Timeout globalTimeout = Timeout.seconds(600); private static final int NUM_BROKERS = 1; private static final String INPUT_TOPIC = "input-stream"; @@ -77,13 +75,13 @@ public class PurgeRepartitionTopicIntegrationTest { } }); - @BeforeClass + @BeforeAll public static void startCluster() throws IOException, InterruptedException { CLUSTER.start(); CLUSTER.createTopic(INPUT_TOPIC, 1, 1); } - @AfterClass + @AfterAll public static void closeCluster() { CLUSTER.stop(); } @@ -154,7 +152,7 @@ public class PurgeRepartitionTopicIntegrationTest { } } - @Before + @BeforeEach public void setup() { // create admin client for verification final Properties adminConfig = new Properties(); @@ -181,7 +179,7 @@ public class PurgeRepartitionTopicIntegrationTest { kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration, time); } - @After + @AfterEach public void shutdown() { if (kafkaStreams != null) { kafkaStreams.close(Duration.ofSeconds(30)); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java index 82409af50f8..d0f04a1265e 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java @@ -61,15 +61,14 @@ import org.apache.kafka.test.MockMapper; import org.apache.kafka.test.NoRetryException; import org.apache.kafka.test.TestUtils; import org.hamcrest.Matchers; -import org.junit.After; -import org.junit.AfterClass; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Rule; -import org.junit.Test; import org.junit.experimental.categories.Category; -import org.junit.rules.TestName; -import org.junit.rules.Timeout; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInfo; +import org.junit.jupiter.api.Timeout; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -116,15 +115,14 @@ import static org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.is; import static org.hamcrest.core.IsEqual.equalTo; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertThrows; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +@Timeout(600) @Category({IntegrationTest.class}) @SuppressWarnings("deprecation") public class QueryableStateIntegrationTest { - @Rule - public Timeout globalTimeout = Timeout.seconds(600); private static final Logger log = LoggerFactory.getLogger(QueryableStateIntegrationTest.class); private static final long DEFAULT_TIMEOUT_MS = 120 * 1000; @@ -133,12 +131,12 @@ public class QueryableStateIntegrationTest { public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS); - @BeforeClass + @BeforeAll public static void startCluster() throws IOException { CLUSTER.start(); } - @AfterClass + @AfterAll public static void closeCluster() { CLUSTER.stop(); } @@ -164,8 +162,8 @@ public class QueryableStateIntegrationTest { private Comparator> stringComparator; private Comparator> stringLongComparator; - private void createTopics() throws Exception { - final String safeTestName = safeUniqueTestName(getClass(), testName); + private void createTopics(final TestInfo testInfo) throws Exception { + final String safeTestName = safeUniqueTestName(getClass(), testInfo); streamOne = streamOne + "-" + safeTestName; streamConcurrent = streamConcurrent + "-" + safeTestName; streamThree = streamThree + "-" + safeTestName; @@ -216,14 +214,11 @@ public class QueryableStateIntegrationTest { return input; } - @Rule - public TestName testName = new TestName(); - - @Before - public void before() throws Exception { - createTopics(); + @BeforeEach + public void before(final TestInfo testInfo) throws Exception { + createTopics(testInfo); streamsConfiguration = new Properties(); - final String safeTestName = safeUniqueTestName(getClass(), testName); + final String safeTestName = safeUniqueTestName(getClass(), testInfo); streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + safeTestName); streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); @@ -244,7 +239,7 @@ public class QueryableStateIntegrationTest { } } - @After + @AfterEach public void shutdown() throws Exception { if (kafkaStreams != null) { kafkaStreams.close(ofSeconds(30)); @@ -450,8 +445,8 @@ public class QueryableStateIntegrationTest { } @Test - public void shouldRejectNonExistentStoreName() throws InterruptedException { - final String uniqueTestName = safeUniqueTestName(getClass(), testName); + public void shouldRejectNonExistentStoreName(final TestInfo testInfo) throws InterruptedException { + final String uniqueTestName = safeUniqueTestName(getClass(), testInfo); final String input = uniqueTestName + "-input"; final String storeName = uniqueTestName + "-input-table"; @@ -465,7 +460,7 @@ public class QueryableStateIntegrationTest { ); final Properties properties = mkProperties(mkMap( - mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, safeUniqueTestName(getClass(), testName)), + mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, safeUniqueTestName(getClass(), testInfo)), mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()) )); @@ -488,8 +483,8 @@ public class QueryableStateIntegrationTest { } @Test - public void shouldRejectWronglyTypedStore() throws InterruptedException { - final String uniqueTestName = safeUniqueTestName(getClass(), testName); + public void shouldRejectWronglyTypedStore(final TestInfo testInfo) throws InterruptedException { + final String uniqueTestName = safeUniqueTestName(getClass(), testInfo); final String input = uniqueTestName + "-input"; final String storeName = uniqueTestName + "-input-table"; diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java index 1a22c90fcf1..4dab0f3baec 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java @@ -47,15 +47,14 @@ import org.apache.kafka.test.MockKeyValueStoreBuilder; import org.apache.kafka.test.StreamsTestUtils; import org.apache.kafka.test.TestCondition; import org.apache.kafka.test.TestUtils; -import org.junit.After; -import org.junit.AfterClass; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Rule; -import org.junit.Test; import org.junit.experimental.categories.Category; -import org.junit.rules.TestName; -import org.junit.rules.Timeout; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInfo; +import org.junit.jupiter.api.Timeout; import java.io.IOException; import java.time.Duration; @@ -81,14 +80,13 @@ import static org.hamcrest.Matchers.greaterThan; * End-to-end integration test based on using regex and named topics for creating sources, using * an embedded Kafka cluster. */ +@Timeout(600) @Category({IntegrationTest.class}) public class RegexSourceIntegrationTest { - @Rule - public Timeout globalTimeout = Timeout.seconds(600); private static final int NUM_BROKERS = 1; public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS); - @BeforeClass + @BeforeAll public static void startCluster() throws IOException, InterruptedException { CLUSTER.start(); CLUSTER.createTopics( @@ -104,7 +102,7 @@ public class RegexSourceIntegrationTest { CLUSTER.createTopic(PARTITIONED_TOPIC_2, 2, 1); } - @AfterClass + @AfterAll public static void closeCluster() { CLUSTER.stop(); } @@ -129,8 +127,8 @@ public class RegexSourceIntegrationTest { private static volatile AtomicInteger topicSuffixGenerator = new AtomicInteger(0); private String outputTopic; - @Before - public void setUp() throws InterruptedException { + @BeforeEach + public void setUp(final TestInfo testInfo) throws InterruptedException { outputTopic = createTopic(topicSuffixGenerator.incrementAndGet()); final Properties properties = new Properties(); properties.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0); @@ -141,7 +139,7 @@ public class RegexSourceIntegrationTest { properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10000); streamsConfiguration = StreamsTestUtils.getStreamsConfig( - IntegrationTestUtils.safeUniqueTestName(RegexSourceIntegrationTest.class, new TestName()), + IntegrationTestUtils.safeUniqueTestName(RegexSourceIntegrationTest.class, testInfo), CLUSTER.bootstrapServers(), STRING_SERDE_CLASSNAME, STRING_SERDE_CLASSNAME, @@ -149,7 +147,7 @@ public class RegexSourceIntegrationTest { ); } - @After + @AfterEach public void tearDown() throws IOException { if (streams != null) { streams.close(); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.java index 9f745072ef2..ee7cbe462b3 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.java @@ -25,11 +25,10 @@ import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; import org.apache.kafka.streams.tests.SmokeTestClient; import org.apache.kafka.streams.tests.SmokeTestDriver; -import org.junit.Assert; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.Timeout; import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Timeout; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.MethodSource; @@ -42,11 +41,11 @@ import java.util.Set; import static org.apache.kafka.streams.tests.SmokeTestDriver.generate; import static org.apache.kafka.streams.tests.SmokeTestDriver.verify; +import static org.junit.jupiter.api.Assertions.assertTrue; @Timeout(600) @Tag("integration") public class SmokeTestDriverIntegrationTest { - public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(3); @BeforeAll @@ -175,6 +174,6 @@ public class SmokeTestDriverIntegrationTest { driver.exception().printStackTrace(); throw new AssertionError(driver.exception()); } - Assert.assertTrue(driver.result().result(), driver.result().passed()); + assertTrue(driver.result().passed(), driver.result().result()); } } diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskCreationIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskCreationIntegrationTest.java index d59a7160040..d84d267b448 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskCreationIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskCreationIntegrationTest.java @@ -34,14 +34,13 @@ import org.apache.kafka.streams.state.StoreBuilder; import org.apache.kafka.streams.state.Stores; import org.apache.kafka.test.IntegrationTest; import org.apache.kafka.test.TestUtils; -import org.junit.After; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.Rule; -import org.junit.Test; import org.junit.experimental.categories.Category; -import org.junit.rules.TestName; -import org.junit.rules.Timeout; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInfo; +import org.junit.jupiter.api.Timeout; import java.io.IOException; import java.util.Properties; @@ -49,28 +48,24 @@ import java.util.function.Predicate; import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName; +@Timeout(600) @Category({IntegrationTest.class}) public class StandbyTaskCreationIntegrationTest { - @Rule - public Timeout globalTimeout = Timeout.seconds(600); private static final int NUM_BROKERS = 1; public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS); - @BeforeClass + @BeforeAll public static void startCluster() throws IOException, InterruptedException { CLUSTER.start(); CLUSTER.createTopic(INPUT_TOPIC, 2, 1); } - @AfterClass + @AfterAll public static void closeCluster() { CLUSTER.stop(); } - @Rule - public TestName testName = new TestName(); - private static final String INPUT_TOPIC = "input-topic"; private KafkaStreams client1; @@ -78,14 +73,14 @@ public class StandbyTaskCreationIntegrationTest { private volatile boolean client1IsOk = false; private volatile boolean client2IsOk = false; - @After + @AfterEach public void after() { client1.close(); client2.close(); } - private Properties streamsConfiguration() { - final String safeTestName = safeUniqueTestName(getClass(), testName); + private Properties streamsConfiguration(final TestInfo testInfo) { + final String safeTestName = safeUniqueTestName(getClass(), testInfo); final Properties streamsConfiguration = new Properties(); streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + safeTestName); streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); @@ -98,7 +93,7 @@ public class StandbyTaskCreationIntegrationTest { @Test @SuppressWarnings("deprecation") - public void shouldNotCreateAnyStandByTasksForStateStoreWithLoggingDisabled() throws Exception { + public void shouldNotCreateAnyStandByTasksForStateStoreWithLoggingDisabled(final TestInfo testInfo) throws Exception { final StreamsBuilder builder = new StreamsBuilder(); final String stateStoreName = "myTransformState"; final StoreBuilder> keyValueStoreBuilder = @@ -121,7 +116,7 @@ public class StandbyTaskCreationIntegrationTest { }, stateStoreName); final Topology topology = builder.build(); - createClients(topology, streamsConfiguration(), topology, streamsConfiguration()); + createClients(topology, streamsConfiguration(testInfo), topology, streamsConfiguration(testInfo)); setStateListenersForVerification(thread -> thread.standbyTasks().isEmpty() && !thread.activeTasks().isEmpty()); @@ -133,10 +128,10 @@ public class StandbyTaskCreationIntegrationTest { } @Test - public void shouldCreateStandByTasksForMaterializedAndOptimizedSourceTables() throws Exception { - final Properties streamsConfiguration1 = streamsConfiguration(); + public void shouldCreateStandByTasksForMaterializedAndOptimizedSourceTables(final TestInfo testInfo) throws Exception { + final Properties streamsConfiguration1 = streamsConfiguration(testInfo); streamsConfiguration1.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.OPTIMIZE); - final Properties streamsConfiguration2 = streamsConfiguration(); + final Properties streamsConfiguration2 = streamsConfiguration(testInfo); streamsConfiguration2.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.OPTIMIZE); final StreamsBuilder builder = new StreamsBuilder();