KAFKA-19372: StreamsGroup not subscribed to a topic when empty (#19901)

We should behave more like a consumer group and make sure to not be
subscribed to the input topics anymore when the last member leaves the
group. We don't do this right now because our topology is still
initialized even after the last member leaves the group.

This will allow:
* Offsets to expire and be cleaned up.
* Offsets to be deleted through admin API calls.

Reviewers: Bill Bejeck <bbejeck@apache.org>
This commit is contained in:
Lucas Brutschy 2025-06-04 20:55:14 +02:00 committed by GitHub
parent 678d456ad7
commit 25bc5f2cfa
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 14 additions and 0 deletions

View File

@ -55,6 +55,7 @@ import java.util.Set;
import java.util.TreeMap; import java.util.TreeMap;
import static org.apache.kafka.coordinator.group.streams.StreamsGroup.StreamsGroupState.ASSIGNING; import static org.apache.kafka.coordinator.group.streams.StreamsGroup.StreamsGroupState.ASSIGNING;
import static org.apache.kafka.coordinator.group.streams.StreamsGroup.StreamsGroupState.DEAD;
import static org.apache.kafka.coordinator.group.streams.StreamsGroup.StreamsGroupState.EMPTY; import static org.apache.kafka.coordinator.group.streams.StreamsGroup.StreamsGroupState.EMPTY;
import static org.apache.kafka.coordinator.group.streams.StreamsGroup.StreamsGroupState.NOT_READY; import static org.apache.kafka.coordinator.group.streams.StreamsGroup.StreamsGroupState.NOT_READY;
import static org.apache.kafka.coordinator.group.streams.StreamsGroup.StreamsGroupState.RECONCILING; import static org.apache.kafka.coordinator.group.streams.StreamsGroup.StreamsGroupState.RECONCILING;
@ -765,6 +766,11 @@ public class StreamsGroup implements Group {
@Override @Override
public boolean isSubscribedToTopic(String topic) { public boolean isSubscribedToTopic(String topic) {
if (state.get() == EMPTY || state.get() == DEAD) {
// No topic subscriptions if the group is empty.
// This allows offsets to expire for empty groups.
return false;
}
Optional<ConfiguredTopology> maybeConfiguredTopology = configuredTopology.get(); Optional<ConfiguredTopology> maybeConfiguredTopology = configuredTopology.get();
if (maybeConfiguredTopology.isEmpty() || !maybeConfiguredTopology.get().isReady()) { if (maybeConfiguredTopology.isEmpty() || !maybeConfiguredTopology.get().isReady()) {
return false; return false;

View File

@ -977,6 +977,8 @@ public class StreamsGroupTest {
)); ));
streamsGroup.setTopology(topology); streamsGroup.setTopology(topology);
streamsGroup.updateMember(streamsGroup.getOrCreateDefaultMember("member-id"));
assertFalse(streamsGroup.isSubscribedToTopic("test-topic1")); assertFalse(streamsGroup.isSubscribedToTopic("test-topic1"));
assertFalse(streamsGroup.isSubscribedToTopic("test-topic2")); assertFalse(streamsGroup.isSubscribedToTopic("test-topic2"));
assertFalse(streamsGroup.isSubscribedToTopic("non-existent-topic")); assertFalse(streamsGroup.isSubscribedToTopic("non-existent-topic"));
@ -991,6 +993,12 @@ public class StreamsGroupTest {
assertTrue(streamsGroup.isSubscribedToTopic("test-topic1")); assertTrue(streamsGroup.isSubscribedToTopic("test-topic1"));
assertTrue(streamsGroup.isSubscribedToTopic("test-topic2")); assertTrue(streamsGroup.isSubscribedToTopic("test-topic2"));
assertFalse(streamsGroup.isSubscribedToTopic("non-existent-topic")); assertFalse(streamsGroup.isSubscribedToTopic("non-existent-topic"));
streamsGroup.removeMember("member-id");
assertFalse(streamsGroup.isSubscribedToTopic("test-topic1"));
assertFalse(streamsGroup.isSubscribedToTopic("test-topic2"));
assertFalse(streamsGroup.isSubscribedToTopic("non-existent-topic"));
} }
@Test @Test