MINOR: Small Connect integration test fixes (#8100)

Author: Konstantine Karantasis <konstantine@confluent.io>
Reviewer: Randall Hauch <rhauch@gmail.com>
This commit is contained in:
Konstantine Karantasis 2020-02-12 15:40:37 -08:00 committed by GitHub
parent f51e1e6548
commit 97d2c726f1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 52 additions and 57 deletions

View File

@ -16,17 +16,13 @@
*/
package org.apache.kafka.connect.mirror;
import org.apache.kafka.connect.runtime.AbstractStatus;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
import org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestUtils;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.common.TopicPartition;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@ -34,19 +30,18 @@ import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.util.Arrays;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.Collections;
import java.util.Properties;
import java.time.Duration;
import java.util.Set;
import static org.apache.kafka.test.TestUtils.waitForCondition;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.apache.kafka.test.TestUtils.waitForCondition;
/**
* Tests MM2 replication and failover/failback logic.
@ -71,7 +66,7 @@ public class MirrorConnectorsIntegrationTest {
private EmbeddedConnectCluster backup;
@Before
public void setup() throws IOException, InterruptedException {
public void setup() throws InterruptedException {
Properties brokerProps = new Properties();
brokerProps.put("auto.create.topics.enable", "false");
@ -116,7 +111,11 @@ public class MirrorConnectorsIntegrationTest {
.build();
primary.start();
primary.assertions().assertAtLeastNumWorkersAreUp(3,
"Workers of primary-connect-cluster did not start in time.");
backup.start();
primary.assertions().assertAtLeastNumWorkersAreUp(3,
"Workers of backup-connect-cluster did not start in time.");
// create these topics before starting the connectors so we don't need to wait for discovery
primary.kafka().createTopic("test-topic-1", NUM_PARTITIONS);
@ -189,30 +188,13 @@ public class MirrorConnectorsIntegrationTest {
private void waitUntilMirrorMakerIsRunning(EmbeddedConnectCluster connectCluster,
Set<String> connNames) throws InterruptedException {
for (String connector : connNames) {
TestUtils.waitForCondition(() -> areConnectorAndTasksRunning(connectCluster,
connector), "Timed out trying to verify connector " +
connector + " was up!");
}
}
private boolean areConnectorAndTasksRunning(EmbeddedConnectCluster connectCluster,
String connectorName) {
try {
ConnectorStateInfo info = connectCluster.connectorStatus(connectorName);
boolean result = info != null
&& !info.tasks().isEmpty()
&& info.connector().state().equals(AbstractStatus.State.RUNNING.toString())
&& info.tasks().stream().allMatch(s -> s.state().equals(AbstractStatus.State.RUNNING.toString()));
log.debug("Found connector and tasks running: {}", result);
return result;
} catch (Exception e) {
log.error("Could not check connector state info.", e);
return false;
connectCluster.assertions().assertConnectorAndAtLeastNumTasksAreRunning(connector, 1,
"Connector " + connector + " tasks did not start in time on cluster: " + connectCluster);
}
}
@After
public void close() throws IOException {
public void close() {
for (String x : primary.connectors()) {
primary.deleteConnector(x);
}

View File

@ -28,7 +28,6 @@ import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
@ -66,7 +65,7 @@ public class ConnectWorkerIntegrationTest {
Properties brokerProps = new Properties();
@Before
public void setup() throws IOException {
public void setup() {
// setup Connect worker properties
workerProps.put(OFFSET_COMMIT_INTERVAL_MS_CONFIG, String.valueOf(OFFSET_COMMIT_INTERVAL_MS));
workerProps.put(CONNECTOR_CLIENT_POLICY_CLASS_CONFIG, "All");

View File

@ -29,7 +29,6 @@ import org.junit.After;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
@ -50,7 +49,6 @@ public class ConnectorClientPolicyIntegrationTest {
private static final int NUM_WORKERS = 1;
private static final String CONNECTOR_NAME = "simple-conn";
@After
public void close() {
}
@ -73,7 +71,7 @@ public class ConnectorClientPolicyIntegrationTest {
@Test
public void testCreateWithAllowedOverridesForPrincipalPolicy() throws Exception {
Map<String, String> props = basicConnectorConfig();
props.put(ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX + CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "PLAIN");
props.put(ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX + CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "PLAINTEXT");
assertPassCreateConnector("Principal", props);
}
@ -85,7 +83,7 @@ public class ConnectorClientPolicyIntegrationTest {
assertPassCreateConnector("All", props);
}
private EmbeddedConnectCluster connectClusterWithPolicy(String policy) throws IOException {
private EmbeddedConnectCluster connectClusterWithPolicy(String policy) throws InterruptedException {
// setup Connect worker properties
Map<String, String> workerProps = new HashMap<>();
workerProps.put(OFFSET_COMMIT_INTERVAL_MS_CONFIG, String.valueOf(5_000));
@ -106,10 +104,13 @@ public class ConnectorClientPolicyIntegrationTest {
// start the clusters
connect.start();
connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS,
"Initial group of workers did not start in time.");
return connect;
}
private void assertFailCreateConnector(String policy, Map<String, String> props) throws IOException {
private void assertFailCreateConnector(String policy, Map<String, String> props) throws InterruptedException {
EmbeddedConnectCluster connect = connectClusterWithPolicy(policy);
try {
connect.configureConnector(CONNECTOR_NAME, props);
@ -121,10 +122,12 @@ public class ConnectorClientPolicyIntegrationTest {
}
}
private void assertPassCreateConnector(String policy, Map<String, String> props) throws IOException {
private void assertPassCreateConnector(String policy, Map<String, String> props) throws InterruptedException {
EmbeddedConnectCluster connect = connectClusterWithPolicy(policy);
try {
connect.configureConnector(CONNECTOR_NAME, props);
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, NUM_TASKS,
"Connector tasks did not start in time.");
} catch (ConnectRestException e) {
fail("Should be able to create connector");
} finally {

View File

@ -34,7 +34,6 @@ import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@ -69,6 +68,7 @@ public class ErrorHandlingIntegrationTest {
private static final Logger log = LoggerFactory.getLogger(ErrorHandlingIntegrationTest.class);
private static final int NUM_WORKERS = 1;
private static final String DLQ_TOPIC = "my-connector-errors";
private static final String CONNECTOR_NAME = "error-conn";
private static final String TASK_ID = "error-conn-0";
@ -83,12 +83,14 @@ public class ErrorHandlingIntegrationTest {
private ConnectorHandle connectorHandle;
@Before
public void setup() throws IOException {
public void setup() throws InterruptedException {
// setup Connect cluster with defaults
connect = new EmbeddedConnectCluster.Builder().build();
// start Connect cluster
connect.start();
connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS,
"Initial group of workers did not start in time.");
// get connector handles before starting test.
connectorHandle = RuntimeHandles.get().connectorHandle(CONNECTOR_NAME);
@ -134,6 +136,8 @@ public class ErrorHandlingIntegrationTest {
connectorHandle.taskHandle(TASK_ID).expectedRecords(EXPECTED_CORRECT_RECORDS);
connect.configureConnector(CONNECTOR_NAME, props);
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, NUM_TASKS,
"Connector tasks did not start in time.");
waitForCondition(this::checkForPartitionAssignment,
CONNECTOR_SETUP_DURATION_MS,
@ -172,6 +176,9 @@ public class ErrorHandlingIntegrationTest {
}
connect.deleteConnector(CONNECTOR_NAME);
connect.assertions().assertConnectorAndTasksAreStopped(CONNECTOR_NAME,
"Connector tasks did not stop in time.");
}
/**

View File

@ -27,7 +27,6 @@ import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
@ -66,7 +65,7 @@ public class ExampleConnectIntegrationTest {
private ConnectorHandle connectorHandle;
@Before
public void setup() throws IOException {
public void setup() {
// setup Connect worker properties
Map<String, String> exampleWorkerProps = new HashMap<>();
exampleWorkerProps.put(OFFSET_COMMIT_INTERVAL_MS_CONFIG, String.valueOf(5_000));

View File

@ -28,7 +28,6 @@ import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
@ -70,7 +69,7 @@ public class RebalanceSourceConnectorsIntegrationTest {
private EmbeddedConnectCluster connect;
@Before
public void setup() throws IOException {
public void setup() {
// setup Connect worker properties
Map<String, String> workerProps = new HashMap<>();
workerProps.put(CONNECT_PROTOCOL_CONFIG, COMPATIBLE.toString());

View File

@ -34,7 +34,6 @@ import org.junit.experimental.categories.Category;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.core.Response;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
@ -57,11 +56,12 @@ public class RestExtensionIntegrationTest {
private static final long REST_EXTENSION_REGISTRATION_TIMEOUT_MS = TimeUnit.MINUTES.toMillis(1);
private static final long CONNECTOR_HEALTH_AND_CONFIG_TIMEOUT_MS = TimeUnit.MINUTES.toMillis(1);
private static final int NUM_WORKERS = 1;
private EmbeddedConnectCluster connect;
@Test
public void testRestExtensionApi() throws IOException, InterruptedException {
public void testRestExtensionApi() throws InterruptedException {
// setup Connect worker properties
Map<String, String> workerProps = new HashMap<>();
workerProps.put(REST_EXTENSION_CLASSES_CONFIG, IntegrationTestRestExtension.class.getName());
@ -69,7 +69,7 @@ public class RestExtensionIntegrationTest {
// build a Connect cluster backed by Kafka and Zk
connect = new EmbeddedConnectCluster.Builder()
.name("connect-cluster")
.numWorkers(1)
.numWorkers(NUM_WORKERS)
.numBrokers(1)
.workerProps(workerProps)
.build();
@ -77,6 +77,9 @@ public class RestExtensionIntegrationTest {
// start the clusters
connect.start();
connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS,
"Initial group of workers did not start in time.");
WorkerHandle worker = connect.workers().stream()
.findFirst()
.orElseThrow(() -> new AssertionError("At least one worker handle should be available"));
@ -99,6 +102,8 @@ public class RestExtensionIntegrationTest {
connectorHandle.taskHandle(connectorHandle.name() + "-0");
StartAndStopLatch connectorStartLatch = connectorHandle.expectedStarts(1);
connect.configureConnector(connectorHandle.name(), connectorProps);
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(connectorHandle.name(), 1,
"Connector tasks did not start in time.");
connectorStartLatch.await(CONNECTOR_HEALTH_AND_CONFIG_TIMEOUT_MS, TimeUnit.MILLISECONDS);
String workerId = String.format("%s:%d", worker.url().getHost(), worker.url().getPort());

View File

@ -28,7 +28,6 @@ import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@ -61,7 +60,7 @@ public class SessionedProtocolIntegrationTest {
private ConnectorHandle connectorHandle;
@Before
public void setup() throws IOException {
public void setup() {
// setup Connect worker properties
Map<String, String> workerProps = new HashMap<>();
workerProps.put(CONNECT_PROTOCOL_CONFIG, ConnectProtocolCompatibility.SESSIONED.protocol());

View File

@ -284,7 +284,8 @@ public class EmbeddedConnectCluster {
if (response.getStatus() < Response.Status.BAD_REQUEST.getStatusCode()) {
return responseToString(response);
}
throw new ConnectRestException(response.getStatus(), "Could not execute PUT request");
throw new ConnectRestException(response.getStatus(),
"Could not execute PUT request. Error response: " + responseToString(response));
}
/**
@ -298,7 +299,8 @@ public class EmbeddedConnectCluster {
String url = endpointForResource(String.format("connectors/%s", connName));
Response response = requestDelete(url);
if (response.getStatus() >= Response.Status.BAD_REQUEST.getStatusCode()) {
throw new ConnectRestException(response.getStatus(), "Could not execute DELETE request.");
throw new ConnectRestException(response.getStatus(),
"Could not execute DELETE request. Error response: " + responseToString(response));
}
}
@ -358,7 +360,7 @@ public class EmbeddedConnectCluster {
*
* @param resource the resource under the worker's admin endpoint
* @return the admin endpoint URL
* @throws ConnectRestException if no admin REST endpoint is available
* @throws ConnectException if no admin REST endpoint is available
*/
public String adminEndpoint(String resource) {
String url = connectCluster.stream()
@ -375,7 +377,7 @@ public class EmbeddedConnectCluster {
*
* @param resource the resource under the worker's admin endpoint
* @return the admin endpoint URL
* @throws ConnectRestException if no REST endpoint is available
* @throws ConnectException if no REST endpoint is available
*/
public String endpointForResource(String resource) {
String url = connectCluster.stream()