KAFKA-9545: Fix IllegalStateException in updateLags (#15096)

We attempt to update lags when in state PENDING_SHUTDOWN or PARTITIONS_REVOKED. In these states,
however, our representation of the assignment may not be up-to-date with the subscription
object inside the consumer. This can cause a bug, in particular, when we subscribe to a
set of topics via a regular expression, and the underlying topic is deleted. The consumer
subscription may reflect that topic deletion already, while our internal state still
contains references to the deleted topic, because `onAssignment` has not yet been
executed. Therefore, we will attempt to call `currentLag` on partitions that are not
assigned to us any more inside the consumer, leading to an `IllegalStateException`.

This bug causes flakiness of the test
`RegexSourceIntegrationTest.testRegexMatchesTopicsAWhenDeleted`.

Reviewers: Matthias J. Sax <matthias@confluent.io>, Bruno Cadonna <cadonna@apache.org>
This commit is contained in:
Lucas Brutschy 2024-01-02 16:35:31 +01:00 committed by GitHub
parent 65b1558532
commit e01eed32ab
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 9 additions and 10 deletions

View File

@ -952,11 +952,7 @@ public class StreamThread extends Thread implements ProcessingThread {
final long pollLatency;
taskManager.resumePollingForPartitionsWithAvailableSpace();
try {
pollLatency = pollPhase();
} finally {
taskManager.updateLags();
}
// Shutdown hook could potentially be triggered and transit the thread state to PENDING_SHUTDOWN during #pollRequests().
// The task manager internal states could be uninitialized if the state transition happens during #onPartitionsAssigned().
@ -981,6 +977,9 @@ public class StreamThread extends Thread implements ProcessingThread {
long totalPunctuateLatency = 0L;
if (state == State.RUNNING
|| (stateUpdaterEnabled && isStartingRunningOrPartitionAssigned())) {
taskManager.updateLags();
/*
* Within an iteration, after processing up to N (N initialized as 1 upon start up) records for each applicable tasks, check the current time:
* 1. If it is time to punctuate, do it;
@ -1101,11 +1100,7 @@ public class StreamThread extends Thread implements ProcessingThread {
final long pollLatency;
taskManager.resumePollingForPartitionsWithAvailableSpace();
try {
pollLatency = pollPhase();
} finally {
taskManager.updateLags();
}
// Shutdown hook could potentially be triggered and transit the thread state to PENDING_SHUTDOWN during #pollRequests().
// The task manager internal states could be uninitialized if the state transition happens during #onPartitionsAssigned().
@ -1119,6 +1114,8 @@ public class StreamThread extends Thread implements ProcessingThread {
long totalCommitLatency = 0L;
if (isRunning()) {
taskManager.updateLags();
checkStateUpdater();
taskManager.maybeThrowTaskExceptionsFromProcessingThreads();

View File

@ -3265,6 +3265,8 @@ public class StreamThreadTest {
thread = setUpThread(streamsConfigProps);
thread.setState(State.STARTING);
thread.setState(State.PARTITIONS_ASSIGNED);
thread.updateThreadMetadata("metadata");
thread.setState(State.RUNNING);
runOnce();