diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java index c69739ac99b..1d4a50ecd50 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java @@ -1029,12 +1029,9 @@ public class GroupCoordinatorService implements GroupCoordinator { OptionalInt groupMetadataPartitionLeaderEpoch ) { throwIfNotActive(); - if (!groupMetadataPartitionLeaderEpoch.isPresent()) { - throw new IllegalArgumentException("The leader epoch should always be provided in KRaft."); - } runtime.scheduleUnloadOperation( new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupMetadataPartitionIndex), - groupMetadataPartitionLeaderEpoch.getAsInt() + groupMetadataPartitionLeaderEpoch ); } diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java index 6bfb52cf22d..0a3cf1d0898 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java @@ -44,6 +44,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.Map; +import java.util.OptionalInt; import java.util.OptionalLong; import java.util.Set; import java.util.concurrent.CompletableFuture; @@ -1669,11 +1670,12 @@ public class CoordinatorRuntime, U> implements Aut * leader anymore. * * @param tp The topic partition of the coordinator. - * @param partitionEpoch The partition epoch. + * @param partitionEpoch The partition epoch as an optional value. + * An empty value means that the topic was deleted. */ public void scheduleUnloadOperation( TopicPartition tp, - int partitionEpoch + OptionalInt partitionEpoch ) { throwIfNotRunning(); log.info("Scheduling unloading of metadata for {} with epoch {}", tp, partitionEpoch); @@ -1683,7 +1685,7 @@ public class CoordinatorRuntime, U> implements Aut if (context != null) { try { context.lock.lock(); - if (context.epoch < partitionEpoch) { + if (!partitionEpoch.isPresent() || context.epoch < partitionEpoch.getAsInt()) { log.info("Started unloading metadata for {} with epoch {}.", tp, partitionEpoch); context.transitionTo(CoordinatorState.CLOSED); coordinators.remove(tp, context); diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java index 8a89b9cbb19..489f2f74acb 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java @@ -336,7 +336,26 @@ public class GroupCoordinatorServiceTest { verify(runtime, times(1)).scheduleUnloadOperation( new TopicPartition("__consumer_offsets", 5), - 10 + OptionalInt.of(10) + ); + } + + @Test + public void testOnResignationWithEmptyLeaderEpoch() { + CoordinatorRuntime runtime = mockRuntime(); + GroupCoordinatorService service = new GroupCoordinatorService( + new LogContext(), + createConfig(), + runtime, + new GroupCoordinatorMetrics() + ); + + service.startup(() -> 1); + service.onResignation(5, OptionalInt.empty()); + + verify(runtime, times(1)).scheduleUnloadOperation( + new TopicPartition("__consumer_offsets", 5), + OptionalInt.empty() ); } diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java index f3db5d8c879..e2be7e5e6a7 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java @@ -49,6 +49,7 @@ import java.util.HashSet; import java.util.LinkedList; import java.util.List; import java.util.Objects; +import java.util.OptionalInt; import java.util.Queue; import java.util.Set; import java.util.concurrent.CompletableFuture; @@ -669,7 +670,7 @@ public class CoordinatorRuntimeTest { assertEquals(10, ctx.epoch); // Schedule the unloading. - runtime.scheduleUnloadOperation(TP, ctx.epoch + 1); + runtime.scheduleUnloadOperation(TP, OptionalInt.of(ctx.epoch + 1)); assertEquals(CLOSED, ctx.state); // Verify that onUnloaded is called. @@ -684,6 +685,60 @@ public class CoordinatorRuntimeTest { // Getting the coordinator context fails because it no longer exists. assertThrows(NotCoordinatorException.class, () -> runtime.contextOrThrow(TP)); } + + @Test + public void testScheduleUnloadingWithEmptyEpoch() { + MockTimer timer = new MockTimer(); + MockPartitionWriter writer = mock(MockPartitionWriter.class); + MockCoordinatorShardBuilderSupplier supplier = mock(MockCoordinatorShardBuilderSupplier.class); + MockCoordinatorShardBuilder builder = mock(MockCoordinatorShardBuilder.class); + MockCoordinatorShard coordinator = mock(MockCoordinatorShard.class); + + CoordinatorRuntime runtime = + new CoordinatorRuntime.Builder() + .withTime(timer.time()) + .withTimer(timer) + .withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT) + .withLoader(new MockCoordinatorLoader()) + .withEventProcessor(new DirectEventProcessor()) + .withPartitionWriter(writer) + .withCoordinatorShardBuilderSupplier(supplier) + .withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class)) + .withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class)) + .build(); + + when(builder.withSnapshotRegistry(any())).thenReturn(builder); + when(builder.withLogContext(any())).thenReturn(builder); + when(builder.withTime(any())).thenReturn(builder); + when(builder.withTimer(any())).thenReturn(builder); + when(builder.withCoordinatorMetrics(any())).thenReturn(builder); + when(builder.withTopicPartition(any())).thenReturn(builder); + when(builder.build()).thenReturn(coordinator); + when(supplier.get()).thenReturn(builder); + + // Loads the coordinator. It directly transitions to active. + runtime.scheduleLoadOperation(TP, 10); + CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP); + assertEquals(ACTIVE, ctx.state); + assertEquals(10, ctx.epoch); + + // Schedule the unloading. + runtime.scheduleUnloadOperation(TP, OptionalInt.empty()); + assertEquals(CLOSED, ctx.state); + + // Verify that onUnloaded is called. + verify(coordinator, times(1)).onUnloaded(); + + // Verify that the listener is deregistered. + verify(writer, times(1)).deregisterListener( + eq(TP), + any(PartitionWriter.Listener.class) + ); + + // Getting the coordinator context fails because it no longer exists. + assertThrows(NotCoordinatorException.class, () -> runtime.contextOrThrow(TP)); + } + @Test public void testScheduleUnloadingWhenContextDoesntExist() { MockTimer timer = new MockTimer(); @@ -717,7 +772,7 @@ public class CoordinatorRuntimeTest { // is asked to unload its state. The unload event is skipped in this case. // Schedule the unloading. - runtime.scheduleUnloadOperation(TP, 11); + runtime.scheduleUnloadOperation(TP, OptionalInt.of(11)); // Verify that onUnloaded is not called. verify(coordinator, times(0)).onUnloaded(); @@ -765,7 +820,7 @@ public class CoordinatorRuntimeTest { // Unloading with a previous epoch is a no-op. The coordinator stays // in active with the correct epoch. - runtime.scheduleUnloadOperation(TP, 0); + runtime.scheduleUnloadOperation(TP, OptionalInt.of(0)); assertEquals(ACTIVE, ctx.state); assertEquals(10, ctx.epoch); }