KAFKA-16105: Reset read offsets when seeking to beginning in TBRLMM (#15165)

Reviewers: Greg Harris <greg.harris@aiven.io>, Luke Chen <showuon@gmail.com>, Kamal Chandraprakash <kamal.chandraprakash@gmail.com>
This commit is contained in:
Anatoly Popov 2024-06-03 23:46:40 +03:00 committed by GitHub
parent c6c39c0062
commit 8a882a77a4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 71 additions and 6 deletions

View File

@ -238,10 +238,15 @@ class ConsumerTask implements Runnable, Closeable {
this.assignedMetadataPartitions = Collections.unmodifiableSet(metadataPartitionSnapshot);
// for newly assigned user-partitions, read from the beginning of the corresponding metadata partition
final Set<TopicPartition> seekToBeginOffsetPartitions = assignedUserTopicIdPartitionsSnapshot
.stream()
.filter(utp -> !utp.isAssigned)
.map(utp -> toRemoteLogPartition(utp.metadataPartition))
.collect(Collectors.toSet());
.stream()
.filter(utp -> !utp.isAssigned)
.map(utp -> utp.metadataPartition)
// When reset to beginning is happening, we also need to reset the last read offset
// Otherwise if the next reassignment request for the same metadata partition comes in
// before the record of already assigned topic has been read, then the reset will happen again to the last read offset
.peek(readOffsetsByMetadataPartition::remove)
.map(ConsumerTask::toRemoteLogPartition)
.collect(Collectors.toSet());
consumer.seekToBeginning(seekToBeginOffsetPartitions);
// for other metadata partitions, read from the offset where the processing left last time.
remoteLogPartitions.stream()
@ -463,4 +468,4 @@ class ConsumerTask implements Runnable, Closeable {
'}';
}
}
}
}

View File

@ -17,6 +17,7 @@
package org.apache.kafka.server.log.remote.metadata.storage;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.TopicIdPartition;
@ -64,8 +65,12 @@ import static org.apache.kafka.server.log.remote.metadata.storage.ConsumerTask.t
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.spy;
public class ConsumerTaskTest {
@ -85,7 +90,7 @@ public class ConsumerTaskTest {
public void beforeEach() {
final Map<TopicPartition, Long> offsets = remoteLogPartitions.stream()
.collect(Collectors.toMap(Function.identity(), e -> 0L));
consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
consumer = spy(new MockConsumer<>(OffsetResetStrategy.EARLIEST));
consumer.updateBeginningOffsets(offsets);
consumerTask = new ConsumerTask(handler, partitioner, consumer, 10L, 300_000L, new SystemTime());
thread = new Thread(consumerTask);
@ -254,6 +259,61 @@ public class ConsumerTaskTest {
assertEquals(3, handler.metadataCounter);
}
@Test
public void testCanReprocessSkippedRecords() throws InterruptedException {
final Uuid topicId = Uuid.fromString("Bp9TDduJRGa9Q5rlvCJOxg");
final TopicIdPartition tpId0 = new TopicIdPartition(topicId, new TopicPartition("sample", 0));
final TopicIdPartition tpId1 = new TopicIdPartition(topicId, new TopicPartition("sample", 1));
final TopicIdPartition tpId3 = new TopicIdPartition(topicId, new TopicPartition("sample", 3));
assertEquals(partitioner.metadataPartition(tpId0), partitioner.metadataPartition(tpId1));
assertNotEquals(partitioner.metadataPartition(tpId3), partitioner.metadataPartition(tpId0));
final int metadataPartition = partitioner.metadataPartition(tpId0);
final int anotherMetadataPartition = partitioner.metadataPartition(tpId3);
// Mocking the consumer to be able to wait for the second reassignment
doAnswer(invocation -> {
if (consumerTask.isUserPartitionAssigned(tpId1) && !consumerTask.isUserPartitionAssigned(tpId3)) {
return ConsumerRecords.empty();
} else {
return invocation.callRealMethod();
}
}).when(consumer).poll(any());
consumer.updateEndOffsets(Collections.singletonMap(toRemoteLogPartition(metadataPartition), 0L));
consumer.updateEndOffsets(Collections.singletonMap(toRemoteLogPartition(anotherMetadataPartition), 0L));
final Set<TopicIdPartition> assignments = Collections.singleton(tpId0);
consumerTask.addAssignmentsForPartitions(assignments);
thread.start();
TestUtils.waitForCondition(() -> consumerTask.isUserPartitionAssigned(tpId0), "Timed out waiting for " + tpId0 + " to be assigned");
// Adding metadata records in the order opposite to the order of assignments
addRecord(consumer, metadataPartition, tpId1, 0);
addRecord(consumer, metadataPartition, tpId0, 1);
TestUtils.waitForCondition(() -> consumerTask.readOffsetForMetadataPartition(metadataPartition).equals(Optional.of(1L)), "Couldn't read record");
// Only one record is processed, tpId1 record is skipped as unassigned
// but read offset is 1 e.g., record for tpId1 has been read by consumer
assertEquals(1, handler.metadataCounter);
// Adding assignment for tpId1 after related metadata records have already been read
consumerTask.addAssignmentsForPartitions(Collections.singleton(tpId1));
TestUtils.waitForCondition(() -> consumerTask.isUserPartitionAssigned(tpId1), "Timed out waiting for " + tpId1 + " to be assigned");
// Adding assignment for tpId0 to trigger the reset to last read offset
// and assignment for tpId3 that has different metadata partition to trigger the update of metadata snapshot
HashSet<TopicIdPartition> partitions = new HashSet<>();
partitions.add(tpId0);
partitions.add(tpId3);
consumerTask.addAssignmentsForPartitions(partitions);
// explicitly re-adding the records since MockConsumer drops them on poll.
addRecord(consumer, metadataPartition, tpId1, 0);
addRecord(consumer, metadataPartition, tpId0, 1);
// Waiting for all metadata records to be re-read from the first metadata partition number
TestUtils.waitForCondition(() -> consumerTask.readOffsetForMetadataPartition(metadataPartition).equals(Optional.of(1L)), "Couldn't read record");
// Verifying that all the metadata records from the first metadata partition were processed properly.
TestUtils.waitForCondition(() -> handler.metadataCounter == 2, "Couldn't read record");
}
@Test
public void testMaybeMarkUserPartitionsAsReady() throws InterruptedException {
final TopicIdPartition tpId = getIdPartitions("hello", 1).get(0);