mirror of https://github.com/apache/kafka.git
KAFKA-10811: Correct the MirrorConnectorsIntegrationTest to correctly mask the exit procedures (#9698)
Normally the `EmbeddedConnectCluster` class masks the `Exit` procedures using within the Connect worker. This normally works great when a single instance of the embedded cluster is used. However, the `MirrorConnectorsIntegrationTest` uses two `EmbeddedConnectCluster` instances, and when the first one is stopped it would reset the (static) exit procedures, and any problems during shutdown of the second embedded Connect cluster would cause the worker to shut down the JVM running the tests. Instead, the `MirrorConnectorsIntegrationTest` class should mask the `Exit` procedures and instruct the `EmbeddedConnectClusters` instances (via the existing builder method) to not mask the procedures. Author: Randall Hauch <rhauch@gmail.com> Reviewers: Mickael Maison <mickael.maison@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
parent
d33dc1869b
commit
874198b873
|
@ -18,11 +18,15 @@ package org.apache.kafka.connect.mirror;
|
|||
|
||||
import org.apache.kafka.clients.admin.Admin;
|
||||
import org.apache.kafka.clients.consumer.Consumer;
|
||||
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
||||
import org.apache.kafka.clients.consumer.ConsumerRecords;
|
||||
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
|
||||
import org.apache.kafka.common.TopicPartition;
|
||||
import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
|
||||
import org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster;
|
||||
import org.apache.kafka.connect.util.clusters.UngracefulShutdownException;
|
||||
import org.apache.kafka.test.IntegrationTest;
|
||||
import org.apache.kafka.common.utils.Exit;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
@ -31,13 +35,17 @@ import org.slf4j.Logger;
|
|||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.IntStream;
|
||||
|
||||
import static org.apache.kafka.test.TestUtils.waitForCondition;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
@ -56,21 +64,57 @@ public class MirrorConnectorsIntegrationTest {
|
|||
|
||||
private static final Logger log = LoggerFactory.getLogger(MirrorConnectorsIntegrationTest.class);
|
||||
|
||||
private static final int NUM_RECORDS_PRODUCED = 100; // to save trees
|
||||
private static final int NUM_RECORDS_PER_PARTITION = 10;
|
||||
private static final int NUM_PARTITIONS = 10;
|
||||
private static final int RECORD_TRANSFER_DURATION_MS = 20_000;
|
||||
private static final int NUM_RECORDS_PRODUCED = NUM_PARTITIONS * NUM_RECORDS_PER_PARTITION;
|
||||
private static final int RECORD_TRANSFER_DURATION_MS = 30_000;
|
||||
private static final int CHECKPOINT_DURATION_MS = 20_000;
|
||||
private static final int RECORD_CONSUME_DURATION_MS = 20_000;
|
||||
private static final int OFFSET_SYNC_DURATION_MS = 30_000;
|
||||
|
||||
private MirrorMakerConfig mm2Config;
|
||||
private volatile boolean shuttingDown;
|
||||
private Map<String, String> mm2Props;
|
||||
private MirrorMakerConfig mm2Config;
|
||||
private EmbeddedConnectCluster primary;
|
||||
private EmbeddedConnectCluster backup;
|
||||
|
||||
private Exit.Procedure exitProcedure;
|
||||
private Exit.Procedure haltProcedure;
|
||||
|
||||
@Before
|
||||
public void setup() throws InterruptedException {
|
||||
shuttingDown = false;
|
||||
exitProcedure = (code, message) -> {
|
||||
if (shuttingDown) {
|
||||
// ignore this since we're shutting down Connect and Kafka and timing isn't always great
|
||||
return;
|
||||
}
|
||||
if (code != 0) {
|
||||
String exitMessage = "Abrupt service exit with code " + code + " and message " + message;
|
||||
log.warn(exitMessage);
|
||||
throw new UngracefulShutdownException(exitMessage);
|
||||
}
|
||||
};
|
||||
haltProcedure = (code, message) -> {
|
||||
if (shuttingDown) {
|
||||
// ignore this since we're shutting down Connect and Kafka and timing isn't always great
|
||||
return;
|
||||
}
|
||||
if (code != 0) {
|
||||
String haltMessage = "Abrupt service halt with code " + code + " and message " + message;
|
||||
log.warn(haltMessage);
|
||||
throw new UngracefulShutdownException(haltMessage);
|
||||
}
|
||||
};
|
||||
// Override the exit and halt procedure that Connect and Kafka will use. For these integration tests,
|
||||
// we don't want to exit the JVM and instead simply want to fail the test
|
||||
Exit.setExitProcedure(exitProcedure);
|
||||
Exit.setHaltProcedure(haltProcedure);
|
||||
|
||||
Properties brokerProps = new Properties();
|
||||
brokerProps.put("auto.create.topics.enable", "false");
|
||||
|
||||
Map<String, String> mm2Props = new HashMap<>();
|
||||
mm2Props = new HashMap<>();
|
||||
mm2Props.put("clusters", "primary, backup");
|
||||
mm2Props.put("max.tasks", "10");
|
||||
mm2Props.put("topics", "test-topic-.*, primary.test-topic-.*, backup.test-topic-.*");
|
||||
|
@ -100,6 +144,7 @@ public class MirrorConnectorsIntegrationTest {
|
|||
.numBrokers(1)
|
||||
.brokerProps(brokerProps)
|
||||
.workerProps(primaryWorkerProps)
|
||||
.maskExitProcedures(false)
|
||||
.build();
|
||||
|
||||
backup = new EmbeddedConnectCluster.Builder()
|
||||
|
@ -108,13 +153,14 @@ public class MirrorConnectorsIntegrationTest {
|
|||
.numBrokers(1)
|
||||
.brokerProps(brokerProps)
|
||||
.workerProps(backupWorkerProps)
|
||||
.maskExitProcedures(false)
|
||||
.build();
|
||||
|
||||
primary.start();
|
||||
primary.assertions().assertAtLeastNumWorkersAreUp(3,
|
||||
"Workers of primary-connect-cluster did not start in time.");
|
||||
backup.start();
|
||||
primary.assertions().assertAtLeastNumWorkersAreUp(3,
|
||||
backup.assertions().assertAtLeastNumWorkersAreUp(3,
|
||||
"Workers of backup-connect-cluster did not start in time.");
|
||||
|
||||
// create these topics before starting the connectors so we don't need to wait for discovery
|
||||
|
@ -125,23 +171,18 @@ public class MirrorConnectorsIntegrationTest {
|
|||
backup.kafka().createTopic("primary.test-topic-1", 1);
|
||||
backup.kafka().createTopic("heartbeats", 1);
|
||||
|
||||
for (int i = 0; i < NUM_RECORDS_PRODUCED; i++) {
|
||||
primary.kafka().produce("test-topic-1", i % NUM_PARTITIONS, "key", "message-1-" + i);
|
||||
backup.kafka().produce("test-topic-1", i % NUM_PARTITIONS, "key", "message-2-" + i);
|
||||
}
|
||||
// produce to all partitions of test-topic-1
|
||||
produceMessages(primary, "test-topic-1", "message-1-");
|
||||
produceMessages(backup, "test-topic-1", "message-2-");
|
||||
|
||||
// create consumers before starting the connectors so we don't need to wait for discovery
|
||||
Consumer<byte[], byte[]> consumer1 = primary.kafka().createConsumerAndSubscribeTo(Collections.singletonMap(
|
||||
"group.id", "consumer-group-1"), "test-topic-1", "backup.test-topic-1");
|
||||
consumer1.poll(Duration.ofMillis(500));
|
||||
consumer1.commitSync();
|
||||
consumer1.close();
|
||||
|
||||
Consumer<byte[], byte[]> consumer2 = backup.kafka().createConsumerAndSubscribeTo(Collections.singletonMap(
|
||||
"group.id", "consumer-group-1"), "test-topic-1", "primary.test-topic-1");
|
||||
consumer2.poll(Duration.ofMillis(500));
|
||||
consumer2.commitSync();
|
||||
consumer2.close();
|
||||
// Generate some consumer activity on both clusters to ensure the checkpoint connector always starts promptly
|
||||
Map<String, Object> dummyProps = Collections.singletonMap("group.id", "consumer-group-dummy");
|
||||
Consumer<byte[], byte[]> dummyConsumer = primary.kafka().createConsumerAndSubscribeTo(dummyProps, "test-topic-1");
|
||||
consumeAllMessages(dummyConsumer);
|
||||
dummyConsumer.close();
|
||||
dummyConsumer = backup.kafka().createConsumerAndSubscribeTo(dummyProps, "test-topic-1");
|
||||
consumeAllMessages(dummyConsumer);
|
||||
dummyConsumer.close();
|
||||
|
||||
log.info("primary REST service: {}", primary.endpointForResource("connectors"));
|
||||
log.info("backup REST service: {}", backup.endpointForResource("connectors"));
|
||||
|
@ -153,41 +194,27 @@ public class MirrorConnectorsIntegrationTest {
|
|||
mm2Props.put("primary.bootstrap.servers", primary.kafka().bootstrapServers());
|
||||
mm2Props.put("backup.bootstrap.servers", backup.kafka().bootstrapServers());
|
||||
mm2Config = new MirrorMakerConfig(mm2Props);
|
||||
}
|
||||
|
||||
|
||||
private void waitUntilMirrorMakerIsRunning(EmbeddedConnectCluster connectCluster,
|
||||
MirrorMakerConfig mm2Config, String primary, String backup) throws InterruptedException {
|
||||
|
||||
connectCluster.configureConnector("MirrorSourceConnector",
|
||||
mm2Config.connectorBaseConfig(new SourceAndTarget(primary, backup), MirrorSourceConnector.class));
|
||||
connectCluster.configureConnector("MirrorCheckpointConnector",
|
||||
mm2Config.connectorBaseConfig(new SourceAndTarget(primary, backup), MirrorCheckpointConnector.class));
|
||||
connectCluster.configureConnector("MirrorHeartbeatConnector",
|
||||
mm2Config.connectorBaseConfig(new SourceAndTarget(primary, backup), MirrorHeartbeatConnector.class));
|
||||
|
||||
// we wait for the connector and tasks to come up for each connector, so that when we do the
|
||||
// actual testing, we are certain that the tasks are up and running; this will prevent
|
||||
// flaky tests where the connector and tasks didn't start up in time for the tests to be
|
||||
// run
|
||||
Set<String> connectorNames = new HashSet<>(Arrays.asList("MirrorSourceConnector",
|
||||
"MirrorCheckpointConnector", "MirrorHeartbeatConnector"));
|
||||
"MirrorCheckpointConnector", "MirrorHeartbeatConnector"));
|
||||
|
||||
backup.configureConnector("MirrorSourceConnector", mm2Config.connectorBaseConfig(new SourceAndTarget("primary", "backup"),
|
||||
MirrorSourceConnector.class));
|
||||
|
||||
backup.configureConnector("MirrorCheckpointConnector", mm2Config.connectorBaseConfig(new SourceAndTarget("primary", "backup"),
|
||||
MirrorCheckpointConnector.class));
|
||||
|
||||
backup.configureConnector("MirrorHeartbeatConnector", mm2Config.connectorBaseConfig(new SourceAndTarget("primary", "backup"),
|
||||
MirrorHeartbeatConnector.class));
|
||||
|
||||
waitUntilMirrorMakerIsRunning(backup, connectorNames);
|
||||
|
||||
primary.configureConnector("MirrorSourceConnector", mm2Config.connectorBaseConfig(new SourceAndTarget("backup", "primary"),
|
||||
MirrorSourceConnector.class));
|
||||
|
||||
primary.configureConnector("MirrorCheckpointConnector", mm2Config.connectorBaseConfig(new SourceAndTarget("backup", "primary"),
|
||||
MirrorCheckpointConnector.class));
|
||||
|
||||
primary.configureConnector("MirrorHeartbeatConnector", mm2Config.connectorBaseConfig(new SourceAndTarget("backup", "primary"),
|
||||
MirrorHeartbeatConnector.class));
|
||||
|
||||
waitUntilMirrorMakerIsRunning(primary, connectorNames);
|
||||
}
|
||||
|
||||
|
||||
private void waitUntilMirrorMakerIsRunning(EmbeddedConnectCluster connectCluster,
|
||||
Set<String> connNames) throws InterruptedException {
|
||||
for (String connector : connNames) {
|
||||
for (String connector : connectorNames) {
|
||||
connectCluster.assertions().assertConnectorAndAtLeastNumTasksAreRunning(connector, 1,
|
||||
"Connector " + connector + " tasks did not start in time on cluster: " + connectCluster);
|
||||
}
|
||||
|
@ -195,20 +222,49 @@ public class MirrorConnectorsIntegrationTest {
|
|||
|
||||
@After
|
||||
public void close() {
|
||||
for (String x : primary.connectors()) {
|
||||
primary.deleteConnector(x);
|
||||
try {
|
||||
for (String x : primary.connectors()) {
|
||||
primary.deleteConnector(x);
|
||||
}
|
||||
for (String x : backup.connectors()) {
|
||||
backup.deleteConnector(x);
|
||||
}
|
||||
deleteAllTopics(primary.kafka());
|
||||
deleteAllTopics(backup.kafka());
|
||||
} finally {
|
||||
shuttingDown = true;
|
||||
try {
|
||||
try {
|
||||
primary.stop();
|
||||
} finally {
|
||||
backup.stop();
|
||||
}
|
||||
} finally {
|
||||
Exit.resetExitProcedure();
|
||||
Exit.resetHaltProcedure();
|
||||
}
|
||||
}
|
||||
for (String x : backup.connectors()) {
|
||||
backup.deleteConnector(x);
|
||||
}
|
||||
deleteAllTopics(primary.kafka());
|
||||
deleteAllTopics(backup.kafka());
|
||||
primary.stop();
|
||||
backup.stop();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReplication() throws InterruptedException {
|
||||
String consumerGroupName = "consumer-group-testReplication";
|
||||
Map<String, Object> consumerProps = new HashMap<String, Object>() {{
|
||||
put("group.id", consumerGroupName);
|
||||
put("auto.offset.reset", "latest");
|
||||
}};
|
||||
|
||||
// create consumers before starting the connectors so we don't need to wait for discovery
|
||||
Consumer<byte[], byte[]> primaryConsumer = primary.kafka().createConsumerAndSubscribeTo(consumerProps, "test-topic-1");
|
||||
consumeAllMessages(primaryConsumer, 0);
|
||||
primaryConsumer.close();
|
||||
|
||||
Consumer<byte[], byte[]> backupConsumer = backup.kafka().createConsumerAndSubscribeTo(consumerProps, "test-topic-1");
|
||||
consumeAllMessages(backupConsumer, 0);
|
||||
backupConsumer.close();
|
||||
|
||||
waitUntilMirrorMakerIsRunning(backup, mm2Config, "primary", "backup");
|
||||
waitUntilMirrorMakerIsRunning(primary, mm2Config, "backup", "primary");
|
||||
MirrorClient primaryClient = new MirrorClient(mm2Config.clientConfig("primary"));
|
||||
MirrorClient backupClient = new MirrorClient(mm2Config.clientConfig("backup"));
|
||||
|
||||
|
@ -220,49 +276,52 @@ public class MirrorConnectorsIntegrationTest {
|
|||
backup.kafka().consume(NUM_RECORDS_PRODUCED, RECORD_TRANSFER_DURATION_MS, "test-topic-1").count());
|
||||
assertEquals("Records were not replicated to primary cluster.", NUM_RECORDS_PRODUCED,
|
||||
primary.kafka().consume(NUM_RECORDS_PRODUCED, RECORD_TRANSFER_DURATION_MS, "backup.test-topic-1").count());
|
||||
|
||||
assertEquals("Primary cluster doesn't have all records from both clusters.", NUM_RECORDS_PRODUCED * 2,
|
||||
primary.kafka().consume(NUM_RECORDS_PRODUCED * 2, RECORD_TRANSFER_DURATION_MS, "backup.test-topic-1", "test-topic-1").count());
|
||||
assertEquals("Backup cluster doesn't have all records from both clusters.", NUM_RECORDS_PRODUCED * 2,
|
||||
backup.kafka().consume(NUM_RECORDS_PRODUCED * 2, RECORD_TRANSFER_DURATION_MS, "primary.test-topic-1", "test-topic-1").count());
|
||||
assertTrue("Heartbeats were not emitted to primary cluster.", primary.kafka().consume(1,
|
||||
RECORD_TRANSFER_DURATION_MS, "heartbeats").count() > 0);
|
||||
assertTrue("Heartbeats were not emitted to backup cluster.", backup.kafka().consume(1,
|
||||
RECORD_TRANSFER_DURATION_MS, "heartbeats").count() > 0);
|
||||
assertTrue("Heartbeats were not replicated downstream to backup cluster.", backup.kafka().consume(1,
|
||||
RECORD_TRANSFER_DURATION_MS, "primary.heartbeats").count() > 0);
|
||||
assertTrue("Heartbeats were not replicated downstream to primary cluster.", primary.kafka().consume(1,
|
||||
RECORD_TRANSFER_DURATION_MS, "backup.heartbeats").count() > 0);
|
||||
|
||||
assertTrue("Heartbeats were not emitted to primary cluster.",
|
||||
primary.kafka().consume(1, RECORD_TRANSFER_DURATION_MS, "heartbeats").count() > 0);
|
||||
assertTrue("Heartbeats were not emitted to backup cluster.",
|
||||
backup.kafka().consume(1, RECORD_TRANSFER_DURATION_MS, "heartbeats").count() > 0);
|
||||
assertTrue("Heartbeats were not replicated downstream to backup cluster.",
|
||||
backup.kafka().consume(1, RECORD_TRANSFER_DURATION_MS, "primary.heartbeats").count() > 0);
|
||||
assertTrue("Heartbeats were not replicated downstream to primary cluster.",
|
||||
primary.kafka().consume(1, RECORD_TRANSFER_DURATION_MS, "backup.heartbeats").count() > 0);
|
||||
|
||||
assertTrue("Did not find upstream primary cluster.", backupClient.upstreamClusters().contains("primary"));
|
||||
assertEquals("Did not calculate replication hops correctly.", 1, backupClient.replicationHops("primary"));
|
||||
assertTrue("Did not find upstream backup cluster.", primaryClient.upstreamClusters().contains("backup"));
|
||||
assertEquals("Did not calculate replication hops correctly.", 1, primaryClient.replicationHops("backup"));
|
||||
assertTrue("Checkpoints were not emitted downstream to backup cluster.", backup.kafka().consume(1,
|
||||
CHECKPOINT_DURATION_MS, "primary.checkpoints.internal").count() > 0);
|
||||
|
||||
Map<TopicPartition, OffsetAndMetadata> backupOffsets = backupClient.remoteConsumerOffsets("consumer-group-1", "primary",
|
||||
assertTrue("Checkpoints were not emitted downstream to backup cluster.",
|
||||
backup.kafka().consume(1, CHECKPOINT_DURATION_MS, "primary.checkpoints.internal").count() > 0);
|
||||
|
||||
Map<TopicPartition, OffsetAndMetadata> backupOffsets = backupClient.remoteConsumerOffsets(consumerGroupName, "primary",
|
||||
Duration.ofMillis(CHECKPOINT_DURATION_MS));
|
||||
|
||||
assertTrue("Offsets not translated downstream to backup cluster. Found: " + backupOffsets, backupOffsets.containsKey(
|
||||
new TopicPartition("primary.test-topic-1", 0)));
|
||||
|
||||
// Failover consumer group to backup cluster.
|
||||
Consumer<byte[], byte[]> consumer1 = backup.kafka().createConsumer(Collections.singletonMap("group.id", "consumer-group-1"));
|
||||
consumer1.assign(backupOffsets.keySet());
|
||||
backupOffsets.forEach(consumer1::seek);
|
||||
consumer1.poll(Duration.ofMillis(500));
|
||||
consumer1.commitSync();
|
||||
backupConsumer = backup.kafka().createConsumer(consumerProps);
|
||||
backupConsumer.assign(allPartitions("test-topic-1", "primary.test-topic-1"));
|
||||
seek(backupConsumer, backupOffsets);
|
||||
consumeAllMessages(backupConsumer, 0);
|
||||
|
||||
assertTrue("Consumer failedover to zero offset.", consumer1.position(new TopicPartition("primary.test-topic-1", 0)) > 0);
|
||||
assertTrue("Consumer failedover beyond expected offset.", consumer1.position(
|
||||
assertTrue("Consumer failedover to zero offset.", backupConsumer.position(new TopicPartition("primary.test-topic-1", 0)) > 0);
|
||||
assertTrue("Consumer failedover beyond expected offset.", backupConsumer.position(
|
||||
new TopicPartition("primary.test-topic-1", 0)) <= NUM_RECORDS_PRODUCED);
|
||||
assertTrue("Checkpoints were not emitted upstream to primary cluster.", primary.kafka().consume(1,
|
||||
CHECKPOINT_DURATION_MS, "backup.checkpoints.internal").count() > 0);
|
||||
|
||||
consumer1.close();
|
||||
backupConsumer.close();
|
||||
|
||||
waitForCondition(() -> {
|
||||
try {
|
||||
return primaryClient.remoteConsumerOffsets("consumer-group-1", "backup",
|
||||
return primaryClient.remoteConsumerOffsets(consumerGroupName, "backup",
|
||||
Duration.ofMillis(CHECKPOINT_DURATION_MS)).containsKey(new TopicPartition("backup.test-topic-1", 0));
|
||||
} catch (Throwable e) {
|
||||
return false;
|
||||
|
@ -271,49 +330,53 @@ public class MirrorConnectorsIntegrationTest {
|
|||
|
||||
waitForCondition(() -> {
|
||||
try {
|
||||
return primaryClient.remoteConsumerOffsets("consumer-group-1", "backup",
|
||||
return primaryClient.remoteConsumerOffsets(consumerGroupName, "backup",
|
||||
Duration.ofMillis(CHECKPOINT_DURATION_MS)).containsKey(new TopicPartition("test-topic-1", 0));
|
||||
} catch (Throwable e) {
|
||||
return false;
|
||||
}
|
||||
}, CHECKPOINT_DURATION_MS, "Offsets not translated upstream to primary cluster.");
|
||||
|
||||
Map<TopicPartition, OffsetAndMetadata> primaryOffsets = primaryClient.remoteConsumerOffsets("consumer-group-1", "backup",
|
||||
Map<TopicPartition, OffsetAndMetadata> primaryOffsets = primaryClient.remoteConsumerOffsets(consumerGroupName, "backup",
|
||||
Duration.ofMillis(CHECKPOINT_DURATION_MS));
|
||||
|
||||
// Failback consumer group to primary cluster
|
||||
Consumer<byte[], byte[]> consumer2 = primary.kafka().createConsumer(Collections.singletonMap("group.id", "consumer-group-1"));
|
||||
consumer2.assign(primaryOffsets.keySet());
|
||||
primaryOffsets.forEach(consumer2::seek);
|
||||
consumer2.poll(Duration.ofMillis(500));
|
||||
|
||||
assertTrue("Consumer failedback to zero upstream offset.", consumer2.position(new TopicPartition("test-topic-1", 0)) > 0);
|
||||
assertTrue("Consumer failedback to zero downstream offset.", consumer2.position(new TopicPartition("backup.test-topic-1", 0)) > 0);
|
||||
assertTrue("Consumer failedback beyond expected upstream offset.", consumer2.position(
|
||||
new TopicPartition("test-topic-1", 0)) <= NUM_RECORDS_PRODUCED);
|
||||
assertTrue("Consumer failedback beyond expected downstream offset.", consumer2.position(
|
||||
new TopicPartition("backup.test-topic-1", 0)) <= NUM_RECORDS_PRODUCED);
|
||||
|
||||
consumer2.close();
|
||||
|
||||
primaryClient.close();
|
||||
backupClient.close();
|
||||
|
||||
// Failback consumer group to primary cluster
|
||||
primaryConsumer = primary.kafka().createConsumer(consumerProps);
|
||||
primaryConsumer.assign(allPartitions("test-topic-1", "backup.test-topic-1"));
|
||||
seek(primaryConsumer, primaryOffsets);
|
||||
consumeAllMessages(primaryConsumer, 0);
|
||||
|
||||
assertTrue("Consumer failedback to zero upstream offset.", primaryConsumer.position(new TopicPartition("test-topic-1", 0)) > 0);
|
||||
assertTrue("Consumer failedback to zero downstream offset.", primaryConsumer.position(new TopicPartition("backup.test-topic-1", 0)) > 0);
|
||||
assertTrue("Consumer failedback beyond expected upstream offset.", primaryConsumer.position(
|
||||
new TopicPartition("test-topic-1", 0)) <= NUM_RECORDS_PER_PARTITION);
|
||||
assertTrue("Consumer failedback beyond expected downstream offset.", primaryConsumer.position(
|
||||
new TopicPartition("backup.test-topic-1", 0)) <= NUM_RECORDS_PER_PARTITION);
|
||||
|
||||
Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> messages2 = consumeAllMessages(primaryConsumer, 0);
|
||||
// If offset translation was successful we expect no messages to be consumed after failback
|
||||
assertEquals("Data was consumed from partitions: " + messages2.keySet() + ".", 0, messages2.size());
|
||||
primaryConsumer.close();
|
||||
|
||||
// create more matching topics
|
||||
primary.kafka().createTopic("test-topic-2", NUM_PARTITIONS);
|
||||
backup.kafka().createTopic("test-topic-3", NUM_PARTITIONS);
|
||||
|
||||
for (int i = 0; i < NUM_RECORDS_PRODUCED; i++) {
|
||||
primary.kafka().produce("test-topic-2", 0, "key", "message-2-" + i);
|
||||
backup.kafka().produce("test-topic-3", 0, "key", "message-3-" + i);
|
||||
}
|
||||
produceMessages(primary, "test-topic-2", "message-3-", 1);
|
||||
produceMessages(backup, "test-topic-3", "message-4-", 1);
|
||||
|
||||
assertEquals("Records were not produced to primary cluster.", NUM_RECORDS_PRODUCED,
|
||||
primary.kafka().consume(NUM_RECORDS_PRODUCED, RECORD_TRANSFER_DURATION_MS, "test-topic-2").count());
|
||||
assertEquals("Records were not produced to backup cluster.", NUM_RECORDS_PRODUCED,
|
||||
backup.kafka().consume(NUM_RECORDS_PRODUCED, RECORD_TRANSFER_DURATION_MS, "test-topic-3").count());
|
||||
|
||||
assertEquals("New topic was not replicated to primary cluster.", NUM_RECORDS_PRODUCED,
|
||||
primary.kafka().consume(NUM_RECORDS_PRODUCED, 2 * RECORD_TRANSFER_DURATION_MS, "backup.test-topic-3").count());
|
||||
assertEquals("New topic was not replicated to backup cluster.", NUM_RECORDS_PRODUCED,
|
||||
backup.kafka().consume(NUM_RECORDS_PRODUCED, 2 * RECORD_TRANSFER_DURATION_MS, "primary.test-topic-2").count());
|
||||
assertEquals("Records were not produced to primary cluster.", NUM_RECORDS_PER_PARTITION,
|
||||
primary.kafka().consume(NUM_RECORDS_PER_PARTITION, RECORD_TRANSFER_DURATION_MS, "test-topic-2").count());
|
||||
assertEquals("Records were not produced to backup cluster.", NUM_RECORDS_PER_PARTITION,
|
||||
backup.kafka().consume(NUM_RECORDS_PER_PARTITION, RECORD_TRANSFER_DURATION_MS, "test-topic-3").count());
|
||||
|
||||
assertEquals("New topic was not replicated to primary cluster.", NUM_RECORDS_PER_PARTITION,
|
||||
primary.kafka().consume(NUM_RECORDS_PER_PARTITION, 2 * RECORD_TRANSFER_DURATION_MS, "backup.test-topic-3").count());
|
||||
assertEquals("New topic was not replicated to backup cluster.", NUM_RECORDS_PER_PARTITION,
|
||||
backup.kafka().consume(NUM_RECORDS_PER_PARTITION, 2 * RECORD_TRANSFER_DURATION_MS, "primary.test-topic-2").count());
|
||||
}
|
||||
|
||||
private void deleteAllTopics(EmbeddedKafkaCluster cluster) {
|
||||
|
@ -323,4 +386,68 @@ public class MirrorConnectorsIntegrationTest {
|
|||
} catch (Throwable e) {
|
||||
}
|
||||
}
|
||||
|
||||
private void produceMessages(EmbeddedConnectCluster cluster, String topicName, String msgPrefix) {
|
||||
produceMessages(cluster, topicName, msgPrefix, NUM_PARTITIONS);
|
||||
}
|
||||
|
||||
private void produceMessages(EmbeddedConnectCluster cluster, String topicName, String msgPrefix, int numPartitions) {
|
||||
// produce the configured number of records to all specified partitions
|
||||
int cnt = 0;
|
||||
for (int r = 0; r < NUM_RECORDS_PER_PARTITION; r++)
|
||||
for (int p = 0; p < numPartitions; p++)
|
||||
cluster.kafka().produce(topicName, p, "key", msgPrefix + cnt++);
|
||||
}
|
||||
|
||||
private Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> consumeAllMessages(Consumer<byte[], byte[]> consumer) throws InterruptedException {
|
||||
return consumeAllMessages(consumer, null, null);
|
||||
}
|
||||
|
||||
private Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> consumeAllMessages(Consumer<byte[], byte[]> consumer, Integer expectedRecords) throws InterruptedException {
|
||||
return consumeAllMessages(consumer, expectedRecords, null);
|
||||
}
|
||||
|
||||
private Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> consumeAllMessages(Consumer<byte[], byte[]> consumer, Integer numExpectedRecords, Duration timeoutDurationMs)
|
||||
throws InterruptedException {
|
||||
int expectedRecords = numExpectedRecords != null ? numExpectedRecords : NUM_RECORDS_PRODUCED;
|
||||
int timeoutMs = (int) (timeoutDurationMs != null ? timeoutDurationMs.toMillis() : RECORD_CONSUME_DURATION_MS);
|
||||
|
||||
List<ConsumerRecord<byte[], byte[]>> records = new ArrayList<>();
|
||||
waitForCondition(() -> {
|
||||
ConsumerRecords<byte[], byte[]> crs = consumer.poll(Duration.ofMillis(1000));
|
||||
for (ConsumerRecord<byte[], byte[]> cr : crs)
|
||||
records.add(cr);
|
||||
assertTrue("Consumer consumed more records than expected: " + records.size() + " (expected " + expectedRecords + ").",
|
||||
records.size() <= expectedRecords);
|
||||
return records.size() == expectedRecords;
|
||||
}, timeoutMs, "Consumer could not consume all records in time.");
|
||||
|
||||
consumer.commitSync();
|
||||
return records.stream().collect(Collectors.groupingBy(c -> new TopicPartition(c.topic(), c.partition())));
|
||||
}
|
||||
|
||||
private void seek(Consumer<byte[], byte[]> consumer, Map<TopicPartition, OffsetAndMetadata> offsets) throws InterruptedException {
|
||||
// In case offsets are replicated faster than actual records, wait until records are replicated before seeking
|
||||
waitForCondition(() -> {
|
||||
boolean ready = true;
|
||||
Map<TopicPartition, Long> endOffsets = consumer.endOffsets(offsets.keySet());
|
||||
for (TopicPartition tp : offsets.keySet()) {
|
||||
if (offsets.get(tp).offset() > endOffsets.get(tp)) {
|
||||
ready = false;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (!ready)
|
||||
Thread.sleep(1000);
|
||||
return ready;
|
||||
}, RECORD_TRANSFER_DURATION_MS, "Records were not replicated in time.");
|
||||
offsets.forEach(consumer::seek);
|
||||
}
|
||||
|
||||
private List<TopicPartition> allPartitions(String... topics) {
|
||||
return IntStream.range(0, NUM_PARTITIONS)
|
||||
.boxed()
|
||||
.flatMap(p -> Arrays.stream(topics).map(t -> new TopicPartition(t, p)))
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -154,8 +154,10 @@ public class EmbeddedConnectCluster {
|
|||
log.error("Could not stop kafka", e);
|
||||
throw new RuntimeException("Could not stop brokers", e);
|
||||
} finally {
|
||||
Exit.resetExitProcedure();
|
||||
Exit.resetHaltProcedure();
|
||||
if (maskExitProcedures) {
|
||||
Exit.resetExitProcedure();
|
||||
Exit.resetHaltProcedure();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue