diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java index 0b478d4fb1b..bfab9a770f6 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java @@ -96,6 +96,7 @@ public class GlobalStateUpdateTask implements GlobalStateMaintainer { } initTopology(); processorContext.initialize(); + flushState(); lastFlush = time.milliseconds(); return stateMgr.changelogOffsets(); } @@ -138,6 +139,9 @@ public class GlobalStateUpdateTask implements GlobalStateMaintainer { } public void close(final boolean wipeStateStore) throws IOException { + if (!wipeStateStore) { + flushState(); + } stateMgr.close(); if (wipeStateStore) { try { diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java index 24b09024d2f..5aa3a248b87 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java @@ -254,6 +254,10 @@ public class GlobalStateTaskTest { globalStateTask.initialize(); globalStateTask.update(record(topic1, 1, currentOffsetT1 + 9000L, "foo".getBytes(), "foo".getBytes())); time.sleep(flushInterval); // flush interval elapsed + + stateMgr.checkpointWritten = false; + stateMgr.flushed = false; + globalStateTask.maybeCheckpoint(); assertEquals(offsets, stateMgr.changelogOffsets()); @@ -269,6 +273,10 @@ public class GlobalStateTaskTest { globalStateTask.update(record(topic1, 1, currentOffsetT1 + 10000L, "foo".getBytes(), "foo".getBytes())); time.sleep(flushInterval / 2); + + stateMgr.checkpointWritten = false; + stateMgr.flushed = false; + globalStateTask.maybeCheckpoint(); assertEquals(offsets, stateMgr.changelogOffsets()); @@ -288,6 +296,10 @@ public class GlobalStateTaskTest { // 10000 records received since last flush => do not flush globalStateTask.update(record(topic1, 1, currentOffsetT1 + 9999L, "foo".getBytes(), "foo".getBytes())); + + stateMgr.checkpointWritten = false; + stateMgr.flushed = false; + globalStateTask.maybeCheckpoint(); assertEquals(offsets, stateMgr.changelogOffsets()); @@ -333,4 +345,25 @@ public class GlobalStateTaskTest { globalStateTask.close(true); assertFalse(stateMgr.baseDir().exists()); } + + @Test + public void shouldCheckpointDuringInitialization() { + globalStateTask.initialize(); + + assertTrue(stateMgr.checkpointWritten); + assertTrue(stateMgr.flushed); + } + + @Test + public void shouldCheckpointDuringClose() throws Exception { + globalStateTask.initialize(); + + stateMgr.checkpointWritten = false; + stateMgr.flushed = false; + + globalStateTask.close(false); + + assertTrue(stateMgr.checkpointWritten); + assertTrue(stateMgr.flushed); + } }