KAFKA-12514: Fix NPE in SubscriptionState (#10369)

Return null for partitionLag if there is no current position.
This was the desired semantics, the lack of the check was an
oversight.

Patches: KIP-695
Patches: a92b986c85

Reviewers: Walker Carlson <wcarlson@confluent.io>, A. Sophie Blee-Goldman <ableegoldman@apache.org>
This commit is contained in:
John Roesler 2021-03-21 20:56:16 -05:00 committed by GitHub
parent 2e2fad747d
commit 2bfe7a9d1e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 20 additions and 2 deletions

View File

@ -539,10 +539,13 @@ public class SubscriptionState {
public synchronized Long partitionLag(TopicPartition tp, IsolationLevel isolationLevel) {
TopicPartitionState topicPartitionState = assignedState(tp);
if (isolationLevel == IsolationLevel.READ_COMMITTED)
if (topicPartitionState.position == null) {
return null;
} else if (isolationLevel == IsolationLevel.READ_COMMITTED) {
return topicPartitionState.lastStableOffset == null ? null : topicPartitionState.lastStableOffset - topicPartitionState.position.offset;
else
} else {
return topicPartitionState.highWatermark == null ? null : topicPartitionState.highWatermark - topicPartitionState.position.offset;
}
}
synchronized Long partitionLead(TopicPartition tp) {

View File

@ -23,6 +23,7 @@ import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.clients.consumer.internals.SubscriptionState.LogTruncation;
import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset;
@ -794,4 +795,18 @@ public class SubscriptionStateTest {
assertFalse(state.isOffsetResetNeeded(tp0));
}
@Test
public void nullPositionLagOnNoPosition() {
state.assignFromUser(Collections.singleton(tp0));
assertNull(state.partitionLag(tp0, IsolationLevel.READ_UNCOMMITTED));
assertNull(state.partitionLag(tp0, IsolationLevel.READ_COMMITTED));
state.updateHighWatermark(tp0, 1L);
state.updateLastStableOffset(tp0, 1L);
assertNull(state.partitionLag(tp0, IsolationLevel.READ_UNCOMMITTED));
assertNull(state.partitionLag(tp0, IsolationLevel.READ_COMMITTED));
}
}