From 25bc5f2cfa0e42d735c6408ce66304fc3497542c Mon Sep 17 00:00:00 2001 From: Lucas Brutschy Date: Wed, 4 Jun 2025 20:55:14 +0200 Subject: [PATCH] KAFKA-19372: StreamsGroup not subscribed to a topic when empty (#19901) We should behave more like a consumer group and make sure to not be subscribed to the input topics anymore when the last member leaves the group. We don't do this right now because our topology is still initialized even after the last member leaves the group. This will allow: * Offsets to expire and be cleaned up. * Offsets to be deleted through admin API calls. Reviewers: Bill Bejeck --- .../kafka/coordinator/group/streams/StreamsGroup.java | 6 ++++++ .../kafka/coordinator/group/streams/StreamsGroupTest.java | 8 ++++++++ 2 files changed, 14 insertions(+) diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java index 061f816296a..afc252a7fee 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java @@ -55,6 +55,7 @@ import java.util.Set; import java.util.TreeMap; import static org.apache.kafka.coordinator.group.streams.StreamsGroup.StreamsGroupState.ASSIGNING; +import static org.apache.kafka.coordinator.group.streams.StreamsGroup.StreamsGroupState.DEAD; import static org.apache.kafka.coordinator.group.streams.StreamsGroup.StreamsGroupState.EMPTY; import static org.apache.kafka.coordinator.group.streams.StreamsGroup.StreamsGroupState.NOT_READY; import static org.apache.kafka.coordinator.group.streams.StreamsGroup.StreamsGroupState.RECONCILING; @@ -765,6 +766,11 @@ public class StreamsGroup implements Group { @Override public boolean isSubscribedToTopic(String topic) { + if (state.get() == EMPTY || state.get() == DEAD) { + // No topic subscriptions if the group is empty. + // This allows offsets to expire for empty groups. + return false; + } Optional maybeConfiguredTopology = configuredTopology.get(); if (maybeConfiguredTopology.isEmpty() || !maybeConfiguredTopology.get().isReady()) { return false; diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java index 0bd8caf3bbd..8966c936356 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java @@ -977,6 +977,8 @@ public class StreamsGroupTest { )); streamsGroup.setTopology(topology); + streamsGroup.updateMember(streamsGroup.getOrCreateDefaultMember("member-id")); + assertFalse(streamsGroup.isSubscribedToTopic("test-topic1")); assertFalse(streamsGroup.isSubscribedToTopic("test-topic2")); assertFalse(streamsGroup.isSubscribedToTopic("non-existent-topic")); @@ -991,6 +993,12 @@ public class StreamsGroupTest { assertTrue(streamsGroup.isSubscribedToTopic("test-topic1")); assertTrue(streamsGroup.isSubscribedToTopic("test-topic2")); assertFalse(streamsGroup.isSubscribedToTopic("non-existent-topic")); + + streamsGroup.removeMember("member-id"); + + assertFalse(streamsGroup.isSubscribedToTopic("test-topic1")); + assertFalse(streamsGroup.isSubscribedToTopic("test-topic2")); + assertFalse(streamsGroup.isSubscribedToTopic("non-existent-topic")); } @Test