KAFKA-15988: Reuse embedded clusters across test cases in Connect OffsetsApiIntegrationTest suite (#14966)

Reviewers: Sudesh Wasnik <swasnik@confluent.io>, Sagar Rao <sagarmeansocean@gmail.com>, Yash Mayya <yash.mayya@gmail.com>, Greg Harris <greg.harris@aiven.io>
This commit is contained in:
Chris Egerton 2024-01-09 10:32:39 -05:00 committed by GitHub
parent bdad163182
commit c7e1fdca64
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 267 additions and 203 deletions

View File

@ -73,7 +73,7 @@ public class ConnectorRestartApiIntegrationTest {
private static final String TOPIC_NAME = "test-topic";
private static Map<Integer, EmbeddedConnectCluster> connectClusterMap = new ConcurrentHashMap<>();
private static final Map<Integer, EmbeddedConnectCluster> CONNECT_CLUSTERS = new ConcurrentHashMap<>();
private EmbeddedConnectCluster connect;
private ConnectorHandle connectorHandle;
@ -91,7 +91,7 @@ public class ConnectorRestartApiIntegrationTest {
}
private void startOrReuseConnectWithNumWorkers(int numWorkers) throws Exception {
connect = connectClusterMap.computeIfAbsent(numWorkers, n -> {
connect = CONNECT_CLUSTERS.computeIfAbsent(numWorkers, n -> {
// setup Connect worker properties
Map<String, String> workerProps = new HashMap<>();
workerProps.put(OFFSET_COMMIT_INTERVAL_MS_CONFIG, String.valueOf(OFFSET_COMMIT_INTERVAL_MS));
@ -125,7 +125,7 @@ public class ConnectorRestartApiIntegrationTest {
@AfterClass
public static void close() {
// stop all Connect, Kafka and Zk threads.
connectClusterMap.values().forEach(EmbeddedConnectCluster::stop);
CONNECT_CLUSTERS.values().forEach(EmbeddedConnectCluster::stop);
}
@Test

View File

@ -30,22 +30,31 @@ import org.apache.kafka.connect.util.SinkUtils;
import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
import org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestUtils;
import org.apache.kafka.test.NoRetryException;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import javax.ws.rs.core.Response;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import static javax.ws.rs.core.Response.Status.INTERNAL_SERVER_ERROR;
import static org.apache.kafka.connect.integration.MonitorableSourceConnector.TOPIC_CONFIG;
import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG;
import static org.apache.kafka.connect.runtime.ConnectorConfig.TASKS_MAX_CONFIG;
@ -56,6 +65,7 @@ import static org.apache.kafka.connect.runtime.TopicCreationConfig.REPLICATION_F
import static org.apache.kafka.connect.runtime.WorkerConfig.KEY_CONVERTER_CLASS_CONFIG;
import static org.apache.kafka.connect.runtime.WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG;
import static org.apache.kafka.connect.runtime.WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG;
import static org.apache.kafka.test.TestUtils.waitForCondition;
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;
@ -70,41 +80,85 @@ public class OffsetsApiIntegrationTest {
private static final long OFFSET_COMMIT_INTERVAL_MS = TimeUnit.SECONDS.toMillis(1);
private static final long OFFSET_READ_TIMEOUT_MS = TimeUnit.SECONDS.toMillis(30);
private static final int NUM_WORKERS = 3;
private static final String CONNECTOR_NAME = "test-connector";
private static final String TOPIC = "test-topic";
private static final int NUM_TASKS = 2;
private static final int NUM_RECORDS_PER_PARTITION = 10;
private Map<String, String> workerProps;
private EmbeddedConnectCluster.Builder connectBuilder;
private static final Map<Map<String, String>, EmbeddedConnectCluster> CONNECT_CLUSTERS = new ConcurrentHashMap<>();
@Rule
public TestName currentTest = new TestName();
private EmbeddedConnectCluster connect;
private String connectorName;
private String topic;
@Before
public void setup() {
Properties brokerProps = new Properties();
brokerProps.put("transaction.state.log.replication.factor", "1");
brokerProps.put("transaction.state.log.min.isr", "1");
// setup Connect worker properties
workerProps = new HashMap<>();
workerProps.put(OFFSET_COMMIT_INTERVAL_MS_CONFIG, String.valueOf(OFFSET_COMMIT_INTERVAL_MS));
// build a Connect cluster backed by Kafka and Zk
connectBuilder = new EmbeddedConnectCluster.Builder()
.name("connect-cluster")
.numWorkers(NUM_WORKERS)
.brokerProps(brokerProps)
.workerProps(workerProps);
connectorName = currentTest.getMethodName();
topic = currentTest.getMethodName();
connect = defaultConnectCluster();
}
@After
public void tearDown() {
connect.stop();
Set<String> remainingConnectors = new HashSet<>(connect.connectors());
if (remainingConnectors.remove(connectorName)) {
connect.deleteConnector(connectorName);
}
try {
assertEquals(
"Some connectors were not properly cleaned up after this test",
Collections.emptySet(),
remainingConnectors
);
} finally {
// Make a last-ditch effort to clean up the leaked connectors
// so as not to interfere with other test cases
remainingConnectors.forEach(connect::deleteConnector);
}
}
@AfterClass
public static void close() {
// stop all Connect, Kafka and Zk threads.
CONNECT_CLUSTERS.values().forEach(EmbeddedConnectCluster::stop);
}
private static EmbeddedConnectCluster createOrReuseConnectWithWorkerProps(Map<String, String> workerProps) {
return CONNECT_CLUSTERS.computeIfAbsent(workerProps, props -> {
Properties brokerProps = new Properties();
brokerProps.put("transaction.state.log.replication.factor", "1");
brokerProps.put("transaction.state.log.min.isr", "1");
// Have to declare a new map since the passed-in one may be immutable
Map<String, String> workerPropsWithDefaults = new HashMap<>(workerProps);
// Enable fast offset commits by default
workerPropsWithDefaults.putIfAbsent(OFFSET_COMMIT_INTERVAL_MS_CONFIG, String.valueOf(OFFSET_COMMIT_INTERVAL_MS));
EmbeddedConnectCluster result = new EmbeddedConnectCluster.Builder()
.name("connect-cluster")
.numWorkers(NUM_WORKERS)
.brokerProps(brokerProps)
.workerProps(workerPropsWithDefaults)
.build();
result.start();
return result;
});
}
private static EmbeddedConnectCluster defaultConnectCluster() {
return createOrReuseConnectWithWorkerProps(Collections.emptyMap());
}
private static EmbeddedConnectCluster exactlyOnceSourceConnectCluster() {
Map<String, String> workerProps = Collections.singletonMap(
DistributedConfig.EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG,
"enabled"
);
return createOrReuseConnectWithWorkerProps(workerProps);
}
@Test
public void testGetNonExistentConnectorOffsets() {
connect = connectBuilder.build();
connect.start();
ConnectRestException e = assertThrows(ConnectRestException.class,
() -> connect.connectorOffsets("non-existent-connector"));
assertEquals(404, e.errorCode());
@ -112,32 +166,29 @@ public class OffsetsApiIntegrationTest {
@Test
public void testGetSinkConnectorOffsets() throws Exception {
connect = connectBuilder.build();
connect.start();
getAndVerifySinkConnectorOffsets(baseSinkConnectorConfigs(), connect.kafka());
}
@Test
public void testGetSinkConnectorOffsetsOverriddenConsumerGroupId() throws Exception {
connect = connectBuilder.build();
connect.start();
Map<String, String> connectorConfigs = baseSinkConnectorConfigs();
connectorConfigs.put(ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX + CommonClientConfigs.GROUP_ID_CONFIG,
"overridden-group-id");
String overriddenGroupId = connectorName + "-overridden-group-id";
connectorConfigs.put(
ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX + CommonClientConfigs.GROUP_ID_CONFIG,
overriddenGroupId
);
getAndVerifySinkConnectorOffsets(connectorConfigs, connect.kafka());
// Ensure that the overridden consumer group ID was the one actually used
try (Admin admin = connect.kafka().createAdminClient()) {
Collection<ConsumerGroupListing> consumerGroups = admin.listConsumerGroups().all().get();
assertTrue(consumerGroups.stream().anyMatch(consumerGroupListing -> "overridden-group-id".equals(consumerGroupListing.groupId())));
assertTrue(consumerGroups.stream().noneMatch(consumerGroupListing -> SinkUtils.consumerGroupId(CONNECTOR_NAME).equals(consumerGroupListing.groupId())));
assertTrue(consumerGroups.stream().anyMatch(consumerGroupListing -> overriddenGroupId.equals(consumerGroupListing.groupId())));
assertTrue(consumerGroups.stream().noneMatch(consumerGroupListing -> SinkUtils.consumerGroupId(connectorName).equals(consumerGroupListing.groupId())));
}
}
@Test
public void testGetSinkConnectorOffsetsDifferentKafkaClusterTargeted() throws Exception {
connect = connectBuilder.build();
connect.start();
EmbeddedKafkaCluster kafkaCluster = new EmbeddedKafkaCluster(1, new Properties());
try (AutoCloseable ignored = kafkaCluster::stop) {
@ -154,54 +205,49 @@ public class OffsetsApiIntegrationTest {
}
private void getAndVerifySinkConnectorOffsets(Map<String, String> connectorConfigs, EmbeddedKafkaCluster kafkaCluster) throws Exception {
kafkaCluster.createTopic(TOPIC, 5);
kafkaCluster.createTopic(topic, 5);
// Produce records to each partition
for (int partition = 0; partition < 5; partition++) {
for (int record = 0; record < NUM_RECORDS_PER_PARTITION; record++) {
kafkaCluster.produce(TOPIC, partition, "key", "value");
kafkaCluster.produce(topic, partition, "key", "value");
}
}
// Create sink connector
connect.configureConnector(CONNECTOR_NAME, connectorConfigs);
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, NUM_TASKS,
connect.configureConnector(connectorName, connectorConfigs);
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(connectorName, NUM_TASKS,
"Connector tasks did not start in time.");
verifyExpectedSinkConnectorOffsets(CONNECTOR_NAME, TOPIC, 5, NUM_RECORDS_PER_PARTITION,
verifyExpectedSinkConnectorOffsets(connectorName, topic, 5, NUM_RECORDS_PER_PARTITION,
"Sink connector consumer group offsets should catch up to the topic end offsets");
// Produce more records to each partition
for (int partition = 0; partition < 5; partition++) {
for (int record = 0; record < NUM_RECORDS_PER_PARTITION; record++) {
kafkaCluster.produce(TOPIC, partition, "key", "value");
kafkaCluster.produce(topic, partition, "key", "value");
}
}
verifyExpectedSinkConnectorOffsets(CONNECTOR_NAME, TOPIC, 5, 2 * NUM_RECORDS_PER_PARTITION,
verifyExpectedSinkConnectorOffsets(connectorName, topic, 5, 2 * NUM_RECORDS_PER_PARTITION,
"Sink connector consumer group offsets should catch up to the topic end offsets");
}
@Test
public void testGetSourceConnectorOffsets() throws Exception {
connect = connectBuilder.build();
connect.start();
getAndVerifySourceConnectorOffsets(baseSourceConnectorConfigs());
}
@Test
public void testGetSourceConnectorOffsetsCustomOffsetsTopic() throws Exception {
connect = connectBuilder.build();
connect.start();
Map<String, String> connectorConfigs = baseSourceConnectorConfigs();
connectorConfigs.put(SourceConnectorConfig.OFFSETS_TOPIC_CONFIG, "custom-offsets-topic");
String connectorOffsetsTopic = connectorName + "-custom-offsets-topic";
connectorConfigs.put(SourceConnectorConfig.OFFSETS_TOPIC_CONFIG, connectorOffsetsTopic);
getAndVerifySourceConnectorOffsets(connectorConfigs);
}
@Test
public void testGetSourceConnectorOffsetsDifferentKafkaClusterTargeted() throws Exception {
connect = connectBuilder.build();
connect.start();
EmbeddedKafkaCluster kafkaCluster = new EmbeddedKafkaCluster(1, new Properties());
try (AutoCloseable ignored = kafkaCluster::stop) {
@ -219,25 +265,23 @@ public class OffsetsApiIntegrationTest {
private void getAndVerifySourceConnectorOffsets(Map<String, String> connectorConfigs) throws Exception {
// Create source connector
connect.configureConnector(CONNECTOR_NAME, connectorConfigs);
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, NUM_TASKS,
connect.configureConnector(connectorName, connectorConfigs);
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(connectorName, NUM_TASKS,
"Connector tasks did not start in time.");
verifyExpectedSourceConnectorOffsets(CONNECTOR_NAME, NUM_TASKS, NUM_RECORDS_PER_PARTITION,
verifyExpectedSourceConnectorOffsets(connectorName, NUM_TASKS, NUM_RECORDS_PER_PARTITION,
"Source connector offsets should reflect the expected number of records produced");
// Each task should produce more records
connectorConfigs.put(MonitorableSourceConnector.MAX_MESSAGES_PRODUCED_CONFIG, String.valueOf(2 * NUM_RECORDS_PER_PARTITION));
connect.configureConnector(CONNECTOR_NAME, connectorConfigs);
connect.configureConnector(connectorName, connectorConfigs);
verifyExpectedSourceConnectorOffsets(CONNECTOR_NAME, NUM_TASKS, 2 * NUM_RECORDS_PER_PARTITION,
verifyExpectedSourceConnectorOffsets(connectorName, NUM_TASKS, 2 * NUM_RECORDS_PER_PARTITION,
"Source connector offsets should reflect the expected number of records produced");
}
@Test
public void testAlterOffsetsNonExistentConnector() throws Exception {
connect = connectBuilder.build();
connect.start();
ConnectRestException e = assertThrows(ConnectRestException.class,
() -> connect.alterConnectorOffsets("non-existent-connector", new ConnectorOffsets(Collections.singletonList(
new ConnectorOffset(Collections.emptyMap(), Collections.emptyMap())))));
@ -246,67 +290,62 @@ public class OffsetsApiIntegrationTest {
@Test
public void testAlterOffsetsNonStoppedConnector() throws Exception {
connect = connectBuilder.build();
connect.start();
// Create source connector
connect.configureConnector(CONNECTOR_NAME, baseSourceConnectorConfigs());
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, NUM_TASKS,
connect.configureConnector(connectorName, baseSourceConnectorConfigs());
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(connectorName, NUM_TASKS,
"Connector tasks did not start in time.");
List<ConnectorOffset> offsets = new ArrayList<>();
// The MonitorableSourceConnector has a source partition per task
for (int i = 0; i < NUM_TASKS; i++) {
offsets.add(
new ConnectorOffset(Collections.singletonMap("task.id", CONNECTOR_NAME + "-" + i),
new ConnectorOffset(Collections.singletonMap("task.id", connectorName + "-" + i),
Collections.singletonMap("saved", 5))
);
}
// Try altering offsets for a running connector
ConnectRestException e = assertThrows(ConnectRestException.class,
() -> connect.alterConnectorOffsets(CONNECTOR_NAME, new ConnectorOffsets(offsets)));
() -> connect.alterConnectorOffsets(connectorName, new ConnectorOffsets(offsets)));
assertEquals(400, e.errorCode());
connect.pauseConnector(CONNECTOR_NAME);
connect.pauseConnector(connectorName);
connect.assertions().assertConnectorAndExactlyNumTasksArePaused(
CONNECTOR_NAME,
connectorName,
NUM_TASKS,
"Connector did not pause in time"
);
// Try altering offsets for a paused (not stopped) connector
e = assertThrows(ConnectRestException.class,
() -> connect.alterConnectorOffsets(CONNECTOR_NAME, new ConnectorOffsets(offsets)));
() -> connect.alterConnectorOffsets(connectorName, new ConnectorOffsets(offsets)));
assertEquals(400, e.errorCode());
}
@Test
public void testAlterSinkConnectorOffsets() throws Exception {
connect = connectBuilder.build();
connect.start();
alterAndVerifySinkConnectorOffsets(baseSinkConnectorConfigs(), connect.kafka());
}
@Test
public void testAlterSinkConnectorOffsetsOverriddenConsumerGroupId() throws Exception {
connect = connectBuilder.build();
connect.start();
Map<String, String> connectorConfigs = baseSinkConnectorConfigs();
connectorConfigs.put(ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX + CommonClientConfigs.GROUP_ID_CONFIG,
"overridden-group-id");
String overriddenGroupId = connectorName + "-overridden-group-id";
connectorConfigs.put(
ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX + CommonClientConfigs.GROUP_ID_CONFIG,
overriddenGroupId
);
alterAndVerifySinkConnectorOffsets(connectorConfigs, connect.kafka());
// Ensure that the overridden consumer group ID was the one actually used
try (Admin admin = connect.kafka().createAdminClient()) {
Collection<ConsumerGroupListing> consumerGroups = admin.listConsumerGroups().all().get();
assertTrue(consumerGroups.stream().anyMatch(consumerGroupListing -> "overridden-group-id".equals(consumerGroupListing.groupId())));
assertTrue(consumerGroups.stream().noneMatch(consumerGroupListing -> SinkUtils.consumerGroupId(CONNECTOR_NAME).equals(consumerGroupListing.groupId())));
assertTrue(consumerGroups.stream().anyMatch(consumerGroupListing -> overriddenGroupId.equals(consumerGroupListing.groupId())));
assertTrue(consumerGroups.stream().noneMatch(consumerGroupListing -> SinkUtils.consumerGroupId(connectorName).equals(consumerGroupListing.groupId())));
}
}
@Test
public void testAlterSinkConnectorOffsetsDifferentKafkaClusterTargeted() throws Exception {
connect = connectBuilder.build();
connect.start();
EmbeddedKafkaCluster kafkaCluster = new EmbeddedKafkaCluster(1, new Properties());
try (AutoCloseable ignored = kafkaCluster::stop) {
@ -324,127 +363,125 @@ public class OffsetsApiIntegrationTest {
private void alterAndVerifySinkConnectorOffsets(Map<String, String> connectorConfigs, EmbeddedKafkaCluster kafkaCluster) throws Exception {
int numPartitions = 3;
kafkaCluster.createTopic(TOPIC, numPartitions);
kafkaCluster.createTopic(topic, numPartitions);
// Produce records to each partition
for (int partition = 0; partition < numPartitions; partition++) {
for (int record = 0; record < NUM_RECORDS_PER_PARTITION; record++) {
kafkaCluster.produce(TOPIC, partition, "key", "value");
kafkaCluster.produce(topic, partition, "key", "value");
}
}
// Create sink connector
connect.configureConnector(CONNECTOR_NAME, connectorConfigs);
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, NUM_TASKS,
connect.configureConnector(connectorName, connectorConfigs);
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(connectorName, NUM_TASKS,
"Connector tasks did not start in time.");
verifyExpectedSinkConnectorOffsets(CONNECTOR_NAME, TOPIC, numPartitions, NUM_RECORDS_PER_PARTITION,
verifyExpectedSinkConnectorOffsets(connectorName, topic, numPartitions, NUM_RECORDS_PER_PARTITION,
"Sink connector consumer group offsets should catch up to the topic end offsets");
connect.stopConnector(CONNECTOR_NAME);
connect.stopConnector(connectorName);
connect.assertions().assertConnectorIsStopped(
CONNECTOR_NAME,
connectorName,
"Connector did not stop in time"
);
// Delete the offset of one partition; alter the offsets of the others
List<ConnectorOffset> offsetsToAlter = new ArrayList<>();
Map<String, Object> partition = new HashMap<>();
partition.put(SinkUtils.KAFKA_TOPIC_KEY, TOPIC);
partition.put(SinkUtils.KAFKA_TOPIC_KEY, topic);
partition.put(SinkUtils.KAFKA_PARTITION_KEY, 0);
offsetsToAlter.add(new ConnectorOffset(partition, null));
for (int i = 1; i < numPartitions; i++) {
partition = new HashMap<>();
partition.put(SinkUtils.KAFKA_TOPIC_KEY, TOPIC);
partition.put(SinkUtils.KAFKA_TOPIC_KEY, topic);
partition.put(SinkUtils.KAFKA_PARTITION_KEY, i);
offsetsToAlter.add(new ConnectorOffset(partition, Collections.singletonMap(SinkUtils.KAFKA_OFFSET_KEY, 5)));
}
String response = connect.alterConnectorOffsets(CONNECTOR_NAME, new ConnectorOffsets(offsetsToAlter));
// Alter the sink connector's offsets, with retry logic (since we just stopped the connector)
String response = modifySinkConnectorOffsetsWithRetry(new ConnectorOffsets(offsetsToAlter));
assertThat(response, containsString("The Connect framework-managed offsets for this connector have been altered successfully. " +
"However, if this connector manages offsets externally, they will need to be manually altered in the system that the connector uses."));
verifyExpectedSinkConnectorOffsets(CONNECTOR_NAME, TOPIC, numPartitions - 1, 5,
verifyExpectedSinkConnectorOffsets(connectorName, topic, numPartitions - 1, 5,
"Sink connector consumer group offsets should reflect the altered offsets");
// Update the connector's configs; this time expect SinkConnector::alterOffsets to return true
connectorConfigs.put(MonitorableSinkConnector.ALTER_OFFSETS_RESULT, "true");
connect.configureConnector(CONNECTOR_NAME, connectorConfigs);
connect.configureConnector(connectorName, connectorConfigs);
// Alter offsets again while the connector is still in a stopped state
offsetsToAlter.clear();
for (int i = 1; i < numPartitions; i++) {
partition = new HashMap<>();
partition.put(SinkUtils.KAFKA_TOPIC_KEY, TOPIC);
partition.put(SinkUtils.KAFKA_TOPIC_KEY, topic);
partition.put(SinkUtils.KAFKA_PARTITION_KEY, i);
offsetsToAlter.add(new ConnectorOffset(partition, Collections.singletonMap(SinkUtils.KAFKA_OFFSET_KEY, 3)));
}
response = connect.alterConnectorOffsets(CONNECTOR_NAME, new ConnectorOffsets(offsetsToAlter));
response = connect.alterConnectorOffsets(connectorName, new ConnectorOffsets(offsetsToAlter));
assertThat(response, containsString("The offsets for this connector have been altered successfully"));
verifyExpectedSinkConnectorOffsets(CONNECTOR_NAME, TOPIC, numPartitions - 1, 3,
verifyExpectedSinkConnectorOffsets(connectorName, topic, numPartitions - 1, 3,
"Sink connector consumer group offsets should reflect the altered offsets");
// Resume the connector and expect its offsets to catch up to the latest offsets
connect.resumeConnector(CONNECTOR_NAME);
connect.resumeConnector(connectorName);
connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(
CONNECTOR_NAME,
connectorName,
NUM_TASKS,
"Connector tasks did not resume in time"
);
verifyExpectedSinkConnectorOffsets(CONNECTOR_NAME, TOPIC, numPartitions, NUM_RECORDS_PER_PARTITION,
verifyExpectedSinkConnectorOffsets(connectorName, topic, numPartitions, NUM_RECORDS_PER_PARTITION,
"Sink connector consumer group offsets should catch up to the topic end offsets");
}
@Test
public void testAlterSinkConnectorOffsetsZombieSinkTasks() throws Exception {
connect = connectBuilder.build();
connect.start();
connect.kafka().createTopic(TOPIC, 1);
connect.kafka().createTopic(topic, 1);
// Produce records
for (int record = 0; record < NUM_RECORDS_PER_PARTITION; record++) {
connect.kafka().produce(TOPIC, 0, "key", "value");
connect.kafka().produce(topic, 0, "key", "value");
}
// Configure a sink connector whose sink task blocks in its stop method
Map<String, String> connectorConfigs = new HashMap<>();
connectorConfigs.put(CONNECTOR_CLASS_CONFIG, BlockingConnectorTest.BlockingSinkConnector.class.getName());
connectorConfigs.put(TOPICS_CONFIG, TOPIC);
connectorConfigs.put(TOPICS_CONFIG, topic);
connectorConfigs.put("block", "Task::stop");
connect.configureConnector(CONNECTOR_NAME, connectorConfigs);
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, 1,
connect.configureConnector(connectorName, connectorConfigs);
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(connectorName, 1,
"Connector tasks did not start in time.");
connect.stopConnector(CONNECTOR_NAME);
connect.stopConnector(connectorName);
// Try to delete the offsets for the single topic partition
Map<String, Object> partition = new HashMap<>();
partition.put(SinkUtils.KAFKA_TOPIC_KEY, TOPIC);
partition.put(SinkUtils.KAFKA_TOPIC_KEY, topic);
partition.put(SinkUtils.KAFKA_PARTITION_KEY, 0);
List<ConnectorOffset> offsetsToAlter = Collections.singletonList(new ConnectorOffset(partition, null));
ConnectRestException e = assertThrows(ConnectRestException.class,
() -> connect.alterConnectorOffsets(CONNECTOR_NAME, new ConnectorOffsets(offsetsToAlter)));
() -> connect.alterConnectorOffsets(connectorName, new ConnectorOffsets(offsetsToAlter)));
assertThat(e.getMessage(), containsString("zombie sink task"));
}
@Test
public void testAlterSinkConnectorOffsetsInvalidRequestBody() throws Exception {
connect = connectBuilder.build();
connect.start();
// Create a sink connector and stop it
connect.configureConnector(CONNECTOR_NAME, baseSinkConnectorConfigs());
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, NUM_TASKS,
connect.configureConnector(connectorName, baseSinkConnectorConfigs());
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(connectorName, NUM_TASKS,
"Connector tasks did not start in time.");
connect.stopConnector(CONNECTOR_NAME);
connect.stopConnector(connectorName);
connect.assertions().assertConnectorIsStopped(
CONNECTOR_NAME,
connectorName,
"Connector did not stop in time"
);
String url = connect.endpointForResource(String.format("connectors/%s/offsets", CONNECTOR_NAME));
String url = connect.endpointForResource(String.format("connectors/%s/offsets", connectorName));
String content = "{}";
try (Response response = connect.requestPatch(url, content)) {
@ -497,15 +534,11 @@ public class OffsetsApiIntegrationTest {
@Test
public void testAlterSourceConnectorOffsets() throws Exception {
connect = connectBuilder.build();
connect.start();
alterAndVerifySourceConnectorOffsets(baseSourceConnectorConfigs());
}
@Test
public void testAlterSourceConnectorOffsetsCustomOffsetsTopic() throws Exception {
connect = connectBuilder.build();
connect.start();
Map<String, String> connectorConfigs = baseSourceConnectorConfigs();
connectorConfigs.put(SourceConnectorConfig.OFFSETS_TOPIC_CONFIG, "custom-offsets-topic");
alterAndVerifySourceConnectorOffsets(connectorConfigs);
@ -513,8 +546,6 @@ public class OffsetsApiIntegrationTest {
@Test
public void testAlterSourceConnectorOffsetsDifferentKafkaClusterTargeted() throws Exception {
connect = connectBuilder.build();
connect.start();
EmbeddedKafkaCluster kafkaCluster = new EmbeddedKafkaCluster(1, new Properties());
try (AutoCloseable ignored = kafkaCluster::stop) {
@ -532,25 +563,23 @@ public class OffsetsApiIntegrationTest {
@Test
public void testAlterSourceConnectorOffsetsExactlyOnceSupportEnabled() throws Exception {
workerProps.put(DistributedConfig.EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG, "enabled");
connect = connectBuilder.workerProps(workerProps).build();
connect.start();
connect = exactlyOnceSourceConnectCluster();
alterAndVerifySourceConnectorOffsets(baseSourceConnectorConfigs());
}
public void alterAndVerifySourceConnectorOffsets(Map<String, String> connectorConfigs) throws Exception {
// Create source connector
connect.configureConnector(CONNECTOR_NAME, connectorConfigs);
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, NUM_TASKS,
connect.configureConnector(connectorName, connectorConfigs);
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(connectorName, NUM_TASKS,
"Connector tasks did not start in time.");
verifyExpectedSourceConnectorOffsets(CONNECTOR_NAME, NUM_TASKS, NUM_RECORDS_PER_PARTITION,
verifyExpectedSourceConnectorOffsets(connectorName, NUM_TASKS, NUM_RECORDS_PER_PARTITION,
"Source connector offsets should reflect the expected number of records produced");
connect.stopConnector(CONNECTOR_NAME);
connect.stopConnector(connectorName);
connect.assertions().assertConnectorIsStopped(
CONNECTOR_NAME,
connectorName,
"Connector did not stop in time"
);
@ -558,63 +587,61 @@ public class OffsetsApiIntegrationTest {
// The MonitorableSourceConnector has a source partition per task
for (int i = 0; i < NUM_TASKS; i++) {
offsetsToAlter.add(
new ConnectorOffset(Collections.singletonMap("task.id", CONNECTOR_NAME + "-" + i),
new ConnectorOffset(Collections.singletonMap("task.id", connectorName + "-" + i),
Collections.singletonMap("saved", 5))
);
}
String response = connect.alterConnectorOffsets(CONNECTOR_NAME, new ConnectorOffsets(offsetsToAlter));
String response = connect.alterConnectorOffsets(connectorName, new ConnectorOffsets(offsetsToAlter));
assertThat(response, containsString("The Connect framework-managed offsets for this connector have been altered successfully. " +
"However, if this connector manages offsets externally, they will need to be manually altered in the system that the connector uses."));
verifyExpectedSourceConnectorOffsets(CONNECTOR_NAME, NUM_TASKS, 5,
verifyExpectedSourceConnectorOffsets(connectorName, NUM_TASKS, 5,
"Source connector offsets should reflect the altered offsets");
// Update the connector's configs; this time expect SourceConnector::alterOffsets to return true
connectorConfigs.put(MonitorableSourceConnector.ALTER_OFFSETS_RESULT, "true");
connect.configureConnector(CONNECTOR_NAME, connectorConfigs);
connect.configureConnector(connectorName, connectorConfigs);
// Alter offsets again while connector is in stopped state
offsetsToAlter = new ArrayList<>();
// The MonitorableSourceConnector has a source partition per task
for (int i = 0; i < NUM_TASKS; i++) {
offsetsToAlter.add(
new ConnectorOffset(Collections.singletonMap("task.id", CONNECTOR_NAME + "-" + i),
new ConnectorOffset(Collections.singletonMap("task.id", connectorName + "-" + i),
Collections.singletonMap("saved", 7))
);
}
response = connect.alterConnectorOffsets(CONNECTOR_NAME, new ConnectorOffsets(offsetsToAlter));
response = connect.alterConnectorOffsets(connectorName, new ConnectorOffsets(offsetsToAlter));
assertThat(response, containsString("The offsets for this connector have been altered successfully"));
verifyExpectedSourceConnectorOffsets(CONNECTOR_NAME, NUM_TASKS, 7,
verifyExpectedSourceConnectorOffsets(connectorName, NUM_TASKS, 7,
"Source connector offsets should reflect the altered offsets");
// Resume the connector and expect its offsets to catch up to the latest offsets
connect.resumeConnector(CONNECTOR_NAME);
connect.resumeConnector(connectorName);
connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(
CONNECTOR_NAME,
connectorName,
NUM_TASKS,
"Connector tasks did not resume in time"
);
verifyExpectedSourceConnectorOffsets(CONNECTOR_NAME, NUM_TASKS, NUM_RECORDS_PER_PARTITION,
verifyExpectedSourceConnectorOffsets(connectorName, NUM_TASKS, NUM_RECORDS_PER_PARTITION,
"Source connector offsets should reflect the expected number of records produced");
}
@Test
public void testAlterSourceConnectorOffsetsInvalidRequestBody() throws Exception {
connect = connectBuilder.build();
connect.start();
// Create a source connector and stop it
connect.configureConnector(CONNECTOR_NAME, baseSourceConnectorConfigs());
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, NUM_TASKS,
connect.configureConnector(connectorName, baseSourceConnectorConfigs());
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(connectorName, NUM_TASKS,
"Connector tasks did not start in time.");
connect.stopConnector(CONNECTOR_NAME);
connect.stopConnector(connectorName);
connect.assertions().assertConnectorIsStopped(
CONNECTOR_NAME,
connectorName,
"Connector did not stop in time"
);
String url = connect.endpointForResource(String.format("connectors/%s/offsets", CONNECTOR_NAME));
String url = connect.endpointForResource(String.format("connectors/%s/offsets", connectorName));
String content = "[]";
try (Response response = connect.requestPatch(url, content)) {
@ -667,31 +694,28 @@ public class OffsetsApiIntegrationTest {
@Test
public void testResetSinkConnectorOffsets() throws Exception {
connect = connectBuilder.build();
connect.start();
resetAndVerifySinkConnectorOffsets(baseSinkConnectorConfigs(), connect.kafka());
}
@Test
public void testResetSinkConnectorOffsetsOverriddenConsumerGroupId() throws Exception {
connect = connectBuilder.build();
connect.start();
Map<String, String> connectorConfigs = baseSinkConnectorConfigs();
connectorConfigs.put(ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX + CommonClientConfigs.GROUP_ID_CONFIG,
"overridden-group-id");
String overriddenGroupId = connectorName + "-overridden-group-id";
connectorConfigs.put(
ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX + CommonClientConfigs.GROUP_ID_CONFIG,
overriddenGroupId
);
resetAndVerifySinkConnectorOffsets(connectorConfigs, connect.kafka());
// Ensure that the overridden consumer group ID was the one actually used
try (Admin admin = connect.kafka().createAdminClient()) {
Collection<ConsumerGroupListing> consumerGroups = admin.listConsumerGroups().all().get();
assertTrue(consumerGroups.stream().anyMatch(consumerGroupListing -> "overridden-group-id".equals(consumerGroupListing.groupId())));
assertTrue(consumerGroups.stream().noneMatch(consumerGroupListing -> SinkUtils.consumerGroupId(CONNECTOR_NAME).equals(consumerGroupListing.groupId())));
assertTrue(consumerGroups.stream().anyMatch(consumerGroupListing -> overriddenGroupId.equals(consumerGroupListing.groupId())));
assertTrue(consumerGroups.stream().noneMatch(consumerGroupListing -> SinkUtils.consumerGroupId(connectorName).equals(consumerGroupListing.groupId())));
}
}
@Test
public void testResetSinkConnectorOffsetsDifferentKafkaClusterTargeted() throws Exception {
connect = connectBuilder.build();
connect.start();
EmbeddedKafkaCluster kafkaCluster = new EmbeddedKafkaCluster(1, new Properties());
try (AutoCloseable ignored = kafkaCluster::stop) {
@ -709,95 +733,89 @@ public class OffsetsApiIntegrationTest {
private void resetAndVerifySinkConnectorOffsets(Map<String, String> connectorConfigs, EmbeddedKafkaCluster kafkaCluster) throws Exception {
int numPartitions = 3;
kafkaCluster.createTopic(TOPIC, numPartitions);
kafkaCluster.createTopic(topic, numPartitions);
// Produce records to each partition
for (int partition = 0; partition < numPartitions; partition++) {
for (int record = 0; record < NUM_RECORDS_PER_PARTITION; record++) {
kafkaCluster.produce(TOPIC, partition, "key", "value");
kafkaCluster.produce(topic, partition, "key", "value");
}
}
// Create sink connector
connect.configureConnector(CONNECTOR_NAME, connectorConfigs);
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, NUM_TASKS,
connect.configureConnector(connectorName, connectorConfigs);
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(connectorName, NUM_TASKS,
"Connector tasks did not start in time.");
verifyExpectedSinkConnectorOffsets(CONNECTOR_NAME, TOPIC, numPartitions, NUM_RECORDS_PER_PARTITION,
verifyExpectedSinkConnectorOffsets(connectorName, topic, numPartitions, NUM_RECORDS_PER_PARTITION,
"Sink connector consumer group offsets should catch up to the topic end offsets");
connect.stopConnector(CONNECTOR_NAME);
connect.stopConnector(connectorName);
connect.assertions().assertConnectorIsStopped(
CONNECTOR_NAME,
connectorName,
"Connector did not stop in time"
);
// Reset the sink connector's offsets
String response = connect.resetConnectorOffsets(CONNECTOR_NAME);
// Reset the sink connector's offsets, with retry logic (since we just stopped the connector)
String response = modifySinkConnectorOffsetsWithRetry(null);
assertThat(response, containsString("The Connect framework-managed offsets for this connector have been reset successfully. " +
"However, if this connector manages offsets externally, they will need to be manually reset in the system that the connector uses."));
verifyEmptyConnectorOffsets(CONNECTOR_NAME);
verifyEmptyConnectorOffsets(connectorName);
// Reset the sink connector's offsets again while it is still in a STOPPED state and ensure that there is no error
response = connect.resetConnectorOffsets(CONNECTOR_NAME);
response = connect.resetConnectorOffsets(connectorName);
assertThat(response, containsString("The Connect framework-managed offsets for this connector have been reset successfully. " +
"However, if this connector manages offsets externally, they will need to be manually reset in the system that the connector uses."));
verifyEmptyConnectorOffsets(CONNECTOR_NAME);
verifyEmptyConnectorOffsets(connectorName);
// Resume the connector and expect its offsets to catch up to the latest offsets
connect.resumeConnector(CONNECTOR_NAME);
connect.resumeConnector(connectorName);
connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(
CONNECTOR_NAME,
connectorName,
NUM_TASKS,
"Connector tasks did not resume in time"
);
verifyExpectedSinkConnectorOffsets(CONNECTOR_NAME, TOPIC, numPartitions, NUM_RECORDS_PER_PARTITION,
verifyExpectedSinkConnectorOffsets(connectorName, topic, numPartitions, NUM_RECORDS_PER_PARTITION,
"Sink connector consumer group offsets should catch up to the topic end offsets");
}
@Test
public void testResetSinkConnectorOffsetsZombieSinkTasks() throws Exception {
connect = connectBuilder.build();
connect.start();
connect.kafka().createTopic(TOPIC, 1);
connect.kafka().createTopic(topic, 1);
// Produce records
for (int record = 0; record < NUM_RECORDS_PER_PARTITION; record++) {
connect.kafka().produce(TOPIC, 0, "key", "value");
connect.kafka().produce(topic, 0, "key", "value");
}
// Configure a sink connector whose sink task blocks in its stop method
Map<String, String> connectorConfigs = new HashMap<>();
connectorConfigs.put(CONNECTOR_CLASS_CONFIG, BlockingConnectorTest.BlockingSinkConnector.class.getName());
connectorConfigs.put(TOPICS_CONFIG, TOPIC);
connectorConfigs.put(TOPICS_CONFIG, topic);
connectorConfigs.put("block", "Task::stop");
connect.configureConnector(CONNECTOR_NAME, connectorConfigs);
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, 1,
connect.configureConnector(connectorName, connectorConfigs);
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(connectorName, 1,
"Connector tasks did not start in time.");
verifyExpectedSinkConnectorOffsets(CONNECTOR_NAME, TOPIC, 1, NUM_RECORDS_PER_PARTITION,
verifyExpectedSinkConnectorOffsets(connectorName, topic, 1, NUM_RECORDS_PER_PARTITION,
"Sink connector consumer group offsets should catch up to the topic end offsets");
connect.stopConnector(CONNECTOR_NAME);
connect.stopConnector(connectorName);
// Try to reset the offsets
ConnectRestException e = assertThrows(ConnectRestException.class, () -> connect.resetConnectorOffsets(CONNECTOR_NAME));
ConnectRestException e = assertThrows(ConnectRestException.class, () -> connect.resetConnectorOffsets(connectorName));
assertThat(e.getMessage(), containsString("zombie sink task"));
}
@Test
public void testResetSourceConnectorOffsets() throws Exception {
connect = connectBuilder.build();
connect.start();
resetAndVerifySourceConnectorOffsets(baseSourceConnectorConfigs());
}
@Test
public void testResetSourceConnectorOffsetsCustomOffsetsTopic() throws Exception {
connect = connectBuilder.build();
connect.start();
Map<String, String> connectorConfigs = baseSourceConnectorConfigs();
connectorConfigs.put(SourceConnectorConfig.OFFSETS_TOPIC_CONFIG, "custom-offsets-topic");
resetAndVerifySourceConnectorOffsets(connectorConfigs);
@ -805,8 +823,6 @@ public class OffsetsApiIntegrationTest {
@Test
public void testResetSourceConnectorOffsetsDifferentKafkaClusterTargeted() throws Exception {
connect = connectBuilder.build();
connect.start();
EmbeddedKafkaCluster kafkaCluster = new EmbeddedKafkaCluster(1, new Properties());
try (AutoCloseable ignored = kafkaCluster::stop) {
@ -824,50 +840,48 @@ public class OffsetsApiIntegrationTest {
@Test
public void testResetSourceConnectorOffsetsExactlyOnceSupportEnabled() throws Exception {
workerProps.put(DistributedConfig.EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG, "enabled");
connect = connectBuilder.workerProps(workerProps).build();
connect.start();
connect = exactlyOnceSourceConnectCluster();
resetAndVerifySourceConnectorOffsets(baseSourceConnectorConfigs());
}
public void resetAndVerifySourceConnectorOffsets(Map<String, String> connectorConfigs) throws Exception {
// Create source connector
connect.configureConnector(CONNECTOR_NAME, connectorConfigs);
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, NUM_TASKS,
connect.configureConnector(connectorName, connectorConfigs);
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(connectorName, NUM_TASKS,
"Connector tasks did not start in time.");
verifyExpectedSourceConnectorOffsets(CONNECTOR_NAME, NUM_TASKS, NUM_RECORDS_PER_PARTITION,
verifyExpectedSourceConnectorOffsets(connectorName, NUM_TASKS, NUM_RECORDS_PER_PARTITION,
"Source connector offsets should reflect the expected number of records produced");
connect.stopConnector(CONNECTOR_NAME);
connect.stopConnector(connectorName);
connect.assertions().assertConnectorIsStopped(
CONNECTOR_NAME,
connectorName,
"Connector did not stop in time"
);
// Reset the source connector's offsets
String response = connect.resetConnectorOffsets(CONNECTOR_NAME);
String response = connect.resetConnectorOffsets(connectorName);
assertThat(response, containsString("The Connect framework-managed offsets for this connector have been reset successfully. " +
"However, if this connector manages offsets externally, they will need to be manually reset in the system that the connector uses."));
verifyEmptyConnectorOffsets(CONNECTOR_NAME);
verifyEmptyConnectorOffsets(connectorName);
// Reset the source connector's offsets again while it is still in a STOPPED state and ensure that there is no error
response = connect.resetConnectorOffsets(CONNECTOR_NAME);
response = connect.resetConnectorOffsets(connectorName);
assertThat(response, containsString("The Connect framework-managed offsets for this connector have been reset successfully. " +
"However, if this connector manages offsets externally, they will need to be manually reset in the system that the connector uses."));
verifyEmptyConnectorOffsets(CONNECTOR_NAME);
verifyEmptyConnectorOffsets(connectorName);
// Resume the connector and expect its offsets to catch up to the latest offsets
connect.resumeConnector(CONNECTOR_NAME);
connect.resumeConnector(connectorName);
connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(
CONNECTOR_NAME,
connectorName,
NUM_TASKS,
"Connector tasks did not resume in time"
);
verifyExpectedSourceConnectorOffsets(CONNECTOR_NAME, NUM_TASKS, NUM_RECORDS_PER_PARTITION,
verifyExpectedSourceConnectorOffsets(connectorName, NUM_TASKS, NUM_RECORDS_PER_PARTITION,
"Source connector offsets should reflect the expected number of records produced");
}
@ -875,7 +889,7 @@ public class OffsetsApiIntegrationTest {
Map<String, String> configs = new HashMap<>();
configs.put(CONNECTOR_CLASS_CONFIG, MonitorableSinkConnector.class.getSimpleName());
configs.put(TASKS_MAX_CONFIG, String.valueOf(NUM_TASKS));
configs.put(TOPICS_CONFIG, TOPIC);
configs.put(TOPICS_CONFIG, topic);
configs.put(KEY_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
configs.put(VALUE_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
return configs;
@ -885,7 +899,7 @@ public class OffsetsApiIntegrationTest {
Map<String, String> props = new HashMap<>();
props.put(CONNECTOR_CLASS_CONFIG, MonitorableSourceConnector.class.getSimpleName());
props.put(TASKS_MAX_CONFIG, String.valueOf(NUM_TASKS));
props.put(TOPIC_CONFIG, TOPIC);
props.put(TOPIC_CONFIG, topic);
props.put(MonitorableSourceConnector.MESSAGES_PER_POLL_CONFIG, "3");
props.put(MonitorableSourceConnector.MAX_MESSAGES_PRODUCED_CONFIG, String.valueOf(NUM_RECORDS_PER_PARTITION));
props.put(ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
@ -895,6 +909,56 @@ public class OffsetsApiIntegrationTest {
return props;
}
/**
* Modify (i.e., alter or reset) the offsets for a sink connector, with retry logic to
* handle cases where laggy task shutdown may have left a consumer in the group.
* @param offsetsToAlter the offsets to alter for the sink connector, or null if
* the connector's offsets should be reset instead
* @return the response from the REST API, if the request was successful
* @throws InterruptedException if the thread is interrupted while waiting for a
* request to modify the connector's offsets to succeed
* @see <a href="https://issues.apache.org/jira/browse/KAFKA-15826">KAFKA-15826</a>
*/
private String modifySinkConnectorOffsetsWithRetry(ConnectorOffsets offsetsToAlter) throws InterruptedException {
// Some retry logic is necessary to account for KAFKA-15826,
// where laggy sink task startup/shutdown can leave consumers running
String modifyVerb = offsetsToAlter != null ? "alter" : "reset";
String conditionDetails = "Failed to " + modifyVerb + " sink connector offsets in time";
AtomicReference<String> responseReference = new AtomicReference<>();
waitForCondition(
() -> {
try {
if (offsetsToAlter == null) {
responseReference.set(connect.resetConnectorOffsets(connectorName));
} else {
responseReference.set(connect.alterConnectorOffsets(connectorName, offsetsToAlter));
}
return true;
} catch (ConnectRestException e) {
boolean internalServerError = e.statusCode() == INTERNAL_SERVER_ERROR.getStatusCode();
String message = Optional.of(e.getMessage()).orElse("");
boolean failedToModifyConsumerOffsets = message.contains(
"Failed to " + modifyVerb + " consumer group offsets for connector"
);
boolean canBeRetried = message.contains("If the connector is in a stopped state, this operation can be safely retried");
boolean retriable = internalServerError && failedToModifyConsumerOffsets && canBeRetried;
if (retriable) {
return false;
} else {
throw new NoRetryException(e);
}
} catch (Throwable t) {
throw new NoRetryException(t);
}
},
30_000,
conditionDetails
);
return responseReference.get();
}
/**
* Verify whether the actual consumer group offsets for a sink connector match the expected offsets. The verification
* is done using the <strong><em>GET /connectors/{connector}/offsets</em></strong> REST API which is repeatedly queried
@ -914,7 +978,7 @@ public class OffsetsApiIntegrationTest {
*/
private void verifyExpectedSinkConnectorOffsets(String connectorName, String expectedTopic, int expectedPartitions,
int expectedOffset, String conditionDetails) throws InterruptedException {
TestUtils.waitForCondition(() -> {
waitForCondition(() -> {
ConnectorOffsets offsets = connect.connectorOffsets(connectorName);
if (offsets.offsets().size() != expectedPartitions) {
return false;
@ -944,14 +1008,14 @@ public class OffsetsApiIntegrationTest {
*/
private void verifyExpectedSourceConnectorOffsets(String connectorName, int numTasks,
int expectedOffset, String conditionDetails) throws InterruptedException {
TestUtils.waitForCondition(() -> {
waitForCondition(() -> {
ConnectorOffsets offsets = connect.connectorOffsets(connectorName);
// The MonitorableSourceConnector has a source partition per task
if (offsets.offsets().size() != numTasks) {
return false;
}
for (ConnectorOffset offset : offsets.offsets()) {
assertTrue(((String) offset.partition().get("task.id")).startsWith(CONNECTOR_NAME));
assertTrue(((String) offset.partition().get("task.id")).startsWith(connectorName));
if ((Integer) offset.offset().get("saved") != expectedOffset) {
return false;
}
@ -969,7 +1033,7 @@ public class OffsetsApiIntegrationTest {
* @throws InterruptedException if the thread is interrupted while waiting for the offsets to be empty
*/
private void verifyEmptyConnectorOffsets(String connectorName) throws InterruptedException {
TestUtils.waitForCondition(() -> {
waitForCondition(() -> {
ConnectorOffsets offsets = connect.connectorOffsets(connectorName);
return offsets.offsets().isEmpty();
}, OFFSET_READ_TIMEOUT_MS, "Connector offsets should be empty after resetting offsets");