diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java index ac594ebea05..21c5d624a6b 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java @@ -60,7 +60,6 @@ import java.util.List; import java.util.Map; import java.util.Optional; import java.util.OptionalInt; -import java.util.OptionalLong; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.RejectedExecutionException; @@ -900,9 +899,8 @@ public class CoordinatorRuntime, U> implements Aut if (currentBatch != null) { currentBatch.deferredEvents.add(event); } else { - OptionalLong pendingOffset = deferredEventQueue.highestPendingOffset(); - if (pendingOffset.isPresent()) { - deferredEventQueue.add(pendingOffset.getAsLong(), event); + if (coordinator.lastCommittedOffset() < coordinator.lastWrittenOffset()) { + deferredEventQueue.add(coordinator.lastWrittenOffset(), event); } else { event.complete(null); } diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java index 616a4f646ad..7a2adb3e8b0 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java @@ -3756,6 +3756,89 @@ public class CoordinatorRuntimeTest { assertNotEquals(coordinator, ctx.coordinator); } + @Test + public void testWriteOpIsNotReleasedWhenStateMachineIsNotCaughtUpAfterLoad() throws ExecutionException, InterruptedException, TimeoutException { + MockTimer timer = new MockTimer(); + MockPartitionWriter writer = new MockPartitionWriter(); + CoordinatorLoader loader = new CoordinatorLoader() { + @Override + public CompletableFuture load( + TopicPartition tp, + CoordinatorPlayback coordinator + ) { + coordinator.replay( + 0, + RecordBatch.NO_PRODUCER_ID, + RecordBatch.NO_PRODUCER_EPOCH, + "record#0" + ); + + coordinator.replay( + 0, + RecordBatch.NO_PRODUCER_ID, + RecordBatch.NO_PRODUCER_EPOCH, + "record#1" + ); + + coordinator.updateLastWrittenOffset(2L); + coordinator.updateLastCommittedOffset(1L); + + return CompletableFuture.completedFuture(new LoadSummary( + 0L, + 0L, + 0L, + 2, + 1 + )); + } + + @Override + public void close() {} + }; + + CoordinatorRuntime runtime = + new CoordinatorRuntime.Builder() + .withTime(timer.time()) + .withTimer(timer) + .withDefaultWriteTimeOut(Duration.ofMillis(20)) + .withLoader(loader) + .withEventProcessor(new DirectEventProcessor()) + .withPartitionWriter(writer) + .withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier()) + .withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class)) + .withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class)) + .withSerializer(new StringSerializer()) + .withAppendLingerMs(10) + .build(); + + // Schedule the loading. + runtime.scheduleLoadOperation(TP, 10); + + // Verify the initial state. + CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP); + assertEquals(2L, ctx.coordinator.lastWrittenOffset()); + assertEquals(1L, ctx.coordinator.lastCommittedOffset()); + assertEquals(Collections.singletonList(2L), ctx.coordinator.snapshotRegistry().epochsList()); + + // Schedule a write operation that does not generate any records. + CompletableFuture write = runtime.scheduleWriteOperation("write#1", TP, Duration.ofMillis(20), + state -> new CoordinatorResult<>(Collections.emptyList(), "response1")); + + // The write operation should not be done. + assertFalse(write.isDone()); + + // Advance the last committed offset. + ctx.highWatermarklistener.onHighWatermarkUpdated(TP, 2L); + + // Verify the state. + assertEquals(2L, ctx.coordinator.lastWrittenOffset()); + assertEquals(2L, ctx.coordinator.lastCommittedOffset()); + assertEquals(Collections.singletonList(2L), ctx.coordinator.snapshotRegistry().epochsList()); + + // The write operation should be completed. + assertEquals("response1", write.get(5, TimeUnit.SECONDS)); + } + private static , U> ArgumentMatcher> coordinatorMatcher( CoordinatorRuntime runtime, TopicPartition tp