KAFKA-16973; Fix caught-up condition (#16367)

When a write operation does not have any records, the coordinator runtime checked whether the state machine is caught-up to decide whether the operation should wait until the state machine is committed up to the operation point or the operation should be completed. The current implementation assumes that there will always be a pending write operation waiting in the deferred queue when the state machine is not fully caught-up yet. This is true except when the state machine is just loaded and not caught-up yet.

This patch fixes the issue by always comparing the last written offset and the last committed offset.

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
David Jacot 2024-06-20 09:53:48 +02:00 committed by GitHub
parent f79e429044
commit ee550c4b77
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 85 additions and 4 deletions

View File

@ -60,7 +60,6 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.OptionalLong;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.RejectedExecutionException;
@ -900,9 +899,8 @@ public class CoordinatorRuntime<S extends CoordinatorShard<U>, U> implements Aut
if (currentBatch != null) {
currentBatch.deferredEvents.add(event);
} else {
OptionalLong pendingOffset = deferredEventQueue.highestPendingOffset();
if (pendingOffset.isPresent()) {
deferredEventQueue.add(pendingOffset.getAsLong(), event);
if (coordinator.lastCommittedOffset() < coordinator.lastWrittenOffset()) {
deferredEventQueue.add(coordinator.lastWrittenOffset(), event);
} else {
event.complete(null);
}

View File

@ -3756,6 +3756,89 @@ public class CoordinatorRuntimeTest {
assertNotEquals(coordinator, ctx.coordinator);
}
@Test
public void testWriteOpIsNotReleasedWhenStateMachineIsNotCaughtUpAfterLoad() throws ExecutionException, InterruptedException, TimeoutException {
MockTimer timer = new MockTimer();
MockPartitionWriter writer = new MockPartitionWriter();
CoordinatorLoader<String> loader = new CoordinatorLoader<String>() {
@Override
public CompletableFuture<LoadSummary> load(
TopicPartition tp,
CoordinatorPlayback<String> coordinator
) {
coordinator.replay(
0,
RecordBatch.NO_PRODUCER_ID,
RecordBatch.NO_PRODUCER_EPOCH,
"record#0"
);
coordinator.replay(
0,
RecordBatch.NO_PRODUCER_ID,
RecordBatch.NO_PRODUCER_EPOCH,
"record#1"
);
coordinator.updateLastWrittenOffset(2L);
coordinator.updateLastCommittedOffset(1L);
return CompletableFuture.completedFuture(new LoadSummary(
0L,
0L,
0L,
2,
1
));
}
@Override
public void close() {}
};
CoordinatorRuntime<MockCoordinatorShard, String> runtime =
new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
.withTime(timer.time())
.withTimer(timer)
.withDefaultWriteTimeOut(Duration.ofMillis(20))
.withLoader(loader)
.withEventProcessor(new DirectEventProcessor())
.withPartitionWriter(writer)
.withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier())
.withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class))
.withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class))
.withSerializer(new StringSerializer())
.withAppendLingerMs(10)
.build();
// Schedule the loading.
runtime.scheduleLoadOperation(TP, 10);
// Verify the initial state.
CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext ctx = runtime.contextOrThrow(TP);
assertEquals(2L, ctx.coordinator.lastWrittenOffset());
assertEquals(1L, ctx.coordinator.lastCommittedOffset());
assertEquals(Collections.singletonList(2L), ctx.coordinator.snapshotRegistry().epochsList());
// Schedule a write operation that does not generate any records.
CompletableFuture<String> write = runtime.scheduleWriteOperation("write#1", TP, Duration.ofMillis(20),
state -> new CoordinatorResult<>(Collections.emptyList(), "response1"));
// The write operation should not be done.
assertFalse(write.isDone());
// Advance the last committed offset.
ctx.highWatermarklistener.onHighWatermarkUpdated(TP, 2L);
// Verify the state.
assertEquals(2L, ctx.coordinator.lastWrittenOffset());
assertEquals(2L, ctx.coordinator.lastCommittedOffset());
assertEquals(Collections.singletonList(2L), ctx.coordinator.snapshotRegistry().epochsList());
// The write operation should be completed.
assertEquals("response1", write.get(5, TimeUnit.SECONDS));
}
private static <S extends CoordinatorShard<U>, U> ArgumentMatcher<CoordinatorPlayback<U>> coordinatorMatcher(
CoordinatorRuntime<S, U> runtime,
TopicPartition tp