KAFKA-14003 Kafka Streams JUnit4 to JUnit5 migration part 2 (#12301)

This pull request addresses https://issues.apache.org/jira/browse/KAFKA-14003. It is the second of a series of pull requests which address the move of Kafka Streams tests from JUnit 4 to JUnit 5.

Reviewer: Bruno Cadonna <cadonna@apache.org>
This commit is contained in:
Christo Lolov 2023-01-11 10:26:48 +02:00 committed by GitHub
parent 8ac644d2b1
commit 78d4458b94
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 177 additions and 225 deletions

View File

@ -42,14 +42,13 @@ import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.utils.UniqueTopicSerdeScope;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.Timeout;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import java.io.IOException;
import java.util.Collections;
@ -62,12 +61,11 @@ import java.util.function.Function;
import static java.time.Duration.ofSeconds;
import static java.util.Arrays.asList;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.startApplicationAndWaitUntilRunning;
import static org.junit.Assert.assertEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
@Timeout(600)
@Category({IntegrationTest.class})
public class KTableKTableForeignKeyInnerJoinMultiIntegrationTest {
@Rule
public Timeout globalTimeout = Timeout.seconds(600);
private final static int NUM_BROKERS = 1;
public final static EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
@ -88,7 +86,7 @@ public class KTableKTableForeignKeyInnerJoinMultiIntegrationTest {
private final static Properties PRODUCER_CONFIG_2 = new Properties();
private final static Properties PRODUCER_CONFIG_3 = new Properties();
@BeforeClass
@BeforeAll
public static void startCluster() throws IOException, InterruptedException {
CLUSTER.start();
//Use multiple partitions to ensure distribution of keys.
@ -149,12 +147,12 @@ public class KTableKTableForeignKeyInnerJoinMultiIntegrationTest {
CONSUMER_CONFIG.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
}
@AfterClass
@AfterAll
public static void closeCluster() {
CLUSTER.stop();
}
@Before
@BeforeEach
public void before() throws IOException {
final String stateDirBasePath = TestUtils.tempDirectory().getPath();
streamsConfig.put(StreamsConfig.STATE_DIR_CONFIG, stateDirBasePath + "-1");
@ -162,7 +160,7 @@ public class KTableKTableForeignKeyInnerJoinMultiIntegrationTest {
streamsConfigThree.put(StreamsConfig.STATE_DIR_CONFIG, stateDirBasePath + "-3");
}
@After
@AfterEach
public void after() throws IOException {
if (streams != null) {
streams.close();

View File

@ -34,15 +34,14 @@ import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import org.junit.rules.Timeout;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.Timeout;
import java.io.IOException;
import java.util.Arrays;
@ -55,35 +54,30 @@ import java.util.function.Predicate;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.quietlyCleanStateAfterTest;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
import static org.junit.Assert.assertEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
@Timeout(600)
@Category({IntegrationTest.class})
public class KTableKTableForeignKeyJoinDistributedTest {
@Rule
public Timeout globalTimeout = Timeout.seconds(600);
private static final int NUM_BROKERS = 1;
private static final String LEFT_TABLE = "left_table";
private static final String RIGHT_TABLE = "right_table";
private static final String OUTPUT = "output-topic";
public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
@BeforeClass
@BeforeAll
public static void startCluster() throws IOException, InterruptedException {
CLUSTER.start();
CLUSTER.createTopic(INPUT_TOPIC, 2, 1);
}
@AfterClass
@AfterAll
public static void closeCluster() {
CLUSTER.stop();
}
private static final Properties CONSUMER_CONFIG = new Properties();
@Rule
public TestName testName = new TestName();
private static final String INPUT_TOPIC = "input-topic";
private KafkaStreams client1;
@ -92,7 +86,7 @@ public class KTableKTableForeignKeyJoinDistributedTest {
private volatile boolean client1IsOk = false;
private volatile boolean client2IsOk = false;
@Before
@BeforeEach
public void setupTopics() throws InterruptedException {
CLUSTER.createTopic(LEFT_TABLE, 1, 1);
CLUSTER.createTopic(RIGHT_TABLE, 1, 1);
@ -125,7 +119,7 @@ public class KTableKTableForeignKeyJoinDistributedTest {
CONSUMER_CONFIG.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
}
@After
@AfterEach
public void after() {
client1.close();
client2.close();
@ -133,8 +127,8 @@ public class KTableKTableForeignKeyJoinDistributedTest {
quietlyCleanStateAfterTest(CLUSTER, client2);
}
public Properties getStreamsConfiguration() {
final String safeTestName = safeUniqueTestName(getClass(), testName);
public Properties getStreamsConfiguration(final TestInfo testInfo) {
final String safeTestName = safeUniqueTestName(getClass(), testInfo);
final Properties streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + safeTestName);
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
@ -164,9 +158,9 @@ public class KTableKTableForeignKeyJoinDistributedTest {
}
@Test
public void shouldBeInitializedWithDefaultSerde() throws Exception {
final Properties streamsConfiguration1 = getStreamsConfiguration();
final Properties streamsConfiguration2 = getStreamsConfiguration();
public void shouldBeInitializedWithDefaultSerde(final TestInfo testInfo) throws Exception {
final Properties streamsConfiguration1 = getStreamsConfiguration(testInfo);
final Properties streamsConfiguration2 = getStreamsConfiguration(testInfo);
//Each streams client needs to have it's own StreamsBuilder in order to simulate
//a truly distributed run

View File

@ -39,15 +39,14 @@ import org.apache.kafka.streams.processor.internals.StreamThread;
import org.apache.kafka.streams.processor.internals.assignment.FallbackPriorTaskAssignor;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import org.junit.rules.Timeout;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -73,20 +72,19 @@ import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.sa
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.startApplicationAndWaitUntilRunning;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.IsEqual.equalTo;
import static org.junit.Assert.assertTrue;
import static org.junit.jupiter.api.Assertions.assertTrue;
@Timeout(600)
@Category({IntegrationTest.class})
public class LagFetchIntegrationTest {
@Rule
public Timeout globalTimeout = Timeout.seconds(600);
public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
@BeforeClass
@BeforeAll
public static void startCluster() throws IOException {
CLUSTER.start();
}
@AfterClass
@AfterAll
public static void closeCluster() {
CLUSTER.stop();
}
@ -102,12 +100,9 @@ public class LagFetchIntegrationTest {
private String outputTopicName;
private String stateStoreName;
@Rule
public TestName testName = new TestName();
@Before
public void before() {
final String safeTestName = safeUniqueTestName(getClass(), testName);
@BeforeEach
public void before(final TestInfo testInfo) {
final String safeTestName = safeUniqueTestName(getClass(), testInfo);
inputTopicName = "input-topic-" + safeTestName;
outputTopicName = "output-topic-" + safeTestName;
stateStoreName = "lagfetch-test-store" + safeTestName;
@ -128,7 +123,7 @@ public class LagFetchIntegrationTest {
consumerConfiguration.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());
}
@After
@AfterEach
public void shutdown() throws Exception {
IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);
}
@ -318,7 +313,7 @@ public class LagFetchIntegrationTest {
IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);
Files.walk(stateDir.toPath()).sorted(Comparator.reverseOrder())
.map(Path::toFile)
.forEach(f -> assertTrue("Some state " + f + " could not be deleted", f.delete()));
.forEach(f -> assertTrue(f.delete(), "Some state " + f + " could not be deleted"));
// wait till the lag goes down to 0
final KafkaStreams restartedStreams = new KafkaStreams(builder.build(), props);

View File

@ -43,16 +43,14 @@ import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import org.junit.rules.Timeout;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.Timeout;
import java.io.IOException;
import java.time.Duration;
@ -65,23 +63,24 @@ import java.util.stream.Collectors;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
@Timeout(600)
@Category({IntegrationTest.class})
@SuppressWarnings("deprecation")
public class MetricsIntegrationTest {
@Rule
public Timeout globalTimeout = Timeout.seconds(600);
private static final int NUM_BROKERS = 1;
private static final int NUM_THREADS = 2;
public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
@BeforeClass
@BeforeAll
public static void startCluster() throws IOException {
CLUSTER.start();
}
@AfterClass
@AfterAll
public static void closeCluster() {
CLUSTER.stop();
}
@ -242,15 +241,12 @@ public class MetricsIntegrationTest {
private String appId;
@Rule
public TestName testName = new TestName();
@Before
public void before() throws InterruptedException {
@BeforeEach
public void before(final TestInfo testInfo) throws InterruptedException {
builder = new StreamsBuilder();
CLUSTER.createTopics(STREAM_INPUT, STREAM_OUTPUT_1, STREAM_OUTPUT_2, STREAM_OUTPUT_3, STREAM_OUTPUT_4);
final String safeTestName = safeUniqueTestName(getClass(), testName);
final String safeTestName = safeUniqueTestName(getClass(), testInfo);
appId = "app-" + safeTestName;
streamsConfiguration = new Properties();
@ -263,7 +259,7 @@ public class MetricsIntegrationTest {
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
}
@After
@AfterEach
public void after() throws InterruptedException {
CLUSTER.deleteTopics(STREAM_INPUT, STREAM_OUTPUT_1, STREAM_OUTPUT_2, STREAM_OUTPUT_3, STREAM_OUTPUT_4);
}
@ -752,9 +748,9 @@ public class MetricsIntegrationTest {
final List<Metric> metrics = listMetric.stream()
.filter(m -> m.metricName().name().equals(metricName))
.collect(Collectors.toList());
Assert.assertEquals("Size of metrics of type:'" + metricName + "' must be equal to " + numMetric + " but it's equal to " + metrics.size(), numMetric, metrics.size());
assertEquals(numMetric, metrics.size(), "Size of metrics of type:'" + metricName + "' must be equal to " + numMetric + " but it's equal to " + metrics.size());
for (final Metric m : metrics) {
Assert.assertNotNull("Metric:'" + m.metricName() + "' must be not null", m.metricValue());
assertNotNull(m.metricValue(), "Metric:'" + m.metricName() + "' must be not null");
}
}
}

View File

@ -28,14 +28,13 @@ import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.test.IntegrationTest;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import org.junit.rules.Timeout;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.Timeout;
import java.io.IOException;
import java.util.HashMap;
@ -47,10 +46,9 @@ import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.sa
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.MatcherAssert.assertThat;
@Timeout(600)
@Category({IntegrationTest.class})
public class MetricsReporterIntegrationTest {
@Rule
public Timeout globalTimeout = Timeout.seconds(600);
private static final int NUM_BROKERS = 1;
public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
@ -62,24 +60,21 @@ public class MetricsReporterIntegrationTest {
private StreamsBuilder builder;
private Properties streamsConfiguration;
@Rule
public TestName testName = new TestName();
@BeforeClass
@BeforeAll
public static void startCluster() throws IOException {
CLUSTER.start();
}
@AfterClass
@AfterAll
public static void closeCluster() {
CLUSTER.stop();
}
@Before
public void before() throws InterruptedException {
@BeforeEach
public void before(final TestInfo testInfo) throws InterruptedException {
builder = new StreamsBuilder();
final String safeTestName = safeUniqueTestName(getClass(), testName);
final String safeTestName = safeUniqueTestName(getClass(), testInfo);
final String appId = "app-" + safeTestName;
streamsConfiguration = new Properties();
@ -116,7 +111,6 @@ public class MetricsReporterIntegrationTest {
@Override
public void close() {
}
}
@Test

View File

@ -65,15 +65,14 @@ import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map;
import java.util.Queue;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import org.junit.rules.Timeout;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.Timeout;
import java.time.Duration;
import java.util.Collection;
@ -105,10 +104,9 @@ import static java.util.Arrays.asList;
import static java.util.Collections.singleton;
import static java.util.Collections.singletonList;
@Timeout(600)
@Category(IntegrationTest.class)
public class NamedTopologyIntegrationTest {
@Rule
public Timeout globalTimeout = Timeout.seconds(600);
private static final Duration STARTUP_TIMEOUT = Duration.ofSeconds(45);
public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
@ -149,7 +147,7 @@ public class NamedTopologyIntegrationTest {
private static Properties producerConfig;
private static Properties consumerConfig;
@BeforeClass
@BeforeAll
public static void initializeClusterAndStandardTopics() throws Exception {
CLUSTER.start();
@ -165,13 +163,11 @@ public class NamedTopologyIntegrationTest {
produceToInputTopics(INPUT_STREAM_3, STANDARD_INPUT_DATA);
}
@AfterClass
@AfterAll
public static void closeCluster() {
CLUSTER.stop();
}
@Rule
public final TestName testName = new TestName();
private String appId;
private String changelog1;
private String changelog2;
@ -219,9 +215,9 @@ public class NamedTopologyIntegrationTest {
return streamsConfiguration;
}
@Before
public void setup() throws Exception {
appId = safeUniqueTestName(NamedTopologyIntegrationTest.class, testName);
@BeforeEach
public void setup(final TestInfo testInfo) throws Exception {
appId = safeUniqueTestName(NamedTopologyIntegrationTest.class, testInfo);
changelog1 = TOPIC_PREFIX + "-" + TOPOLOGY_1 + "-store-changelog";
changelog2 = TOPIC_PREFIX + "-" + TOPOLOGY_2 + "-store-changelog";
changelog3 = TOPIC_PREFIX + "-" + TOPOLOGY_3 + "-store-changelog";
@ -246,7 +242,7 @@ public class NamedTopologyIntegrationTest {
topology2Builder2 = streams2.newNamedTopologyBuilder(TOPOLOGY_2);
}
@After
@AfterEach
public void shutdown() throws Exception {
if (streams != null) {
streams.close(Duration.ofSeconds(30));

View File

@ -55,22 +55,20 @@ import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.NoRetryException;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import org.junit.rules.Timeout;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@Timeout(600)
@Category(IntegrationTest.class)
public class OptimizedKTableIntegrationTest {
@Rule
public Timeout globalTimeout = Timeout.seconds(600);
private static final Logger LOG = LoggerFactory.getLogger(OptimizedKTableIntegrationTest.class);
private static final int NUM_BROKERS = 1;
private static int port = 0;
@ -79,29 +77,25 @@ public class OptimizedKTableIntegrationTest {
public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
@BeforeClass
@BeforeAll
public static void startCluster() throws IOException {
CLUSTER.start();
}
@AfterClass
@AfterAll
public static void closeCluster() {
CLUSTER.stop();
}
@Rule
public final TestName testName = new TestName();
private final List<KafkaStreams> streamsToCleanup = new ArrayList<>();
private final MockTime mockTime = CLUSTER.time;
@Before
@BeforeEach
public void before() throws InterruptedException {
CLUSTER.createTopic(INPUT_TOPIC_NAME, 2, 1);
}
@After
@AfterEach
public void after() {
for (final KafkaStreams kafkaStreams : streamsToCleanup) {
kafkaStreams.close();
@ -109,7 +103,7 @@ public class OptimizedKTableIntegrationTest {
}
@Test
public void shouldApplyUpdatesToStandbyStore() throws Exception {
public void shouldApplyUpdatesToStandbyStore(final TestInfo testInfo) throws Exception {
final int batch1NumMessages = 100;
final int batch2NumMessages = 100;
final int key = 1;
@ -123,8 +117,8 @@ public class OptimizedKTableIntegrationTest {
.toStream()
.peek((k, v) -> semaphore.release());
final KafkaStreams kafkaStreams1 = createKafkaStreams(builder, streamsConfiguration());
final KafkaStreams kafkaStreams2 = createKafkaStreams(builder, streamsConfiguration());
final KafkaStreams kafkaStreams1 = createKafkaStreams(builder, streamsConfiguration(testInfo));
final KafkaStreams kafkaStreams2 = createKafkaStreams(builder, streamsConfiguration(testInfo));
final List<KafkaStreams> kafkaStreamsList = Arrays.asList(kafkaStreams1, kafkaStreams2);
startApplicationAndWaitUntilRunning(kafkaStreamsList, Duration.ofSeconds(60));
@ -201,8 +195,8 @@ public class OptimizedKTableIntegrationTest {
return streams;
}
private Properties streamsConfiguration() {
final String safeTestName = safeUniqueTestName(getClass(), testName);
private Properties streamsConfiguration(final TestInfo testInfo) {
final String safeTestName = safeUniqueTestName(getClass(), testInfo);
final Properties config = new Properties();
config.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.OPTIMIZE);
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + safeTestName);

View File

@ -37,14 +37,13 @@ import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.MockMapper;
import org.apache.kafka.test.TestCondition;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.Timeout;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import java.io.IOException;
import java.time.Duration;
@ -55,10 +54,9 @@ import java.util.List;
import java.util.Properties;
import java.util.Set;
@Timeout(600)
@Category({IntegrationTest.class})
public class PurgeRepartitionTopicIntegrationTest {
@Rule
public Timeout globalTimeout = Timeout.seconds(600);
private static final int NUM_BROKERS = 1;
private static final String INPUT_TOPIC = "input-stream";
@ -77,13 +75,13 @@ public class PurgeRepartitionTopicIntegrationTest {
}
});
@BeforeClass
@BeforeAll
public static void startCluster() throws IOException, InterruptedException {
CLUSTER.start();
CLUSTER.createTopic(INPUT_TOPIC, 1, 1);
}
@AfterClass
@AfterAll
public static void closeCluster() {
CLUSTER.stop();
}
@ -154,7 +152,7 @@ public class PurgeRepartitionTopicIntegrationTest {
}
}
@Before
@BeforeEach
public void setup() {
// create admin client for verification
final Properties adminConfig = new Properties();
@ -181,7 +179,7 @@ public class PurgeRepartitionTopicIntegrationTest {
kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration, time);
}
@After
@AfterEach
public void shutdown() {
if (kafkaStreams != null) {
kafkaStreams.close(Duration.ofSeconds(30));

View File

@ -61,15 +61,14 @@ import org.apache.kafka.test.MockMapper;
import org.apache.kafka.test.NoRetryException;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import org.junit.rules.Timeout;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -116,15 +115,14 @@ import static org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.core.IsEqual.equalTo;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThrows;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
@Timeout(600)
@Category({IntegrationTest.class})
@SuppressWarnings("deprecation")
public class QueryableStateIntegrationTest {
@Rule
public Timeout globalTimeout = Timeout.seconds(600);
private static final Logger log = LoggerFactory.getLogger(QueryableStateIntegrationTest.class);
private static final long DEFAULT_TIMEOUT_MS = 120 * 1000;
@ -133,12 +131,12 @@ public class QueryableStateIntegrationTest {
public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
@BeforeClass
@BeforeAll
public static void startCluster() throws IOException {
CLUSTER.start();
}
@AfterClass
@AfterAll
public static void closeCluster() {
CLUSTER.stop();
}
@ -164,8 +162,8 @@ public class QueryableStateIntegrationTest {
private Comparator<KeyValue<String, String>> stringComparator;
private Comparator<KeyValue<String, Long>> stringLongComparator;
private void createTopics() throws Exception {
final String safeTestName = safeUniqueTestName(getClass(), testName);
private void createTopics(final TestInfo testInfo) throws Exception {
final String safeTestName = safeUniqueTestName(getClass(), testInfo);
streamOne = streamOne + "-" + safeTestName;
streamConcurrent = streamConcurrent + "-" + safeTestName;
streamThree = streamThree + "-" + safeTestName;
@ -216,14 +214,11 @@ public class QueryableStateIntegrationTest {
return input;
}
@Rule
public TestName testName = new TestName();
@Before
public void before() throws Exception {
createTopics();
@BeforeEach
public void before(final TestInfo testInfo) throws Exception {
createTopics(testInfo);
streamsConfiguration = new Properties();
final String safeTestName = safeUniqueTestName(getClass(), testName);
final String safeTestName = safeUniqueTestName(getClass(), testInfo);
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + safeTestName);
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
@ -244,7 +239,7 @@ public class QueryableStateIntegrationTest {
}
}
@After
@AfterEach
public void shutdown() throws Exception {
if (kafkaStreams != null) {
kafkaStreams.close(ofSeconds(30));
@ -450,8 +445,8 @@ public class QueryableStateIntegrationTest {
}
@Test
public void shouldRejectNonExistentStoreName() throws InterruptedException {
final String uniqueTestName = safeUniqueTestName(getClass(), testName);
public void shouldRejectNonExistentStoreName(final TestInfo testInfo) throws InterruptedException {
final String uniqueTestName = safeUniqueTestName(getClass(), testInfo);
final String input = uniqueTestName + "-input";
final String storeName = uniqueTestName + "-input-table";
@ -465,7 +460,7 @@ public class QueryableStateIntegrationTest {
);
final Properties properties = mkProperties(mkMap(
mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, safeUniqueTestName(getClass(), testName)),
mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, safeUniqueTestName(getClass(), testInfo)),
mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers())
));
@ -488,8 +483,8 @@ public class QueryableStateIntegrationTest {
}
@Test
public void shouldRejectWronglyTypedStore() throws InterruptedException {
final String uniqueTestName = safeUniqueTestName(getClass(), testName);
public void shouldRejectWronglyTypedStore(final TestInfo testInfo) throws InterruptedException {
final String uniqueTestName = safeUniqueTestName(getClass(), testInfo);
final String input = uniqueTestName + "-input";
final String storeName = uniqueTestName + "-input-table";

View File

@ -47,15 +47,14 @@ import org.apache.kafka.test.MockKeyValueStoreBuilder;
import org.apache.kafka.test.StreamsTestUtils;
import org.apache.kafka.test.TestCondition;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import org.junit.rules.Timeout;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.Timeout;
import java.io.IOException;
import java.time.Duration;
@ -81,14 +80,13 @@ import static org.hamcrest.Matchers.greaterThan;
* End-to-end integration test based on using regex and named topics for creating sources, using
* an embedded Kafka cluster.
*/
@Timeout(600)
@Category({IntegrationTest.class})
public class RegexSourceIntegrationTest {
@Rule
public Timeout globalTimeout = Timeout.seconds(600);
private static final int NUM_BROKERS = 1;
public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
@BeforeClass
@BeforeAll
public static void startCluster() throws IOException, InterruptedException {
CLUSTER.start();
CLUSTER.createTopics(
@ -104,7 +102,7 @@ public class RegexSourceIntegrationTest {
CLUSTER.createTopic(PARTITIONED_TOPIC_2, 2, 1);
}
@AfterClass
@AfterAll
public static void closeCluster() {
CLUSTER.stop();
}
@ -129,8 +127,8 @@ public class RegexSourceIntegrationTest {
private static volatile AtomicInteger topicSuffixGenerator = new AtomicInteger(0);
private String outputTopic;
@Before
public void setUp() throws InterruptedException {
@BeforeEach
public void setUp(final TestInfo testInfo) throws InterruptedException {
outputTopic = createTopic(topicSuffixGenerator.incrementAndGet());
final Properties properties = new Properties();
properties.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
@ -141,7 +139,7 @@ public class RegexSourceIntegrationTest {
properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10000);
streamsConfiguration = StreamsTestUtils.getStreamsConfig(
IntegrationTestUtils.safeUniqueTestName(RegexSourceIntegrationTest.class, new TestName()),
IntegrationTestUtils.safeUniqueTestName(RegexSourceIntegrationTest.class, testInfo),
CLUSTER.bootstrapServers(),
STRING_SERDE_CLASSNAME,
STRING_SERDE_CLASSNAME,
@ -149,7 +147,7 @@ public class RegexSourceIntegrationTest {
);
}
@After
@AfterEach
public void tearDown() throws IOException {
if (streams != null) {
streams.close();

View File

@ -25,11 +25,10 @@ import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.tests.SmokeTestClient;
import org.apache.kafka.streams.tests.SmokeTestDriver;
import org.junit.Assert;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
@ -42,11 +41,11 @@ import java.util.Set;
import static org.apache.kafka.streams.tests.SmokeTestDriver.generate;
import static org.apache.kafka.streams.tests.SmokeTestDriver.verify;
import static org.junit.jupiter.api.Assertions.assertTrue;
@Timeout(600)
@Tag("integration")
public class SmokeTestDriverIntegrationTest {
public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(3);
@BeforeAll
@ -175,6 +174,6 @@ public class SmokeTestDriverIntegrationTest {
driver.exception().printStackTrace();
throw new AssertionError(driver.exception());
}
Assert.assertTrue(driver.result().result(), driver.result().passed());
assertTrue(driver.result().passed(), driver.result().result());
}
}

View File

@ -34,14 +34,13 @@ import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import org.junit.rules.Timeout;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.Timeout;
import java.io.IOException;
import java.util.Properties;
@ -49,28 +48,24 @@ import java.util.function.Predicate;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
@Timeout(600)
@Category({IntegrationTest.class})
public class StandbyTaskCreationIntegrationTest {
@Rule
public Timeout globalTimeout = Timeout.seconds(600);
private static final int NUM_BROKERS = 1;
public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
@BeforeClass
@BeforeAll
public static void startCluster() throws IOException, InterruptedException {
CLUSTER.start();
CLUSTER.createTopic(INPUT_TOPIC, 2, 1);
}
@AfterClass
@AfterAll
public static void closeCluster() {
CLUSTER.stop();
}
@Rule
public TestName testName = new TestName();
private static final String INPUT_TOPIC = "input-topic";
private KafkaStreams client1;
@ -78,14 +73,14 @@ public class StandbyTaskCreationIntegrationTest {
private volatile boolean client1IsOk = false;
private volatile boolean client2IsOk = false;
@After
@AfterEach
public void after() {
client1.close();
client2.close();
}
private Properties streamsConfiguration() {
final String safeTestName = safeUniqueTestName(getClass(), testName);
private Properties streamsConfiguration(final TestInfo testInfo) {
final String safeTestName = safeUniqueTestName(getClass(), testInfo);
final Properties streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + safeTestName);
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
@ -98,7 +93,7 @@ public class StandbyTaskCreationIntegrationTest {
@Test
@SuppressWarnings("deprecation")
public void shouldNotCreateAnyStandByTasksForStateStoreWithLoggingDisabled() throws Exception {
public void shouldNotCreateAnyStandByTasksForStateStoreWithLoggingDisabled(final TestInfo testInfo) throws Exception {
final StreamsBuilder builder = new StreamsBuilder();
final String stateStoreName = "myTransformState";
final StoreBuilder<KeyValueStore<Integer, Integer>> keyValueStoreBuilder =
@ -121,7 +116,7 @@ public class StandbyTaskCreationIntegrationTest {
}, stateStoreName);
final Topology topology = builder.build();
createClients(topology, streamsConfiguration(), topology, streamsConfiguration());
createClients(topology, streamsConfiguration(testInfo), topology, streamsConfiguration(testInfo));
setStateListenersForVerification(thread -> thread.standbyTasks().isEmpty() && !thread.activeTasks().isEmpty());
@ -133,10 +128,10 @@ public class StandbyTaskCreationIntegrationTest {
}
@Test
public void shouldCreateStandByTasksForMaterializedAndOptimizedSourceTables() throws Exception {
final Properties streamsConfiguration1 = streamsConfiguration();
public void shouldCreateStandByTasksForMaterializedAndOptimizedSourceTables(final TestInfo testInfo) throws Exception {
final Properties streamsConfiguration1 = streamsConfiguration(testInfo);
streamsConfiguration1.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.OPTIMIZE);
final Properties streamsConfiguration2 = streamsConfiguration();
final Properties streamsConfiguration2 = streamsConfiguration(testInfo);
streamsConfiguration2.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.OPTIMIZE);
final StreamsBuilder builder = new StreamsBuilder();