mirror of https://github.com/apache/kafka.git
KAFKA-16383: Ensure tasks have already polled their consumers before producing verified records in MirrorConnectorsIntegrationBaseTest::testReplicateFromLatest (#16598)
Reviewers: Greg Harris <greg.harris@aiven.io>
This commit is contained in:
parent
b015a83f6d
commit
177b38ad66
|
@ -932,6 +932,9 @@ public class MirrorConnectorsIntegrationBaseTest {
|
||||||
String topic = "test-topic-1";
|
String topic = "test-topic-1";
|
||||||
produceMessages(primaryProducer, topic, NUM_PARTITIONS);
|
produceMessages(primaryProducer, topic, NUM_PARTITIONS);
|
||||||
|
|
||||||
|
String sentinelTopic = "test-topic-sentinel";
|
||||||
|
primary.kafka().createTopic(sentinelTopic);
|
||||||
|
|
||||||
// consume from the ends of topics when no committed offsets are found
|
// consume from the ends of topics when no committed offsets are found
|
||||||
mm2Props.put(PRIMARY_CLUSTER_ALIAS + ".consumer." + AUTO_OFFSET_RESET_CONFIG, "latest");
|
mm2Props.put(PRIMARY_CLUSTER_ALIAS + ".consumer." + AUTO_OFFSET_RESET_CONFIG, "latest");
|
||||||
// one way replication from primary to backup
|
// one way replication from primary to backup
|
||||||
|
@ -939,18 +942,44 @@ public class MirrorConnectorsIntegrationBaseTest {
|
||||||
mm2Config = new MirrorMakerConfig(mm2Props);
|
mm2Config = new MirrorMakerConfig(mm2Props);
|
||||||
waitUntilMirrorMakerIsRunning(backup, CONNECTOR_LIST, mm2Config, PRIMARY_CLUSTER_ALIAS, BACKUP_CLUSTER_ALIAS);
|
waitUntilMirrorMakerIsRunning(backup, CONNECTOR_LIST, mm2Config, PRIMARY_CLUSTER_ALIAS, BACKUP_CLUSTER_ALIAS);
|
||||||
|
|
||||||
|
String backupSentinelTopic = remoteTopicName(sentinelTopic, PRIMARY_CLUSTER_ALIAS);
|
||||||
|
waitForTopicCreated(backup, backupSentinelTopic);
|
||||||
|
|
||||||
|
// wait for proof that the task has managed to poll its consumers at least once;
|
||||||
|
// this should also mean that it knows the proper end offset of the other test topic,
|
||||||
|
// and will consume exactly the expected number of records that we produce after
|
||||||
|
// this assertion passes
|
||||||
|
// NOTE: this assumes that there is a single MirrorSourceTask instance running;
|
||||||
|
// if there are multiple tasks, the logic will need to be updated to ensure that each
|
||||||
|
// task has managed to poll its consumer before continuing
|
||||||
|
waitForCondition(
|
||||||
|
() -> {
|
||||||
|
primary.kafka().produce(sentinelTopic, "sentinel-value");
|
||||||
|
int sentinelValues = backup.kafka().consumeAll(1_000, backupSentinelTopic).count();
|
||||||
|
return sentinelValues > 0;
|
||||||
|
},
|
||||||
|
RECORD_TRANSFER_DURATION_MS,
|
||||||
|
"Records were not produced to sentinel topic in time"
|
||||||
|
);
|
||||||
|
|
||||||
// produce some more messages to the topic, now that MM2 is running and replication should be taking place
|
// produce some more messages to the topic, now that MM2 is running and replication should be taking place
|
||||||
produceMessages(primaryProducer, topic, NUM_PARTITIONS);
|
produceMessages(primaryProducer, topic, NUM_PARTITIONS);
|
||||||
|
|
||||||
String backupTopic = remoteTopicName(topic, PRIMARY_CLUSTER_ALIAS);
|
String backupTopic = remoteTopicName(topic, PRIMARY_CLUSTER_ALIAS);
|
||||||
// wait for at least the expected number of records to be replicated to the backup cluster
|
// wait for at least the expected number of records to be replicated to the backup cluster
|
||||||
backup.kafka().consume(NUM_PARTITIONS * NUM_RECORDS_PER_PARTITION, RECORD_TRANSFER_DURATION_MS, backupTopic);
|
backup.kafka().consume(NUM_RECORDS_PRODUCED, RECORD_TRANSFER_DURATION_MS, backupTopic);
|
||||||
|
|
||||||
// consume all records from backup cluster
|
// consume all records from backup cluster
|
||||||
ConsumerRecords<byte[], byte[]> replicatedRecords = backup.kafka().consumeAll(RECORD_TRANSFER_DURATION_MS, backupTopic);
|
ConsumerRecords<byte[], byte[]> replicatedRecords = backup.kafka().consumeAll(RECORD_TRANSFER_DURATION_MS, backupTopic);
|
||||||
|
|
||||||
// ensure that we only replicated the records produced after startup
|
// ensure that we only replicated the records produced after startup
|
||||||
replicatedRecords.partitions().forEach(topicPartition -> {
|
replicatedRecords.partitions().forEach(topicPartition -> {
|
||||||
int replicatedCount = replicatedRecords.records(topicPartition).size();
|
int replicatedCount = replicatedRecords.records(topicPartition).size();
|
||||||
assertEquals(NUM_RECORDS_PER_PARTITION, replicatedCount);
|
assertEquals(
|
||||||
|
NUM_RECORDS_PER_PARTITION,
|
||||||
|
replicatedCount,
|
||||||
|
"Unexpected number of replicated records for partition " + topicPartition.partition()
|
||||||
|
);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1324,7 +1353,6 @@ public class MirrorConnectorsIntegrationBaseTest {
|
||||||
private static Map<String, String> basicMM2Config() {
|
private static Map<String, String> basicMM2Config() {
|
||||||
Map<String, String> mm2Props = new HashMap<>();
|
Map<String, String> mm2Props = new HashMap<>();
|
||||||
mm2Props.put("clusters", PRIMARY_CLUSTER_ALIAS + ", " + BACKUP_CLUSTER_ALIAS);
|
mm2Props.put("clusters", PRIMARY_CLUSTER_ALIAS + ", " + BACKUP_CLUSTER_ALIAS);
|
||||||
mm2Props.put("max.tasks", "10");
|
|
||||||
mm2Props.put("groups", "consumer-group-.*");
|
mm2Props.put("groups", "consumer-group-.*");
|
||||||
mm2Props.put("sync.topic.acls.enabled", "false");
|
mm2Props.put("sync.topic.acls.enabled", "false");
|
||||||
mm2Props.put("emit.checkpoints.interval.seconds", String.valueOf(CHECKPOINT_INTERVAL_DURATION_MS / 1000));
|
mm2Props.put("emit.checkpoints.interval.seconds", String.valueOf(CHECKPOINT_INTERVAL_DURATION_MS / 1000));
|
||||||
|
|
Loading…
Reference in New Issue