MINOR: prevent cleanup() from being called while Streams is still shutting down (#10666)

Currently KafkaStreams#cleanUp only throw an IllegalStateException if the state is RUNNING or REBALANCING, however the application could be in the process of shutting down in which case StreamThreads may still be running. We should also throw if the state is PENDING_ERROR or PENDING_SHUTDOWN

Reviewers: Walker Carlson <wcarlson@confluent.io>, Guozhang Wang <guozhang@confluent.io>
This commit is contained in:
A. Sophie Blee-Goldman 2021-05-13 16:16:35 -07:00 committed by GitHub
parent 6d1ae8bc00
commit 4153e754f1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 66 additions and 58 deletions

View File

@ -1444,7 +1444,7 @@ public class KafkaStreams implements AutoCloseable {
* @throws StreamsException if cleanup failed
*/
public void cleanUp() {
if (isRunningOrRebalancing()) {
if (!(state == State.CREATED || state == State.NOT_RUNNING || state == State.ERROR)) {
throw new IllegalStateException("Cannot clean up while running.");
}
stateDirectory.clean();

View File

@ -351,7 +351,7 @@ public class StateDirectory {
public synchronized void clean() {
try {
cleanRemovedTasksCalledByUser();
cleanStateAndTaskDirectoriesCalledByUser();
} catch (final Exception e) {
throw new StreamsException(e);
}
@ -413,43 +413,31 @@ public class StateDirectory {
}
}
private void cleanRemovedTasksCalledByUser() throws Exception {
private void cleanStateAndTaskDirectoriesCalledByUser() throws Exception {
if (!lockedTasksToOwner.isEmpty()) {
log.warn("Found some still-locked task directories when user requested to cleaning up the state, "
+ "since Streams is not running any more these will be ignored to complete the cleanup");
}
final AtomicReference<Exception> firstException = new AtomicReference<>();
for (final File taskDir : listAllTaskDirectories()) {
final String dirName = taskDir.getName();
final TaskId id = TaskId.parseTaskDirectoryName(dirName, null);
if (!lockedTasksToOwner.containsKey(id)) {
try {
if (lock(id)) {
log.info("{} Deleting state directory {} for task {} as user calling cleanup.",
logPrefix(), dirName, id);
Utils.delete(taskDir);
} else {
log.warn("{} Could not get lock for state directory {} for task {} as user calling cleanup.",
logPrefix(), dirName, id);
if (lockedTasksToOwner.containsKey(id)) {
log.warn("{} Task {} in state directory {} was still locked by {}",
logPrefix(), dirName, id, lockedTasksToOwner.get(id));
}
} catch (final OverlappingFileLockException | IOException exception) {
Utils.delete(taskDir);
} catch (final IOException exception) {
log.error(
String.format("%s Failed to delete state directory %s for task %s with exception:",
logPrefix(), dirName, id),
exception
);
firstException.compareAndSet(null, exception);
} finally {
try {
unlock(id);
// for manual user call, stream threads are not running so it is safe to delete
// the whole directory
Utils.delete(taskDir);
} catch (final IOException exception) {
log.error(
String.format("%s Failed to release lock on state directory %s for task %s with exception:",
logPrefix(), dirName, id),
exception
);
firstException.compareAndSet(null, exception);
}
}
}
}
final Exception exception = firstException.get();

View File

@ -34,6 +34,7 @@ import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.KafkaStreams.State;
import org.apache.kafka.streams.errors.StreamsNotStartedException;
import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler;
import org.apache.kafka.streams.errors.TopologyException;
@ -96,6 +97,7 @@ import static java.util.Collections.singletonList;
import static org.apache.kafka.streams.state.QueryableStoreTypes.keyValueStore;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForApplicationState;
import static org.apache.kafka.test.TestUtils.waitForCondition;
import static org.easymock.EasyMock.anyInt;
import static org.easymock.EasyMock.anyLong;
import static org.easymock.EasyMock.anyObject;
@ -235,8 +237,8 @@ public class KafkaStreamsTest {
EasyMock.expect(StreamThread.eosEnabled(anyObject(StreamsConfig.class))).andReturn(false).anyTimes();
EasyMock.expect(StreamThread.processingMode(anyObject(StreamsConfig.class))).andReturn(StreamThread.ProcessingMode.AT_LEAST_ONCE).anyTimes();
EasyMock.expect(streamThreadOne.getId()).andReturn(0L).anyTimes();
EasyMock.expect(streamThreadTwo.getId()).andReturn(1L).anyTimes();
EasyMock.expect(streamThreadOne.getId()).andReturn(1L).anyTimes();
EasyMock.expect(streamThreadTwo.getId()).andReturn(2L).anyTimes();
prepareStreamThread(streamThreadOne, 1, true);
prepareStreamThread(streamThreadTwo, 2, false);
@ -298,7 +300,9 @@ public class KafkaStreamsTest {
);
}
private void prepareStreamThread(final StreamThread thread, final int threadId, final boolean terminable) throws Exception {
private void prepareStreamThread(final StreamThread thread,
final int threadId,
final boolean terminable) throws Exception {
final AtomicReference<StreamThread.State> state = new AtomicReference<>(StreamThread.State.CREATED);
EasyMock.expect(thread.state()).andAnswer(state::get).anyTimes();
@ -351,19 +355,21 @@ public class KafkaStreamsTest {
producer.close();
}
state.set(StreamThread.State.DEAD);
threadStatelistenerCapture.getValue().onChange(thread, StreamThread.State.PENDING_SHUTDOWN, StreamThread.State.RUNNING);
threadStatelistenerCapture.getValue().onChange(thread, StreamThread.State.DEAD, StreamThread.State.PENDING_SHUTDOWN);
return null;
}).anyTimes();
EasyMock.expect(thread.isRunning()).andReturn(state.get() == StreamThread.State.RUNNING).anyTimes();
thread.join();
if (terminable)
if (terminable) {
EasyMock.expectLastCall().anyTimes();
else
} else {
EasyMock.expectLastCall().andAnswer(() -> {
Thread.sleep(50L);
Thread.sleep(2000L);
return null;
}).anyTimes();
}
EasyMock.expect(thread.activeTasks()).andStubReturn(emptyList());
EasyMock.expect(thread.allTasks()).andStubReturn(Collections.emptyMap());
@ -387,7 +393,7 @@ public class KafkaStreamsTest {
streams.start();
TestUtils.waitForCondition(
waitForCondition(
() -> streamsStateListener.numChanges == 2,
"Streams never started.");
Assert.assertEquals(KafkaStreams.State.RUNNING, streams.state());
@ -439,7 +445,7 @@ public class KafkaStreamsTest {
streams.close();
TestUtils.waitForCondition(
waitForCondition(
() -> streamsStateListener.numChanges == 6,
"Streams never closed.");
Assert.assertEquals(KafkaStreams.State.NOT_RUNNING, streams.state());
@ -453,7 +459,7 @@ public class KafkaStreamsTest {
final KafkaStreams streams = new KafkaStreams(builder.build(), props, supplier, time);
streams.close();
TestUtils.waitForCondition(
waitForCondition(
() -> streams.state() == KafkaStreams.State.NOT_RUNNING,
"Streams never stopped.");
@ -476,18 +482,18 @@ public class KafkaStreamsTest {
assertEquals(streams.state(), KafkaStreams.State.CREATED);
streams.start();
TestUtils.waitForCondition(
waitForCondition(
() -> streams.state() == KafkaStreams.State.RUNNING,
"Streams never started.");
for (int i = 0; i < NUM_THREADS; i++) {
final StreamThread tmpThread = streams.threads.get(i);
tmpThread.shutdown();
TestUtils.waitForCondition(() -> tmpThread.state() == StreamThread.State.DEAD,
waitForCondition(() -> tmpThread.state() == StreamThread.State.DEAD,
"Thread never stopped.");
streams.threads.get(i).join();
}
TestUtils.waitForCondition(
waitForCondition(
() -> streams.localThreadsMetadata().stream().allMatch(t -> t.threadState().equals("DEAD")),
"Streams never stopped"
);
@ -495,7 +501,7 @@ public class KafkaStreamsTest {
streams.close();
}
TestUtils.waitForCondition(
waitForCondition(
() -> streams.state() == KafkaStreams.State.NOT_RUNNING,
"Streams never stopped.");
@ -511,17 +517,17 @@ public class KafkaStreamsTest {
try {
streams.start();
TestUtils.waitForCondition(
waitForCondition(
() -> streams.state() == KafkaStreams.State.RUNNING,
"Streams never started.");
final GlobalStreamThread globalStreamThread = streams.globalStreamThread;
globalStreamThread.shutdown();
TestUtils.waitForCondition(
waitForCondition(
() -> globalStreamThread.state() == GlobalStreamThread.State.DEAD,
"Thread never stopped.");
globalStreamThread.join();
TestUtils.waitForCondition(
waitForCondition(
() -> streams.state() == KafkaStreams.State.PENDING_ERROR,
"Thread never stopped."
);
@ -529,7 +535,7 @@ public class KafkaStreamsTest {
streams.close();
}
TestUtils.waitForCondition(
waitForCondition(
() -> streams.state() == KafkaStreams.State.ERROR,
"Thread never stopped."
);
@ -568,7 +574,7 @@ public class KafkaStreamsTest {
final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time);
streams.start();
final int oldSize = streams.threads.size();
TestUtils.waitForCondition(() -> streams.state() == KafkaStreams.State.RUNNING, 15L, "wait until running");
waitForCondition(() -> streams.state() == KafkaStreams.State.RUNNING, 15L, "wait until running");
assertThat(streams.addStreamThread(), equalTo(Optional.of("processId-StreamThread-" + 2)));
assertThat(streams.threads.size(), equalTo(oldSize + 1));
}
@ -616,7 +622,7 @@ public class KafkaStreamsTest {
final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time);
streams.start();
final int oldSize = streams.threads.size();
TestUtils.waitForCondition(() -> streams.state() == KafkaStreams.State.RUNNING, 15L,
waitForCondition(() -> streams.state() == KafkaStreams.State.RUNNING, 15L,
"Kafka Streams client did not reach state RUNNING");
assertThat(streams.removeStreamThread(), equalTo(Optional.of("processId-StreamThread-" + 1)));
assertThat(streams.threads.size(), equalTo(oldSize - 1));
@ -707,7 +713,7 @@ public class KafkaStreamsTest {
public void shouldThrowOnCleanupWhileRunning() throws InterruptedException {
final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time);
streams.start();
TestUtils.waitForCondition(
waitForCondition(
() -> streams.state() == KafkaStreams.State.RUNNING,
"Streams never started.");
@ -719,6 +725,20 @@ public class KafkaStreamsTest {
}
}
@Test
public void shouldThrowOnCleanupWhileShuttingDown() throws InterruptedException {
final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time);
streams.start();
waitForCondition(
() -> streams.state() == KafkaStreams.State.RUNNING,
"Streams never started.");
streams.close(Duration.ZERO);
assertThat(streams.state() == State.PENDING_SHUTDOWN, equalTo(true));
assertThrows(IllegalStateException.class, streams::cleanUp);
assertThat(streams.state() == State.PENDING_SHUTDOWN, equalTo(true));
}
@Test
public void shouldNotGetAllTasksWhenNotRunning() throws InterruptedException {
try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) {
@ -1027,13 +1047,13 @@ public class KafkaStreamsTest {
assertEquals(streams.state(), KafkaStreams.State.CREATED);
streams.start();
TestUtils.waitForCondition(
waitForCondition(
() -> streams.state() == KafkaStreams.State.RUNNING,
"Streams never started, state is " + streams.state());
streams.close();
TestUtils.waitForCondition(
waitForCondition(
() -> streams.state() == KafkaStreams.State.NOT_RUNNING,
"Streams never stopped.");
}

View File

@ -170,8 +170,8 @@ public class StandbyTaskEOSIntegrationTest {
// Wait for the record to be processed
assertTrue(instanceLatch.await(15, TimeUnit.SECONDS));
streamInstanceOne.close(Duration.ZERO);
streamInstanceTwo.close(Duration.ZERO);
streamInstanceOne.close();
streamInstanceTwo.close();
streamInstanceOne.cleanUp();
streamInstanceTwo.cleanUp();