MINOR: Fix consumer group warmup in MirroMaker 2 integration tests (#16771)

Reviewers: Greg Harris <greg.harris@aiven.io>
This commit is contained in:
Chris Egerton 2024-08-02 19:57:05 +02:00 committed by GitHub
parent 16cc877533
commit 5afdb17092
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 44 additions and 17 deletions

View File

@ -26,12 +26,12 @@ import org.apache.kafka.clients.admin.ListOffsetsResult;
import org.apache.kafka.clients.admin.OffsetSpec;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.config.TopicConfig;
@ -81,6 +81,7 @@ import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
@ -123,7 +124,7 @@ public class MirrorConnectorsIntegrationBaseTest {
private static final int REQUEST_TIMEOUT_DURATION_MS = 60_000;
private static final int CHECKPOINT_INTERVAL_DURATION_MS = 1_000;
private static final int NUM_WORKERS = 3;
protected static final Duration CONSUMER_POLL_TIMEOUT_MS = Duration.ofMillis(500L);
protected static final Duration CONSUMER_POLL_TIMEOUT = Duration.ofMillis(500L);
protected static final String PRIMARY_CLUSTER_ALIAS = "primary";
protected static final String BACKUP_CLUSTER_ALIAS = "backup";
protected static final List<Class<? extends Connector>> CONNECTOR_LIST = Arrays.asList(
@ -372,7 +373,7 @@ public class MirrorConnectorsIntegrationBaseTest {
try (Consumer<byte[], byte[]> primaryConsumer = backup.kafka().createConsumer(Collections.singletonMap("group.id", consumerGroupName))) {
primaryConsumer.assign(backupOffsets.keySet());
backupOffsets.forEach(primaryConsumer::seek);
primaryConsumer.poll(CONSUMER_POLL_TIMEOUT_MS);
primaryConsumer.poll(CONSUMER_POLL_TIMEOUT);
primaryConsumer.commitAsync();
assertTrue(primaryConsumer.position(new TopicPartition(backupTopic1, 0)) > 0, "Consumer failedover to zero offset.");
@ -393,7 +394,7 @@ public class MirrorConnectorsIntegrationBaseTest {
try (Consumer<byte[], byte[]> primaryConsumer = primary.kafka().createConsumer(Collections.singletonMap("group.id", consumerGroupName))) {
primaryConsumer.assign(primaryOffsets.keySet());
primaryOffsets.forEach(primaryConsumer::seek);
primaryConsumer.poll(CONSUMER_POLL_TIMEOUT_MS);
primaryConsumer.poll(CONSUMER_POLL_TIMEOUT);
primaryConsumer.commitAsync();
assertTrue(primaryConsumer.position(new TopicPartition(reverseTopic1, 0)) > 0, "Consumer failedback to zero downstream offset.");
@ -775,7 +776,7 @@ public class MirrorConnectorsIntegrationBaseTest {
Map<TopicPartition, OffsetAndMetadata> partialCheckpoints;
log.info("Initial checkpoints: {}", initialCheckpoints);
try (Consumer<byte[], byte[]> primaryConsumer = primary.kafka().createConsumerAndSubscribeTo(consumerProps, "test-topic-1")) {
primaryConsumer.poll(CONSUMER_POLL_TIMEOUT_MS);
primaryConsumer.poll(CONSUMER_POLL_TIMEOUT);
primaryConsumer.commitSync(partialOffsets(allRecords, 0.9f));
partialCheckpoints = waitForNewCheckpointOnAllPartitions(
backupClient, consumerGroupName, PRIMARY_CLUSTER_ALIAS, remoteTopic, initialCheckpoints);
@ -791,7 +792,7 @@ public class MirrorConnectorsIntegrationBaseTest {
Map<TopicPartition, OffsetAndMetadata> finalCheckpoints;
try (Consumer<byte[], byte[]> primaryConsumer = primary.kafka().createConsumerAndSubscribeTo(consumerProps, "test-topic-1")) {
primaryConsumer.poll(CONSUMER_POLL_TIMEOUT_MS);
primaryConsumer.poll(CONSUMER_POLL_TIMEOUT);
primaryConsumer.commitSync(partialOffsets(allRecords, 0.1f));
finalCheckpoints = waitForNewCheckpointOnAllPartitions(
backupClient, consumerGroupName, PRIMARY_CLUSTER_ALIAS, remoteTopic, partialCheckpoints);
@ -1278,7 +1279,7 @@ public class MirrorConnectorsIntegrationBaseTest {
try (Consumer<byte[], byte[]> consumer = connect.kafka().createConsumerAndSubscribeTo(consumerProps, topics.toArray(new String[0]))) {
final AtomicInteger totalConsumedRecords = new AtomicInteger(0);
waitForCondition(() -> {
ConsumerRecords<byte[], byte[]> records = consumer.poll(CONSUMER_POLL_TIMEOUT_MS);
ConsumerRecords<byte[], byte[]> records = consumer.poll(CONSUMER_POLL_TIMEOUT);
records.forEach(record -> lastOffset.put(new TopicPartition(record.topic(), record.partition()), record.offset()));
return expectedRecords == totalConsumedRecords.addAndGet(records.count());
}, RECORD_CONSUME_DURATION_MS, "Consumer cannot consume all records in time");
@ -1342,7 +1343,7 @@ public class MirrorConnectorsIntegrationBaseTest {
}
private static void assertDownstreamRedeliveriesBoundedByMaxLag(Consumer<byte[], byte[]> targetConsumer, int offsetLagMax) {
ConsumerRecords<byte[], byte[]> records = targetConsumer.poll(CONSUMER_POLL_TIMEOUT_MS);
ConsumerRecords<byte[], byte[]> records = targetConsumer.poll(CONSUMER_POLL_TIMEOUT);
// After a full sync, there should be at most offset.lag.max records per partition consumed by both upstream and downstream consumers.
for (TopicPartition tp : records.partitions()) {
int count = records.records(tp).size();
@ -1357,7 +1358,7 @@ public class MirrorConnectorsIntegrationBaseTest {
throws InterruptedException {
final AtomicInteger totalConsumedRecords = new AtomicInteger(0);
waitForCondition(() -> {
ConsumerRecords<T, T> records = consumer.poll(CONSUMER_POLL_TIMEOUT_MS);
ConsumerRecords<T, T> records = consumer.poll(CONSUMER_POLL_TIMEOUT);
return numExpectedRecords == totalConsumedRecords.addAndGet(records.count());
}, RECORD_CONSUME_DURATION_MS, "Consumer cannot consume all records in time");
consumer.commitSync();
@ -1420,9 +1421,27 @@ public class MirrorConnectorsIntegrationBaseTest {
}
private void warmUpConsumer(String clusterName, EmbeddedKafkaCluster kafkaCluster, Map<String, Object> consumerProps, String topic) {
try (Consumer<?, ?> dummyConsumer = kafkaCluster.createConsumerAndSubscribeTo(consumerProps, topic)) {
AtomicBoolean joinedGroup = new AtomicBoolean(false);
ConsumerRebalanceListener rebalanceListener = new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
// no-op
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
joinedGroup.set(true);
}
};
Timer warmupTimer = Time.SYSTEM.timer(60_000);
try (Consumer<?, ?> dummyConsumer = kafkaCluster.createConsumerAndSubscribeTo(consumerProps, rebalanceListener, topic)) {
// poll to ensure we've joined the group
dummyConsumer.poll(CONSUMER_POLL_TIMEOUT_MS);
do {
dummyConsumer.poll(CONSUMER_POLL_TIMEOUT);
warmupTimer.update();
} while (!joinedGroup.get() && !warmupTimer.isExpired());
// force the consumer to have a known position on every topic partition
// so that it will be able to commit offsets for that position
@ -1430,16 +1449,15 @@ public class MirrorConnectorsIntegrationBaseTest {
Set<TopicPartition> topicPartitionsPendingPosition = IntStream.range(0, NUM_PARTITIONS)
.mapToObj(partition -> new TopicPartition(topic, partition))
.collect(Collectors.toSet());
Timer positionTimer = Time.SYSTEM.timer(60_000);
while (!positionTimer.isExpired() && !topicPartitionsPendingPosition.isEmpty()) {
while (!warmupTimer.isExpired() && !topicPartitionsPendingPosition.isEmpty()) {
Set<TopicPartition> topicPartitionsWithPosition = new HashSet<>();
topicPartitionsPendingPosition.forEach(topicPartition -> {
try {
positionTimer.update();
dummyConsumer.position(topicPartition, Duration.ofMillis(positionTimer.remainingMs()));
warmupTimer.update();
dummyConsumer.position(topicPartition, Duration.ofMillis(warmupTimer.remainingMs()));
topicPartitionsWithPosition.add(topicPartition);
} catch (KafkaException e) {
} catch (Exception e) {
log.warn("Failed to calculate consumer position for {} on cluster {}", topicPartition, clusterName);
}
});

View File

@ -31,6 +31,7 @@ import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.OffsetSpec;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
@ -639,8 +640,16 @@ public class EmbeddedKafkaCluster {
}
public KafkaConsumer<byte[], byte[]> createConsumerAndSubscribeTo(Map<String, Object> consumerProps, String... topics) {
return createConsumerAndSubscribeTo(consumerProps, null, topics);
}
public KafkaConsumer<byte[], byte[]> createConsumerAndSubscribeTo(Map<String, Object> consumerProps, ConsumerRebalanceListener rebalanceListener, String... topics) {
KafkaConsumer<byte[], byte[]> consumer = createConsumer(consumerProps);
if (rebalanceListener != null) {
consumer.subscribe(Arrays.asList(topics), rebalanceListener);
} else {
consumer.subscribe(Arrays.asList(topics));
}
return consumer;
}