KAFKA-16118; Coordinator unloading fails when replica is deleted (#15182)

When a replica is deleted, the unloading procedure of the coordinator is called with an empty leader epoch. However, the current implementation of the new group coordinator throws an exception in this case. My bad. This patch updates the logic to handle it correctly.

We discovered the bug in our testing environment. We will add a system test or an integration test in a subsequent patch to better exercise this path.

Reviewers: Justine Olshan <jolshan@confluent.io>
This commit is contained in:
David Jacot 2024-01-12 15:34:52 -08:00 committed by GitHub
parent 21227bda61
commit 3a6e699f13
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 84 additions and 11 deletions

View File

@ -1029,12 +1029,9 @@ public class GroupCoordinatorService implements GroupCoordinator {
OptionalInt groupMetadataPartitionLeaderEpoch
) {
throwIfNotActive();
if (!groupMetadataPartitionLeaderEpoch.isPresent()) {
throw new IllegalArgumentException("The leader epoch should always be provided in KRaft.");
}
runtime.scheduleUnloadOperation(
new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupMetadataPartitionIndex),
groupMetadataPartitionLeaderEpoch.getAsInt()
groupMetadataPartitionLeaderEpoch
);
}

View File

@ -44,6 +44,7 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.OptionalInt;
import java.util.OptionalLong;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
@ -1669,11 +1670,12 @@ public class CoordinatorRuntime<S extends CoordinatorShard<U>, U> implements Aut
* leader anymore.
*
* @param tp The topic partition of the coordinator.
* @param partitionEpoch The partition epoch.
* @param partitionEpoch The partition epoch as an optional value.
* An empty value means that the topic was deleted.
*/
public void scheduleUnloadOperation(
TopicPartition tp,
int partitionEpoch
OptionalInt partitionEpoch
) {
throwIfNotRunning();
log.info("Scheduling unloading of metadata for {} with epoch {}", tp, partitionEpoch);
@ -1683,7 +1685,7 @@ public class CoordinatorRuntime<S extends CoordinatorShard<U>, U> implements Aut
if (context != null) {
try {
context.lock.lock();
if (context.epoch < partitionEpoch) {
if (!partitionEpoch.isPresent() || context.epoch < partitionEpoch.getAsInt()) {
log.info("Started unloading metadata for {} with epoch {}.", tp, partitionEpoch);
context.transitionTo(CoordinatorState.CLOSED);
coordinators.remove(tp, context);

View File

@ -336,7 +336,26 @@ public class GroupCoordinatorServiceTest {
verify(runtime, times(1)).scheduleUnloadOperation(
new TopicPartition("__consumer_offsets", 5),
10
OptionalInt.of(10)
);
}
@Test
public void testOnResignationWithEmptyLeaderEpoch() {
CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = mockRuntime();
GroupCoordinatorService service = new GroupCoordinatorService(
new LogContext(),
createConfig(),
runtime,
new GroupCoordinatorMetrics()
);
service.startup(() -> 1);
service.onResignation(5, OptionalInt.empty());
verify(runtime, times(1)).scheduleUnloadOperation(
new TopicPartition("__consumer_offsets", 5),
OptionalInt.empty()
);
}

View File

@ -49,6 +49,7 @@ import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import java.util.OptionalInt;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
@ -669,7 +670,7 @@ public class CoordinatorRuntimeTest {
assertEquals(10, ctx.epoch);
// Schedule the unloading.
runtime.scheduleUnloadOperation(TP, ctx.epoch + 1);
runtime.scheduleUnloadOperation(TP, OptionalInt.of(ctx.epoch + 1));
assertEquals(CLOSED, ctx.state);
// Verify that onUnloaded is called.
@ -684,6 +685,60 @@ public class CoordinatorRuntimeTest {
// Getting the coordinator context fails because it no longer exists.
assertThrows(NotCoordinatorException.class, () -> runtime.contextOrThrow(TP));
}
@Test
public void testScheduleUnloadingWithEmptyEpoch() {
MockTimer timer = new MockTimer();
MockPartitionWriter writer = mock(MockPartitionWriter.class);
MockCoordinatorShardBuilderSupplier supplier = mock(MockCoordinatorShardBuilderSupplier.class);
MockCoordinatorShardBuilder builder = mock(MockCoordinatorShardBuilder.class);
MockCoordinatorShard coordinator = mock(MockCoordinatorShard.class);
CoordinatorRuntime<MockCoordinatorShard, String> runtime =
new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
.withTime(timer.time())
.withTimer(timer)
.withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT)
.withLoader(new MockCoordinatorLoader())
.withEventProcessor(new DirectEventProcessor())
.withPartitionWriter(writer)
.withCoordinatorShardBuilderSupplier(supplier)
.withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class))
.withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class))
.build();
when(builder.withSnapshotRegistry(any())).thenReturn(builder);
when(builder.withLogContext(any())).thenReturn(builder);
when(builder.withTime(any())).thenReturn(builder);
when(builder.withTimer(any())).thenReturn(builder);
when(builder.withCoordinatorMetrics(any())).thenReturn(builder);
when(builder.withTopicPartition(any())).thenReturn(builder);
when(builder.build()).thenReturn(coordinator);
when(supplier.get()).thenReturn(builder);
// Loads the coordinator. It directly transitions to active.
runtime.scheduleLoadOperation(TP, 10);
CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext ctx = runtime.contextOrThrow(TP);
assertEquals(ACTIVE, ctx.state);
assertEquals(10, ctx.epoch);
// Schedule the unloading.
runtime.scheduleUnloadOperation(TP, OptionalInt.empty());
assertEquals(CLOSED, ctx.state);
// Verify that onUnloaded is called.
verify(coordinator, times(1)).onUnloaded();
// Verify that the listener is deregistered.
verify(writer, times(1)).deregisterListener(
eq(TP),
any(PartitionWriter.Listener.class)
);
// Getting the coordinator context fails because it no longer exists.
assertThrows(NotCoordinatorException.class, () -> runtime.contextOrThrow(TP));
}
@Test
public void testScheduleUnloadingWhenContextDoesntExist() {
MockTimer timer = new MockTimer();
@ -717,7 +772,7 @@ public class CoordinatorRuntimeTest {
// is asked to unload its state. The unload event is skipped in this case.
// Schedule the unloading.
runtime.scheduleUnloadOperation(TP, 11);
runtime.scheduleUnloadOperation(TP, OptionalInt.of(11));
// Verify that onUnloaded is not called.
verify(coordinator, times(0)).onUnloaded();
@ -765,7 +820,7 @@ public class CoordinatorRuntimeTest {
// Unloading with a previous epoch is a no-op. The coordinator stays
// in active with the correct epoch.
runtime.scheduleUnloadOperation(TP, 0);
runtime.scheduleUnloadOperation(TP, OptionalInt.of(0));
assertEquals(ACTIVE, ctx.state);
assertEquals(10, ctx.epoch);
}