KAFKA-16921 [3/N] migrate connect module to junit 5 (#16330)

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
Ken Huang 2024-06-15 01:37:59 +09:00 committed by GitHub
parent 8682334b6a
commit fc6f8b6591
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
22 changed files with 411 additions and 478 deletions

View File

@ -38,12 +38,11 @@ import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.apache.kafka.connect.source.SourceTaskContext;
import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
import org.apache.kafka.test.IntegrationTest;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.function.ThrowingRunnable;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.function.Executable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -68,16 +67,16 @@ import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_C
import static org.apache.kafka.connect.runtime.ConnectorConfig.TASKS_MAX_CONFIG;
import static org.apache.kafka.connect.runtime.SinkConnectorConfig.TOPICS_CONFIG;
import static org.apache.kafka.connect.runtime.rest.RestServer.DEFAULT_REST_REQUEST_TIMEOUT_MS;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
/**
* Tests situations during which certain connector operations, such as start, validation,
* configuration and others, take longer than expected.
*/
@Category(IntegrationTest.class)
@Tag("integration")
public class BlockingConnectorTest {
private static final Logger log = LoggerFactory.getLogger(BlockingConnectorTest.class);
@ -121,7 +120,7 @@ public class BlockingConnectorTest {
private EmbeddedConnectCluster connect;
private ConnectorHandle normalConnectorHandle;
@Before
@BeforeEach
public void setup() throws Exception {
// build a Connect cluster backed by Kafka and Zk
connect = new EmbeddedConnectCluster.Builder()
@ -136,7 +135,7 @@ public class BlockingConnectorTest {
connect.start();
}
@After
@AfterEach
public void close() {
// stop all Connect, Kafka and Zk threads.
connect.stop();
@ -354,26 +353,26 @@ public class BlockingConnectorTest {
normalConnectorHandle.awaitCommits(RECORD_TRANSFER_TIMEOUT_MS);
}
private void assertRequestTimesOut(String requestDescription, ThrowingRunnable request, String expectedTimeoutMessage) {
private void assertRequestTimesOut(String requestDescription, Executable request, String expectedTimeoutMessage) {
// Artificially reduce the REST request timeout so that these don't take 90 seconds
connect.requestTimeout(REDUCED_REST_REQUEST_TIMEOUT);
ConnectRestException exception = assertThrows(
"Should have failed to " + requestDescription,
ConnectRestException.class, request
ConnectRestException.class, request,
"Should have failed to " + requestDescription
);
assertEquals(
"Should have gotten 500 error from trying to " + requestDescription,
Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(), exception.statusCode()
Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(), exception.statusCode(),
"Should have gotten 500 error from trying to " + requestDescription
);
assertTrue(
exception.getMessage().contains("Request timed out"),
"Should have gotten timeout message from trying to " + requestDescription
+ "; instead, message was: " + exception.getMessage(),
exception.getMessage().contains("Request timed out")
+ "; instead, message was: " + exception.getMessage()
);
if (expectedTimeoutMessage != null) {
assertTrue(
"Timeout error message '" + exception.getMessage() + "' does not match expected format",
exception.getMessage().contains(expectedTimeoutMessage)
exception.getMessage().contains(expectedTimeoutMessage),
"Timeout error message '" + exception.getMessage() + "' does not match expected format"
);
}
// Reset the REST request timeout so that other requests aren't impacted
@ -510,8 +509,8 @@ public class BlockingConnectorTest {
CountDownLatch blockLatch;
synchronized (Block.class) {
assertNotNull(
"Block was reset prematurely",
awaitBlockLatch
awaitBlockLatch,
"Block was reset prematurely"
);
awaitBlockLatch.countDown();
blockLatch = newBlockLatch();

View File

@ -1,43 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.connect.integration;
import org.junit.rules.TestRule;
import org.junit.rules.TestWatcher;
import org.junit.runner.Description;
import org.slf4j.Logger;
/**
* A utility class for Connect's integration tests
*/
public class ConnectIntegrationTestUtils {
public static TestRule newTestWatcher(Logger log) {
return new TestWatcher() {
@Override
protected void starting(Description description) {
super.starting(description);
log.info("Starting test {}", description.getMethodName());
}
@Override
protected void finished(Description description) {
super.finished(description);
log.info("Finished test {}", description.getMethodName());
}
};
}
}

View File

@ -41,21 +41,20 @@ import org.apache.kafka.connect.util.ConnectorTaskId;
import org.apache.kafka.connect.util.SinkUtils;
import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
import org.apache.kafka.connect.util.clusters.WorkerHandle;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TemporaryFolder;
import org.junit.rules.TestRule;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.io.TempDir;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.event.Level;
import java.io.File;
import java.io.FileOutputStream;
import java.nio.file.Path;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
@ -98,16 +97,16 @@ import static org.apache.kafka.connect.util.clusters.ConnectAssertions.CONNECTOR
import static org.apache.kafka.test.TestUtils.waitForCondition;
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
/**
* Test simple operations on the workers of a Connect cluster.
*/
@Category(IntegrationTest.class)
@Tag("integration")
public class ConnectWorkerIntegrationTest {
private static final Logger log = LoggerFactory.getLogger(ConnectWorkerIntegrationTest.class);
@ -124,14 +123,9 @@ public class ConnectWorkerIntegrationTest {
private Map<String, String> workerProps;
private Properties brokerProps;
@Rule
public TestRule watcher = ConnectIntegrationTestUtils.newTestWatcher(log);
@Rule
public TemporaryFolder tmp = new TemporaryFolder();
@Before
public void setup() {
@BeforeEach
public void setup(TestInfo testInfo) {
log.info("Starting test {}", testInfo.getDisplayName());
// setup Connect worker properties
workerProps = new HashMap<>();
workerProps.put(OFFSET_COMMIT_INTERVAL_MS_CONFIG, String.valueOf(OFFSET_COMMIT_INTERVAL_MS));
@ -150,8 +144,9 @@ public class ConnectWorkerIntegrationTest {
.maskExitProcedures(true); // true is the default, setting here as example
}
@After
public void close() {
@AfterEach
public void close(TestInfo testInfo) {
log.info("Finished test {}", testInfo.getDisplayName());
// stop all Connect, Kafka and Zk threads.
connect.stop();
}
@ -268,9 +263,9 @@ public class ConnectWorkerIntegrationTest {
Thread.sleep(TimeUnit.SECONDS.toMillis(10));
// Wait for the connector to be stopped
assertTrue("Failed to stop connector and tasks after coordinator failure within "
+ CONNECTOR_SETUP_DURATION_MS + "ms",
stopLatch.await(CONNECTOR_SETUP_DURATION_MS, TimeUnit.MILLISECONDS));
assertTrue(stopLatch.await(CONNECTOR_SETUP_DURATION_MS, TimeUnit.MILLISECONDS),
"Failed to stop connector and tasks after coordinator failure within "
+ CONNECTOR_SETUP_DURATION_MS + "ms");
StartAndStopLatch startLatch = connectorHandle.expectedStarts(1, false);
connect.kafka().startOnlyKafkaOnSamePorts();
@ -288,9 +283,9 @@ public class ConnectWorkerIntegrationTest {
"Connector tasks did not start in time.");
// Expect that the connector has started again
assertTrue("Failed to stop connector and tasks after coordinator failure within "
+ CONNECTOR_SETUP_DURATION_MS + "ms",
startLatch.await(CONNECTOR_SETUP_DURATION_MS, TimeUnit.MILLISECONDS));
assertTrue(startLatch.await(CONNECTOR_SETUP_DURATION_MS, TimeUnit.MILLISECONDS),
"Failed to stop connector and tasks after coordinator failure within "
+ CONNECTOR_SETUP_DURATION_MS + "ms");
}
/**
@ -358,7 +353,7 @@ public class ConnectWorkerIntegrationTest {
StartAndStopLatch stopCounter = connector.expectedStops(1);
connect.deleteConnector(CONNECTOR_NAME);
assertTrue("Connector and all tasks were not stopped in time", stopCounter.await(1, TimeUnit.MINUTES));
assertTrue(stopCounter.await(1, TimeUnit.MINUTES), "Connector and all tasks were not stopped in time");
}
/**
@ -927,8 +922,8 @@ public class ConnectWorkerIntegrationTest {
assertEquals(INTERNAL_SERVER_ERROR.getStatusCode(), restException.statusCode());
assertNotNull(restException.getMessage());
assertTrue(
"Message '" + restException.getMessage() + "' does not match expected format",
restException.getMessage().contains("Request timed out. The worker is currently " + expectedStageDescription)
restException.getMessage().contains("Request timed out. The worker is currently " + expectedStageDescription),
"Message '" + restException.getMessage() + "' does not match expected format"
);
return true;
@ -1166,8 +1161,8 @@ public class ConnectWorkerIntegrationTest {
final TopicPartition connectorTopicPartition = new TopicPartition(connectorTopic, 0);
final long initialEndOffset = connect.kafka().endOffset(connectorTopicPartition);
assertTrue(
"Source connector should have published at least one record to Kafka",
initialEndOffset > 0
initialEndOffset > 0,
"Source connector should have published at least one record to Kafka"
);
connectorHandle.expectedCommits(NUM_TASKS * 2);
@ -1187,9 +1182,9 @@ public class ConnectWorkerIntegrationTest {
// See if any new records got written to the old topic
final long nextEndOffset = connect.kafka().endOffset(connectorTopicPartition);
assertEquals(
"No new records should have been written to the older topic",
initialEndOffset,
nextEndOffset
nextEndOffset,
"No new records should have been written to the older topic"
);
}
@ -1203,7 +1198,7 @@ public class ConnectWorkerIntegrationTest {
* an invalid config provider reference, it will still be possible to reconfigure the connector.
*/
@Test
public void testReconfigureConnectorWithFailingTaskConfigs() throws Exception {
public void testReconfigureConnectorWithFailingTaskConfigs(@TempDir Path tmp) throws Exception {
final int offsetCommitIntervalMs = 100;
workerProps.put(CONFIG_PROVIDERS_CONFIG, "file");
workerProps.put(CONFIG_PROVIDERS_CONFIG + ".file.class", FileConfigProvider.class.getName());
@ -1219,7 +1214,7 @@ public class ConnectWorkerIntegrationTest {
final String firstConnectorTopic = "connector-topic-1";
connect.kafka().createTopic(firstConnectorTopic);
final File secretsFile = tmp.newFile("test-secrets");
final File secretsFile = tmp.resolve("test-secrets").toFile();
final Properties secrets = new Properties();
final String throughputSecretKey = "secret-throughput";
secrets.put(throughputSecretKey, "10");
@ -1244,7 +1239,7 @@ public class ConnectWorkerIntegrationTest {
connectorHandle.awaitCommits(offsetCommitIntervalMs * 3);
// Delete the secrets file, which should render the old task configs invalid
assertTrue("Failed to delete secrets file", secretsFile.delete());
assertTrue(secretsFile.delete(), "Failed to delete secrets file");
// Use a start latch here instead of assertConnectorAndExactlyNumTasksAreRunning
// since failure to reconfigure the tasks (which may occur if the bug this test was written
@ -1260,8 +1255,8 @@ public class ConnectWorkerIntegrationTest {
connectorConfig.put(TOPIC_CONFIG, secondConnectorTopic);
connect.configureConnector(CONNECTOR_NAME, connectorConfig);
assertTrue(
"Connector tasks were not restarted in time",
restarts.await(10, TimeUnit.SECONDS)
restarts.await(10, TimeUnit.SECONDS),
"Connector tasks were not restarted in time"
);
// Wait for at least one task to commit offsets after being restarted
@ -1270,9 +1265,9 @@ public class ConnectWorkerIntegrationTest {
final long endOffset = connect.kafka().endOffset(new TopicPartition(secondConnectorTopic, 0));
assertTrue(
endOffset > 0,
"Source connector should have published at least one record to new Kafka topic "
+ "after being reconfigured",
endOffset > 0
+ "after being reconfigured"
);
}
@ -1307,9 +1302,9 @@ public class ConnectWorkerIntegrationTest {
"Connector did not start or task did not fail in time"
);
assertEquals(
"Connector should not have any committed offsets when only task fails on first record",
new ConnectorOffsets(Collections.emptyList()),
connect.connectorOffsets(CONNECTOR_NAME)
connect.connectorOffsets(CONNECTOR_NAME),
"Connector should not have any committed offsets when only task fails on first record"
);
// Reconfigure the connector to use the string converter, which should not cause any more task failures

View File

@ -24,10 +24,9 @@ import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException;
import org.apache.kafka.connect.storage.StringConverter;
import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
import org.apache.kafka.test.IntegrationTest;
import org.junit.After;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import java.util.HashMap;
import java.util.Map;
@ -39,17 +38,17 @@ import static org.apache.kafka.connect.runtime.ConnectorConfig.TASKS_MAX_CONFIG;
import static org.apache.kafka.connect.runtime.ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG;
import static org.apache.kafka.connect.runtime.SinkConnectorConfig.TOPICS_CONFIG;
import static org.apache.kafka.connect.runtime.WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.fail;
@Category(IntegrationTest.class)
@Tag("integration")
public class ConnectorClientPolicyIntegrationTest {
private static final int NUM_TASKS = 1;
private static final int NUM_WORKERS = 1;
private static final String CONNECTOR_NAME = "simple-conn";
@After
@AfterEach
public void close() {
}
@ -131,7 +130,7 @@ public class ConnectorClientPolicyIntegrationTest {
connect.configureConnector(CONNECTOR_NAME, props);
fail("Shouldn't be able to create connector");
} catch (ConnectRestException e) {
assertEquals(e.statusCode(), 400);
assertEquals(400, e.statusCode());
} finally {
connect.stop();
}

View File

@ -20,15 +20,12 @@ import org.apache.kafka.connect.runtime.AbstractStatus;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
import org.apache.kafka.connect.storage.StringConverter;
import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
import org.apache.kafka.test.IntegrationTest;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import org.junit.rules.TestRule;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -54,14 +51,14 @@ import static org.apache.kafka.connect.runtime.TopicCreationConfig.REPLICATION_F
import static org.apache.kafka.connect.runtime.WorkerConfig.CONNECTOR_CLIENT_POLICY_CLASS_CONFIG;
import static org.apache.kafka.connect.runtime.WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG;
import static org.apache.kafka.connect.util.clusters.ConnectAssertions.CONNECTOR_SETUP_DURATION_MS;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
/**
* Test connectors restart API use cases.
*/
@Category(IntegrationTest.class)
@Tag("integration")
public class ConnectorRestartApiIntegrationTest {
private static final Logger log = LoggerFactory.getLogger(ConnectorRestartApiIntegrationTest.class);
@ -78,14 +75,11 @@ public class ConnectorRestartApiIntegrationTest {
private EmbeddedConnectCluster connect;
private ConnectorHandle connectorHandle;
private String connectorName;
@Rule
public TestRule watcher = ConnectIntegrationTestUtils.newTestWatcher(log);
@Rule
public TestName testName = new TestName();
@Before
public void setup() {
connectorName = CONNECTOR_NAME_PREFIX + testName.getMethodName();
@BeforeEach
public void setup(TestInfo testInfo) {
log.info("Starting test {}", testInfo.getDisplayName());
connectorName = CONNECTOR_NAME_PREFIX + testInfo.getTestMethod().get().getName();
// get connector handles before starting test.
connectorHandle = RuntimeHandles.get().connectorHandle(connectorName);
}
@ -115,12 +109,13 @@ public class ConnectorRestartApiIntegrationTest {
});
}
@After
public void tearDown() {
@AfterEach
public void tearDown(TestInfo testInfo) {
log.info("Finished test {}", testInfo.getDisplayName());
RuntimeHandles.get().deleteConnector(connectorName);
}
@AfterClass
@AfterAll
public static void close() {
// stop all Connect, Kafka and Zk threads.
CONNECT_CLUSTERS.values().forEach(EmbeddedConnectCluster::stop);
@ -268,16 +263,16 @@ public class ConnectorRestartApiIntegrationTest {
}
// Wait for the connector to be stopped
assertTrue("Failed to stop connector and tasks within "
+ CONNECTOR_SETUP_DURATION_MS + "ms",
stopLatch.await(CONNECTOR_SETUP_DURATION_MS, TimeUnit.MILLISECONDS));
assertTrue(stopLatch.await(CONNECTOR_SETUP_DURATION_MS, TimeUnit.MILLISECONDS),
"Failed to stop connector and tasks within "
+ CONNECTOR_SETUP_DURATION_MS + "ms");
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(connectorName, NUM_TASKS,
"Connector tasks are not all in running state.");
// Expect that the connector has started again
assertTrue("Failed to start connector and tasks within "
+ CONNECTOR_SETUP_DURATION_MS + "ms",
startLatch.await(CONNECTOR_SETUP_DURATION_MS, TimeUnit.MILLISECONDS));
assertTrue(startLatch.await(CONNECTOR_SETUP_DURATION_MS, TimeUnit.MILLISECONDS),
"Failed to start connector and tasks within "
+ CONNECTOR_SETUP_DURATION_MS + "ms");
StartsAndStops afterSnapshot = connectorHandle.startAndStopCounter().countsSnapshot();
assertEquals(beforeSnapshot.starts() + expectedConnectorRestarts, afterSnapshot.starts());
@ -321,9 +316,9 @@ public class ConnectorRestartApiIntegrationTest {
connect.assertions().assertConnectorIsFailedAndTasksHaveFailed(connectorName, 0,
"Connector tasks are not all in running state.");
// Expect that the connector has started again
assertTrue("Failed to start connector and tasks after coordinator failure within "
+ CONNECTOR_SETUP_DURATION_MS + "ms",
startLatch.await(CONNECTOR_SETUP_DURATION_MS, TimeUnit.MILLISECONDS));
assertTrue(startLatch.await(CONNECTOR_SETUP_DURATION_MS, TimeUnit.MILLISECONDS),
"Failed to start connector and tasks after coordinator failure within "
+ CONNECTOR_SETUP_DURATION_MS + "ms");
StartsAndStops afterSnapshot = connectorHandle.startAndStopCounter().countsSnapshot();
assertEquals(beforeSnapshot.starts() + expectedConnectorRestarts, afterSnapshot.starts());
@ -356,16 +351,16 @@ public class ConnectorRestartApiIntegrationTest {
}
// Wait for the connector to be stopped
assertTrue("Failed to stop connector and tasks within "
+ CONNECTOR_SETUP_DURATION_MS + "ms",
stopLatch.await(CONNECTOR_SETUP_DURATION_MS, TimeUnit.MILLISECONDS));
assertTrue(stopLatch.await(CONNECTOR_SETUP_DURATION_MS, TimeUnit.MILLISECONDS),
"Failed to stop connector and tasks within "
+ CONNECTOR_SETUP_DURATION_MS + "ms");
connect.assertions().assertConnectorIsRunningAndNumTasksHaveFailed(connectorName, NUM_TASKS, tasksToFail.size(),
"Connector tasks are not all in running state.");
// Expect that the connector has started again
assertTrue("Failed to start connector and tasks within "
+ CONNECTOR_SETUP_DURATION_MS + "ms",
startLatch.await(CONNECTOR_SETUP_DURATION_MS, TimeUnit.MILLISECONDS));
assertTrue(startLatch.await(CONNECTOR_SETUP_DURATION_MS, TimeUnit.MILLISECONDS),
"Failed to start connector and tasks within "
+ CONNECTOR_SETUP_DURATION_MS + "ms");
StartsAndStops afterSnapshot = connectorHandle.startAndStopCounter().countsSnapshot();

View File

@ -26,11 +26,10 @@ import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException;
import org.apache.kafka.connect.storage.KafkaStatusBackingStore;
import org.apache.kafka.connect.storage.StringConverter;
import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
import org.apache.kafka.test.IntegrationTest;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
@ -58,14 +57,14 @@ import static org.apache.kafka.connect.runtime.WorkerConfig.CONNECTOR_CLIENT_POL
import static org.apache.kafka.connect.runtime.WorkerConfig.TOPIC_TRACKING_ALLOW_RESET_CONFIG;
import static org.apache.kafka.connect.runtime.WorkerConfig.TOPIC_TRACKING_ENABLE_CONFIG;
import static org.apache.kafka.connect.sink.SinkConnector.TOPICS_CONFIG;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
/**
* Integration test for the endpoints that offer topic tracking of a connector's active
* topics.
*/
@Category(IntegrationTest.class)
@Tag("integration")
public class ConnectorTopicsIntegrationTest {
private static final int NUM_WORKERS = 5;
@ -82,7 +81,7 @@ public class ConnectorTopicsIntegrationTest {
Map<String, String> workerProps = new HashMap<>();
Properties brokerProps = new Properties();
@Before
@BeforeEach
public void setup() {
// setup Connect worker properties
workerProps.put(CONNECTOR_CLIENT_POLICY_CLASS_CONFIG, "All");
@ -99,7 +98,7 @@ public class ConnectorTopicsIntegrationTest {
.maskExitProcedures(true); // true is the default, setting here as example
}
@After
@AfterEach
public void close() {
// stop all Connect, Kafka and Zk threads.
connect.stop();

View File

@ -27,11 +27,10 @@ import org.apache.kafka.connect.storage.StringConverter;
import org.apache.kafka.connect.transforms.Filter;
import org.apache.kafka.connect.transforms.predicates.RecordIsTombstone;
import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
import org.apache.kafka.test.IntegrationTest;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.util.HashMap;
@ -56,7 +55,7 @@ import static org.apache.kafka.connect.runtime.SourceConnectorConfig.TOPIC_CREAT
/**
* Integration test for preflight connector config validation
*/
@Category(IntegrationTest.class)
@Tag("integration")
public class ConnectorValidationIntegrationTest {
private static final String WORKER_GROUP_ID = "connect-worker-group-id";
@ -64,7 +63,7 @@ public class ConnectorValidationIntegrationTest {
// Use a single embedded cluster for all test cases in order to cut down on runtime
private static EmbeddedConnectCluster connect;
@BeforeClass
@BeforeAll
public static void setup() {
Map<String, String> workerProps = new HashMap<>();
workerProps.put(GROUP_ID_CONFIG, WORKER_GROUP_ID);
@ -79,7 +78,7 @@ public class ConnectorValidationIntegrationTest {
connect.start();
}
@AfterClass
@AfterAll
public static void close() {
if (connect != null) {
// stop all Connect, Kafka and Zk threads.

View File

@ -28,13 +28,11 @@ import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
import org.apache.kafka.connect.storage.StringConverter;
import org.apache.kafka.connect.transforms.Transformation;
import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
import org.apache.kafka.test.IntegrationTest;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.Timeout;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -62,18 +60,18 @@ import static org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter.ER
import static org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter.ERROR_HEADER_EXCEPTION_MESSAGE;
import static org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter.ERROR_HEADER_ORIG_TOPIC;
import static org.apache.kafka.test.TestUtils.waitForCondition;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
/**
* Integration test for the different error handling policies in Connect (namely, retry policies, skipping bad records,
* and dead letter queues).
*/
@Category(IntegrationTest.class)
@Tag("integration")
@Timeout(value = 600)
public class ErrorHandlingIntegrationTest {
@Rule
public Timeout globalTimeout = Timeout.seconds(600);
private static final Logger log = LoggerFactory.getLogger(ErrorHandlingIntegrationTest.class);
private static final int NUM_WORKERS = 1;
private static final String DLQ_TOPIC = "my-connector-errors";
@ -87,7 +85,7 @@ public class ErrorHandlingIntegrationTest {
private EmbeddedConnectCluster connect;
private ConnectorHandle connectorHandle;
@Before
@BeforeEach
public void setup() throws InterruptedException {
// setup Connect cluster with defaults
connect = new EmbeddedConnectCluster.Builder().build();
@ -99,7 +97,7 @@ public class ErrorHandlingIntegrationTest {
connectorHandle = RuntimeHandles.get().connectorHandle(CONNECTOR_NAME);
}
@After
@AfterEach
public void close() {
RuntimeHandles.get().deleteConnector(CONNECTOR_NAME);
connect.stop();
@ -158,8 +156,8 @@ public class ErrorHandlingIntegrationTest {
String k = new String(rec.key());
String v = new String(rec.value());
log.debug("Consumed record (key='{}', value='{}') from topic {}", k, v, rec.topic());
assertEquals("Unexpected key", k, "key-" + i);
assertEquals("Unexpected value", v, "value-" + i);
assertEquals(k, "key-" + i, "Unexpected key");
assertEquals(v, "value-" + i, "Unexpected value");
i++;
}
@ -237,8 +235,8 @@ public class ErrorHandlingIntegrationTest {
String k = new String(rec.key());
String v = new String(rec.value());
log.debug("Consumed record (key='{}', value='{}') from topic {}", k, v, rec.topic());
assertEquals("Unexpected key", k, "key-" + i);
assertEquals("Unexpected value", v, "value-" + i);
assertEquals(k, "key-" + i, "Unexpected key");
assertEquals(v, "value-" + i, "Unexpected value");
i++;
}

View File

@ -50,11 +50,10 @@ import org.apache.kafka.connect.storage.StringConverter;
import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
import org.apache.kafka.connect.util.clusters.ConnectAssertions;
import org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster;
import org.apache.kafka.test.IntegrationTest;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -100,14 +99,14 @@ import static org.apache.kafka.connect.source.SourceTask.TransactionBoundary.CON
import static org.apache.kafka.connect.source.SourceTask.TransactionBoundary.INTERVAL;
import static org.apache.kafka.connect.source.SourceTask.TransactionBoundary.POLL;
import static org.apache.kafka.test.TestUtils.waitForCondition;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
@Category(IntegrationTest.class)
@Tag("integration")
public class ExactlyOnceSourceIntegrationTest {
private static final Logger log = LoggerFactory.getLogger(ExactlyOnceSourceIntegrationTest.class);
@ -130,7 +129,7 @@ public class ExactlyOnceSourceIntegrationTest {
private EmbeddedConnectCluster connect;
private ConnectorHandle connectorHandle;
@Before
@BeforeEach
public void setup() {
workerProps = new HashMap<>();
workerProps.put(EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG, "enabled");
@ -156,7 +155,7 @@ public class ExactlyOnceSourceIntegrationTest {
connect.start();
}
@After
@AfterEach
public void close() {
try {
// stop all Connect, Kafka and Zk threads.
@ -189,31 +188,32 @@ public class ExactlyOnceSourceIntegrationTest {
// Connector will return null from SourceConnector::exactlyOnceSupport
props.put(CUSTOM_EXACTLY_ONCE_SUPPORT_CONFIG, MonitorableSourceConnector.EXACTLY_ONCE_NULL);
ConfigInfos validation = connect.validateConnectorConfig(MonitorableSourceConnector.class.getSimpleName(), props);
assertEquals("Preflight validation should have exactly one error", 1, validation.errorCount());
assertEquals(1, validation.errorCount(),
"Preflight validation should have exactly one error");
ConfigInfo propertyValidation = findConfigInfo(EXACTLY_ONCE_SUPPORT_CONFIG, validation);
assertFalse("Preflight validation for exactly-once support property should have at least one error message",
propertyValidation.configValue().errors().isEmpty());
assertFalse(propertyValidation.configValue().errors().isEmpty(),
"Preflight validation for exactly-once support property should have at least one error message");
// Connector will return UNSUPPORTED from SourceConnector::exactlyOnceSupport
props.put(CUSTOM_EXACTLY_ONCE_SUPPORT_CONFIG, MonitorableSourceConnector.EXACTLY_ONCE_UNSUPPORTED);
validation = connect.validateConnectorConfig(MonitorableSourceConnector.class.getSimpleName(), props);
assertEquals("Preflight validation should have exactly one error", 1, validation.errorCount());
assertEquals(1, validation.errorCount(), "Preflight validation should have exactly one error");
propertyValidation = findConfigInfo(EXACTLY_ONCE_SUPPORT_CONFIG, validation);
assertFalse("Preflight validation for exactly-once support property should have at least one error message",
propertyValidation.configValue().errors().isEmpty());
assertFalse(propertyValidation.configValue().errors().isEmpty(),
"Preflight validation for exactly-once support property should have at least one error message");
// Connector will throw an exception from SourceConnector::exactlyOnceSupport
props.put(CUSTOM_EXACTLY_ONCE_SUPPORT_CONFIG, MonitorableSourceConnector.EXACTLY_ONCE_FAIL);
validation = connect.validateConnectorConfig(MonitorableSourceConnector.class.getSimpleName(), props);
assertEquals("Preflight validation should have exactly one error", 1, validation.errorCount());
assertEquals(1, validation.errorCount(), "Preflight validation should have exactly one error");
propertyValidation = findConfigInfo(EXACTLY_ONCE_SUPPORT_CONFIG, validation);
assertFalse("Preflight validation for exactly-once support property should have at least one error message",
propertyValidation.configValue().errors().isEmpty());
assertFalse(propertyValidation.configValue().errors().isEmpty(),
"Preflight validation for exactly-once support property should have at least one error message");
// Connector will return SUPPORTED from SourceConnector::exactlyOnceSupport
props.put(CUSTOM_EXACTLY_ONCE_SUPPORT_CONFIG, MonitorableSourceConnector.EXACTLY_ONCE_SUPPORTED);
validation = connect.validateConnectorConfig(MonitorableSourceConnector.class.getSimpleName(), props);
assertEquals("Preflight validation should have zero errors", 0, validation.errorCount());
assertEquals(0, validation.errorCount(), "Preflight validation should have zero errors");
// Test out the transaction boundary definition property
props.put(TRANSACTION_BOUNDARY_CONFIG, CONNECTOR.toString());
@ -221,31 +221,31 @@ public class ExactlyOnceSourceIntegrationTest {
// Connector will return null from SourceConnector::canDefineTransactionBoundaries
props.put(CUSTOM_TRANSACTION_BOUNDARIES_CONFIG, MonitorableSourceConnector.TRANSACTION_BOUNDARIES_NULL);
validation = connect.validateConnectorConfig(MonitorableSourceConnector.class.getSimpleName(), props);
assertEquals("Preflight validation should have exactly one error", 1, validation.errorCount());
assertEquals(1, validation.errorCount(), "Preflight validation should have exactly one error");
propertyValidation = findConfigInfo(TRANSACTION_BOUNDARY_CONFIG, validation);
assertFalse("Preflight validation for transaction boundary property should have at least one error message",
propertyValidation.configValue().errors().isEmpty());
assertFalse(propertyValidation.configValue().errors().isEmpty(),
"Preflight validation for transaction boundary property should have at least one error message");
// Connector will return UNSUPPORTED from SourceConnector::canDefineTransactionBoundaries
props.put(CUSTOM_TRANSACTION_BOUNDARIES_CONFIG, MonitorableSourceConnector.TRANSACTION_BOUNDARIES_UNSUPPORTED);
validation = connect.validateConnectorConfig(MonitorableSourceConnector.class.getSimpleName(), props);
assertEquals("Preflight validation should have exactly one error", 1, validation.errorCount());
assertEquals(1, validation.errorCount(), "Preflight validation should have exactly one error");
propertyValidation = findConfigInfo(TRANSACTION_BOUNDARY_CONFIG, validation);
assertFalse("Preflight validation for transaction boundary property should have at least one error message",
propertyValidation.configValue().errors().isEmpty());
assertFalse(propertyValidation.configValue().errors().isEmpty(),
"Preflight validation for transaction boundary property should have at least one error message");
// Connector will throw an exception from SourceConnector::canDefineTransactionBoundaries
props.put(CUSTOM_TRANSACTION_BOUNDARIES_CONFIG, MonitorableSourceConnector.TRANSACTION_BOUNDARIES_FAIL);
validation = connect.validateConnectorConfig(MonitorableSourceConnector.class.getSimpleName(), props);
assertEquals("Preflight validation should have exactly one error", 1, validation.errorCount());
assertEquals(1, validation.errorCount(), "Preflight validation should have exactly one error");
propertyValidation = findConfigInfo(TRANSACTION_BOUNDARY_CONFIG, validation);
assertFalse("Preflight validation for transaction boundary property should have at least one error message",
propertyValidation.configValue().errors().isEmpty());
assertFalse(propertyValidation.configValue().errors().isEmpty(),
"Preflight validation for transaction boundary property should have at least one error message");
// Connector will return SUPPORTED from SourceConnector::canDefineTransactionBoundaries
props.put(CUSTOM_TRANSACTION_BOUNDARIES_CONFIG, MonitorableSourceConnector.TRANSACTION_BOUNDARIES_SUPPORTED);
validation = connect.validateConnectorConfig(MonitorableSourceConnector.class.getSimpleName(), props);
assertEquals("Preflight validation should have zero errors", 0, validation.errorCount());
assertEquals(0, validation.errorCount(), "Preflight validation should have zero errors");
}
/**
@ -302,8 +302,8 @@ public class ExactlyOnceSourceIntegrationTest {
null,
topic
);
assertTrue("Not enough records produced by source connector. Expected at least: " + MINIMUM_MESSAGES + " + but got " + records.count(),
records.count() >= MINIMUM_MESSAGES);
assertTrue(records.count() >= MINIMUM_MESSAGES,
"Not enough records produced by source connector. Expected at least: " + MINIMUM_MESSAGES + " + but got " + records.count());
assertExactlyOnceSeqnos(records, numTasks);
}
@ -362,8 +362,8 @@ public class ExactlyOnceSourceIntegrationTest {
null,
topic
);
assertTrue("Not enough records produced by source connector. Expected at least: " + MINIMUM_MESSAGES + " + but got " + records.count(),
records.count() >= MINIMUM_MESSAGES);
assertTrue(records.count() >= MINIMUM_MESSAGES,
"Not enough records produced by source connector. Expected at least: " + MINIMUM_MESSAGES + " + but got " + records.count());
assertExactlyOnceSeqnos(records, numTasks);
}
@ -423,8 +423,8 @@ public class ExactlyOnceSourceIntegrationTest {
null,
topic
);
assertTrue("Not enough records produced by source connector. Expected at least: " + MINIMUM_MESSAGES + " + but got " + sourceRecords.count(),
sourceRecords.count() >= MINIMUM_MESSAGES);
assertTrue(sourceRecords.count() >= MINIMUM_MESSAGES,
"Not enough records produced by source connector. Expected at least: " + MINIMUM_MESSAGES + " + but got " + sourceRecords.count());
// also consume from the cluster's offsets topic to verify that the expected offsets (which should correspond to the connector's
// custom transaction boundaries) were committed
@ -446,8 +446,8 @@ public class ExactlyOnceSourceIntegrationTest {
List<Long> actualOffsetSeqnos = parseAndAssertOffsetsForSingleTask(offsetRecords);
assertEquals("Committed offsets should match connector-defined transaction boundaries",
expectedOffsetSeqnos, actualOffsetSeqnos.subList(0, expectedOffsetSeqnos.size()));
assertEquals(expectedOffsetSeqnos, actualOffsetSeqnos.subList(0, expectedOffsetSeqnos.size()),
"Committed offsets should match connector-defined transaction boundaries");
List<Long> expectedRecordSeqnos = LongStream.range(1, MINIMUM_MESSAGES + 1).boxed().collect(Collectors.toList());
long priorBoundary = 1;
@ -464,8 +464,8 @@ public class ExactlyOnceSourceIntegrationTest {
List<Long> actualRecordSeqnos = parseAndAssertValuesForSingleTask(sourceRecords);
// Have to sort the records by seqno since we produce to multiple partitions and in-order consumption isn't guaranteed
Collections.sort(actualRecordSeqnos);
assertEquals("Committed records should exclude connector-aborted transactions",
expectedRecordSeqnos, actualRecordSeqnos.subList(0, expectedRecordSeqnos.size()));
assertEquals(expectedRecordSeqnos, actualRecordSeqnos.subList(0, expectedRecordSeqnos.size()),
"Committed records should exclude connector-aborted transactions");
}
/**
@ -534,8 +534,8 @@ public class ExactlyOnceSourceIntegrationTest {
null,
topic
);
assertTrue("Not enough records produced by source connector. Expected at least: " + MINIMUM_MESSAGES + " + but got " + records.count(),
records.count() >= MINIMUM_MESSAGES);
assertTrue(records.count() >= MINIMUM_MESSAGES,
"Not enough records produced by source connector. Expected at least: " + MINIMUM_MESSAGES + " + but got " + records.count());
assertExactlyOnceSeqnos(records, numTasks);
}
@ -597,8 +597,8 @@ public class ExactlyOnceSourceIntegrationTest {
null,
topic
);
assertTrue("Not enough records produced by source connector. Expected at least: " + MINIMUM_MESSAGES + " + but got " + records.count(),
records.count() >= MINIMUM_MESSAGES);
assertTrue(records.count() >= MINIMUM_MESSAGES,
"Not enough records produced by source connector. Expected at least: " + MINIMUM_MESSAGES + " + but got " + records.count());
// We used at most five tasks during the tests; each of them should have been able to produce records
assertExactlyOnceSeqnos(records, 5);
}
@ -809,8 +809,8 @@ public class ExactlyOnceSourceIntegrationTest {
Collections.singletonMap(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed"),
"test-topic")
.count();
assertTrue("Not enough records produced by source connector. Expected at least: " + MINIMUM_MESSAGES + " + but got " + recordNum,
recordNum >= MINIMUM_MESSAGES);
assertTrue(recordNum >= MINIMUM_MESSAGES,
"Not enough records produced by source connector. Expected at least: " + MINIMUM_MESSAGES + " + but got " + recordNum);
// also consume from the connector's dedicated offsets topic
ConsumerRecords<byte[], byte[]> offsetRecords = connectorTargetedCluster
@ -822,16 +822,16 @@ public class ExactlyOnceSourceIntegrationTest {
);
List<Long> seqnos = parseAndAssertOffsetsForSingleTask(offsetRecords);
seqnos.forEach(seqno ->
assertEquals("Offset commits should occur on connector-defined poll boundaries, which happen every " + MINIMUM_MESSAGES + " records",
0, seqno % MINIMUM_MESSAGES)
assertEquals(0, seqno % MINIMUM_MESSAGES,
"Offset commits should occur on connector-defined poll boundaries, which happen every " + MINIMUM_MESSAGES + " records")
);
// also consume from the cluster's global offsets topic
offsetRecords = connect.kafka().consumeAll(TimeUnit.MINUTES.toMillis(1), globalOffsetsTopic);
seqnos = parseAndAssertOffsetsForSingleTask(offsetRecords);
seqnos.forEach(seqno ->
assertEquals("Offset commits should occur on connector-defined poll boundaries, which happen every " + MINIMUM_MESSAGES + " records",
0, seqno % MINIMUM_MESSAGES)
assertEquals(0, seqno % MINIMUM_MESSAGES,
"Offset commits should occur on connector-defined poll boundaries, which happen every " + MINIMUM_MESSAGES + " records")
);
// Shut down the whole cluster
@ -876,8 +876,8 @@ public class ExactlyOnceSourceIntegrationTest {
null,
topic
);
assertTrue("Not enough records produced by source connector. Expected at least: " + MINIMUM_MESSAGES + " + but got " + sourceRecords.count(),
sourceRecords.count() >= MINIMUM_MESSAGES);
assertTrue(sourceRecords.count() >= MINIMUM_MESSAGES,
"Not enough records produced by source connector. Expected at least: " + MINIMUM_MESSAGES + " + but got " + sourceRecords.count());
// also have to check which offsets have actually been committed, since we no longer have exactly-once semantics
offsetRecords = connectorTargetedCluster.consumeAll(
CONSUME_RECORDS_TIMEOUT_MS,
@ -941,13 +941,13 @@ public class ExactlyOnceSourceIntegrationTest {
private List<Long> parseAndAssertOffsetsForSingleTask(ConsumerRecords<byte[], byte[]> offsetRecords) {
Map<Integer, List<Long>> parsedOffsets = parseOffsetForTasks(offsetRecords);
assertEquals("Expected records to only be produced from a single task", Collections.singleton(0), parsedOffsets.keySet());
assertEquals(Collections.singleton(0), parsedOffsets.keySet(), "Expected records to only be produced from a single task");
return parsedOffsets.get(0);
}
private List<Long> parseAndAssertValuesForSingleTask(ConsumerRecords<byte[], byte[]> sourceRecords) {
Map<Integer, List<Long>> parsedValues = parseValuesForTasks(sourceRecords);
assertEquals("Expected records to only be produced from a single task", Collections.singleton(0), parsedValues.keySet());
assertEquals(Collections.singleton(0), parsedValues.keySet(), "Expected records to only be produced from a single task");
return parsedValues.get(0);
}
@ -965,7 +965,7 @@ public class ExactlyOnceSourceIntegrationTest {
));
parsedValues.replaceAll((task, values) -> {
Long committedValue = lastCommittedValues.get(task);
assertNotNull("No committed offset found for task " + task, committedValue);
assertNotNull(committedValue, "No committed offset found for task " + task);
return values.stream().filter(v -> v <= committedValue).collect(Collectors.toList());
});
assertSeqnos(parsedValues, numTasks);
@ -973,7 +973,7 @@ public class ExactlyOnceSourceIntegrationTest {
private void assertSeqnos(Map<Integer, List<Long>> parsedValues, int numTasks) {
Set<Integer> expectedKeys = IntStream.range(0, numTasks).boxed().collect(Collectors.toSet());
assertEquals("Expected records to be produced by each task", expectedKeys, parsedValues.keySet());
assertEquals(expectedKeys, parsedValues.keySet(), "Expected records to be produced by each task");
parsedValues.forEach((taskId, seqnos) -> {
// We don't check for order here because the records may have been produced to multiple topic partitions,
@ -988,11 +988,11 @@ public class ExactlyOnceSourceIntegrationTest {
// Try to provide the most friendly error message possible if this test fails
assertTrue(
missingSeqnos.isEmpty() && extraSeqnos.isEmpty(),
"Seqnos for task " + taskId + " should start at 1 and increase strictly by 1 with each record, " +
"but the actual seqnos did not.\n" +
"Seqnos that should have been emitted but were not: " + missingSeqnos + "\n" +
"seqnos that should not have been emitted but were: " + extraSeqnos,
missingSeqnos.isEmpty() && extraSeqnos.isEmpty()
"seqnos that should not have been emitted but were: " + extraSeqnos
);
});
}
@ -1000,8 +1000,8 @@ public class ExactlyOnceSourceIntegrationTest {
private Map<Integer, List<Long>> parseValuesForTasks(ConsumerRecords<byte[], byte[]> sourceRecords) {
Map<Integer, List<Long>> result = new HashMap<>();
for (ConsumerRecord<byte[], byte[]> sourceRecord : sourceRecords) {
assertNotNull("Record key should not be null", sourceRecord.key());
assertNotNull("Record value should not be null", sourceRecord.value());
assertNotNull(sourceRecord.key(), "Record key should not be null");
assertNotNull(sourceRecord.value(), "Record value should not be null");
String key = new String(sourceRecord.key());
String value = new String(sourceRecord.value());
@ -1009,17 +1009,17 @@ public class ExactlyOnceSourceIntegrationTest {
String keyPrefix = "key-";
String valuePrefix = "value-";
assertTrue("Key should start with \"" + keyPrefix + "\"", key.startsWith(keyPrefix));
assertTrue("Value should start with \"" + valuePrefix + "\"", value.startsWith(valuePrefix));
assertTrue(key.startsWith(keyPrefix), "Key should start with \"" + keyPrefix + "\"");
assertTrue(value.startsWith(valuePrefix), "Value should start with \"" + valuePrefix + "\"");
assertEquals(
"key and value should be identical after prefix",
key.substring(keyPrefix.length()),
value.substring(valuePrefix.length())
value.substring(valuePrefix.length()),
"key and value should be identical after prefix"
);
String[] split = key.substring(keyPrefix.length()).split("-");
assertEquals("Key should match pattern 'key-<connectorName>-<taskId>-<seqno>", 3, split.length);
assertEquals("Key should match pattern 'key-<connectorName>-<taskId>-<seqno>", CONNECTOR_NAME, split[0]);
assertEquals(3, split.length, "Key should match pattern 'key-<connectorName>-<taskId>-<seqno>");
assertEquals(CONNECTOR_NAME, split[0], "Key should match pattern 'key-<connectorName>-<taskId>-<seqno>");
int taskId;
try {
@ -1051,23 +1051,23 @@ public class ExactlyOnceSourceIntegrationTest {
Object keyObject = offsetsConverter.toConnectData("topic name is not used by converter", offsetRecord.key()).value();
Object valueObject = offsetsConverter.toConnectData("topic name is not used by converter", offsetRecord.value()).value();
assertNotNull("Offset key should not be null", keyObject);
assertNotNull("Offset value should not be null", valueObject);
assertNotNull(keyObject, "Offset key should not be null");
assertNotNull(valueObject, "Offset value should not be null");
@SuppressWarnings("unchecked")
List<Object> key = assertAndCast(keyObject, List.class, "Key");
assertEquals(
"Offset topic key should be a list containing two elements: the name of the connector, and the connector-provided source partition",
2,
key.size()
key.size(),
"Offset topic key should be a list containing two elements: the name of the connector, and the connector-provided source partition"
);
assertEquals(CONNECTOR_NAME, key.get(0));
@SuppressWarnings("unchecked")
Map<String, Object> partition = assertAndCast(key.get(1), Map.class, "Key[1]");
Object taskIdObject = partition.get("task.id");
assertNotNull("Serialized source partition should contain 'task.id' field from MonitorableSourceConnector", taskIdObject);
assertNotNull(taskIdObject, "Serialized source partition should contain 'task.id' field from MonitorableSourceConnector");
String taskId = assertAndCast(taskIdObject, String.class, "task ID");
assertTrue("task ID should match pattern '<connectorName>-<taskId>", taskId.startsWith(CONNECTOR_NAME + "-"));
assertTrue(taskId.startsWith(CONNECTOR_NAME + "-"), "task ID should match pattern '<connectorName>-<taskId>");
String taskIdRemainder = taskId.substring(CONNECTOR_NAME.length() + 1);
int taskNum;
try {
@ -1080,7 +1080,7 @@ public class ExactlyOnceSourceIntegrationTest {
Map<String, Object> value = assertAndCast(valueObject, Map.class, "Value");
Object seqnoObject = value.get("saved");
assertNotNull("Serialized source offset should contain 'seqno' field from MonitorableSourceConnector", seqnoObject);
assertNotNull(seqnoObject, "Serialized source offset should contain 'seqno' field from MonitorableSourceConnector");
long seqno = assertAndCast(seqnoObject, Long.class, "Seqno offset field");
result.computeIfAbsent(taskNum, t -> new ArrayList<>()).add(seqno);
@ -1111,21 +1111,22 @@ public class ExactlyOnceSourceIntegrationTest {
}
private void assertConnectorStarted(StartAndStopLatch connectorStart) throws InterruptedException {
assertTrue("Connector and tasks did not finish startup in time",
assertTrue(
connectorStart.await(
ConnectAssertions.CONNECTOR_SETUP_DURATION_MS,
TimeUnit.MILLISECONDS
)
),
"Connector and tasks did not finish startup in time"
);
}
private void assertConnectorStopped(StartAndStopLatch connectorStop) throws InterruptedException {
assertTrue(
"Connector and tasks did not finish shutdown in time",
connectorStop.await(
ConnectAssertions.CONNECTOR_SHUTDOWN_DURATION_MS,
TimeUnit.MILLISECONDS
)
),
"Connector and tasks did not finish shutdown in time"
);
}
@ -1165,12 +1166,13 @@ public class ExactlyOnceSourceIntegrationTest {
private void assertTransactionalProducerIsFenced(KafkaProducer<byte[], byte[]> producer, String topic) {
producer.beginTransaction();
assertThrows("Producer should be fenced out",
assertThrows(
ProducerFencedException.class,
() -> {
producer.send(new ProducerRecord<>(topic, new byte[] {69}, new byte[] {96}));
producer.commitTransaction();
}
},
"Producer should be fenced out"
);
producer.close(Duration.ZERO);
}

View File

@ -19,13 +19,10 @@ package org.apache.kafka.connect.integration;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
import org.apache.kafka.connect.storage.StringConverter;
import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
import org.apache.kafka.test.IntegrationTest;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestRule;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -44,8 +41,8 @@ import static org.apache.kafka.connect.runtime.TopicCreationConfig.PARTITIONS_CO
import static org.apache.kafka.connect.runtime.TopicCreationConfig.REPLICATION_FACTOR_CONFIG;
import static org.apache.kafka.connect.runtime.WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG;
import static org.apache.kafka.test.TestUtils.waitForCondition;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
/**
* An example integration test that demonstrates how to setup an integration test for Connect.
@ -53,7 +50,7 @@ import static org.junit.Assert.assertTrue;
* The following test configures and executes up a sink connector pipeline in a worker, produces messages into
* the source topic-partitions, and demonstrates how to check the overall behavior of the pipeline.
*/
@Category(IntegrationTest.class)
@Tag("integration")
public class ExampleConnectIntegrationTest {
private static final Logger log = LoggerFactory.getLogger(ExampleConnectIntegrationTest.class);
@ -71,10 +68,7 @@ public class ExampleConnectIntegrationTest {
private EmbeddedConnectCluster connect;
private ConnectorHandle connectorHandle;
@Rule
public TestRule watcher = ConnectIntegrationTestUtils.newTestWatcher(log);
@Before
@BeforeEach
public void setup() {
// setup Connect worker properties
Map<String, String> exampleWorkerProps = new HashMap<>();
@ -100,7 +94,7 @@ public class ExampleConnectIntegrationTest {
connectorHandle = RuntimeHandles.get().connectorHandle(CONNECTOR_NAME);
}
@After
@AfterEach
public void close() {
// delete connector handle
RuntimeHandles.get().deleteConnector(CONNECTOR_NAME);
@ -156,8 +150,9 @@ public class ExampleConnectIntegrationTest {
}
// consume all records from the source topic or fail, to ensure that they were correctly produced.
assertEquals("Unexpected number of records consumed", NUM_RECORDS_PRODUCED,
connect.kafka().consume(NUM_RECORDS_PRODUCED, RECORD_TRANSFER_DURATION_MS, "test-topic").count());
assertEquals(NUM_RECORDS_PRODUCED,
connect.kafka().consume(NUM_RECORDS_PRODUCED, RECORD_TRANSFER_DURATION_MS, "test-topic").count(),
"Unexpected number of records consumed");
// wait for the connector tasks to consume all records.
connectorHandle.awaitRecords(RECORD_TRANSFER_DURATION_MS);
@ -217,8 +212,8 @@ public class ExampleConnectIntegrationTest {
// consume all records from the source topic or fail, to ensure that they were correctly produced
int recordNum = connect.kafka().consume(NUM_RECORDS_PRODUCED, RECORD_TRANSFER_DURATION_MS, "test-topic").count();
assertTrue("Not enough records produced by source connector. Expected at least: " + NUM_RECORDS_PRODUCED + " + but got " + recordNum,
recordNum >= NUM_RECORDS_PRODUCED);
assertTrue(recordNum >= NUM_RECORDS_PRODUCED,
"Not enough records produced by source connector. Expected at least: " + NUM_RECORDS_PRODUCED + " + but got " + recordNum);
// delete connector
connect.deleteConnector(CONNECTOR_NAME);

View File

@ -25,20 +25,19 @@ import org.apache.kafka.common.config.TopicConfig;
import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
import org.apache.kafka.connect.util.clusters.WorkerHandle;
import org.apache.kafka.test.IntegrationTest;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.junit.Assert.assertFalse;
import static org.junit.jupiter.api.Assertions.assertFalse;
/**
* Integration test for the creation of internal topics.
*/
@Category(IntegrationTest.class)
@Tag("integration")
public class InternalTopicsIntegrationTest {
private static final Logger log = LoggerFactory.getLogger(InternalTopicsIntegrationTest.class);
@ -47,13 +46,13 @@ public class InternalTopicsIntegrationTest {
Map<String, String> workerProps = new HashMap<>();
Properties brokerProps = new Properties();
@Before
@BeforeEach
public void setup() {
// setup Kafka broker properties
brokerProps.put("auto.create.topics.enable", String.valueOf(false));
}
@After
@AfterEach
public void close() {
// stop all Connect, Kafka and Zk threads.
connect.stop();

View File

@ -29,15 +29,13 @@ import org.apache.kafka.connect.storage.StringConverter;
import org.apache.kafka.connect.util.SinkUtils;
import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
import org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.NoRetryException;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import javax.ws.rs.core.Response;
import java.util.ArrayList;
@ -68,14 +66,14 @@ import static org.apache.kafka.connect.runtime.WorkerConfig.VALUE_CONVERTER_CLAS
import static org.apache.kafka.test.TestUtils.waitForCondition;
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
/**
* Integration tests for Kafka Connect's connector offset management REST APIs
*/
@Category(IntegrationTest.class)
@Tag("integration")
public class OffsetsApiIntegrationTest {
private static final long OFFSET_COMMIT_INTERVAL_MS = TimeUnit.SECONDS.toMillis(1);
private static final long OFFSET_READ_TIMEOUT_MS = TimeUnit.SECONDS.toMillis(30);
@ -83,20 +81,18 @@ public class OffsetsApiIntegrationTest {
private static final int NUM_TASKS = 2;
private static final int NUM_RECORDS_PER_PARTITION = 10;
private static final Map<Map<String, String>, EmbeddedConnectCluster> CONNECT_CLUSTERS = new ConcurrentHashMap<>();
@Rule
public TestName currentTest = new TestName();
private EmbeddedConnectCluster connect;
private String connectorName;
private String topic;
@Before
public void setup() {
connectorName = currentTest.getMethodName();
topic = currentTest.getMethodName();
@BeforeEach
public void setup(TestInfo testInfo) {
connectorName = testInfo.getTestMethod().get().getName();
topic = testInfo.getTestMethod().get().getName();
connect = defaultConnectCluster();
}
@After
@AfterEach
public void tearDown() {
Set<String> remainingConnectors = new HashSet<>(connect.connectors());
if (remainingConnectors.remove(connectorName)) {
@ -104,9 +100,9 @@ public class OffsetsApiIntegrationTest {
}
try {
assertEquals(
"Some connectors were not properly cleaned up after this test",
Collections.emptySet(),
remainingConnectors
remainingConnectors,
"Some connectors were not properly cleaned up after this test"
);
} finally {
// Make a last-ditch effort to clean up the leaked connectors
@ -115,7 +111,7 @@ public class OffsetsApiIntegrationTest {
}
}
@AfterClass
@AfterAll
public static void close() {
// stop all Connect, Kafka and Zk threads.
CONNECT_CLUSTERS.values().forEach(EmbeddedConnectCluster::stop);

View File

@ -19,13 +19,11 @@ package org.apache.kafka.connect.integration;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
import org.apache.kafka.connect.storage.StringConverter;
import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
import org.apache.kafka.test.IntegrationTest;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestRule;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -52,14 +50,14 @@ import static org.apache.kafka.connect.runtime.distributed.ConnectProtocolCompat
import static org.apache.kafka.connect.runtime.distributed.DistributedConfig.CONNECT_PROTOCOL_CONFIG;
import static org.apache.kafka.connect.runtime.distributed.DistributedConfig.SCHEDULED_REBALANCE_MAX_DELAY_MS_CONFIG;
import static org.apache.kafka.test.TestUtils.waitForCondition;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
/**
* Integration tests for incremental cooperative rebalancing between Connect workers
*/
@Category(IntegrationTest.class)
@Tag("integration")
public class RebalanceSourceConnectorsIntegrationTest {
private static final Logger log = LoggerFactory.getLogger(RebalanceSourceConnectorsIntegrationTest.class);
@ -73,12 +71,11 @@ public class RebalanceSourceConnectorsIntegrationTest {
private static final String TOPIC_NAME = "sequential-topic";
private EmbeddedConnectCluster connect;
@Rule
public TestRule watcher = ConnectIntegrationTestUtils.newTestWatcher(log);
@Before
public void setup() {
@BeforeEach
public void setup(TestInfo testInfo) {
log.info("Starting test {}", testInfo.getDisplayName());
// setup Connect worker properties
Map<String, String> workerProps = new HashMap<>();
workerProps.put(CONNECT_PROTOCOL_CONFIG, COMPATIBLE.toString());
@ -102,8 +99,9 @@ public class RebalanceSourceConnectorsIntegrationTest {
connect.start();
}
@After
public void close() {
@AfterEach
public void close(TestInfo testInfo) {
log.info("Finished test {}", testInfo.getDisplayName());
// stop all Connect, Kafka and Zk threads.
connect.stop();
}
@ -155,8 +153,8 @@ public class RebalanceSourceConnectorsIntegrationTest {
// consume all records from the source topic or fail, to ensure that they were correctly produced
int recordNum = connect.kafka().consume(numRecordsProduced, recordTransferDurationMs, TOPIC_NAME).count();
assertTrue("Not enough records produced by source connector. Expected at least: " + numRecordsProduced + " + but got " + recordNum,
recordNum >= numRecordsProduced);
assertTrue(recordNum >= numRecordsProduced,
"Not enough records produced by source connector. Expected at least: " + numRecordsProduced + " + but got " + recordNum);
// expect that we're going to restart the connector and its tasks
StartAndStopLatch restartLatch = connectorHandle.expectedStarts(1);
@ -166,9 +164,9 @@ public class RebalanceSourceConnectorsIntegrationTest {
connect.configureConnector(CONNECTOR_NAME, props);
// Wait for the connector *and tasks* to be restarted
assertTrue("Failed to alter connector configuration and see connector and tasks restart "
+ "within " + CONNECTOR_SETUP_DURATION_MS + "ms",
restartLatch.await(CONNECTOR_SETUP_DURATION_MS, TimeUnit.MILLISECONDS));
assertTrue(restartLatch.await(CONNECTOR_SETUP_DURATION_MS, TimeUnit.MILLISECONDS),
"Failed to alter connector configuration and see connector and tasks restart "
+ "within " + CONNECTOR_SETUP_DURATION_MS + "ms");
// And wait for the Connect to show the connectors and tasks are running
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, NUM_TASKS,
@ -176,8 +174,8 @@ public class RebalanceSourceConnectorsIntegrationTest {
// consume all records from the source topic or fail, to ensure that they were correctly produced
recordNum = connect.kafka().consume(numRecordsProduced, recordTransferDurationMs, anotherTopic).count();
assertTrue("Not enough records produced by source connector. Expected at least: " + numRecordsProduced + " + but got " + recordNum,
recordNum >= numRecordsProduced);
assertTrue(recordNum >= numRecordsProduced,
"Not enough records produced by source connector. Expected at least: " + numRecordsProduced + " + but got " + recordNum);
}
@Test
@ -332,16 +330,16 @@ public class RebalanceSourceConnectorsIntegrationTest {
log.debug("Connector balance: {}", formatAssignment(connectors));
log.debug("Task balance: {}", formatAssignment(tasks));
assertNotEquals("Found no connectors running!", maxConnectors, 0);
assertNotEquals("Found no tasks running!", maxTasks, 0);
assertEquals("Connector assignments are not unique: " + connectors,
connectors.values().size(),
connectors.values().stream().distinct().count());
assertEquals("Task assignments are not unique: " + tasks,
tasks.values().size(),
tasks.values().stream().distinct().count());
assertTrue("Connectors are imbalanced: " + formatAssignment(connectors), maxConnectors - minConnectors < 2);
assertTrue("Tasks are imbalanced: " + formatAssignment(tasks), maxTasks - minTasks < 2);
assertNotEquals(0, maxConnectors, "Found no connectors running!");
assertNotEquals(0, maxTasks, "Found no tasks running!");
assertEquals(connectors.values().size(),
connectors.values().stream().distinct().count(),
"Connector assignments are not unique: " + connectors);
assertEquals(tasks.values().size(),
tasks.values().stream().distinct().count(),
"Task assignments are not unique: " + tasks);
assertTrue(maxConnectors - minConnectors < 2, "Connectors are imbalanced: " + formatAssignment(connectors));
assertTrue(maxTasks - minTasks < 2, "Tasks are imbalanced: " + formatAssignment(tasks));
return true;
} catch (Exception e) {
log.error("Could not check connector state info.", e);

View File

@ -26,10 +26,9 @@ import org.apache.kafka.connect.rest.ConnectRestExtension;
import org.apache.kafka.connect.rest.ConnectRestExtensionContext;
import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
import org.apache.kafka.connect.util.clusters.WorkerHandle;
import org.apache.kafka.test.IntegrationTest;
import org.junit.After;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
@ -46,12 +45,12 @@ import static org.apache.kafka.connect.runtime.ConnectorConfig.TASKS_MAX_CONFIG;
import static org.apache.kafka.connect.runtime.SinkConnectorConfig.TOPICS_CONFIG;
import static org.apache.kafka.connect.runtime.rest.RestServerConfig.REST_EXTENSION_CLASSES_CONFIG;
import static org.apache.kafka.test.TestUtils.waitForCondition;
import static org.junit.Assert.assertEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
/**
* A simple integration test to ensure that REST extensions are registered correctly.
*/
@Category(IntegrationTest.class)
@Tag("integration")
public class RestExtensionIntegrationTest {
private static final long REST_EXTENSION_REGISTRATION_TIMEOUT_MS = TimeUnit.MINUTES.toMillis(1);
@ -132,7 +131,7 @@ public class RestExtensionIntegrationTest {
}
}
@After
@AfterEach
public void close() {
// stop all Connect, Kafka and Zk threads.
connect.stop();

View File

@ -41,18 +41,19 @@ import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorType;
import org.apache.kafka.connect.runtime.rest.util.SSLUtils;
import org.apache.kafka.connect.util.Callback;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestSslUtils;
import org.apache.kafka.test.TestUtils;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
import org.mockito.junit.jupiter.MockitoExtension;
import org.mockito.junit.jupiter.MockitoSettings;
import org.mockito.quality.Strictness;
import javax.net.ssl.SSLContext;
import java.io.IOException;
@ -74,9 +75,10 @@ import static org.mockito.ArgumentMatchers.isNull;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.StrictStubs.class)
@ExtendWith(MockitoExtension.class)
@MockitoSettings(strictness = Strictness.STRICT_STUBS)
@SuppressWarnings("unchecked")
@Category(IntegrationTest.class)
@Tag("integration")
public class RestForwardingIntegrationTest {
private Map<String, Object> sslConfig;
@ -93,13 +95,13 @@ public class RestForwardingIntegrationTest {
private CloseableHttpClient httpClient;
private Collection<CloseableHttpResponse> responses;
@Before
@BeforeEach
public void setUp() throws IOException, GeneralSecurityException {
sslConfig = TestSslUtils.createSslConfig(false, true, Mode.SERVER, TestUtils.tempFile(), "testCert");
responses = new ArrayList<>();
}
@After
@AfterEach
public void tearDown() throws IOException {
for (CloseableHttpResponse response: responses) {
response.close();

View File

@ -19,11 +19,10 @@ package org.apache.kafka.connect.integration;
import org.apache.kafka.connect.runtime.distributed.ConnectProtocolCompatibility;
import org.apache.kafka.connect.storage.StringConverter;
import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
import org.apache.kafka.test.IntegrationTest;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -41,13 +40,13 @@ import static org.apache.kafka.connect.runtime.SinkConnectorConfig.TOPICS_CONFIG
import static org.apache.kafka.connect.runtime.distributed.DistributedConfig.CONNECT_PROTOCOL_CONFIG;
import static org.apache.kafka.connect.runtime.rest.InternalRequestSignature.SIGNATURE_ALGORITHM_HEADER;
import static org.apache.kafka.connect.runtime.rest.InternalRequestSignature.SIGNATURE_HEADER;
import static org.junit.Assert.assertEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
/**
* A simple integration test to ensure that internal request validation becomes enabled with the
* "sessioned" protocol.
*/
@Category(IntegrationTest.class)
@Tag("integration")
public class SessionedProtocolIntegrationTest {
private static final Logger log = LoggerFactory.getLogger(SessionedProtocolIntegrationTest.class);
@ -58,7 +57,7 @@ public class SessionedProtocolIntegrationTest {
private EmbeddedConnectCluster connect;
private ConnectorHandle connectorHandle;
@Before
@BeforeEach
public void setup() {
// setup Connect worker properties
Map<String, String> workerProps = new HashMap<>();
@ -79,7 +78,7 @@ public class SessionedProtocolIntegrationTest {
connectorHandle = RuntimeHandles.get().connectorHandle(CONNECTOR_NAME);
}
@After
@AfterEach
public void close() {
// stop all Connect, Kafka and Zk threads.
connect.stop();

View File

@ -22,11 +22,10 @@ import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.storage.StringConverter;
import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
import org.apache.kafka.test.IntegrationTest;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import java.util.Arrays;
import java.util.Collection;
@ -48,13 +47,13 @@ import static org.apache.kafka.connect.runtime.WorkerConfig.CONNECTOR_CLIENT_POL
import static org.apache.kafka.connect.runtime.WorkerConfig.KEY_CONVERTER_CLASS_CONFIG;
import static org.apache.kafka.connect.runtime.WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG;
import static org.apache.kafka.test.TestUtils.waitForCondition;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
/**
* Integration test for sink connectors
*/
@Category(IntegrationTest.class)
@Tag("integration")
public class SinkConnectorsIntegrationTest {
private static final int NUM_TASKS = 1;
@ -64,7 +63,7 @@ public class SinkConnectorsIntegrationTest {
private EmbeddedConnectCluster connect;
@Before
@BeforeEach
public void setup() throws Exception {
Map<String, String> workerProps = new HashMap<>();
// permit all Kafka client overrides; required for testing different consumer partition assignment strategies
@ -85,7 +84,7 @@ public class SinkConnectorsIntegrationTest {
connect.start();
}
@After
@AfterEach
public void close() {
// delete connector handle
RuntimeHandles.get().deleteConnector(CONNECTOR_NAME);
@ -111,7 +110,7 @@ public class SinkConnectorsIntegrationTest {
"5000");
final Set<String> consumedRecordValues = new HashSet<>();
Consumer<SinkRecord> onPut = record -> assertTrue("Task received duplicate record from Connect", consumedRecordValues.add(Objects.toString(record.value())));
Consumer<SinkRecord> onPut = record -> assertTrue(consumedRecordValues.add(Objects.toString(record.value())), "Task received duplicate record from Connect");
ConnectorHandle connector = RuntimeHandles.get().connectorHandle(CONNECTOR_NAME);
TaskHandle task = connector.taskHandle(CONNECTOR_NAME + "-0", onPut);
@ -217,7 +216,7 @@ public class SinkConnectorsIntegrationTest {
"5000");
final Set<String> consumedRecordValues = new HashSet<>();
Consumer<SinkRecord> onPut = record -> assertTrue("Task received duplicate record from Connect", consumedRecordValues.add(Objects.toString(record.value())));
Consumer<SinkRecord> onPut = record -> assertTrue(consumedRecordValues.add(Objects.toString(record.value())), "Task received duplicate record from Connect");
ConnectorHandle connector = RuntimeHandles.get().connectorHandle(CONNECTOR_NAME);
TaskHandle task = connector.taskHandle(CONNECTOR_NAME + "-0", onPut);

View File

@ -19,11 +19,10 @@ package org.apache.kafka.connect.integration;
import org.apache.kafka.connect.runtime.SourceConnectorConfig;
import org.apache.kafka.connect.storage.StringConverter;
import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
import org.apache.kafka.test.IntegrationTest;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import java.util.Collections;
import java.util.HashMap;
@ -51,7 +50,7 @@ import static org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster.DEFA
* Integration test for source connectors with a focus on topic creation with custom properties by
* the connector tasks.
*/
@Category(IntegrationTest.class)
@Tag("integration")
public class SourceConnectorsIntegrationTest {
private static final int NUM_WORKERS = 3;
@ -72,7 +71,7 @@ public class SourceConnectorsIntegrationTest {
Map<String, String> workerProps = new HashMap<>();
Properties brokerProps = new Properties();
@Before
@BeforeEach
public void setup() {
// setup Connect worker properties
workerProps.put(CONNECTOR_CLIENT_POLICY_CLASS_CONFIG, "All");
@ -89,7 +88,7 @@ public class SourceConnectorsIntegrationTest {
.maskExitProcedures(true); // true is the default, setting here as example
}
@After
@AfterEach
public void close() {
// stop all Connect, Kafka and Zk threads.
connect.stop();

View File

@ -21,11 +21,10 @@ import org.apache.kafka.connect.runtime.rest.entities.CreateConnectorRequest;
import org.apache.kafka.connect.runtime.rest.entities.LoggerLevel;
import org.apache.kafka.connect.storage.StringConverter;
import org.apache.kafka.connect.util.clusters.EmbeddedConnectStandalone;
import org.apache.kafka.test.IntegrationTest;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import java.util.Collections;
import java.util.HashMap;
@ -43,13 +42,13 @@ import static org.apache.kafka.connect.runtime.ConnectorConfig.VALUE_CONVERTER_C
import static org.apache.kafka.connect.runtime.TopicCreationConfig.DEFAULT_TOPIC_CREATION_PREFIX;
import static org.apache.kafka.connect.runtime.TopicCreationConfig.PARTITIONS_CONFIG;
import static org.apache.kafka.connect.runtime.TopicCreationConfig.REPLICATION_FACTOR_CONFIG;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
@Category(IntegrationTest.class)
@Tag("integration")
public class StandaloneWorkerIntegrationTest {
private static final String CONNECTOR_NAME = "test-connector";
@ -58,14 +57,14 @@ public class StandaloneWorkerIntegrationTest {
private EmbeddedConnectStandalone connect;
@Before
@BeforeEach
public void setup() {
connect = new EmbeddedConnectStandalone.Builder()
.build();
connect.start();
}
@After
@AfterEach
public void cleanup() {
connect.stop();
}
@ -73,15 +72,15 @@ public class StandaloneWorkerIntegrationTest {
@Test
public void testDynamicLogging() {
Map<String, LoggerLevel> initialLevels = connect.allLogLevels();
assertFalse("Connect REST API did not list any known loggers", initialLevels.isEmpty());
assertFalse(initialLevels.isEmpty(), "Connect REST API did not list any known loggers");
Map<String, LoggerLevel> invalidModifiedLoggers = Utils.filterMap(
initialLevels,
StandaloneWorkerIntegrationTest::isModified
);
assertEquals(
"No loggers should have a non-null last-modified timestamp",
Collections.emptyMap(),
invalidModifiedLoggers
invalidModifiedLoggers,
"No loggers should have a non-null last-modified timestamp"
);
// Tests with no scope
@ -103,9 +102,9 @@ public class StandaloneWorkerIntegrationTest {
connect.setLogLevel(namespace2, level2, "worker");
LoggerLevel currentLoggerLevel = connect.getLogLevel(namespace2);
assertEquals(
"Log level and last-modified timestamp should not be affected by consecutive identical requests",
priorLoggerLevel,
currentLoggerLevel
currentLoggerLevel,
"Log level and last-modified timestamp should not be affected by consecutive identical requests"
);
// Tests with scope=cluster
@ -125,8 +124,8 @@ public class StandaloneWorkerIntegrationTest {
List<String> affectedLoggers = connect.setLogLevel(namespace, level, scope);
if ("cluster".equals(scope)) {
assertNull(
"Modifying log levels with scope=cluster should result in an empty response",
affectedLoggers
affectedLoggers,
"Modifying log levels with scope=cluster should result in an empty response"
);
} else {
assertTrue(affectedLoggers.contains(namespace));
@ -134,10 +133,10 @@ public class StandaloneWorkerIntegrationTest {
.filter(l -> !l.startsWith(namespace))
.collect(Collectors.toList());
assertEquals(
"No loggers outside the namespace '" + namespace
+ "' should have been included in the response for a request to modify that namespace",
Collections.emptyList(),
invalidAffectedLoggers
invalidAffectedLoggers,
"No loggers outside the namespace '" + namespace
+ "' should have been included in the response for a request to modify that namespace"
);
}
@ -148,9 +147,9 @@ public class StandaloneWorkerIntegrationTest {
assertEquals(level, loggerLevel.level());
assertNotNull(loggerLevel.lastModified());
assertTrue(
loggerLevel.lastModified() >= requestTime,
"Last-modified timestamp for logger level is " + loggerLevel.lastModified()
+ ", which is before " + requestTime + ", the most-recent time the level was adjusted",
loggerLevel.lastModified() >= requestTime
+ ", which is before " + requestTime + ", the most-recent time the level was adjusted"
);
// Verify information for all listed loggers
@ -166,23 +165,23 @@ public class StandaloneWorkerIntegrationTest {
)
);
assertEquals(
Collections.emptyMap(),
invalidAffectedLoggerLevels,
"At least one logger in the affected namespace '" + namespace
+ "' does not have the expected level of '" + level
+ "', has a null last-modified timestamp, or has a last-modified timestamp "
+ "that is less recent than " + requestTime
+ ", which is when the namespace was last adjusted",
Collections.emptyMap(),
invalidAffectedLoggerLevels
+ ", which is when the namespace was last adjusted"
);
Set<String> droppedLoggers = Utils.diff(HashSet::new, initialLevels.keySet(), newLevels.keySet());
assertEquals(
Collections.emptySet(),
droppedLoggers,
"At least one logger was present in the listing of all loggers "
+ "before the logging level for namespace '" + namespace
+ "' was set to '" + level
+ "' that is no longer present",
Collections.emptySet(),
droppedLoggers
+ "' that is no longer present"
);
Map<String, LoggerLevel> invalidUnaffectedLoggerLevels = Utils.filterMap(
@ -190,12 +189,12 @@ public class StandaloneWorkerIntegrationTest {
e -> !hasNamespace(e, namespace) && !e.getValue().equals(initialLevels.get(e.getKey()))
);
assertEquals(
Collections.emptyMap(),
invalidUnaffectedLoggerLevels,
"At least one logger outside of the affected namespace '" + namespace
+ "' has a different logging level or last-modified timestamp than it did "
+ "before the namespace was set to level '" + level
+ "'; none of these loggers should have been affected",
Collections.emptyMap(),
invalidUnaffectedLoggerLevels
+ "'; none of these loggers should have been affected"
);
return newLevels;

View File

@ -24,17 +24,16 @@ import java.util.concurrent.TimeUnit;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.test.IntegrationTest;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
@Category(IntegrationTest.class)
@Tag("integration")
public class StartAndStopCounterTest {
private StartAndStopCounter counter;
@ -42,13 +41,13 @@ public class StartAndStopCounterTest {
private ExecutorService waiters;
private StartAndStopLatch latch;
@Before
@BeforeEach
public void setup() {
clock = new MockTime();
counter = new StartAndStopCounter(clock);
}
@After
@AfterEach
public void teardown() {
if (waiters != null) {
try {

View File

@ -27,16 +27,15 @@ import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.test.IntegrationTest;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
@Category(IntegrationTest.class)
@Tag("integration")
public class StartAndStopLatchTest {
private final AtomicBoolean completed = new AtomicBoolean();
@ -46,13 +45,13 @@ public class StartAndStopLatchTest {
private ExecutorService waiters;
private Future<Boolean> future;
@Before
@BeforeEach
public void setup() {
clock = new MockTime();
waiters = Executors.newSingleThreadExecutor();
}
@After
@AfterEach
public void teardown() {
if (waiters != null) {
waiters.shutdownNow();

View File

@ -23,11 +23,10 @@ import org.apache.kafka.connect.transforms.predicates.HasHeaderKey;
import org.apache.kafka.connect.transforms.predicates.RecordIsTombstone;
import org.apache.kafka.connect.transforms.predicates.TopicNameMatches;
import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
import org.apache.kafka.test.IntegrationTest;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.jupiter.api.Tag;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -49,13 +48,13 @@ import static org.apache.kafka.connect.runtime.TopicCreationConfig.PARTITIONS_CO
import static org.apache.kafka.connect.runtime.TopicCreationConfig.REPLICATION_FACTOR_CONFIG;
import static org.apache.kafka.connect.runtime.WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG;
import static org.apache.kafka.test.TestUtils.waitForCondition;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
/**
* An integration test for connectors with transformations
*/
@Category(IntegrationTest.class)
@Tag("integration")
public class TransformationIntegrationTest {
private static final Logger log = LoggerFactory.getLogger(TransformationIntegrationTest.class);
@ -159,10 +158,16 @@ public class TransformationIntegrationTest {
}
// consume all records from the source topic or fail, to ensure that they were correctly produced.
assertEquals("Unexpected number of records consumed", numFooRecords,
connect.kafka().consume(numFooRecords, RECORD_TRANSFER_DURATION_MS, fooTopic).count());
assertEquals("Unexpected number of records consumed", numBarRecords,
connect.kafka().consume(numBarRecords, RECORD_TRANSFER_DURATION_MS, barTopic).count());
assertEquals(
numFooRecords,
connect.kafka().consume(numFooRecords, RECORD_TRANSFER_DURATION_MS, fooTopic).count(),
"Unexpected number of records consumed"
);
assertEquals(
numBarRecords,
connect.kafka().consume(numBarRecords, RECORD_TRANSFER_DURATION_MS, barTopic).count(),
"Unexpected number of records consumed"
);
// wait for the connector tasks to consume all records.
connectorHandle.awaitRecords(RECORD_TRANSFER_DURATION_MS);
@ -239,8 +244,11 @@ public class TransformationIntegrationTest {
}
// consume all records from the source topic or fail, to ensure that they were correctly produced.
assertEquals("Unexpected number of records consumed", numRecords,
connect.kafka().consume(numRecords, RECORD_TRANSFER_DURATION_MS, topic).count());
assertEquals(
numRecords,
connect.kafka().consume(numRecords, RECORD_TRANSFER_DURATION_MS, topic).count(),
"Unexpected number of records consumed"
);
// wait for the connector tasks to consume all records.
connectorHandle.awaitRecords(RECORD_TRANSFER_DURATION_MS);
@ -305,8 +313,7 @@ public class TransformationIntegrationTest {
// consume all records from the source topic or fail, to ensure that they were correctly produced
for (ConsumerRecord<byte[], byte[]> record : connect.kafka().consume(1, RECORD_TRANSFER_DURATION_MS, "test-topic")) {
assertNotNull("Expected header to exist",
record.headers().lastHeader("header-8"));
assertNotNull(record.headers().lastHeader("header-8"), "Expected header to exist");
}
// delete connector