KAFKA-18168: Adding checkpointing for GlobalKTable during restoration and closing (#18752)

To address the issue of not creating a checkpoint file during the
restoring and closing process, called the
GlobalStateUpdateTask.flushState() method in
GlobalStateUpdateTask.initialize() and GlobalStateUpdateTask.close()
methods. This will flush the state and create a checkpoint file thereby,
avoiding the need to completely restore the entire state.

Reviewers: Alieh Saeedi <asaeedi@confluent.io>, Matthias J. Sax <matthias@confluent.io>
This commit is contained in:
Janindu Pathirana 2025-03-03 01:46:48 +05:30 committed by GitHub
parent 42a200bd39
commit 2e6e5304c0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 37 additions and 0 deletions

View File

@ -96,6 +96,7 @@ public class GlobalStateUpdateTask implements GlobalStateMaintainer {
} }
initTopology(); initTopology();
processorContext.initialize(); processorContext.initialize();
flushState();
lastFlush = time.milliseconds(); lastFlush = time.milliseconds();
return stateMgr.changelogOffsets(); return stateMgr.changelogOffsets();
} }
@ -138,6 +139,9 @@ public class GlobalStateUpdateTask implements GlobalStateMaintainer {
} }
public void close(final boolean wipeStateStore) throws IOException { public void close(final boolean wipeStateStore) throws IOException {
if (!wipeStateStore) {
flushState();
}
stateMgr.close(); stateMgr.close();
if (wipeStateStore) { if (wipeStateStore) {
try { try {

View File

@ -254,6 +254,10 @@ public class GlobalStateTaskTest {
globalStateTask.initialize(); globalStateTask.initialize();
globalStateTask.update(record(topic1, 1, currentOffsetT1 + 9000L, "foo".getBytes(), "foo".getBytes())); globalStateTask.update(record(topic1, 1, currentOffsetT1 + 9000L, "foo".getBytes(), "foo".getBytes()));
time.sleep(flushInterval); // flush interval elapsed time.sleep(flushInterval); // flush interval elapsed
stateMgr.checkpointWritten = false;
stateMgr.flushed = false;
globalStateTask.maybeCheckpoint(); globalStateTask.maybeCheckpoint();
assertEquals(offsets, stateMgr.changelogOffsets()); assertEquals(offsets, stateMgr.changelogOffsets());
@ -269,6 +273,10 @@ public class GlobalStateTaskTest {
globalStateTask.update(record(topic1, 1, currentOffsetT1 + 10000L, "foo".getBytes(), "foo".getBytes())); globalStateTask.update(record(topic1, 1, currentOffsetT1 + 10000L, "foo".getBytes(), "foo".getBytes()));
time.sleep(flushInterval / 2); time.sleep(flushInterval / 2);
stateMgr.checkpointWritten = false;
stateMgr.flushed = false;
globalStateTask.maybeCheckpoint(); globalStateTask.maybeCheckpoint();
assertEquals(offsets, stateMgr.changelogOffsets()); assertEquals(offsets, stateMgr.changelogOffsets());
@ -288,6 +296,10 @@ public class GlobalStateTaskTest {
// 10000 records received since last flush => do not flush // 10000 records received since last flush => do not flush
globalStateTask.update(record(topic1, 1, currentOffsetT1 + 9999L, "foo".getBytes(), "foo".getBytes())); globalStateTask.update(record(topic1, 1, currentOffsetT1 + 9999L, "foo".getBytes(), "foo".getBytes()));
stateMgr.checkpointWritten = false;
stateMgr.flushed = false;
globalStateTask.maybeCheckpoint(); globalStateTask.maybeCheckpoint();
assertEquals(offsets, stateMgr.changelogOffsets()); assertEquals(offsets, stateMgr.changelogOffsets());
@ -333,4 +345,25 @@ public class GlobalStateTaskTest {
globalStateTask.close(true); globalStateTask.close(true);
assertFalse(stateMgr.baseDir().exists()); assertFalse(stateMgr.baseDir().exists());
} }
@Test
public void shouldCheckpointDuringInitialization() {
globalStateTask.initialize();
assertTrue(stateMgr.checkpointWritten);
assertTrue(stateMgr.flushed);
}
@Test
public void shouldCheckpointDuringClose() throws Exception {
globalStateTask.initialize();
stateMgr.checkpointWritten = false;
stateMgr.flushed = false;
globalStateTask.close(false);
assertTrue(stateMgr.checkpointWritten);
assertTrue(stateMgr.flushed);
}
} }