From fc6f8b659135a932bd91c1c6a982dbe241f1041d Mon Sep 17 00:00:00 2001 From: Ken Huang <100591800+m1a2st@users.noreply.github.com> Date: Sat, 15 Jun 2024 01:37:59 +0900 Subject: [PATCH] KAFKA-16921 [3/N] migrate connect module to junit 5 (#16330) Reviewers: Chia-Ping Tsai --- .../integration/BlockingConnectorTest.java | 47 +++-- .../ConnectIntegrationTestUtils.java | 43 ----- .../ConnectWorkerIntegrationTest.java | 85 +++++---- .../ConnectorClientPolicyIntegrationTest.java | 17 +- .../ConnectorRestartApiIntegrationTest.java | 71 ++++---- .../ConnectorTopicsIntegrationTest.java | 19 +- .../ConnectorValidationIntegrationTest.java | 15 +- .../ErrorHandlingIntegrationTest.java | 36 ++-- .../ExactlyOnceSourceIntegrationTest.java | 166 +++++++++--------- .../ExampleConnectIntegrationTest.java | 33 ++-- .../InternalTopicsIntegrationTest.java | 17 +- .../OffsetsApiIntegrationTest.java | 40 ++--- ...alanceSourceConnectorsIntegrationTest.java | 68 ++++--- .../RestExtensionIntegrationTest.java | 13 +- .../RestForwardingIntegrationTest.java | 24 +-- .../SessionedProtocolIntegrationTest.java | 17 +- .../SinkConnectorsIntegrationTest.java | 23 ++- .../SourceConnectorsIntegrationTest.java | 15 +- .../StandaloneWorkerIntegrationTest.java | 67 ++++--- .../integration/StartAndStopCounterTest.java | 21 ++- .../integration/StartAndStopLatchTest.java | 19 +- .../TransformationIntegrationTest.java | 33 ++-- 22 files changed, 411 insertions(+), 478 deletions(-) delete mode 100644 connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectIntegrationTestUtils.java diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java index 532ab1baaf0..39e771c70e7 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java @@ -38,12 +38,11 @@ import org.apache.kafka.connect.source.SourceRecord; import org.apache.kafka.connect.source.SourceTask; import org.apache.kafka.connect.source.SourceTaskContext; import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster; -import org.apache.kafka.test.IntegrationTest; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; -import org.junit.experimental.categories.Category; -import org.junit.function.ThrowingRunnable; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.function.Executable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -68,16 +67,16 @@ import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_C import static org.apache.kafka.connect.runtime.ConnectorConfig.TASKS_MAX_CONFIG; import static org.apache.kafka.connect.runtime.SinkConnectorConfig.TOPICS_CONFIG; import static org.apache.kafka.connect.runtime.rest.RestServer.DEFAULT_REST_REQUEST_TIMEOUT_MS; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertThrows; -import static org.junit.Assert.assertTrue; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; /** * Tests situations during which certain connector operations, such as start, validation, * configuration and others, take longer than expected. */ -@Category(IntegrationTest.class) +@Tag("integration") public class BlockingConnectorTest { private static final Logger log = LoggerFactory.getLogger(BlockingConnectorTest.class); @@ -121,7 +120,7 @@ public class BlockingConnectorTest { private EmbeddedConnectCluster connect; private ConnectorHandle normalConnectorHandle; - @Before + @BeforeEach public void setup() throws Exception { // build a Connect cluster backed by Kafka and Zk connect = new EmbeddedConnectCluster.Builder() @@ -136,7 +135,7 @@ public class BlockingConnectorTest { connect.start(); } - @After + @AfterEach public void close() { // stop all Connect, Kafka and Zk threads. connect.stop(); @@ -354,26 +353,26 @@ public class BlockingConnectorTest { normalConnectorHandle.awaitCommits(RECORD_TRANSFER_TIMEOUT_MS); } - private void assertRequestTimesOut(String requestDescription, ThrowingRunnable request, String expectedTimeoutMessage) { + private void assertRequestTimesOut(String requestDescription, Executable request, String expectedTimeoutMessage) { // Artificially reduce the REST request timeout so that these don't take 90 seconds connect.requestTimeout(REDUCED_REST_REQUEST_TIMEOUT); ConnectRestException exception = assertThrows( - "Should have failed to " + requestDescription, - ConnectRestException.class, request + ConnectRestException.class, request, + "Should have failed to " + requestDescription ); assertEquals( - "Should have gotten 500 error from trying to " + requestDescription, - Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(), exception.statusCode() + Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(), exception.statusCode(), + "Should have gotten 500 error from trying to " + requestDescription ); assertTrue( + exception.getMessage().contains("Request timed out"), "Should have gotten timeout message from trying to " + requestDescription - + "; instead, message was: " + exception.getMessage(), - exception.getMessage().contains("Request timed out") + + "; instead, message was: " + exception.getMessage() ); if (expectedTimeoutMessage != null) { assertTrue( - "Timeout error message '" + exception.getMessage() + "' does not match expected format", - exception.getMessage().contains(expectedTimeoutMessage) + exception.getMessage().contains(expectedTimeoutMessage), + "Timeout error message '" + exception.getMessage() + "' does not match expected format" ); } // Reset the REST request timeout so that other requests aren't impacted @@ -510,8 +509,8 @@ public class BlockingConnectorTest { CountDownLatch blockLatch; synchronized (Block.class) { assertNotNull( - "Block was reset prematurely", - awaitBlockLatch + awaitBlockLatch, + "Block was reset prematurely" ); awaitBlockLatch.countDown(); blockLatch = newBlockLatch(); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectIntegrationTestUtils.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectIntegrationTestUtils.java deleted file mode 100644 index 058dbe206fe..00000000000 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectIntegrationTestUtils.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.connect.integration; - -import org.junit.rules.TestRule; -import org.junit.rules.TestWatcher; -import org.junit.runner.Description; -import org.slf4j.Logger; - -/** - * A utility class for Connect's integration tests - */ -public class ConnectIntegrationTestUtils { - public static TestRule newTestWatcher(Logger log) { - return new TestWatcher() { - @Override - protected void starting(Description description) { - super.starting(description); - log.info("Starting test {}", description.getMethodName()); - } - - @Override - protected void finished(Description description) { - super.finished(description); - log.info("Finished test {}", description.getMethodName()); - } - }; - } -} diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java index 84ff88013fe..6b01ff0387f 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java @@ -41,21 +41,20 @@ import org.apache.kafka.connect.util.ConnectorTaskId; import org.apache.kafka.connect.util.SinkUtils; import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster; import org.apache.kafka.connect.util.clusters.WorkerHandle; -import org.apache.kafka.test.IntegrationTest; import org.apache.kafka.test.TestUtils; -import org.junit.After; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.experimental.categories.Category; -import org.junit.rules.TemporaryFolder; -import org.junit.rules.TestRule; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInfo; +import org.junit.jupiter.api.io.TempDir; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.event.Level; import java.io.File; import java.io.FileOutputStream; +import java.nio.file.Path; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -98,16 +97,16 @@ import static org.apache.kafka.connect.util.clusters.ConnectAssertions.CONNECTOR import static org.apache.kafka.test.TestUtils.waitForCondition; import static org.hamcrest.CoreMatchers.containsString; import static org.hamcrest.MatcherAssert.assertThat; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.assertInstanceOf; /** * Test simple operations on the workers of a Connect cluster. */ -@Category(IntegrationTest.class) +@Tag("integration") public class ConnectWorkerIntegrationTest { private static final Logger log = LoggerFactory.getLogger(ConnectWorkerIntegrationTest.class); @@ -124,14 +123,9 @@ public class ConnectWorkerIntegrationTest { private Map workerProps; private Properties brokerProps; - @Rule - public TestRule watcher = ConnectIntegrationTestUtils.newTestWatcher(log); - - @Rule - public TemporaryFolder tmp = new TemporaryFolder(); - - @Before - public void setup() { + @BeforeEach + public void setup(TestInfo testInfo) { + log.info("Starting test {}", testInfo.getDisplayName()); // setup Connect worker properties workerProps = new HashMap<>(); workerProps.put(OFFSET_COMMIT_INTERVAL_MS_CONFIG, String.valueOf(OFFSET_COMMIT_INTERVAL_MS)); @@ -150,8 +144,9 @@ public class ConnectWorkerIntegrationTest { .maskExitProcedures(true); // true is the default, setting here as example } - @After - public void close() { + @AfterEach + public void close(TestInfo testInfo) { + log.info("Finished test {}", testInfo.getDisplayName()); // stop all Connect, Kafka and Zk threads. connect.stop(); } @@ -268,9 +263,9 @@ public class ConnectWorkerIntegrationTest { Thread.sleep(TimeUnit.SECONDS.toMillis(10)); // Wait for the connector to be stopped - assertTrue("Failed to stop connector and tasks after coordinator failure within " - + CONNECTOR_SETUP_DURATION_MS + "ms", - stopLatch.await(CONNECTOR_SETUP_DURATION_MS, TimeUnit.MILLISECONDS)); + assertTrue(stopLatch.await(CONNECTOR_SETUP_DURATION_MS, TimeUnit.MILLISECONDS), + "Failed to stop connector and tasks after coordinator failure within " + + CONNECTOR_SETUP_DURATION_MS + "ms"); StartAndStopLatch startLatch = connectorHandle.expectedStarts(1, false); connect.kafka().startOnlyKafkaOnSamePorts(); @@ -288,9 +283,9 @@ public class ConnectWorkerIntegrationTest { "Connector tasks did not start in time."); // Expect that the connector has started again - assertTrue("Failed to stop connector and tasks after coordinator failure within " - + CONNECTOR_SETUP_DURATION_MS + "ms", - startLatch.await(CONNECTOR_SETUP_DURATION_MS, TimeUnit.MILLISECONDS)); + assertTrue(startLatch.await(CONNECTOR_SETUP_DURATION_MS, TimeUnit.MILLISECONDS), + "Failed to stop connector and tasks after coordinator failure within " + + CONNECTOR_SETUP_DURATION_MS + "ms"); } /** @@ -358,7 +353,7 @@ public class ConnectWorkerIntegrationTest { StartAndStopLatch stopCounter = connector.expectedStops(1); connect.deleteConnector(CONNECTOR_NAME); - assertTrue("Connector and all tasks were not stopped in time", stopCounter.await(1, TimeUnit.MINUTES)); + assertTrue(stopCounter.await(1, TimeUnit.MINUTES), "Connector and all tasks were not stopped in time"); } /** @@ -927,8 +922,8 @@ public class ConnectWorkerIntegrationTest { assertEquals(INTERNAL_SERVER_ERROR.getStatusCode(), restException.statusCode()); assertNotNull(restException.getMessage()); assertTrue( - "Message '" + restException.getMessage() + "' does not match expected format", - restException.getMessage().contains("Request timed out. The worker is currently " + expectedStageDescription) + restException.getMessage().contains("Request timed out. The worker is currently " + expectedStageDescription), + "Message '" + restException.getMessage() + "' does not match expected format" ); return true; @@ -1166,8 +1161,8 @@ public class ConnectWorkerIntegrationTest { final TopicPartition connectorTopicPartition = new TopicPartition(connectorTopic, 0); final long initialEndOffset = connect.kafka().endOffset(connectorTopicPartition); assertTrue( - "Source connector should have published at least one record to Kafka", - initialEndOffset > 0 + initialEndOffset > 0, + "Source connector should have published at least one record to Kafka" ); connectorHandle.expectedCommits(NUM_TASKS * 2); @@ -1187,9 +1182,9 @@ public class ConnectWorkerIntegrationTest { // See if any new records got written to the old topic final long nextEndOffset = connect.kafka().endOffset(connectorTopicPartition); assertEquals( - "No new records should have been written to the older topic", initialEndOffset, - nextEndOffset + nextEndOffset, + "No new records should have been written to the older topic" ); } @@ -1203,7 +1198,7 @@ public class ConnectWorkerIntegrationTest { * an invalid config provider reference, it will still be possible to reconfigure the connector. */ @Test - public void testReconfigureConnectorWithFailingTaskConfigs() throws Exception { + public void testReconfigureConnectorWithFailingTaskConfigs(@TempDir Path tmp) throws Exception { final int offsetCommitIntervalMs = 100; workerProps.put(CONFIG_PROVIDERS_CONFIG, "file"); workerProps.put(CONFIG_PROVIDERS_CONFIG + ".file.class", FileConfigProvider.class.getName()); @@ -1219,7 +1214,7 @@ public class ConnectWorkerIntegrationTest { final String firstConnectorTopic = "connector-topic-1"; connect.kafka().createTopic(firstConnectorTopic); - final File secretsFile = tmp.newFile("test-secrets"); + final File secretsFile = tmp.resolve("test-secrets").toFile(); final Properties secrets = new Properties(); final String throughputSecretKey = "secret-throughput"; secrets.put(throughputSecretKey, "10"); @@ -1244,7 +1239,7 @@ public class ConnectWorkerIntegrationTest { connectorHandle.awaitCommits(offsetCommitIntervalMs * 3); // Delete the secrets file, which should render the old task configs invalid - assertTrue("Failed to delete secrets file", secretsFile.delete()); + assertTrue(secretsFile.delete(), "Failed to delete secrets file"); // Use a start latch here instead of assertConnectorAndExactlyNumTasksAreRunning // since failure to reconfigure the tasks (which may occur if the bug this test was written @@ -1260,8 +1255,8 @@ public class ConnectWorkerIntegrationTest { connectorConfig.put(TOPIC_CONFIG, secondConnectorTopic); connect.configureConnector(CONNECTOR_NAME, connectorConfig); assertTrue( - "Connector tasks were not restarted in time", - restarts.await(10, TimeUnit.SECONDS) + restarts.await(10, TimeUnit.SECONDS), + "Connector tasks were not restarted in time" ); // Wait for at least one task to commit offsets after being restarted @@ -1270,9 +1265,9 @@ public class ConnectWorkerIntegrationTest { final long endOffset = connect.kafka().endOffset(new TopicPartition(secondConnectorTopic, 0)); assertTrue( + endOffset > 0, "Source connector should have published at least one record to new Kafka topic " - + "after being reconfigured", - endOffset > 0 + + "after being reconfigured" ); } @@ -1307,9 +1302,9 @@ public class ConnectWorkerIntegrationTest { "Connector did not start or task did not fail in time" ); assertEquals( - "Connector should not have any committed offsets when only task fails on first record", new ConnectorOffsets(Collections.emptyList()), - connect.connectorOffsets(CONNECTOR_NAME) + connect.connectorOffsets(CONNECTOR_NAME), + "Connector should not have any committed offsets when only task fails on first record" ); // Reconfigure the connector to use the string converter, which should not cause any more task failures diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorClientPolicyIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorClientPolicyIntegrationTest.java index f09a949010c..ae3639aa5f2 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorClientPolicyIntegrationTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorClientPolicyIntegrationTest.java @@ -24,10 +24,9 @@ import org.apache.kafka.connect.runtime.WorkerConfig; import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException; import org.apache.kafka.connect.storage.StringConverter; import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster; -import org.apache.kafka.test.IntegrationTest; -import org.junit.After; -import org.junit.Test; -import org.junit.experimental.categories.Category; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; import java.util.HashMap; import java.util.Map; @@ -39,17 +38,17 @@ import static org.apache.kafka.connect.runtime.ConnectorConfig.TASKS_MAX_CONFIG; import static org.apache.kafka.connect.runtime.ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG; import static org.apache.kafka.connect.runtime.SinkConnectorConfig.TOPICS_CONFIG; import static org.apache.kafka.connect.runtime.WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.fail; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.fail; -@Category(IntegrationTest.class) +@Tag("integration") public class ConnectorClientPolicyIntegrationTest { private static final int NUM_TASKS = 1; private static final int NUM_WORKERS = 1; private static final String CONNECTOR_NAME = "simple-conn"; - @After + @AfterEach public void close() { } @@ -131,7 +130,7 @@ public class ConnectorClientPolicyIntegrationTest { connect.configureConnector(CONNECTOR_NAME, props); fail("Shouldn't be able to create connector"); } catch (ConnectRestException e) { - assertEquals(e.statusCode(), 400); + assertEquals(400, e.statusCode()); } finally { connect.stop(); } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorRestartApiIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorRestartApiIntegrationTest.java index e957b97f611..45dfb96eb93 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorRestartApiIntegrationTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorRestartApiIntegrationTest.java @@ -20,15 +20,12 @@ import org.apache.kafka.connect.runtime.AbstractStatus; import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo; import org.apache.kafka.connect.storage.StringConverter; import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster; -import org.apache.kafka.test.IntegrationTest; -import org.junit.After; -import org.junit.AfterClass; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.experimental.categories.Category; -import org.junit.rules.TestName; -import org.junit.rules.TestRule; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInfo; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -54,14 +51,14 @@ import static org.apache.kafka.connect.runtime.TopicCreationConfig.REPLICATION_F import static org.apache.kafka.connect.runtime.WorkerConfig.CONNECTOR_CLIENT_POLICY_CLASS_CONFIG; import static org.apache.kafka.connect.runtime.WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG; import static org.apache.kafka.connect.util.clusters.ConnectAssertions.CONNECTOR_SETUP_DURATION_MS; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotEquals; -import static org.junit.Assert.assertTrue; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; /** * Test connectors restart API use cases. */ -@Category(IntegrationTest.class) +@Tag("integration") public class ConnectorRestartApiIntegrationTest { private static final Logger log = LoggerFactory.getLogger(ConnectorRestartApiIntegrationTest.class); @@ -78,14 +75,11 @@ public class ConnectorRestartApiIntegrationTest { private EmbeddedConnectCluster connect; private ConnectorHandle connectorHandle; private String connectorName; - @Rule - public TestRule watcher = ConnectIntegrationTestUtils.newTestWatcher(log); - @Rule - public TestName testName = new TestName(); - @Before - public void setup() { - connectorName = CONNECTOR_NAME_PREFIX + testName.getMethodName(); + @BeforeEach + public void setup(TestInfo testInfo) { + log.info("Starting test {}", testInfo.getDisplayName()); + connectorName = CONNECTOR_NAME_PREFIX + testInfo.getTestMethod().get().getName(); // get connector handles before starting test. connectorHandle = RuntimeHandles.get().connectorHandle(connectorName); } @@ -115,12 +109,13 @@ public class ConnectorRestartApiIntegrationTest { }); } - @After - public void tearDown() { + @AfterEach + public void tearDown(TestInfo testInfo) { + log.info("Finished test {}", testInfo.getDisplayName()); RuntimeHandles.get().deleteConnector(connectorName); } - @AfterClass + @AfterAll public static void close() { // stop all Connect, Kafka and Zk threads. CONNECT_CLUSTERS.values().forEach(EmbeddedConnectCluster::stop); @@ -268,16 +263,16 @@ public class ConnectorRestartApiIntegrationTest { } // Wait for the connector to be stopped - assertTrue("Failed to stop connector and tasks within " - + CONNECTOR_SETUP_DURATION_MS + "ms", - stopLatch.await(CONNECTOR_SETUP_DURATION_MS, TimeUnit.MILLISECONDS)); + assertTrue(stopLatch.await(CONNECTOR_SETUP_DURATION_MS, TimeUnit.MILLISECONDS), + "Failed to stop connector and tasks within " + + CONNECTOR_SETUP_DURATION_MS + "ms"); connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(connectorName, NUM_TASKS, "Connector tasks are not all in running state."); // Expect that the connector has started again - assertTrue("Failed to start connector and tasks within " - + CONNECTOR_SETUP_DURATION_MS + "ms", - startLatch.await(CONNECTOR_SETUP_DURATION_MS, TimeUnit.MILLISECONDS)); + assertTrue(startLatch.await(CONNECTOR_SETUP_DURATION_MS, TimeUnit.MILLISECONDS), + "Failed to start connector and tasks within " + + CONNECTOR_SETUP_DURATION_MS + "ms"); StartsAndStops afterSnapshot = connectorHandle.startAndStopCounter().countsSnapshot(); assertEquals(beforeSnapshot.starts() + expectedConnectorRestarts, afterSnapshot.starts()); @@ -321,9 +316,9 @@ public class ConnectorRestartApiIntegrationTest { connect.assertions().assertConnectorIsFailedAndTasksHaveFailed(connectorName, 0, "Connector tasks are not all in running state."); // Expect that the connector has started again - assertTrue("Failed to start connector and tasks after coordinator failure within " - + CONNECTOR_SETUP_DURATION_MS + "ms", - startLatch.await(CONNECTOR_SETUP_DURATION_MS, TimeUnit.MILLISECONDS)); + assertTrue(startLatch.await(CONNECTOR_SETUP_DURATION_MS, TimeUnit.MILLISECONDS), + "Failed to start connector and tasks after coordinator failure within " + + CONNECTOR_SETUP_DURATION_MS + "ms"); StartsAndStops afterSnapshot = connectorHandle.startAndStopCounter().countsSnapshot(); assertEquals(beforeSnapshot.starts() + expectedConnectorRestarts, afterSnapshot.starts()); @@ -356,16 +351,16 @@ public class ConnectorRestartApiIntegrationTest { } // Wait for the connector to be stopped - assertTrue("Failed to stop connector and tasks within " - + CONNECTOR_SETUP_DURATION_MS + "ms", - stopLatch.await(CONNECTOR_SETUP_DURATION_MS, TimeUnit.MILLISECONDS)); + assertTrue(stopLatch.await(CONNECTOR_SETUP_DURATION_MS, TimeUnit.MILLISECONDS), + "Failed to stop connector and tasks within " + + CONNECTOR_SETUP_DURATION_MS + "ms"); connect.assertions().assertConnectorIsRunningAndNumTasksHaveFailed(connectorName, NUM_TASKS, tasksToFail.size(), "Connector tasks are not all in running state."); // Expect that the connector has started again - assertTrue("Failed to start connector and tasks within " - + CONNECTOR_SETUP_DURATION_MS + "ms", - startLatch.await(CONNECTOR_SETUP_DURATION_MS, TimeUnit.MILLISECONDS)); + assertTrue(startLatch.await(CONNECTOR_SETUP_DURATION_MS, TimeUnit.MILLISECONDS), + "Failed to start connector and tasks within " + + CONNECTOR_SETUP_DURATION_MS + "ms"); StartsAndStops afterSnapshot = connectorHandle.startAndStopCounter().countsSnapshot(); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorTopicsIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorTopicsIntegrationTest.java index e9fdd91c387..2bc3bc45ca6 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorTopicsIntegrationTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorTopicsIntegrationTest.java @@ -26,11 +26,10 @@ import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException; import org.apache.kafka.connect.storage.KafkaStatusBackingStore; import org.apache.kafka.connect.storage.StringConverter; import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster; -import org.apache.kafka.test.IntegrationTest; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; -import org.junit.experimental.categories.Category; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; import java.nio.charset.StandardCharsets; import java.time.Duration; @@ -58,14 +57,14 @@ import static org.apache.kafka.connect.runtime.WorkerConfig.CONNECTOR_CLIENT_POL import static org.apache.kafka.connect.runtime.WorkerConfig.TOPIC_TRACKING_ALLOW_RESET_CONFIG; import static org.apache.kafka.connect.runtime.WorkerConfig.TOPIC_TRACKING_ENABLE_CONFIG; import static org.apache.kafka.connect.sink.SinkConnector.TOPICS_CONFIG; -import static org.junit.Assert.assertThrows; -import static org.junit.Assert.assertTrue; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; /** * Integration test for the endpoints that offer topic tracking of a connector's active * topics. */ -@Category(IntegrationTest.class) +@Tag("integration") public class ConnectorTopicsIntegrationTest { private static final int NUM_WORKERS = 5; @@ -82,7 +81,7 @@ public class ConnectorTopicsIntegrationTest { Map workerProps = new HashMap<>(); Properties brokerProps = new Properties(); - @Before + @BeforeEach public void setup() { // setup Connect worker properties workerProps.put(CONNECTOR_CLIENT_POLICY_CLASS_CONFIG, "All"); @@ -99,7 +98,7 @@ public class ConnectorTopicsIntegrationTest { .maskExitProcedures(true); // true is the default, setting here as example } - @After + @AfterEach public void close() { // stop all Connect, Kafka and Zk threads. connect.stop(); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorValidationIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorValidationIntegrationTest.java index 60ffec97e10..d74504c6fe5 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorValidationIntegrationTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorValidationIntegrationTest.java @@ -27,11 +27,10 @@ import org.apache.kafka.connect.storage.StringConverter; import org.apache.kafka.connect.transforms.Filter; import org.apache.kafka.connect.transforms.predicates.RecordIsTombstone; import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster; -import org.apache.kafka.test.IntegrationTest; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.Test; -import org.junit.experimental.categories.Category; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; import java.io.IOException; import java.util.HashMap; @@ -56,7 +55,7 @@ import static org.apache.kafka.connect.runtime.SourceConnectorConfig.TOPIC_CREAT /** * Integration test for preflight connector config validation */ -@Category(IntegrationTest.class) +@Tag("integration") public class ConnectorValidationIntegrationTest { private static final String WORKER_GROUP_ID = "connect-worker-group-id"; @@ -64,7 +63,7 @@ public class ConnectorValidationIntegrationTest { // Use a single embedded cluster for all test cases in order to cut down on runtime private static EmbeddedConnectCluster connect; - @BeforeClass + @BeforeAll public static void setup() { Map workerProps = new HashMap<>(); workerProps.put(GROUP_ID_CONFIG, WORKER_GROUP_ID); @@ -79,7 +78,7 @@ public class ConnectorValidationIntegrationTest { connect.start(); } - @AfterClass + @AfterAll public static void close() { if (connect != null) { // stop all Connect, Kafka and Zk threads. diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrorHandlingIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrorHandlingIntegrationTest.java index 30fe48116fc..9f496c8c423 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrorHandlingIntegrationTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrorHandlingIntegrationTest.java @@ -28,13 +28,11 @@ import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo; import org.apache.kafka.connect.storage.StringConverter; import org.apache.kafka.connect.transforms.Transformation; import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster; -import org.apache.kafka.test.IntegrationTest; -import org.junit.After; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.experimental.categories.Category; -import org.junit.rules.Timeout; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -62,18 +60,18 @@ import static org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter.ER import static org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter.ERROR_HEADER_EXCEPTION_MESSAGE; import static org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter.ERROR_HEADER_ORIG_TOPIC; import static org.apache.kafka.test.TestUtils.waitForCondition; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; /** * Integration test for the different error handling policies in Connect (namely, retry policies, skipping bad records, * and dead letter queues). */ -@Category(IntegrationTest.class) +@Tag("integration") +@Timeout(value = 600) public class ErrorHandlingIntegrationTest { - @Rule - public Timeout globalTimeout = Timeout.seconds(600); + private static final Logger log = LoggerFactory.getLogger(ErrorHandlingIntegrationTest.class); private static final int NUM_WORKERS = 1; private static final String DLQ_TOPIC = "my-connector-errors"; @@ -87,7 +85,7 @@ public class ErrorHandlingIntegrationTest { private EmbeddedConnectCluster connect; private ConnectorHandle connectorHandle; - @Before + @BeforeEach public void setup() throws InterruptedException { // setup Connect cluster with defaults connect = new EmbeddedConnectCluster.Builder().build(); @@ -99,7 +97,7 @@ public class ErrorHandlingIntegrationTest { connectorHandle = RuntimeHandles.get().connectorHandle(CONNECTOR_NAME); } - @After + @AfterEach public void close() { RuntimeHandles.get().deleteConnector(CONNECTOR_NAME); connect.stop(); @@ -158,8 +156,8 @@ public class ErrorHandlingIntegrationTest { String k = new String(rec.key()); String v = new String(rec.value()); log.debug("Consumed record (key='{}', value='{}') from topic {}", k, v, rec.topic()); - assertEquals("Unexpected key", k, "key-" + i); - assertEquals("Unexpected value", v, "value-" + i); + assertEquals(k, "key-" + i, "Unexpected key"); + assertEquals(v, "value-" + i, "Unexpected value"); i++; } @@ -237,8 +235,8 @@ public class ErrorHandlingIntegrationTest { String k = new String(rec.key()); String v = new String(rec.value()); log.debug("Consumed record (key='{}', value='{}') from topic {}", k, v, rec.topic()); - assertEquals("Unexpected key", k, "key-" + i); - assertEquals("Unexpected value", v, "value-" + i); + assertEquals(k, "key-" + i, "Unexpected key"); + assertEquals(v, "value-" + i, "Unexpected value"); i++; } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.java index 84ee814ae40..e710d6ac712 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.java @@ -50,11 +50,10 @@ import org.apache.kafka.connect.storage.StringConverter; import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster; import org.apache.kafka.connect.util.clusters.ConnectAssertions; import org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster; -import org.apache.kafka.test.IntegrationTest; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; -import org.junit.experimental.categories.Category; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -100,14 +99,14 @@ import static org.apache.kafka.connect.source.SourceTask.TransactionBoundary.CON import static org.apache.kafka.connect.source.SourceTask.TransactionBoundary.INTERVAL; import static org.apache.kafka.connect.source.SourceTask.TransactionBoundary.POLL; import static org.apache.kafka.test.TestUtils.waitForCondition; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertThrows; -import static org.junit.Assert.assertTrue; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.assertInstanceOf; -@Category(IntegrationTest.class) +@Tag("integration") public class ExactlyOnceSourceIntegrationTest { private static final Logger log = LoggerFactory.getLogger(ExactlyOnceSourceIntegrationTest.class); @@ -130,7 +129,7 @@ public class ExactlyOnceSourceIntegrationTest { private EmbeddedConnectCluster connect; private ConnectorHandle connectorHandle; - @Before + @BeforeEach public void setup() { workerProps = new HashMap<>(); workerProps.put(EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG, "enabled"); @@ -156,7 +155,7 @@ public class ExactlyOnceSourceIntegrationTest { connect.start(); } - @After + @AfterEach public void close() { try { // stop all Connect, Kafka and Zk threads. @@ -189,31 +188,32 @@ public class ExactlyOnceSourceIntegrationTest { // Connector will return null from SourceConnector::exactlyOnceSupport props.put(CUSTOM_EXACTLY_ONCE_SUPPORT_CONFIG, MonitorableSourceConnector.EXACTLY_ONCE_NULL); ConfigInfos validation = connect.validateConnectorConfig(MonitorableSourceConnector.class.getSimpleName(), props); - assertEquals("Preflight validation should have exactly one error", 1, validation.errorCount()); + assertEquals(1, validation.errorCount(), + "Preflight validation should have exactly one error"); ConfigInfo propertyValidation = findConfigInfo(EXACTLY_ONCE_SUPPORT_CONFIG, validation); - assertFalse("Preflight validation for exactly-once support property should have at least one error message", - propertyValidation.configValue().errors().isEmpty()); + assertFalse(propertyValidation.configValue().errors().isEmpty(), + "Preflight validation for exactly-once support property should have at least one error message"); // Connector will return UNSUPPORTED from SourceConnector::exactlyOnceSupport props.put(CUSTOM_EXACTLY_ONCE_SUPPORT_CONFIG, MonitorableSourceConnector.EXACTLY_ONCE_UNSUPPORTED); validation = connect.validateConnectorConfig(MonitorableSourceConnector.class.getSimpleName(), props); - assertEquals("Preflight validation should have exactly one error", 1, validation.errorCount()); + assertEquals(1, validation.errorCount(), "Preflight validation should have exactly one error"); propertyValidation = findConfigInfo(EXACTLY_ONCE_SUPPORT_CONFIG, validation); - assertFalse("Preflight validation for exactly-once support property should have at least one error message", - propertyValidation.configValue().errors().isEmpty()); + assertFalse(propertyValidation.configValue().errors().isEmpty(), + "Preflight validation for exactly-once support property should have at least one error message"); // Connector will throw an exception from SourceConnector::exactlyOnceSupport props.put(CUSTOM_EXACTLY_ONCE_SUPPORT_CONFIG, MonitorableSourceConnector.EXACTLY_ONCE_FAIL); validation = connect.validateConnectorConfig(MonitorableSourceConnector.class.getSimpleName(), props); - assertEquals("Preflight validation should have exactly one error", 1, validation.errorCount()); + assertEquals(1, validation.errorCount(), "Preflight validation should have exactly one error"); propertyValidation = findConfigInfo(EXACTLY_ONCE_SUPPORT_CONFIG, validation); - assertFalse("Preflight validation for exactly-once support property should have at least one error message", - propertyValidation.configValue().errors().isEmpty()); + assertFalse(propertyValidation.configValue().errors().isEmpty(), + "Preflight validation for exactly-once support property should have at least one error message"); // Connector will return SUPPORTED from SourceConnector::exactlyOnceSupport props.put(CUSTOM_EXACTLY_ONCE_SUPPORT_CONFIG, MonitorableSourceConnector.EXACTLY_ONCE_SUPPORTED); validation = connect.validateConnectorConfig(MonitorableSourceConnector.class.getSimpleName(), props); - assertEquals("Preflight validation should have zero errors", 0, validation.errorCount()); + assertEquals(0, validation.errorCount(), "Preflight validation should have zero errors"); // Test out the transaction boundary definition property props.put(TRANSACTION_BOUNDARY_CONFIG, CONNECTOR.toString()); @@ -221,31 +221,31 @@ public class ExactlyOnceSourceIntegrationTest { // Connector will return null from SourceConnector::canDefineTransactionBoundaries props.put(CUSTOM_TRANSACTION_BOUNDARIES_CONFIG, MonitorableSourceConnector.TRANSACTION_BOUNDARIES_NULL); validation = connect.validateConnectorConfig(MonitorableSourceConnector.class.getSimpleName(), props); - assertEquals("Preflight validation should have exactly one error", 1, validation.errorCount()); + assertEquals(1, validation.errorCount(), "Preflight validation should have exactly one error"); propertyValidation = findConfigInfo(TRANSACTION_BOUNDARY_CONFIG, validation); - assertFalse("Preflight validation for transaction boundary property should have at least one error message", - propertyValidation.configValue().errors().isEmpty()); + assertFalse(propertyValidation.configValue().errors().isEmpty(), + "Preflight validation for transaction boundary property should have at least one error message"); // Connector will return UNSUPPORTED from SourceConnector::canDefineTransactionBoundaries props.put(CUSTOM_TRANSACTION_BOUNDARIES_CONFIG, MonitorableSourceConnector.TRANSACTION_BOUNDARIES_UNSUPPORTED); validation = connect.validateConnectorConfig(MonitorableSourceConnector.class.getSimpleName(), props); - assertEquals("Preflight validation should have exactly one error", 1, validation.errorCount()); + assertEquals(1, validation.errorCount(), "Preflight validation should have exactly one error"); propertyValidation = findConfigInfo(TRANSACTION_BOUNDARY_CONFIG, validation); - assertFalse("Preflight validation for transaction boundary property should have at least one error message", - propertyValidation.configValue().errors().isEmpty()); + assertFalse(propertyValidation.configValue().errors().isEmpty(), + "Preflight validation for transaction boundary property should have at least one error message"); // Connector will throw an exception from SourceConnector::canDefineTransactionBoundaries props.put(CUSTOM_TRANSACTION_BOUNDARIES_CONFIG, MonitorableSourceConnector.TRANSACTION_BOUNDARIES_FAIL); validation = connect.validateConnectorConfig(MonitorableSourceConnector.class.getSimpleName(), props); - assertEquals("Preflight validation should have exactly one error", 1, validation.errorCount()); + assertEquals(1, validation.errorCount(), "Preflight validation should have exactly one error"); propertyValidation = findConfigInfo(TRANSACTION_BOUNDARY_CONFIG, validation); - assertFalse("Preflight validation for transaction boundary property should have at least one error message", - propertyValidation.configValue().errors().isEmpty()); + assertFalse(propertyValidation.configValue().errors().isEmpty(), + "Preflight validation for transaction boundary property should have at least one error message"); // Connector will return SUPPORTED from SourceConnector::canDefineTransactionBoundaries props.put(CUSTOM_TRANSACTION_BOUNDARIES_CONFIG, MonitorableSourceConnector.TRANSACTION_BOUNDARIES_SUPPORTED); validation = connect.validateConnectorConfig(MonitorableSourceConnector.class.getSimpleName(), props); - assertEquals("Preflight validation should have zero errors", 0, validation.errorCount()); + assertEquals(0, validation.errorCount(), "Preflight validation should have zero errors"); } /** @@ -302,8 +302,8 @@ public class ExactlyOnceSourceIntegrationTest { null, topic ); - assertTrue("Not enough records produced by source connector. Expected at least: " + MINIMUM_MESSAGES + " + but got " + records.count(), - records.count() >= MINIMUM_MESSAGES); + assertTrue(records.count() >= MINIMUM_MESSAGES, + "Not enough records produced by source connector. Expected at least: " + MINIMUM_MESSAGES + " + but got " + records.count()); assertExactlyOnceSeqnos(records, numTasks); } @@ -362,8 +362,8 @@ public class ExactlyOnceSourceIntegrationTest { null, topic ); - assertTrue("Not enough records produced by source connector. Expected at least: " + MINIMUM_MESSAGES + " + but got " + records.count(), - records.count() >= MINIMUM_MESSAGES); + assertTrue(records.count() >= MINIMUM_MESSAGES, + "Not enough records produced by source connector. Expected at least: " + MINIMUM_MESSAGES + " + but got " + records.count()); assertExactlyOnceSeqnos(records, numTasks); } @@ -423,8 +423,8 @@ public class ExactlyOnceSourceIntegrationTest { null, topic ); - assertTrue("Not enough records produced by source connector. Expected at least: " + MINIMUM_MESSAGES + " + but got " + sourceRecords.count(), - sourceRecords.count() >= MINIMUM_MESSAGES); + assertTrue(sourceRecords.count() >= MINIMUM_MESSAGES, + "Not enough records produced by source connector. Expected at least: " + MINIMUM_MESSAGES + " + but got " + sourceRecords.count()); // also consume from the cluster's offsets topic to verify that the expected offsets (which should correspond to the connector's // custom transaction boundaries) were committed @@ -446,8 +446,8 @@ public class ExactlyOnceSourceIntegrationTest { List actualOffsetSeqnos = parseAndAssertOffsetsForSingleTask(offsetRecords); - assertEquals("Committed offsets should match connector-defined transaction boundaries", - expectedOffsetSeqnos, actualOffsetSeqnos.subList(0, expectedOffsetSeqnos.size())); + assertEquals(expectedOffsetSeqnos, actualOffsetSeqnos.subList(0, expectedOffsetSeqnos.size()), + "Committed offsets should match connector-defined transaction boundaries"); List expectedRecordSeqnos = LongStream.range(1, MINIMUM_MESSAGES + 1).boxed().collect(Collectors.toList()); long priorBoundary = 1; @@ -464,8 +464,8 @@ public class ExactlyOnceSourceIntegrationTest { List actualRecordSeqnos = parseAndAssertValuesForSingleTask(sourceRecords); // Have to sort the records by seqno since we produce to multiple partitions and in-order consumption isn't guaranteed Collections.sort(actualRecordSeqnos); - assertEquals("Committed records should exclude connector-aborted transactions", - expectedRecordSeqnos, actualRecordSeqnos.subList(0, expectedRecordSeqnos.size())); + assertEquals(expectedRecordSeqnos, actualRecordSeqnos.subList(0, expectedRecordSeqnos.size()), + "Committed records should exclude connector-aborted transactions"); } /** @@ -534,8 +534,8 @@ public class ExactlyOnceSourceIntegrationTest { null, topic ); - assertTrue("Not enough records produced by source connector. Expected at least: " + MINIMUM_MESSAGES + " + but got " + records.count(), - records.count() >= MINIMUM_MESSAGES); + assertTrue(records.count() >= MINIMUM_MESSAGES, + "Not enough records produced by source connector. Expected at least: " + MINIMUM_MESSAGES + " + but got " + records.count()); assertExactlyOnceSeqnos(records, numTasks); } @@ -597,8 +597,8 @@ public class ExactlyOnceSourceIntegrationTest { null, topic ); - assertTrue("Not enough records produced by source connector. Expected at least: " + MINIMUM_MESSAGES + " + but got " + records.count(), - records.count() >= MINIMUM_MESSAGES); + assertTrue(records.count() >= MINIMUM_MESSAGES, + "Not enough records produced by source connector. Expected at least: " + MINIMUM_MESSAGES + " + but got " + records.count()); // We used at most five tasks during the tests; each of them should have been able to produce records assertExactlyOnceSeqnos(records, 5); } @@ -809,8 +809,8 @@ public class ExactlyOnceSourceIntegrationTest { Collections.singletonMap(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed"), "test-topic") .count(); - assertTrue("Not enough records produced by source connector. Expected at least: " + MINIMUM_MESSAGES + " + but got " + recordNum, - recordNum >= MINIMUM_MESSAGES); + assertTrue(recordNum >= MINIMUM_MESSAGES, + "Not enough records produced by source connector. Expected at least: " + MINIMUM_MESSAGES + " + but got " + recordNum); // also consume from the connector's dedicated offsets topic ConsumerRecords offsetRecords = connectorTargetedCluster @@ -822,16 +822,16 @@ public class ExactlyOnceSourceIntegrationTest { ); List seqnos = parseAndAssertOffsetsForSingleTask(offsetRecords); seqnos.forEach(seqno -> - assertEquals("Offset commits should occur on connector-defined poll boundaries, which happen every " + MINIMUM_MESSAGES + " records", - 0, seqno % MINIMUM_MESSAGES) + assertEquals(0, seqno % MINIMUM_MESSAGES, + "Offset commits should occur on connector-defined poll boundaries, which happen every " + MINIMUM_MESSAGES + " records") ); // also consume from the cluster's global offsets topic offsetRecords = connect.kafka().consumeAll(TimeUnit.MINUTES.toMillis(1), globalOffsetsTopic); seqnos = parseAndAssertOffsetsForSingleTask(offsetRecords); seqnos.forEach(seqno -> - assertEquals("Offset commits should occur on connector-defined poll boundaries, which happen every " + MINIMUM_MESSAGES + " records", - 0, seqno % MINIMUM_MESSAGES) + assertEquals(0, seqno % MINIMUM_MESSAGES, + "Offset commits should occur on connector-defined poll boundaries, which happen every " + MINIMUM_MESSAGES + " records") ); // Shut down the whole cluster @@ -876,8 +876,8 @@ public class ExactlyOnceSourceIntegrationTest { null, topic ); - assertTrue("Not enough records produced by source connector. Expected at least: " + MINIMUM_MESSAGES + " + but got " + sourceRecords.count(), - sourceRecords.count() >= MINIMUM_MESSAGES); + assertTrue(sourceRecords.count() >= MINIMUM_MESSAGES, + "Not enough records produced by source connector. Expected at least: " + MINIMUM_MESSAGES + " + but got " + sourceRecords.count()); // also have to check which offsets have actually been committed, since we no longer have exactly-once semantics offsetRecords = connectorTargetedCluster.consumeAll( CONSUME_RECORDS_TIMEOUT_MS, @@ -941,13 +941,13 @@ public class ExactlyOnceSourceIntegrationTest { private List parseAndAssertOffsetsForSingleTask(ConsumerRecords offsetRecords) { Map> parsedOffsets = parseOffsetForTasks(offsetRecords); - assertEquals("Expected records to only be produced from a single task", Collections.singleton(0), parsedOffsets.keySet()); + assertEquals(Collections.singleton(0), parsedOffsets.keySet(), "Expected records to only be produced from a single task"); return parsedOffsets.get(0); } private List parseAndAssertValuesForSingleTask(ConsumerRecords sourceRecords) { Map> parsedValues = parseValuesForTasks(sourceRecords); - assertEquals("Expected records to only be produced from a single task", Collections.singleton(0), parsedValues.keySet()); + assertEquals(Collections.singleton(0), parsedValues.keySet(), "Expected records to only be produced from a single task"); return parsedValues.get(0); } @@ -965,7 +965,7 @@ public class ExactlyOnceSourceIntegrationTest { )); parsedValues.replaceAll((task, values) -> { Long committedValue = lastCommittedValues.get(task); - assertNotNull("No committed offset found for task " + task, committedValue); + assertNotNull(committedValue, "No committed offset found for task " + task); return values.stream().filter(v -> v <= committedValue).collect(Collectors.toList()); }); assertSeqnos(parsedValues, numTasks); @@ -973,7 +973,7 @@ public class ExactlyOnceSourceIntegrationTest { private void assertSeqnos(Map> parsedValues, int numTasks) { Set expectedKeys = IntStream.range(0, numTasks).boxed().collect(Collectors.toSet()); - assertEquals("Expected records to be produced by each task", expectedKeys, parsedValues.keySet()); + assertEquals(expectedKeys, parsedValues.keySet(), "Expected records to be produced by each task"); parsedValues.forEach((taskId, seqnos) -> { // We don't check for order here because the records may have been produced to multiple topic partitions, @@ -988,11 +988,11 @@ public class ExactlyOnceSourceIntegrationTest { // Try to provide the most friendly error message possible if this test fails assertTrue( + missingSeqnos.isEmpty() && extraSeqnos.isEmpty(), "Seqnos for task " + taskId + " should start at 1 and increase strictly by 1 with each record, " + "but the actual seqnos did not.\n" + "Seqnos that should have been emitted but were not: " + missingSeqnos + "\n" + - "seqnos that should not have been emitted but were: " + extraSeqnos, - missingSeqnos.isEmpty() && extraSeqnos.isEmpty() + "seqnos that should not have been emitted but were: " + extraSeqnos ); }); } @@ -1000,8 +1000,8 @@ public class ExactlyOnceSourceIntegrationTest { private Map> parseValuesForTasks(ConsumerRecords sourceRecords) { Map> result = new HashMap<>(); for (ConsumerRecord sourceRecord : sourceRecords) { - assertNotNull("Record key should not be null", sourceRecord.key()); - assertNotNull("Record value should not be null", sourceRecord.value()); + assertNotNull(sourceRecord.key(), "Record key should not be null"); + assertNotNull(sourceRecord.value(), "Record value should not be null"); String key = new String(sourceRecord.key()); String value = new String(sourceRecord.value()); @@ -1009,17 +1009,17 @@ public class ExactlyOnceSourceIntegrationTest { String keyPrefix = "key-"; String valuePrefix = "value-"; - assertTrue("Key should start with \"" + keyPrefix + "\"", key.startsWith(keyPrefix)); - assertTrue("Value should start with \"" + valuePrefix + "\"", value.startsWith(valuePrefix)); + assertTrue(key.startsWith(keyPrefix), "Key should start with \"" + keyPrefix + "\""); + assertTrue(value.startsWith(valuePrefix), "Value should start with \"" + valuePrefix + "\""); assertEquals( - "key and value should be identical after prefix", key.substring(keyPrefix.length()), - value.substring(valuePrefix.length()) + value.substring(valuePrefix.length()), + "key and value should be identical after prefix" ); String[] split = key.substring(keyPrefix.length()).split("-"); - assertEquals("Key should match pattern 'key---", 3, split.length); - assertEquals("Key should match pattern 'key---", CONNECTOR_NAME, split[0]); + assertEquals(3, split.length, "Key should match pattern 'key---"); + assertEquals(CONNECTOR_NAME, split[0], "Key should match pattern 'key---"); int taskId; try { @@ -1051,23 +1051,23 @@ public class ExactlyOnceSourceIntegrationTest { Object keyObject = offsetsConverter.toConnectData("topic name is not used by converter", offsetRecord.key()).value(); Object valueObject = offsetsConverter.toConnectData("topic name is not used by converter", offsetRecord.value()).value(); - assertNotNull("Offset key should not be null", keyObject); - assertNotNull("Offset value should not be null", valueObject); + assertNotNull(keyObject, "Offset key should not be null"); + assertNotNull(valueObject, "Offset value should not be null"); @SuppressWarnings("unchecked") List key = assertAndCast(keyObject, List.class, "Key"); assertEquals( - "Offset topic key should be a list containing two elements: the name of the connector, and the connector-provided source partition", 2, - key.size() + key.size(), + "Offset topic key should be a list containing two elements: the name of the connector, and the connector-provided source partition" ); assertEquals(CONNECTOR_NAME, key.get(0)); @SuppressWarnings("unchecked") Map partition = assertAndCast(key.get(1), Map.class, "Key[1]"); Object taskIdObject = partition.get("task.id"); - assertNotNull("Serialized source partition should contain 'task.id' field from MonitorableSourceConnector", taskIdObject); + assertNotNull(taskIdObject, "Serialized source partition should contain 'task.id' field from MonitorableSourceConnector"); String taskId = assertAndCast(taskIdObject, String.class, "task ID"); - assertTrue("task ID should match pattern '-", taskId.startsWith(CONNECTOR_NAME + "-")); + assertTrue(taskId.startsWith(CONNECTOR_NAME + "-"), "task ID should match pattern '-"); String taskIdRemainder = taskId.substring(CONNECTOR_NAME.length() + 1); int taskNum; try { @@ -1080,7 +1080,7 @@ public class ExactlyOnceSourceIntegrationTest { Map value = assertAndCast(valueObject, Map.class, "Value"); Object seqnoObject = value.get("saved"); - assertNotNull("Serialized source offset should contain 'seqno' field from MonitorableSourceConnector", seqnoObject); + assertNotNull(seqnoObject, "Serialized source offset should contain 'seqno' field from MonitorableSourceConnector"); long seqno = assertAndCast(seqnoObject, Long.class, "Seqno offset field"); result.computeIfAbsent(taskNum, t -> new ArrayList<>()).add(seqno); @@ -1111,21 +1111,22 @@ public class ExactlyOnceSourceIntegrationTest { } private void assertConnectorStarted(StartAndStopLatch connectorStart) throws InterruptedException { - assertTrue("Connector and tasks did not finish startup in time", + assertTrue( connectorStart.await( ConnectAssertions.CONNECTOR_SETUP_DURATION_MS, TimeUnit.MILLISECONDS - ) + ), + "Connector and tasks did not finish startup in time" ); } private void assertConnectorStopped(StartAndStopLatch connectorStop) throws InterruptedException { assertTrue( - "Connector and tasks did not finish shutdown in time", connectorStop.await( ConnectAssertions.CONNECTOR_SHUTDOWN_DURATION_MS, TimeUnit.MILLISECONDS - ) + ), + "Connector and tasks did not finish shutdown in time" ); } @@ -1165,12 +1166,13 @@ public class ExactlyOnceSourceIntegrationTest { private void assertTransactionalProducerIsFenced(KafkaProducer producer, String topic) { producer.beginTransaction(); - assertThrows("Producer should be fenced out", + assertThrows( ProducerFencedException.class, () -> { producer.send(new ProducerRecord<>(topic, new byte[] {69}, new byte[] {96})); producer.commitTransaction(); - } + }, + "Producer should be fenced out" ); producer.close(Duration.ZERO); } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExampleConnectIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExampleConnectIntegrationTest.java index 23a87c2c0fe..8ff9cc7319f 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExampleConnectIntegrationTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExampleConnectIntegrationTest.java @@ -19,13 +19,10 @@ package org.apache.kafka.connect.integration; import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo; import org.apache.kafka.connect.storage.StringConverter; import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster; -import org.apache.kafka.test.IntegrationTest; -import org.junit.After; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.experimental.categories.Category; -import org.junit.rules.TestRule; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -44,8 +41,8 @@ import static org.apache.kafka.connect.runtime.TopicCreationConfig.PARTITIONS_CO import static org.apache.kafka.connect.runtime.TopicCreationConfig.REPLICATION_FACTOR_CONFIG; import static org.apache.kafka.connect.runtime.WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG; import static org.apache.kafka.test.TestUtils.waitForCondition; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; /** * An example integration test that demonstrates how to setup an integration test for Connect. @@ -53,7 +50,7 @@ import static org.junit.Assert.assertTrue; * The following test configures and executes up a sink connector pipeline in a worker, produces messages into * the source topic-partitions, and demonstrates how to check the overall behavior of the pipeline. */ -@Category(IntegrationTest.class) +@Tag("integration") public class ExampleConnectIntegrationTest { private static final Logger log = LoggerFactory.getLogger(ExampleConnectIntegrationTest.class); @@ -71,10 +68,7 @@ public class ExampleConnectIntegrationTest { private EmbeddedConnectCluster connect; private ConnectorHandle connectorHandle; - @Rule - public TestRule watcher = ConnectIntegrationTestUtils.newTestWatcher(log); - - @Before + @BeforeEach public void setup() { // setup Connect worker properties Map exampleWorkerProps = new HashMap<>(); @@ -100,7 +94,7 @@ public class ExampleConnectIntegrationTest { connectorHandle = RuntimeHandles.get().connectorHandle(CONNECTOR_NAME); } - @After + @AfterEach public void close() { // delete connector handle RuntimeHandles.get().deleteConnector(CONNECTOR_NAME); @@ -156,8 +150,9 @@ public class ExampleConnectIntegrationTest { } // consume all records from the source topic or fail, to ensure that they were correctly produced. - assertEquals("Unexpected number of records consumed", NUM_RECORDS_PRODUCED, - connect.kafka().consume(NUM_RECORDS_PRODUCED, RECORD_TRANSFER_DURATION_MS, "test-topic").count()); + assertEquals(NUM_RECORDS_PRODUCED, + connect.kafka().consume(NUM_RECORDS_PRODUCED, RECORD_TRANSFER_DURATION_MS, "test-topic").count(), + "Unexpected number of records consumed"); // wait for the connector tasks to consume all records. connectorHandle.awaitRecords(RECORD_TRANSFER_DURATION_MS); @@ -217,8 +212,8 @@ public class ExampleConnectIntegrationTest { // consume all records from the source topic or fail, to ensure that they were correctly produced int recordNum = connect.kafka().consume(NUM_RECORDS_PRODUCED, RECORD_TRANSFER_DURATION_MS, "test-topic").count(); - assertTrue("Not enough records produced by source connector. Expected at least: " + NUM_RECORDS_PRODUCED + " + but got " + recordNum, - recordNum >= NUM_RECORDS_PRODUCED); + assertTrue(recordNum >= NUM_RECORDS_PRODUCED, + "Not enough records produced by source connector. Expected at least: " + NUM_RECORDS_PRODUCED + " + but got " + recordNum); // delete connector connect.deleteConnector(CONNECTOR_NAME); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/InternalTopicsIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/InternalTopicsIntegrationTest.java index c044bb82298..fb9055dea0c 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/InternalTopicsIntegrationTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/InternalTopicsIntegrationTest.java @@ -25,20 +25,19 @@ import org.apache.kafka.common.config.TopicConfig; import org.apache.kafka.connect.runtime.distributed.DistributedConfig; import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster; import org.apache.kafka.connect.util.clusters.WorkerHandle; -import org.apache.kafka.test.IntegrationTest; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; -import org.junit.experimental.categories.Category; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static org.junit.Assert.assertFalse; +import static org.junit.jupiter.api.Assertions.assertFalse; /** * Integration test for the creation of internal topics. */ -@Category(IntegrationTest.class) +@Tag("integration") public class InternalTopicsIntegrationTest { private static final Logger log = LoggerFactory.getLogger(InternalTopicsIntegrationTest.class); @@ -47,13 +46,13 @@ public class InternalTopicsIntegrationTest { Map workerProps = new HashMap<>(); Properties brokerProps = new Properties(); - @Before + @BeforeEach public void setup() { // setup Kafka broker properties brokerProps.put("auto.create.topics.enable", String.valueOf(false)); } - @After + @AfterEach public void close() { // stop all Connect, Kafka and Zk threads. connect.stop(); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/OffsetsApiIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/OffsetsApiIntegrationTest.java index dc507b68df7..f87476335fb 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/OffsetsApiIntegrationTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/OffsetsApiIntegrationTest.java @@ -29,15 +29,13 @@ import org.apache.kafka.connect.storage.StringConverter; import org.apache.kafka.connect.util.SinkUtils; import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster; import org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster; -import org.apache.kafka.test.IntegrationTest; import org.apache.kafka.test.NoRetryException; -import org.junit.After; -import org.junit.AfterClass; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.experimental.categories.Category; -import org.junit.rules.TestName; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInfo; import javax.ws.rs.core.Response; import java.util.ArrayList; @@ -68,14 +66,14 @@ import static org.apache.kafka.connect.runtime.WorkerConfig.VALUE_CONVERTER_CLAS import static org.apache.kafka.test.TestUtils.waitForCondition; import static org.hamcrest.CoreMatchers.containsString; import static org.hamcrest.MatcherAssert.assertThat; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertThrows; -import static org.junit.Assert.assertTrue; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; /** * Integration tests for Kafka Connect's connector offset management REST APIs */ -@Category(IntegrationTest.class) +@Tag("integration") public class OffsetsApiIntegrationTest { private static final long OFFSET_COMMIT_INTERVAL_MS = TimeUnit.SECONDS.toMillis(1); private static final long OFFSET_READ_TIMEOUT_MS = TimeUnit.SECONDS.toMillis(30); @@ -83,20 +81,18 @@ public class OffsetsApiIntegrationTest { private static final int NUM_TASKS = 2; private static final int NUM_RECORDS_PER_PARTITION = 10; private static final Map, EmbeddedConnectCluster> CONNECT_CLUSTERS = new ConcurrentHashMap<>(); - @Rule - public TestName currentTest = new TestName(); private EmbeddedConnectCluster connect; private String connectorName; private String topic; - @Before - public void setup() { - connectorName = currentTest.getMethodName(); - topic = currentTest.getMethodName(); + @BeforeEach + public void setup(TestInfo testInfo) { + connectorName = testInfo.getTestMethod().get().getName(); + topic = testInfo.getTestMethod().get().getName(); connect = defaultConnectCluster(); } - @After + @AfterEach public void tearDown() { Set remainingConnectors = new HashSet<>(connect.connectors()); if (remainingConnectors.remove(connectorName)) { @@ -104,9 +100,9 @@ public class OffsetsApiIntegrationTest { } try { assertEquals( - "Some connectors were not properly cleaned up after this test", Collections.emptySet(), - remainingConnectors + remainingConnectors, + "Some connectors were not properly cleaned up after this test" ); } finally { // Make a last-ditch effort to clean up the leaked connectors @@ -115,7 +111,7 @@ public class OffsetsApiIntegrationTest { } } - @AfterClass + @AfterAll public static void close() { // stop all Connect, Kafka and Zk threads. CONNECT_CLUSTERS.values().forEach(EmbeddedConnectCluster::stop); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RebalanceSourceConnectorsIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RebalanceSourceConnectorsIntegrationTest.java index 4c803a34921..af37d8bb38a 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RebalanceSourceConnectorsIntegrationTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RebalanceSourceConnectorsIntegrationTest.java @@ -19,13 +19,11 @@ package org.apache.kafka.connect.integration; import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo; import org.apache.kafka.connect.storage.StringConverter; import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster; -import org.apache.kafka.test.IntegrationTest; -import org.junit.After; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.experimental.categories.Category; -import org.junit.rules.TestRule; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInfo; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -52,14 +50,14 @@ import static org.apache.kafka.connect.runtime.distributed.ConnectProtocolCompat import static org.apache.kafka.connect.runtime.distributed.DistributedConfig.CONNECT_PROTOCOL_CONFIG; import static org.apache.kafka.connect.runtime.distributed.DistributedConfig.SCHEDULED_REBALANCE_MAX_DELAY_MS_CONFIG; import static org.apache.kafka.test.TestUtils.waitForCondition; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotEquals; -import static org.junit.Assert.assertTrue; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; /** * Integration tests for incremental cooperative rebalancing between Connect workers */ -@Category(IntegrationTest.class) +@Tag("integration") public class RebalanceSourceConnectorsIntegrationTest { private static final Logger log = LoggerFactory.getLogger(RebalanceSourceConnectorsIntegrationTest.class); @@ -73,12 +71,11 @@ public class RebalanceSourceConnectorsIntegrationTest { private static final String TOPIC_NAME = "sequential-topic"; private EmbeddedConnectCluster connect; + - @Rule - public TestRule watcher = ConnectIntegrationTestUtils.newTestWatcher(log); - - @Before - public void setup() { + @BeforeEach + public void setup(TestInfo testInfo) { + log.info("Starting test {}", testInfo.getDisplayName()); // setup Connect worker properties Map workerProps = new HashMap<>(); workerProps.put(CONNECT_PROTOCOL_CONFIG, COMPATIBLE.toString()); @@ -102,8 +99,9 @@ public class RebalanceSourceConnectorsIntegrationTest { connect.start(); } - @After - public void close() { + @AfterEach + public void close(TestInfo testInfo) { + log.info("Finished test {}", testInfo.getDisplayName()); // stop all Connect, Kafka and Zk threads. connect.stop(); } @@ -155,8 +153,8 @@ public class RebalanceSourceConnectorsIntegrationTest { // consume all records from the source topic or fail, to ensure that they were correctly produced int recordNum = connect.kafka().consume(numRecordsProduced, recordTransferDurationMs, TOPIC_NAME).count(); - assertTrue("Not enough records produced by source connector. Expected at least: " + numRecordsProduced + " + but got " + recordNum, - recordNum >= numRecordsProduced); + assertTrue(recordNum >= numRecordsProduced, + "Not enough records produced by source connector. Expected at least: " + numRecordsProduced + " + but got " + recordNum); // expect that we're going to restart the connector and its tasks StartAndStopLatch restartLatch = connectorHandle.expectedStarts(1); @@ -166,9 +164,9 @@ public class RebalanceSourceConnectorsIntegrationTest { connect.configureConnector(CONNECTOR_NAME, props); // Wait for the connector *and tasks* to be restarted - assertTrue("Failed to alter connector configuration and see connector and tasks restart " - + "within " + CONNECTOR_SETUP_DURATION_MS + "ms", - restartLatch.await(CONNECTOR_SETUP_DURATION_MS, TimeUnit.MILLISECONDS)); + assertTrue(restartLatch.await(CONNECTOR_SETUP_DURATION_MS, TimeUnit.MILLISECONDS), + "Failed to alter connector configuration and see connector and tasks restart " + + "within " + CONNECTOR_SETUP_DURATION_MS + "ms"); // And wait for the Connect to show the connectors and tasks are running connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, NUM_TASKS, @@ -176,8 +174,8 @@ public class RebalanceSourceConnectorsIntegrationTest { // consume all records from the source topic or fail, to ensure that they were correctly produced recordNum = connect.kafka().consume(numRecordsProduced, recordTransferDurationMs, anotherTopic).count(); - assertTrue("Not enough records produced by source connector. Expected at least: " + numRecordsProduced + " + but got " + recordNum, - recordNum >= numRecordsProduced); + assertTrue(recordNum >= numRecordsProduced, + "Not enough records produced by source connector. Expected at least: " + numRecordsProduced + " + but got " + recordNum); } @Test @@ -332,16 +330,16 @@ public class RebalanceSourceConnectorsIntegrationTest { log.debug("Connector balance: {}", formatAssignment(connectors)); log.debug("Task balance: {}", formatAssignment(tasks)); - assertNotEquals("Found no connectors running!", maxConnectors, 0); - assertNotEquals("Found no tasks running!", maxTasks, 0); - assertEquals("Connector assignments are not unique: " + connectors, - connectors.values().size(), - connectors.values().stream().distinct().count()); - assertEquals("Task assignments are not unique: " + tasks, - tasks.values().size(), - tasks.values().stream().distinct().count()); - assertTrue("Connectors are imbalanced: " + formatAssignment(connectors), maxConnectors - minConnectors < 2); - assertTrue("Tasks are imbalanced: " + formatAssignment(tasks), maxTasks - minTasks < 2); + assertNotEquals(0, maxConnectors, "Found no connectors running!"); + assertNotEquals(0, maxTasks, "Found no tasks running!"); + assertEquals(connectors.values().size(), + connectors.values().stream().distinct().count(), + "Connector assignments are not unique: " + connectors); + assertEquals(tasks.values().size(), + tasks.values().stream().distinct().count(), + "Task assignments are not unique: " + tasks); + assertTrue(maxConnectors - minConnectors < 2, "Connectors are imbalanced: " + formatAssignment(connectors)); + assertTrue(maxTasks - minTasks < 2, "Tasks are imbalanced: " + formatAssignment(tasks)); return true; } catch (Exception e) { log.error("Could not check connector state info.", e); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RestExtensionIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RestExtensionIntegrationTest.java index 112dd219811..8562cd02c14 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RestExtensionIntegrationTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RestExtensionIntegrationTest.java @@ -26,10 +26,9 @@ import org.apache.kafka.connect.rest.ConnectRestExtension; import org.apache.kafka.connect.rest.ConnectRestExtensionContext; import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster; import org.apache.kafka.connect.util.clusters.WorkerHandle; -import org.apache.kafka.test.IntegrationTest; -import org.junit.After; -import org.junit.Test; -import org.junit.experimental.categories.Category; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; import javax.ws.rs.GET; import javax.ws.rs.Path; @@ -46,12 +45,12 @@ import static org.apache.kafka.connect.runtime.ConnectorConfig.TASKS_MAX_CONFIG; import static org.apache.kafka.connect.runtime.SinkConnectorConfig.TOPICS_CONFIG; import static org.apache.kafka.connect.runtime.rest.RestServerConfig.REST_EXTENSION_CLASSES_CONFIG; import static org.apache.kafka.test.TestUtils.waitForCondition; -import static org.junit.Assert.assertEquals; +import static org.junit.jupiter.api.Assertions.assertEquals; /** * A simple integration test to ensure that REST extensions are registered correctly. */ -@Category(IntegrationTest.class) +@Tag("integration") public class RestExtensionIntegrationTest { private static final long REST_EXTENSION_REGISTRATION_TIMEOUT_MS = TimeUnit.MINUTES.toMillis(1); @@ -132,7 +131,7 @@ public class RestExtensionIntegrationTest { } } - @After + @AfterEach public void close() { // stop all Connect, Kafka and Zk threads. connect.stop(); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RestForwardingIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RestForwardingIntegrationTest.java index 2e63a2518b3..2660e58b5ab 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RestForwardingIntegrationTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RestForwardingIntegrationTest.java @@ -41,18 +41,19 @@ import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo; import org.apache.kafka.connect.runtime.rest.entities.ConnectorType; import org.apache.kafka.connect.runtime.rest.util.SSLUtils; import org.apache.kafka.connect.util.Callback; -import org.apache.kafka.test.IntegrationTest; import org.apache.kafka.test.TestSslUtils; import org.apache.kafka.test.TestUtils; import org.eclipse.jetty.util.ssl.SslContextFactory; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; -import org.junit.experimental.categories.Category; -import org.junit.runner.RunWith; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.ArgumentCaptor; import org.mockito.Mock; -import org.mockito.junit.MockitoJUnitRunner; +import org.mockito.junit.jupiter.MockitoExtension; +import org.mockito.junit.jupiter.MockitoSettings; +import org.mockito.quality.Strictness; import javax.net.ssl.SSLContext; import java.io.IOException; @@ -74,9 +75,10 @@ import static org.mockito.ArgumentMatchers.isNull; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.when; -@RunWith(MockitoJUnitRunner.StrictStubs.class) +@ExtendWith(MockitoExtension.class) +@MockitoSettings(strictness = Strictness.STRICT_STUBS) @SuppressWarnings("unchecked") -@Category(IntegrationTest.class) +@Tag("integration") public class RestForwardingIntegrationTest { private Map sslConfig; @@ -93,13 +95,13 @@ public class RestForwardingIntegrationTest { private CloseableHttpClient httpClient; private Collection responses; - @Before + @BeforeEach public void setUp() throws IOException, GeneralSecurityException { sslConfig = TestSslUtils.createSslConfig(false, true, Mode.SERVER, TestUtils.tempFile(), "testCert"); responses = new ArrayList<>(); } - @After + @AfterEach public void tearDown() throws IOException { for (CloseableHttpResponse response: responses) { response.close(); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/SessionedProtocolIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/SessionedProtocolIntegrationTest.java index 8f71033b798..f34ad25b5cf 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/SessionedProtocolIntegrationTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/SessionedProtocolIntegrationTest.java @@ -19,11 +19,10 @@ package org.apache.kafka.connect.integration; import org.apache.kafka.connect.runtime.distributed.ConnectProtocolCompatibility; import org.apache.kafka.connect.storage.StringConverter; import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster; -import org.apache.kafka.test.IntegrationTest; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; -import org.junit.experimental.categories.Category; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -41,13 +40,13 @@ import static org.apache.kafka.connect.runtime.SinkConnectorConfig.TOPICS_CONFIG import static org.apache.kafka.connect.runtime.distributed.DistributedConfig.CONNECT_PROTOCOL_CONFIG; import static org.apache.kafka.connect.runtime.rest.InternalRequestSignature.SIGNATURE_ALGORITHM_HEADER; import static org.apache.kafka.connect.runtime.rest.InternalRequestSignature.SIGNATURE_HEADER; -import static org.junit.Assert.assertEquals; +import static org.junit.jupiter.api.Assertions.assertEquals; /** * A simple integration test to ensure that internal request validation becomes enabled with the * "sessioned" protocol. */ -@Category(IntegrationTest.class) +@Tag("integration") public class SessionedProtocolIntegrationTest { private static final Logger log = LoggerFactory.getLogger(SessionedProtocolIntegrationTest.class); @@ -58,7 +57,7 @@ public class SessionedProtocolIntegrationTest { private EmbeddedConnectCluster connect; private ConnectorHandle connectorHandle; - @Before + @BeforeEach public void setup() { // setup Connect worker properties Map workerProps = new HashMap<>(); @@ -79,7 +78,7 @@ public class SessionedProtocolIntegrationTest { connectorHandle = RuntimeHandles.get().connectorHandle(CONNECTOR_NAME); } - @After + @AfterEach public void close() { // stop all Connect, Kafka and Zk threads. connect.stop(); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/SinkConnectorsIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/SinkConnectorsIntegrationTest.java index f42addf396c..67c41580a4f 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/SinkConnectorsIntegrationTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/SinkConnectorsIntegrationTest.java @@ -22,11 +22,10 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.connect.sink.SinkRecord; import org.apache.kafka.connect.storage.StringConverter; import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster; -import org.apache.kafka.test.IntegrationTest; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; -import org.junit.experimental.categories.Category; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; import java.util.Arrays; import java.util.Collection; @@ -48,13 +47,13 @@ import static org.apache.kafka.connect.runtime.WorkerConfig.CONNECTOR_CLIENT_POL import static org.apache.kafka.connect.runtime.WorkerConfig.KEY_CONVERTER_CLASS_CONFIG; import static org.apache.kafka.connect.runtime.WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG; import static org.apache.kafka.test.TestUtils.waitForCondition; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; /** * Integration test for sink connectors */ -@Category(IntegrationTest.class) +@Tag("integration") public class SinkConnectorsIntegrationTest { private static final int NUM_TASKS = 1; @@ -64,7 +63,7 @@ public class SinkConnectorsIntegrationTest { private EmbeddedConnectCluster connect; - @Before + @BeforeEach public void setup() throws Exception { Map workerProps = new HashMap<>(); // permit all Kafka client overrides; required for testing different consumer partition assignment strategies @@ -85,7 +84,7 @@ public class SinkConnectorsIntegrationTest { connect.start(); } - @After + @AfterEach public void close() { // delete connector handle RuntimeHandles.get().deleteConnector(CONNECTOR_NAME); @@ -111,7 +110,7 @@ public class SinkConnectorsIntegrationTest { "5000"); final Set consumedRecordValues = new HashSet<>(); - Consumer onPut = record -> assertTrue("Task received duplicate record from Connect", consumedRecordValues.add(Objects.toString(record.value()))); + Consumer onPut = record -> assertTrue(consumedRecordValues.add(Objects.toString(record.value())), "Task received duplicate record from Connect"); ConnectorHandle connector = RuntimeHandles.get().connectorHandle(CONNECTOR_NAME); TaskHandle task = connector.taskHandle(CONNECTOR_NAME + "-0", onPut); @@ -217,7 +216,7 @@ public class SinkConnectorsIntegrationTest { "5000"); final Set consumedRecordValues = new HashSet<>(); - Consumer onPut = record -> assertTrue("Task received duplicate record from Connect", consumedRecordValues.add(Objects.toString(record.value()))); + Consumer onPut = record -> assertTrue(consumedRecordValues.add(Objects.toString(record.value())), "Task received duplicate record from Connect"); ConnectorHandle connector = RuntimeHandles.get().connectorHandle(CONNECTOR_NAME); TaskHandle task = connector.taskHandle(CONNECTOR_NAME + "-0", onPut); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/SourceConnectorsIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/SourceConnectorsIntegrationTest.java index 046e5a9ccf7..3ff324f1dde 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/SourceConnectorsIntegrationTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/SourceConnectorsIntegrationTest.java @@ -19,11 +19,10 @@ package org.apache.kafka.connect.integration; import org.apache.kafka.connect.runtime.SourceConnectorConfig; import org.apache.kafka.connect.storage.StringConverter; import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster; -import org.apache.kafka.test.IntegrationTest; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; -import org.junit.experimental.categories.Category; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; import java.util.Collections; import java.util.HashMap; @@ -51,7 +50,7 @@ import static org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster.DEFA * Integration test for source connectors with a focus on topic creation with custom properties by * the connector tasks. */ -@Category(IntegrationTest.class) +@Tag("integration") public class SourceConnectorsIntegrationTest { private static final int NUM_WORKERS = 3; @@ -72,7 +71,7 @@ public class SourceConnectorsIntegrationTest { Map workerProps = new HashMap<>(); Properties brokerProps = new Properties(); - @Before + @BeforeEach public void setup() { // setup Connect worker properties workerProps.put(CONNECTOR_CLIENT_POLICY_CLASS_CONFIG, "All"); @@ -89,7 +88,7 @@ public class SourceConnectorsIntegrationTest { .maskExitProcedures(true); // true is the default, setting here as example } - @After + @AfterEach public void close() { // stop all Connect, Kafka and Zk threads. connect.stop(); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/StandaloneWorkerIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/StandaloneWorkerIntegrationTest.java index e47fe4304d3..ccb7fa9c5b8 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/StandaloneWorkerIntegrationTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/StandaloneWorkerIntegrationTest.java @@ -21,11 +21,10 @@ import org.apache.kafka.connect.runtime.rest.entities.CreateConnectorRequest; import org.apache.kafka.connect.runtime.rest.entities.LoggerLevel; import org.apache.kafka.connect.storage.StringConverter; import org.apache.kafka.connect.util.clusters.EmbeddedConnectStandalone; -import org.apache.kafka.test.IntegrationTest; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; -import org.junit.experimental.categories.Category; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; import java.util.Collections; import java.util.HashMap; @@ -43,13 +42,13 @@ import static org.apache.kafka.connect.runtime.ConnectorConfig.VALUE_CONVERTER_C import static org.apache.kafka.connect.runtime.TopicCreationConfig.DEFAULT_TOPIC_CREATION_PREFIX; import static org.apache.kafka.connect.runtime.TopicCreationConfig.PARTITIONS_CONFIG; import static org.apache.kafka.connect.runtime.TopicCreationConfig.REPLICATION_FACTOR_CONFIG; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; -@Category(IntegrationTest.class) +@Tag("integration") public class StandaloneWorkerIntegrationTest { private static final String CONNECTOR_NAME = "test-connector"; @@ -58,14 +57,14 @@ public class StandaloneWorkerIntegrationTest { private EmbeddedConnectStandalone connect; - @Before + @BeforeEach public void setup() { connect = new EmbeddedConnectStandalone.Builder() .build(); connect.start(); } - @After + @AfterEach public void cleanup() { connect.stop(); } @@ -73,15 +72,15 @@ public class StandaloneWorkerIntegrationTest { @Test public void testDynamicLogging() { Map initialLevels = connect.allLogLevels(); - assertFalse("Connect REST API did not list any known loggers", initialLevels.isEmpty()); + assertFalse(initialLevels.isEmpty(), "Connect REST API did not list any known loggers"); Map invalidModifiedLoggers = Utils.filterMap( initialLevels, StandaloneWorkerIntegrationTest::isModified ); assertEquals( - "No loggers should have a non-null last-modified timestamp", Collections.emptyMap(), - invalidModifiedLoggers + invalidModifiedLoggers, + "No loggers should have a non-null last-modified timestamp" ); // Tests with no scope @@ -103,9 +102,9 @@ public class StandaloneWorkerIntegrationTest { connect.setLogLevel(namespace2, level2, "worker"); LoggerLevel currentLoggerLevel = connect.getLogLevel(namespace2); assertEquals( - "Log level and last-modified timestamp should not be affected by consecutive identical requests", priorLoggerLevel, - currentLoggerLevel + currentLoggerLevel, + "Log level and last-modified timestamp should not be affected by consecutive identical requests" ); // Tests with scope=cluster @@ -125,8 +124,8 @@ public class StandaloneWorkerIntegrationTest { List affectedLoggers = connect.setLogLevel(namespace, level, scope); if ("cluster".equals(scope)) { assertNull( - "Modifying log levels with scope=cluster should result in an empty response", - affectedLoggers + affectedLoggers, + "Modifying log levels with scope=cluster should result in an empty response" ); } else { assertTrue(affectedLoggers.contains(namespace)); @@ -134,10 +133,10 @@ public class StandaloneWorkerIntegrationTest { .filter(l -> !l.startsWith(namespace)) .collect(Collectors.toList()); assertEquals( - "No loggers outside the namespace '" + namespace - + "' should have been included in the response for a request to modify that namespace", Collections.emptyList(), - invalidAffectedLoggers + invalidAffectedLoggers, + "No loggers outside the namespace '" + namespace + + "' should have been included in the response for a request to modify that namespace" ); } @@ -148,9 +147,9 @@ public class StandaloneWorkerIntegrationTest { assertEquals(level, loggerLevel.level()); assertNotNull(loggerLevel.lastModified()); assertTrue( + loggerLevel.lastModified() >= requestTime, "Last-modified timestamp for logger level is " + loggerLevel.lastModified() - + ", which is before " + requestTime + ", the most-recent time the level was adjusted", - loggerLevel.lastModified() >= requestTime + + ", which is before " + requestTime + ", the most-recent time the level was adjusted" ); // Verify information for all listed loggers @@ -166,23 +165,23 @@ public class StandaloneWorkerIntegrationTest { ) ); assertEquals( + Collections.emptyMap(), + invalidAffectedLoggerLevels, "At least one logger in the affected namespace '" + namespace + "' does not have the expected level of '" + level + "', has a null last-modified timestamp, or has a last-modified timestamp " + "that is less recent than " + requestTime - + ", which is when the namespace was last adjusted", - Collections.emptyMap(), - invalidAffectedLoggerLevels + + ", which is when the namespace was last adjusted" ); Set droppedLoggers = Utils.diff(HashSet::new, initialLevels.keySet(), newLevels.keySet()); assertEquals( + Collections.emptySet(), + droppedLoggers, "At least one logger was present in the listing of all loggers " + "before the logging level for namespace '" + namespace + "' was set to '" + level - + "' that is no longer present", - Collections.emptySet(), - droppedLoggers + + "' that is no longer present" ); Map invalidUnaffectedLoggerLevels = Utils.filterMap( @@ -190,12 +189,12 @@ public class StandaloneWorkerIntegrationTest { e -> !hasNamespace(e, namespace) && !e.getValue().equals(initialLevels.get(e.getKey())) ); assertEquals( + Collections.emptyMap(), + invalidUnaffectedLoggerLevels, "At least one logger outside of the affected namespace '" + namespace + "' has a different logging level or last-modified timestamp than it did " + "before the namespace was set to level '" + level - + "'; none of these loggers should have been affected", - Collections.emptyMap(), - invalidUnaffectedLoggerLevels + + "'; none of these loggers should have been affected" ); return newLevels; diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/StartAndStopCounterTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/StartAndStopCounterTest.java index 43cb86149da..5ecf83c5222 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/StartAndStopCounterTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/StartAndStopCounterTest.java @@ -24,17 +24,16 @@ import java.util.concurrent.TimeUnit; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Time; -import org.apache.kafka.test.IntegrationTest; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; -import org.junit.experimental.categories.Category; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; -@Category(IntegrationTest.class) +@Tag("integration") public class StartAndStopCounterTest { private StartAndStopCounter counter; @@ -42,13 +41,13 @@ public class StartAndStopCounterTest { private ExecutorService waiters; private StartAndStopLatch latch; - @Before + @BeforeEach public void setup() { clock = new MockTime(); counter = new StartAndStopCounter(clock); } - @After + @AfterEach public void teardown() { if (waiters != null) { try { diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/StartAndStopLatchTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/StartAndStopLatchTest.java index ebf546dfcd0..8ac18ed812d 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/StartAndStopLatchTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/StartAndStopLatchTest.java @@ -27,16 +27,15 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Time; -import org.apache.kafka.test.IntegrationTest; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; -import org.junit.experimental.categories.Category; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; -@Category(IntegrationTest.class) +@Tag("integration") public class StartAndStopLatchTest { private final AtomicBoolean completed = new AtomicBoolean(); @@ -46,13 +45,13 @@ public class StartAndStopLatchTest { private ExecutorService waiters; private Future future; - @Before + @BeforeEach public void setup() { clock = new MockTime(); waiters = Executors.newSingleThreadExecutor(); } - @After + @AfterEach public void teardown() { if (waiters != null) { waiters.shutdownNow(); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/TransformationIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/TransformationIntegrationTest.java index 760684425df..17d2e7da57a 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/TransformationIntegrationTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/TransformationIntegrationTest.java @@ -23,11 +23,10 @@ import org.apache.kafka.connect.transforms.predicates.HasHeaderKey; import org.apache.kafka.connect.transforms.predicates.RecordIsTombstone; import org.apache.kafka.connect.transforms.predicates.TopicNameMatches; import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster; -import org.apache.kafka.test.IntegrationTest; import org.junit.After; import org.junit.Before; import org.junit.Test; -import org.junit.experimental.categories.Category; +import org.junit.jupiter.api.Tag; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -49,13 +48,13 @@ import static org.apache.kafka.connect.runtime.TopicCreationConfig.PARTITIONS_CO import static org.apache.kafka.connect.runtime.TopicCreationConfig.REPLICATION_FACTOR_CONFIG; import static org.apache.kafka.connect.runtime.WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG; import static org.apache.kafka.test.TestUtils.waitForCondition; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; /** * An integration test for connectors with transformations */ -@Category(IntegrationTest.class) +@Tag("integration") public class TransformationIntegrationTest { private static final Logger log = LoggerFactory.getLogger(TransformationIntegrationTest.class); @@ -159,10 +158,16 @@ public class TransformationIntegrationTest { } // consume all records from the source topic or fail, to ensure that they were correctly produced. - assertEquals("Unexpected number of records consumed", numFooRecords, - connect.kafka().consume(numFooRecords, RECORD_TRANSFER_DURATION_MS, fooTopic).count()); - assertEquals("Unexpected number of records consumed", numBarRecords, - connect.kafka().consume(numBarRecords, RECORD_TRANSFER_DURATION_MS, barTopic).count()); + assertEquals( + numFooRecords, + connect.kafka().consume(numFooRecords, RECORD_TRANSFER_DURATION_MS, fooTopic).count(), + "Unexpected number of records consumed" + ); + assertEquals( + numBarRecords, + connect.kafka().consume(numBarRecords, RECORD_TRANSFER_DURATION_MS, barTopic).count(), + "Unexpected number of records consumed" + ); // wait for the connector tasks to consume all records. connectorHandle.awaitRecords(RECORD_TRANSFER_DURATION_MS); @@ -239,8 +244,11 @@ public class TransformationIntegrationTest { } // consume all records from the source topic or fail, to ensure that they were correctly produced. - assertEquals("Unexpected number of records consumed", numRecords, - connect.kafka().consume(numRecords, RECORD_TRANSFER_DURATION_MS, topic).count()); + assertEquals( + numRecords, + connect.kafka().consume(numRecords, RECORD_TRANSFER_DURATION_MS, topic).count(), + "Unexpected number of records consumed" + ); // wait for the connector tasks to consume all records. connectorHandle.awaitRecords(RECORD_TRANSFER_DURATION_MS); @@ -305,8 +313,7 @@ public class TransformationIntegrationTest { // consume all records from the source topic or fail, to ensure that they were correctly produced for (ConsumerRecord record : connect.kafka().consume(1, RECORD_TRANSFER_DURATION_MS, "test-topic")) { - assertNotNull("Expected header to exist", - record.headers().lastHeader("header-8")); + assertNotNull(record.headers().lastHeader("header-8"), "Expected header to exist"); } // delete connector