diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java index 061f816296a..afc252a7fee 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java @@ -55,6 +55,7 @@ import java.util.Set; import java.util.TreeMap; import static org.apache.kafka.coordinator.group.streams.StreamsGroup.StreamsGroupState.ASSIGNING; +import static org.apache.kafka.coordinator.group.streams.StreamsGroup.StreamsGroupState.DEAD; import static org.apache.kafka.coordinator.group.streams.StreamsGroup.StreamsGroupState.EMPTY; import static org.apache.kafka.coordinator.group.streams.StreamsGroup.StreamsGroupState.NOT_READY; import static org.apache.kafka.coordinator.group.streams.StreamsGroup.StreamsGroupState.RECONCILING; @@ -765,6 +766,11 @@ public class StreamsGroup implements Group { @Override public boolean isSubscribedToTopic(String topic) { + if (state.get() == EMPTY || state.get() == DEAD) { + // No topic subscriptions if the group is empty. + // This allows offsets to expire for empty groups. + return false; + } Optional maybeConfiguredTopology = configuredTopology.get(); if (maybeConfiguredTopology.isEmpty() || !maybeConfiguredTopology.get().isReady()) { return false; diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java index 0bd8caf3bbd..8966c936356 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java @@ -977,6 +977,8 @@ public class StreamsGroupTest { )); streamsGroup.setTopology(topology); + streamsGroup.updateMember(streamsGroup.getOrCreateDefaultMember("member-id")); + assertFalse(streamsGroup.isSubscribedToTopic("test-topic1")); assertFalse(streamsGroup.isSubscribedToTopic("test-topic2")); assertFalse(streamsGroup.isSubscribedToTopic("non-existent-topic")); @@ -991,6 +993,12 @@ public class StreamsGroupTest { assertTrue(streamsGroup.isSubscribedToTopic("test-topic1")); assertTrue(streamsGroup.isSubscribedToTopic("test-topic2")); assertFalse(streamsGroup.isSubscribedToTopic("non-existent-topic")); + + streamsGroup.removeMember("member-id"); + + assertFalse(streamsGroup.isSubscribedToTopic("test-topic1")); + assertFalse(streamsGroup.isSubscribedToTopic("test-topic2")); + assertFalse(streamsGroup.isSubscribedToTopic("non-existent-topic")); } @Test