diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 4b5fad5d6e1..8d753be0e99 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -335,7 +335,7 @@
+ files="(RecordHelpersTest|GroupCoordinatorRecordHelpers|GroupMetadataManager|GroupMetadataManagerTest|OffsetMetadataManagerTest|GroupCoordinatorServiceTest|GroupCoordinatorShardTest|GroupCoordinatorRecordSerde|StreamsGroupTest).java"/>
diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java
index b5d63499751..79c1b72237b 100644
--- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java
+++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java
@@ -37,6 +37,7 @@ public interface Group {
CONSUMER("consumer"),
CLASSIC("classic"),
SHARE("share"),
+ STREAMS("streams"),
UNKNOWN("unknown");
private final String name;
diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetrics.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetrics.java
index 23606593a6b..1cd9e4b0bd9 100644
--- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetrics.java
+++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetrics.java
@@ -29,6 +29,7 @@ import org.apache.kafka.coordinator.group.Group;
import org.apache.kafka.coordinator.group.classic.ClassicGroupState;
import org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroup.ConsumerGroupState;
import org.apache.kafka.coordinator.group.modern.share.ShareGroup;
+import org.apache.kafka.coordinator.group.streams.StreamsGroup.StreamsGroupState;
import org.apache.kafka.server.metrics.KafkaYammerMetrics;
import org.apache.kafka.timeline.SnapshotRegistry;
@@ -73,6 +74,8 @@ public class GroupCoordinatorMetrics extends CoordinatorMetrics implements AutoC
public static final String SHARE_GROUP_COUNT_METRIC_NAME = "group-count";
public static final String CONSUMER_GROUP_COUNT_STATE_TAG = "state";
public static final String SHARE_GROUP_COUNT_STATE_TAG = CONSUMER_GROUP_COUNT_STATE_TAG;
+ public static final String STREAMS_GROUP_COUNT_METRIC_NAME = "streams-group-count";
+ public static final String STREAMS_GROUP_COUNT_STATE_TAG = "state";
public static final String OFFSET_COMMITS_SENSOR_NAME = "OffsetCommits";
public static final String OFFSET_EXPIRED_SENSOR_NAME = "OffsetExpired";
@@ -80,6 +83,7 @@ public class GroupCoordinatorMetrics extends CoordinatorMetrics implements AutoC
public static final String CLASSIC_GROUP_COMPLETED_REBALANCES_SENSOR_NAME = "CompletedRebalances";
public static final String CONSUMER_GROUP_REBALANCES_SENSOR_NAME = "ConsumerGroupRebalances";
public static final String SHARE_GROUP_REBALANCES_SENSOR_NAME = "ShareGroupRebalances";
+ public static final String STREAMS_GROUP_REBALANCES_SENSOR_NAME = "StreamsGroupRebalances";
private final MetricName classicGroupCountMetricName;
private final MetricName consumerGroupCountMetricName;
@@ -92,6 +96,13 @@ public class GroupCoordinatorMetrics extends CoordinatorMetrics implements AutoC
private final MetricName shareGroupCountEmptyMetricName;
private final MetricName shareGroupCountStableMetricName;
private final MetricName shareGroupCountDeadMetricName;
+ private final MetricName streamsGroupCountMetricName;
+ private final MetricName streamsGroupCountEmptyMetricName;
+ private final MetricName streamsGroupCountAssigningMetricName;
+ private final MetricName streamsGroupCountReconcilingMetricName;
+ private final MetricName streamsGroupCountStableMetricName;
+ private final MetricName streamsGroupCountDeadMetricName;
+ private final MetricName streamsGroupCountNotReadyMetricName;
private final MetricsRegistry registry;
private final Metrics metrics;
@@ -106,6 +117,7 @@ public class GroupCoordinatorMetrics extends CoordinatorMetrics implements AutoC
this(KafkaYammerMetrics.defaultRegistry(), new Metrics());
}
+ @SuppressWarnings("MethodLength")
public GroupCoordinatorMetrics(MetricsRegistry registry, Metrics metrics) {
this.registry = Objects.requireNonNull(registry);
this.metrics = Objects.requireNonNull(metrics);
@@ -190,6 +202,55 @@ public class GroupCoordinatorMetrics extends CoordinatorMetrics implements AutoC
SHARE_GROUP_COUNT_STATE_TAG, ShareGroup.ShareGroupState.DEAD.toString()
);
+ streamsGroupCountMetricName = metrics.metricName(
+ GROUP_COUNT_METRIC_NAME,
+ METRICS_GROUP,
+ "The total number of groups using the streams rebalance protocol.",
+ Collections.singletonMap(GROUP_COUNT_PROTOCOL_TAG, Group.GroupType.STREAMS.toString())
+ );
+
+ streamsGroupCountEmptyMetricName = metrics.metricName(
+ STREAMS_GROUP_COUNT_METRIC_NAME,
+ METRICS_GROUP,
+ "The number of streams groups in empty state.",
+ Collections.singletonMap(STREAMS_GROUP_COUNT_STATE_TAG, StreamsGroupState.EMPTY.toString())
+ );
+
+ streamsGroupCountAssigningMetricName = metrics.metricName(
+ STREAMS_GROUP_COUNT_METRIC_NAME,
+ METRICS_GROUP,
+ "The number of streams groups in assigning state.",
+ Collections.singletonMap(STREAMS_GROUP_COUNT_STATE_TAG, StreamsGroupState.ASSIGNING.toString())
+ );
+
+ streamsGroupCountReconcilingMetricName = metrics.metricName(
+ STREAMS_GROUP_COUNT_METRIC_NAME,
+ METRICS_GROUP,
+ "The number of streams groups in reconciling state.",
+ Collections.singletonMap(STREAMS_GROUP_COUNT_STATE_TAG, StreamsGroupState.RECONCILING.toString())
+ );
+
+ streamsGroupCountStableMetricName = metrics.metricName(
+ STREAMS_GROUP_COUNT_METRIC_NAME,
+ METRICS_GROUP,
+ "The number of streams groups in stable state.",
+ Collections.singletonMap(STREAMS_GROUP_COUNT_STATE_TAG, StreamsGroupState.STABLE.toString())
+ );
+
+ streamsGroupCountDeadMetricName = metrics.metricName(
+ STREAMS_GROUP_COUNT_METRIC_NAME,
+ METRICS_GROUP,
+ "The number of streams groups in dead state.",
+ Collections.singletonMap(STREAMS_GROUP_COUNT_STATE_TAG, StreamsGroupState.DEAD.toString())
+ );
+
+ streamsGroupCountNotReadyMetricName = metrics.metricName(
+ STREAMS_GROUP_COUNT_METRIC_NAME,
+ METRICS_GROUP,
+ "The number of streams groups in not ready state.",
+ Collections.singletonMap(STREAMS_GROUP_COUNT_STATE_TAG, StreamsGroupState.NOT_READY.toString())
+ );
+
registerGauges();
Sensor offsetCommitsSensor = metrics.sensor(OFFSET_COMMITS_SENSOR_NAME);
@@ -247,6 +308,15 @@ public class GroupCoordinatorMetrics extends CoordinatorMetrics implements AutoC
METRICS_GROUP,
"The total number of share group rebalances",
SHARE_GROUP_PROTOCOL_TAG, Group.GroupType.SHARE.toString())));
+
+ Sensor streamsGroupRebalanceSensor = metrics.sensor(STREAMS_GROUP_REBALANCES_SENSOR_NAME);
+ streamsGroupRebalanceSensor.add(new Meter(
+ metrics.metricName("streams-group-rebalance-rate",
+ METRICS_GROUP,
+ "The rate of streams group rebalances"),
+ metrics.metricName("streams-group-rebalance-count",
+ METRICS_GROUP,
+ "The total number of streams group rebalances")));
globalSensors = Collections.unmodifiableMap(Utils.mkMap(
Utils.mkEntry(OFFSET_COMMITS_SENSOR_NAME, offsetCommitsSensor),
@@ -254,7 +324,8 @@ public class GroupCoordinatorMetrics extends CoordinatorMetrics implements AutoC
Utils.mkEntry(OFFSET_DELETIONS_SENSOR_NAME, offsetDeletionsSensor),
Utils.mkEntry(CLASSIC_GROUP_COMPLETED_REBALANCES_SENSOR_NAME, classicGroupCompletedRebalancesSensor),
Utils.mkEntry(CONSUMER_GROUP_REBALANCES_SENSOR_NAME, consumerGroupRebalanceSensor),
- Utils.mkEntry(SHARE_GROUP_REBALANCES_SENSOR_NAME, shareGroupRebalanceSensor)
+ Utils.mkEntry(SHARE_GROUP_REBALANCES_SENSOR_NAME, shareGroupRebalanceSensor),
+ Utils.mkEntry(STREAMS_GROUP_REBALANCES_SENSOR_NAME, streamsGroupRebalanceSensor)
));
}
@@ -278,6 +349,14 @@ public class GroupCoordinatorMetrics extends CoordinatorMetrics implements AutoC
return shards.values().stream().mapToLong(shard -> shard.numConsumerGroups(state)).sum();
}
+ private long numStreamsGroups() {
+ return shards.values().stream().mapToLong(GroupCoordinatorMetricsShard::numStreamsGroups).sum();
+ }
+
+ private long numStreamsGroups(StreamsGroupState state) {
+ return shards.values().stream().mapToLong(shard -> shard.numStreamsGroups(state)).sum();
+ }
+
private long numShareGroups() {
return shards.values().stream().mapToLong(GroupCoordinatorMetricsShard::numShareGroups).sum();
}
@@ -309,7 +388,14 @@ public class GroupCoordinatorMetrics extends CoordinatorMetrics implements AutoC
shareGroupCountMetricName,
shareGroupCountEmptyMetricName,
shareGroupCountStableMetricName,
- shareGroupCountDeadMetricName
+ shareGroupCountDeadMetricName,
+ streamsGroupCountMetricName,
+ streamsGroupCountEmptyMetricName,
+ streamsGroupCountAssigningMetricName,
+ streamsGroupCountReconcilingMetricName,
+ streamsGroupCountStableMetricName,
+ streamsGroupCountDeadMetricName,
+ streamsGroupCountNotReadyMetricName
).forEach(metrics::removeMetric);
Arrays.asList(
@@ -318,7 +404,8 @@ public class GroupCoordinatorMetrics extends CoordinatorMetrics implements AutoC
OFFSET_DELETIONS_SENSOR_NAME,
CLASSIC_GROUP_COMPLETED_REBALANCES_SENSOR_NAME,
CONSUMER_GROUP_REBALANCES_SENSOR_NAME,
- SHARE_GROUP_REBALANCES_SENSOR_NAME
+ SHARE_GROUP_REBALANCES_SENSOR_NAME,
+ STREAMS_GROUP_REBALANCES_SENSOR_NAME
).forEach(metrics::removeSensor);
}
@@ -461,5 +548,40 @@ public class GroupCoordinatorMetrics extends CoordinatorMetrics implements AutoC
shareGroupCountDeadMetricName,
(Gauge) (config, now) -> numShareGroups(ShareGroup.ShareGroupState.DEAD)
);
+
+ metrics.addMetric(
+ streamsGroupCountMetricName,
+ (Gauge) (config, now) -> numStreamsGroups()
+ );
+
+ metrics.addMetric(
+ streamsGroupCountEmptyMetricName,
+ (Gauge) (config, now) -> numStreamsGroups(StreamsGroupState.EMPTY)
+ );
+
+ metrics.addMetric(
+ streamsGroupCountAssigningMetricName,
+ (Gauge) (config, now) -> numStreamsGroups(StreamsGroupState.ASSIGNING)
+ );
+
+ metrics.addMetric(
+ streamsGroupCountReconcilingMetricName,
+ (Gauge) (config, now) -> numStreamsGroups(StreamsGroupState.RECONCILING)
+ );
+
+ metrics.addMetric(
+ streamsGroupCountStableMetricName,
+ (Gauge) (config, now) -> numStreamsGroups(StreamsGroupState.STABLE)
+ );
+
+ metrics.addMetric(
+ streamsGroupCountDeadMetricName,
+ (Gauge) (config, now) -> numStreamsGroups(StreamsGroupState.DEAD)
+ );
+
+ metrics.addMetric(
+ streamsGroupCountNotReadyMetricName,
+ (Gauge) (config, now) -> numStreamsGroups(StreamsGroupState.NOT_READY)
+ );
}
}
diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetricsShard.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetricsShard.java
index 1ed75229f58..8b814bb0b23 100644
--- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetricsShard.java
+++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetricsShard.java
@@ -23,6 +23,7 @@ import org.apache.kafka.coordinator.common.runtime.CoordinatorMetricsShard;
import org.apache.kafka.coordinator.group.classic.ClassicGroupState;
import org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroup.ConsumerGroupState;
import org.apache.kafka.coordinator.group.modern.share.ShareGroup;
+import org.apache.kafka.coordinator.group.streams.StreamsGroup.StreamsGroupState;
import org.apache.kafka.timeline.SnapshotRegistry;
import org.apache.kafka.timeline.TimelineLong;
@@ -78,6 +79,11 @@ public class GroupCoordinatorMetricsShard implements CoordinatorMetricsShard {
*/
private final Map shareGroupGauges;
+ /**
+ * Streams group size gauge counters keyed by the metric name.
+ */
+ private final Map streamsGroupGauges;
+
/**
* All sensors keyed by the sensor name. A Sensor object is shared across all metrics shards.
*/
@@ -119,6 +125,21 @@ public class GroupCoordinatorMetricsShard implements CoordinatorMetricsShard {
new TimelineGaugeCounter(new TimelineLong(snapshotRegistry), new AtomicLong(0)))
);
+ this.streamsGroupGauges = Utils.mkMap(
+ Utils.mkEntry(StreamsGroupState.EMPTY,
+ new TimelineGaugeCounter(new TimelineLong(snapshotRegistry), new AtomicLong(0))),
+ Utils.mkEntry(StreamsGroupState.ASSIGNING,
+ new TimelineGaugeCounter(new TimelineLong(snapshotRegistry), new AtomicLong(0))),
+ Utils.mkEntry(StreamsGroupState.RECONCILING,
+ new TimelineGaugeCounter(new TimelineLong(snapshotRegistry), new AtomicLong(0))),
+ Utils.mkEntry(StreamsGroupState.STABLE,
+ new TimelineGaugeCounter(new TimelineLong(snapshotRegistry), new AtomicLong(0))),
+ Utils.mkEntry(StreamsGroupState.DEAD,
+ new TimelineGaugeCounter(new TimelineLong(snapshotRegistry), new AtomicLong(0))),
+ Utils.mkEntry(StreamsGroupState.NOT_READY,
+ new TimelineGaugeCounter(new TimelineLong(snapshotRegistry), new AtomicLong(0)))
+ );
+
this.globalSensors = Objects.requireNonNull(globalSensors);
this.topicPartition = Objects.requireNonNull(topicPartition);
}
@@ -144,6 +165,20 @@ public class GroupCoordinatorMetricsShard implements CoordinatorMetricsShard {
this.consumerGroupGauges = consumerGroupGauges;
}
+ /**
+ * Increment the number of streams groups.
+ *
+ * @param state the streams group state.
+ */
+ public void incrementNumStreamsGroups(StreamsGroupState state) {
+ TimelineGaugeCounter gaugeCounter = streamsGroupGauges.get(state);
+ if (gaugeCounter != null) {
+ synchronized (gaugeCounter.timelineLong) {
+ gaugeCounter.timelineLong.increment();
+ }
+ }
+ }
+
/**
* Decrement the number of offsets.
*/
@@ -153,6 +188,20 @@ public class GroupCoordinatorMetricsShard implements CoordinatorMetricsShard {
}
}
+ /**
+ * Decrement the number of streams groups.
+ *
+ * @param state the streams group state.
+ */
+ public void decrementNumStreamsGroups(StreamsGroupState state) {
+ TimelineGaugeCounter gaugeCounter = streamsGroupGauges.get(state);
+ if (gaugeCounter != null) {
+ synchronized (gaugeCounter.timelineLong) {
+ gaugeCounter.timelineLong.decrement();
+ }
+ }
+ }
+
/**
* @return The number of offsets.
*/
@@ -205,6 +254,29 @@ public class GroupCoordinatorMetricsShard implements CoordinatorMetricsShard {
return consumerGroupGauges.values().stream()
.mapToLong(Long::longValue).sum();
}
+
+ /**
+ * Obtain the number of streams groups in the specified state.
+ *
+ * @param state the streams group state.
+ *
+ * @return The number of streams groups in `state`.
+ */
+ public long numStreamsGroups(StreamsGroupState state) {
+ TimelineGaugeCounter gaugeCounter = streamsGroupGauges.get(state);
+ if (gaugeCounter != null) {
+ return gaugeCounter.atomicLong.get();
+ }
+ return 0L;
+ }
+
+ /**
+ * @return The total number of streams groups.
+ */
+ public long numStreamsGroups() {
+ return streamsGroupGauges.values().stream()
+ .mapToLong(timelineGaugeCounter -> timelineGaugeCounter.atomicLong.get()).sum();
+ }
@Override
public void record(String sensorName) {
@@ -246,6 +318,14 @@ public class GroupCoordinatorMetricsShard implements CoordinatorMetricsShard {
}
gaugeCounter.atomicLong.set(value);
});
+
+ this.streamsGroupGauges.forEach((__, gaugeCounter) -> {
+ long value;
+ synchronized (gaugeCounter.timelineLong) {
+ value = gaugeCounter.timelineLong.get(offset);
+ }
+ gaugeCounter.atomicLong.set(value);
+ });
}
/**
@@ -330,4 +410,66 @@ public class GroupCoordinatorMetricsShard implements CoordinatorMetricsShard {
}
}
}
+
+ /**
+ * Called when a streams group's state has changed. Increment/decrement
+ * the counter accordingly.
+ *
+ * @param oldState The previous state. null value means that it's a new group.
+ * @param newState The next state. null value means that the group has been removed.
+ */
+ public void onStreamsGroupStateTransition(
+ StreamsGroupState oldState,
+ StreamsGroupState newState
+ ) {
+ if (newState != null) {
+ switch (newState) {
+ case EMPTY:
+ incrementNumStreamsGroups(StreamsGroupState.EMPTY);
+ break;
+ case NOT_READY:
+ incrementNumStreamsGroups(StreamsGroupState.NOT_READY);
+ break;
+ case ASSIGNING:
+ incrementNumStreamsGroups(StreamsGroupState.ASSIGNING);
+ break;
+ case RECONCILING:
+ incrementNumStreamsGroups(StreamsGroupState.RECONCILING);
+ break;
+ case STABLE:
+ incrementNumStreamsGroups(StreamsGroupState.STABLE);
+ break;
+ case DEAD:
+ incrementNumStreamsGroups(StreamsGroupState.DEAD);
+ break;
+ default:
+ throw new IllegalArgumentException("Unknown new state for streams group: " + newState);
+ }
+ }
+
+ if (oldState != null) {
+ switch (oldState) {
+ case EMPTY:
+ decrementNumStreamsGroups(StreamsGroupState.EMPTY);
+ break;
+ case NOT_READY:
+ decrementNumStreamsGroups(StreamsGroupState.NOT_READY);
+ break;
+ case ASSIGNING:
+ decrementNumStreamsGroups(StreamsGroupState.ASSIGNING);
+ break;
+ case RECONCILING:
+ decrementNumStreamsGroups(StreamsGroupState.RECONCILING);
+ break;
+ case STABLE:
+ decrementNumStreamsGroups(StreamsGroupState.STABLE);
+ break;
+ case DEAD:
+ decrementNumStreamsGroups(StreamsGroupState.DEAD);
+ break;
+ default:
+ throw new IllegalArgumentException("Unknown old state for streams group: " + newState);
+ }
+ }
+ }
}
diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsCoordinatorRecordHelpers.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsCoordinatorRecordHelpers.java
index c44e5d89713..881f930d8a5 100644
--- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsCoordinatorRecordHelpers.java
+++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsCoordinatorRecordHelpers.java
@@ -26,7 +26,6 @@ import org.apache.kafka.coordinator.group.generated.StreamsGroupMetadataKey;
import org.apache.kafka.coordinator.group.generated.StreamsGroupMetadataValue;
import org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataKey;
import org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataValue;
-import org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataValue.PartitionMetadata;
import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberKey;
import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberValue;
import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMetadataKey;
@@ -116,21 +115,10 @@ public class StreamsCoordinatorRecordHelpers {
StreamsGroupPartitionMetadataValue value = new StreamsGroupPartitionMetadataValue();
newPartitionMetadata.forEach((topicName, topicMetadata) -> {
- List partitionMetadata = new ArrayList<>();
- if (!topicMetadata.partitionRacks().isEmpty()) {
- topicMetadata.partitionRacks().forEach((partition, racks) ->
- partitionMetadata.add(new StreamsGroupPartitionMetadataValue.PartitionMetadata()
- .setPartition(partition)
- .setRacks(racks.stream().sorted().toList())
- )
- );
- }
- partitionMetadata.sort(Comparator.comparingInt(PartitionMetadata::partition));
value.topics().add(new StreamsGroupPartitionMetadataValue.TopicMetadata()
.setTopicId(topicMetadata.id())
.setTopicName(topicMetadata.name())
.setNumPartitions(topicMetadata.numPartitions())
- .setPartitionMetadata(partitionMetadata)
);
});
diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java
new file mode 100644
index 00000000000..d161e64f599
--- /dev/null
+++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java
@@ -0,0 +1,1012 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams;
+
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.StaleMemberEpochException;
+import org.apache.kafka.common.errors.UnknownMemberIdException;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
+import org.apache.kafka.common.message.ListGroupsResponseData;
+import org.apache.kafka.common.message.StreamsGroupDescribeResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.JoinGroupRequest;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
+import org.apache.kafka.coordinator.group.Group;
+import org.apache.kafka.coordinator.group.OffsetExpirationCondition;
+import org.apache.kafka.coordinator.group.OffsetExpirationConditionImpl;
+import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetricsShard;
+import org.apache.kafka.coordinator.group.streams.topics.ConfiguredSubtopology;
+import org.apache.kafka.coordinator.group.streams.topics.ConfiguredTopology;
+import org.apache.kafka.coordinator.group.streams.topics.InternalTopicManager;
+import org.apache.kafka.image.TopicImage;
+import org.apache.kafka.image.TopicsImage;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.apache.kafka.timeline.TimelineInteger;
+import org.apache.kafka.timeline.TimelineObject;
+
+import org.slf4j.Logger;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.TreeMap;
+
+import static org.apache.kafka.coordinator.group.streams.StreamsGroup.StreamsGroupState.ASSIGNING;
+import static org.apache.kafka.coordinator.group.streams.StreamsGroup.StreamsGroupState.EMPTY;
+import static org.apache.kafka.coordinator.group.streams.StreamsGroup.StreamsGroupState.NOT_READY;
+import static org.apache.kafka.coordinator.group.streams.StreamsGroup.StreamsGroupState.RECONCILING;
+import static org.apache.kafka.coordinator.group.streams.StreamsGroup.StreamsGroupState.STABLE;
+
+/**
+ * A Streams Group. All the metadata in this class are backed by records in the __consumer_offsets partitions.
+ */
+public class StreamsGroup implements Group {
+
+ /**
+ * The protocol type for streams groups. There is only one protocol type, "streams".
+ */
+ private static final String PROTOCOL_TYPE = "streams";
+
+ public enum StreamsGroupState {
+ EMPTY("Empty"),
+ NOT_READY("NotReady"),
+ ASSIGNING("Assigning"),
+ RECONCILING("Reconciling"),
+ STABLE("Stable"),
+ DEAD("Dead");
+
+ private final String name;
+
+ private final String lowerCaseName;
+
+ StreamsGroupState(String name) {
+ this.name = name;
+ if (Objects.equals(name, "NotReady")) {
+ this.lowerCaseName = "not_ready";
+ } else {
+ this.lowerCaseName = name.toLowerCase(Locale.ROOT);
+ }
+ }
+
+ @Override
+ public String toString() {
+ return name;
+ }
+
+ public String toLowerCaseString() {
+ return lowerCaseName;
+ }
+ }
+
+ public static class DeadlineAndEpoch {
+
+ static final DeadlineAndEpoch EMPTY = new DeadlineAndEpoch(0L, 0);
+
+ public final long deadlineMs;
+ public final int epoch;
+
+ DeadlineAndEpoch(long deadlineMs, int epoch) {
+ this.deadlineMs = deadlineMs;
+ this.epoch = epoch;
+ }
+ }
+
+ private final LogContext logContext;
+ private final Logger log;
+
+ /**
+ * The snapshot registry.
+ */
+ private final SnapshotRegistry snapshotRegistry;
+
+ /**
+ * The group ID.
+ */
+ private final String groupId;
+
+ /**
+ * The group state.
+ */
+ private final TimelineObject state;
+
+ /**
+ * The group epoch. The epoch is incremented whenever the topology, topic metadata or the set of members changes and it will trigger
+ * the computation of a new assignment for the group.
+ */
+ private final TimelineInteger groupEpoch;
+
+ /**
+ * The group members.
+ */
+ private final TimelineHashMap members;
+
+ /**
+ * The static group members.
+ */
+ private final TimelineHashMap staticMembers;
+
+ /**
+ * The metadata associated with each subscribed topic name.
+ */
+ private final TimelineHashMap partitionMetadata;
+
+ /**
+ * The target assignment epoch. An assignment epoch smaller than the group epoch means that a new assignment is required. The assignment
+ * epoch is updated when a new assignment is installed.
+ */
+ private final TimelineInteger targetAssignmentEpoch;
+
+ /**
+ * The target assignment per member ID.
+ */
+ private final TimelineHashMap targetAssignment;
+
+ /**
+ * These maps map each active/standby/warmup task to the process ID(s) of their current owner.
+ * The mapping is of the form subtopology -> partition -> memberId
.
+ * When a member revokes a partition, it removes its process ID from this map.
+ * When a member gets a partition, it adds its process ID to this map.
+ */
+ private final TimelineHashMap> currentActiveTaskToProcessId;
+ private final TimelineHashMap>> currentStandbyTaskToProcessIds;
+ private final TimelineHashMap>> currentWarmupTaskToProcessIds;
+
+ /**
+ * The coordinator metrics.
+ */
+ private final GroupCoordinatorMetricsShard metrics;
+
+ /**
+ * The Streams topology.
+ */
+ private final TimelineObject> topology;
+
+ /**
+ * The configured topology including resolved regular expressions.
+ */
+ private final TimelineObject> configuredTopology;
+
+ /**
+ * The metadata refresh deadline. It consists of a timestamp in milliseconds together with the group epoch at the time of setting it.
+ * The metadata refresh time is considered as a soft state (read that it is not stored in a timeline data structure). It is like this
+ * because it is not persisted to the log. The group epoch is here to ensure that the metadata refresh deadline is invalidated if the
+ * group epoch does not correspond to the current group epoch. This can happen if the metadata refresh deadline is updated after having
+ * refreshed the metadata but the write operation failed. In this case, the time is not automatically rolled back.
+ */
+ private DeadlineAndEpoch metadataRefreshDeadline = DeadlineAndEpoch.EMPTY;
+
+ public StreamsGroup(
+ LogContext logContext,
+ SnapshotRegistry snapshotRegistry,
+ String groupId,
+ GroupCoordinatorMetricsShard metrics
+ ) {
+ this.log = logContext.logger(StreamsGroup.class);
+ this.logContext = logContext;
+ this.snapshotRegistry = Objects.requireNonNull(snapshotRegistry);
+ this.groupId = Objects.requireNonNull(groupId);
+ this.state = new TimelineObject<>(snapshotRegistry, EMPTY);
+ this.groupEpoch = new TimelineInteger(snapshotRegistry);
+ this.members = new TimelineHashMap<>(snapshotRegistry, 0);
+ this.staticMembers = new TimelineHashMap<>(snapshotRegistry, 0);
+ this.partitionMetadata = new TimelineHashMap<>(snapshotRegistry, 0);
+ this.targetAssignmentEpoch = new TimelineInteger(snapshotRegistry);
+ this.targetAssignment = new TimelineHashMap<>(snapshotRegistry, 0);
+ this.currentActiveTaskToProcessId = new TimelineHashMap<>(snapshotRegistry, 0);
+ this.currentStandbyTaskToProcessIds = new TimelineHashMap<>(snapshotRegistry, 0);
+ this.currentWarmupTaskToProcessIds = new TimelineHashMap<>(snapshotRegistry, 0);
+ this.metrics = Objects.requireNonNull(metrics);
+ this.topology = new TimelineObject<>(snapshotRegistry, Optional.empty());
+ this.configuredTopology = new TimelineObject<>(snapshotRegistry, Optional.empty());
+ }
+
+ /**
+ * @return The group type (Streams).
+ */
+ @Override
+ public GroupType type() {
+ return GroupType.STREAMS;
+ }
+
+ /**
+ * @return The current state as a String.
+ */
+ @Override
+ public String stateAsString() {
+ return state.get().toString();
+ }
+
+ /**
+ * @return The current state as a String with given committedOffset.
+ */
+ public String stateAsString(long committedOffset) {
+ return state.get(committedOffset).toString();
+ }
+
+ /**
+ * @return the group formatted as a list group response based on the committed offset.
+ */
+ public ListGroupsResponseData.ListedGroup asListedGroup(long committedOffset) {
+ return new ListGroupsResponseData.ListedGroup()
+ .setGroupId(groupId)
+ .setProtocolType(PROTOCOL_TYPE)
+ .setGroupState(state.get(committedOffset).toString())
+ .setGroupType(type().toString());
+ }
+
+ public Optional configuredTopology() {
+ return configuredTopology.get();
+ }
+
+ public Optional topology() {
+ return topology.get();
+ }
+
+ public void setTopology(StreamsTopology topology) {
+ this.topology.set(Optional.of(topology));
+ maybeUpdateConfiguredTopology();
+ maybeUpdateGroupState();
+ }
+
+ /**
+ * @return The group ID.
+ */
+ @Override
+ public String groupId() {
+ return groupId;
+ }
+
+ /**
+ * @return The current state.
+ */
+ public StreamsGroupState state() {
+ return state.get();
+ }
+
+ /**
+ * @return The group epoch.
+ */
+ public int groupEpoch() {
+ return groupEpoch.get();
+ }
+
+ /**
+ * Sets the group epoch.
+ *
+ * @param groupEpoch The new group epoch.
+ */
+ public void setGroupEpoch(int groupEpoch) {
+ this.groupEpoch.set(groupEpoch);
+ maybeUpdateGroupState();
+ }
+
+ /**
+ * @return The target assignment epoch.
+ */
+ public int assignmentEpoch() {
+ return targetAssignmentEpoch.get();
+ }
+
+ /**
+ * Sets the assignment epoch.
+ *
+ * @param targetAssignmentEpoch The new assignment epoch.
+ */
+ public void setTargetAssignmentEpoch(int targetAssignmentEpoch) {
+ this.targetAssignmentEpoch.set(targetAssignmentEpoch);
+ maybeUpdateGroupState();
+ }
+
+ /**
+ * Get member ID of a static member that matches the given group instance ID.
+ *
+ * @param groupInstanceId The group instance ID.
+ * @return The member ID corresponding to the given instance ID or null if it does not exist
+ */
+ public String staticMemberId(String groupInstanceId) {
+ return staticMembers.get(groupInstanceId);
+ }
+
+ /**
+ * Gets or creates a new member but without adding it to the group. Adding a member is done via the
+ * {@link StreamsGroup#updateMember(StreamsGroupMember)} method.
+ *
+ * @param memberId The member ID.
+ * @param createIfNotExists Booleans indicating whether the member must be created if it does not exist.
+ * @return A StreamsGroupMember.
+ */
+ public StreamsGroupMember getOrMaybeCreateMember(
+ String memberId,
+ boolean createIfNotExists
+ ) {
+ StreamsGroupMember member = members.get(memberId);
+ if (member != null) {
+ return member;
+ }
+
+ if (!createIfNotExists) {
+ throw new UnknownMemberIdException(
+ String.format("Member %s is not a member of group %s.", memberId, groupId)
+ );
+ }
+
+ return new StreamsGroupMember.Builder(memberId).build();
+ }
+
+ /**
+ * Gets a static member.
+ *
+ * @param instanceId The group instance ID.
+ * @return The member corresponding to the given instance ID or null if it does not exist
+ */
+ public StreamsGroupMember staticMember(String instanceId) {
+ String existingMemberId = staticMemberId(instanceId);
+ return existingMemberId == null ? null : getOrMaybeCreateMember(existingMemberId, false);
+ }
+
+ /**
+ * Adds or updates the member.
+ *
+ * @param newMember The new member state.
+ */
+ public void updateMember(StreamsGroupMember newMember) {
+ if (newMember == null) {
+ throw new IllegalArgumentException("newMember cannot be null.");
+ }
+ StreamsGroupMember oldMember = members.put(newMember.memberId(), newMember);
+ maybeUpdateTaskProcessId(oldMember, newMember);
+ updateStaticMember(newMember);
+ maybeUpdateGroupState();
+ }
+
+ /**
+ * Updates the member ID stored against the instance ID if the member is a static member.
+ *
+ * @param newMember The new member state.
+ */
+ private void updateStaticMember(StreamsGroupMember newMember) {
+ if (newMember.instanceId() != null && newMember.instanceId().isPresent()) {
+ staticMembers.put(newMember.instanceId().get(), newMember.memberId());
+ }
+ }
+
+ /**
+ * Remove the member from the group.
+ *
+ * @param memberId The member ID to remove.
+ */
+ public void removeMember(String memberId) {
+ StreamsGroupMember oldMember = members.remove(memberId);
+ maybeRemoveTaskProcessId(oldMember);
+ removeStaticMember(oldMember);
+ maybeUpdateGroupState();
+ }
+
+ /**
+ * Remove the static member mapping if the removed member is static.
+ *
+ * @param oldMember The member to remove.
+ */
+ private void removeStaticMember(StreamsGroupMember oldMember) {
+ if (oldMember.instanceId() != null && oldMember.instanceId().isPresent()) {
+ staticMembers.remove(oldMember.instanceId().get());
+ }
+ }
+
+ /**
+ * Returns true if the member exists.
+ *
+ * @param memberId The member ID.
+ * @return A boolean indicating whether the member exists or not.
+ */
+ public boolean hasMember(String memberId) {
+ return members.containsKey(memberId);
+ }
+
+ /**
+ * @return The number of members.
+ */
+ public int numMembers() {
+ return members.size();
+ }
+
+ /**
+ * @return An immutable map containing all the members keyed by their ID.
+ */
+ public Map members() {
+ return Collections.unmodifiableMap(members);
+ }
+
+ /**
+ * @return An immutable map containing all the static members keyed by instance ID.
+ */
+ public Map staticMembers() {
+ return Collections.unmodifiableMap(staticMembers);
+ }
+
+ /**
+ * Returns the target assignment of the member.
+ *
+ * @return The StreamsGroupMemberAssignment or an EMPTY one if it does not exist.
+ */
+ public TasksTuple targetAssignment(String memberId) {
+ return targetAssignment.getOrDefault(memberId, TasksTuple.EMPTY);
+ }
+
+ /**
+ * Updates the target assignment of a member.
+ *
+ * @param memberId The member ID.
+ * @param newTargetAssignment The new target assignment.
+ */
+ public void updateTargetAssignment(String memberId, TasksTuple newTargetAssignment) {
+ targetAssignment.put(memberId, newTargetAssignment);
+ }
+
+ /**
+ * @return An immutable map containing all the target assignment keyed by member ID.
+ */
+ public Map targetAssignment() {
+ return Collections.unmodifiableMap(targetAssignment);
+ }
+
+ /**
+ * Returns the current process ID of a task or null if the task does not have one.
+ *
+ * @param subtopologyId The topic ID.
+ * @param taskId The task ID.
+ * @return The process ID or null.
+ */
+ public String currentActiveTaskProcessId(
+ String subtopologyId, int taskId
+ ) {
+ Map tasks = currentActiveTaskToProcessId.get(subtopologyId);
+ if (tasks == null) {
+ return null;
+ } else {
+ return tasks.getOrDefault(taskId, null);
+ }
+ }
+
+ /**
+ * Returns the current process IDs of a task or empty set if the task does not have one.
+ *
+ * @param subtopologyId The topic ID.
+ * @param taskId The task ID.
+ * @return The process IDs or empty set.
+ */
+ public Set currentStandbyTaskProcessIds(
+ String subtopologyId, int taskId
+ ) {
+ Map> tasks = currentStandbyTaskToProcessIds.get(subtopologyId);
+ if (tasks == null) {
+ return Collections.emptySet();
+ } else {
+ return tasks.getOrDefault(taskId, Collections.emptySet());
+ }
+ }
+
+ /**
+ * Returns the current process ID of a task or empty set if the task does not have one.
+ *
+ * @param subtopologyId The topic ID.
+ * @param taskId The process ID.
+ * @return The member IDs or empty set.
+ */
+ public Set currentWarmupTaskProcessIds(
+ String subtopologyId, int taskId
+ ) {
+ Map> tasks = currentWarmupTaskToProcessIds.get(subtopologyId);
+ if (tasks == null) {
+ return Collections.emptySet();
+ } else {
+ return tasks.getOrDefault(taskId, Collections.emptySet());
+ }
+ }
+
+ /**
+ * @return An immutable map of partition metadata for each topic that are inputs for this streams group.
+ */
+ public Map partitionMetadata() {
+ return Collections.unmodifiableMap(partitionMetadata);
+ }
+
+ /**
+ * Updates the partition metadata. This replaces the previous one.
+ *
+ * @param partitionMetadata The new partition metadata.
+ */
+ public void setPartitionMetadata(
+ Map partitionMetadata
+ ) {
+ this.partitionMetadata.clear();
+ this.partitionMetadata.putAll(partitionMetadata);
+ maybeUpdateConfiguredTopology();
+ maybeUpdateGroupState();
+ }
+
+ /**
+ * Computes the partition metadata based on the current topology and the current topics image.
+ *
+ * @param topicsImage The current metadata for all available topics.
+ * @param topology The current metadata for the Streams topology
+ * @return An immutable map of partition metadata for each topic that the Streams topology is using (besides non-repartition sink topics)
+ */
+ public Map computePartitionMetadata(
+ TopicsImage topicsImage,
+ StreamsTopology topology
+ ) {
+ Set requiredTopicNames = topology.requiredTopics();
+
+ // Create the topic metadata for each subscribed topic.
+ Map newPartitionMetadata = new HashMap<>(requiredTopicNames.size());
+
+ requiredTopicNames.forEach(topicName -> {
+ TopicImage topicImage = topicsImage.getTopic(topicName);
+ if (topicImage != null) {
+ newPartitionMetadata.put(topicName, new TopicMetadata(
+ topicImage.id(),
+ topicImage.name(),
+ topicImage.partitions().size())
+ );
+ }
+ });
+
+ return Collections.unmodifiableMap(newPartitionMetadata);
+ }
+
+ /**
+ * Updates the metadata refresh deadline.
+ *
+ * @param deadlineMs The deadline in milliseconds.
+ * @param groupEpoch The associated group epoch.
+ */
+ public void setMetadataRefreshDeadline(
+ long deadlineMs,
+ int groupEpoch
+ ) {
+ this.metadataRefreshDeadline = new DeadlineAndEpoch(deadlineMs, groupEpoch);
+ }
+
+ /**
+ * Requests a metadata refresh.
+ */
+ public void requestMetadataRefresh() {
+ this.metadataRefreshDeadline = DeadlineAndEpoch.EMPTY;
+ }
+
+ /**
+ * Checks if a metadata refresh is required. A refresh is required in two cases: 1) The deadline is smaller or equal to the current
+ * time; 2) The group epoch associated with the deadline is larger than the current group epoch. This means that the operations which
+ * updated the deadline failed.
+ *
+ * @param currentTimeMs The current time in milliseconds.
+ * @return A boolean indicating whether a refresh is required or not.
+ */
+ public boolean hasMetadataExpired(long currentTimeMs) {
+ return currentTimeMs >= metadataRefreshDeadline.deadlineMs || groupEpoch() < metadataRefreshDeadline.epoch;
+ }
+
+ /**
+ * @return The metadata refresh deadline.
+ */
+ public DeadlineAndEpoch metadataRefreshDeadline() {
+ return metadataRefreshDeadline;
+ }
+
+ /**
+ * Validates the OffsetCommit request.
+ *
+ * @param memberId The member ID.
+ * @param groupInstanceId The group instance ID.
+ * @param memberEpoch The member epoch.
+ * @param isTransactional Whether the offset commit is transactional or not.
+ * @param apiVersion The api version.
+ * @throws UnknownMemberIdException If the member is not found.
+ * @throws StaleMemberEpochException If the provided member epoch doesn't match the actual member epoch.
+ */
+ @Override
+ public void validateOffsetCommit(
+ String memberId,
+ String groupInstanceId,
+ int memberEpoch,
+ boolean isTransactional,
+ short apiVersion
+ ) throws UnknownMemberIdException, StaleMemberEpochException {
+ // When the member epoch is -1, the request comes from either the admin client
+ // or a consumer which does not use the group management facility. In this case,
+ // the request can commit offsets if the group is empty.
+ if (memberEpoch < 0 && members().isEmpty()) return;
+
+ // The TxnOffsetCommit API does not require the member ID, the generation ID and the group instance ID fields.
+ // Hence, they are only validated if any of them is provided
+ if (isTransactional && memberEpoch == JoinGroupRequest.UNKNOWN_GENERATION_ID &&
+ memberId.equals(JoinGroupRequest.UNKNOWN_MEMBER_ID) && groupInstanceId == null)
+ return;
+
+ final StreamsGroupMember member = getOrMaybeCreateMember(memberId, false);
+
+ // If the commit is not transactional and the member uses the new streams protocol (KIP-1071),
+ // the member should be using the OffsetCommit API version >= 9.
+ if (!isTransactional && apiVersion < 9) {
+ throw new UnsupportedVersionException("OffsetCommit version 9 or above must be used " +
+ "by members using the streams group protocol");
+ }
+
+ validateMemberEpoch(memberEpoch, member.memberEpoch());
+ }
+
+ /**
+ * Validates the OffsetFetch request.
+ *
+ * @param memberId The member ID for streams groups.
+ * @param memberEpoch The member epoch for streams groups.
+ * @param lastCommittedOffset The last committed offsets in the timeline.
+ */
+ @Override
+ public void validateOffsetFetch(
+ String memberId,
+ int memberEpoch,
+ long lastCommittedOffset
+ ) throws UnknownMemberIdException, StaleMemberEpochException {
+ // When the member ID is null and the member epoch is -1, the request either comes
+ // from the admin client or from a client which does not provide them. In this case,
+ // the fetch request is accepted.
+ if (memberId == null && memberEpoch < 0) {
+ return;
+ }
+
+ final StreamsGroupMember member = members.get(memberId, lastCommittedOffset);
+ if (member == null) {
+ throw new UnknownMemberIdException(String.format("Member %s is not a member of group %s.",
+ memberId, groupId));
+ }
+ validateMemberEpoch(memberEpoch, member.memberEpoch());
+ }
+
+ /**
+ * Validates the OffsetDelete request.
+ */
+ @Override
+ public void validateOffsetDelete() {
+ }
+
+ /**
+ * Validates the DeleteGroups request.
+ */
+ @Override
+ public void validateDeleteGroup() throws ApiException {
+ if (state() != StreamsGroupState.EMPTY) {
+ throw Errors.NON_EMPTY_GROUP.exception();
+ }
+ }
+
+ @Override
+ public boolean isSubscribedToTopic(String topic) {
+ Optional maybeConfiguredTopology = configuredTopology.get();
+ if (maybeConfiguredTopology.isEmpty() || !maybeConfiguredTopology.get().isReady()) {
+ return false;
+ }
+ for (ConfiguredSubtopology sub : maybeConfiguredTopology.get().subtopologies().orElse(new TreeMap<>()).values()) {
+ if (sub.sourceTopics().contains(topic) || sub.repartitionSourceTopics().containsKey(topic)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Populates the list of records with tombstone(s) for deleting the group.
+ *
+ * @param records The list of records.
+ */
+ @Override
+ public void createGroupTombstoneRecords(List records) {
+ members().forEach((memberId, member) ->
+ records.add(StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentTombstoneRecord(groupId(), memberId))
+ );
+
+ members().forEach((memberId, member) ->
+ records.add(StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentTombstoneRecord(groupId(), memberId))
+ );
+ records.add(StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochTombstoneRecord(groupId()));
+
+ members().forEach((memberId, member) ->
+ records.add(StreamsCoordinatorRecordHelpers.newStreamsGroupMemberTombstoneRecord(groupId(), memberId))
+ );
+
+ records.add(StreamsCoordinatorRecordHelpers.newStreamsGroupPartitionMetadataTombstoneRecord(groupId()));
+ records.add(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochTombstoneRecord(groupId()));
+ records.add(StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecordTombstone(groupId()));
+ }
+
+ @Override
+ public boolean isEmpty() {
+ return state() == StreamsGroupState.EMPTY;
+ }
+
+ /**
+ * See {@link org.apache.kafka.coordinator.group.OffsetExpirationCondition}
+ *
+ * @return The offset expiration condition for the group or Empty if no such condition exists.
+ */
+ @Override
+ public Optional offsetExpirationCondition() {
+ return Optional.of(new OffsetExpirationConditionImpl(offsetAndMetadata -> offsetAndMetadata.commitTimestampMs));
+ }
+
+ @Override
+ public boolean isInStates(Set statesFilter, long committedOffset) {
+ return statesFilter.contains(state.get(committedOffset).toLowerCaseString());
+ }
+
+ /**
+ * Throws a StaleMemberEpochException if the received member epoch does not match the expected member epoch.
+ */
+ private void validateMemberEpoch(
+ int receivedMemberEpoch,
+ int expectedMemberEpoch
+ ) throws StaleMemberEpochException {
+ if (receivedMemberEpoch != expectedMemberEpoch) {
+ throw new StaleMemberEpochException(String.format("The received member epoch %d does not match "
+ + "the expected member epoch %d.", receivedMemberEpoch, expectedMemberEpoch));
+ }
+ }
+
+ /**
+ * Updates the current state of the group.
+ */
+ private void maybeUpdateGroupState() {
+ StreamsGroupState previousState = state.get();
+ StreamsGroupState newState = STABLE;
+ if (members.isEmpty()) {
+ newState = EMPTY;
+ } else if (topology() == null || configuredTopology().isEmpty() || !configuredTopology().get().isReady()) {
+ newState = NOT_READY;
+ } else if (groupEpoch.get() > targetAssignmentEpoch.get()) {
+ newState = ASSIGNING;
+ } else {
+ for (StreamsGroupMember member : members.values()) {
+ if (!member.isReconciledTo(targetAssignmentEpoch.get())) {
+ newState = RECONCILING;
+ break;
+ }
+ }
+ }
+
+ state.set(newState);
+ metrics.onStreamsGroupStateTransition(previousState, newState);
+ }
+
+ private void maybeUpdateConfiguredTopology() {
+ if (topology.get().isPresent()) {
+ final StreamsTopology streamsTopology = topology.get().get();
+
+ log.info("[GroupId {}] Configuring the topology {}", groupId, streamsTopology);
+ this.configuredTopology.set(Optional.of(InternalTopicManager.configureTopics(logContext, streamsTopology, partitionMetadata)));
+
+ } else {
+ configuredTopology.set(Optional.empty());
+ }
+ }
+
+ /**
+ * Updates the tasks process IDs based on the old and the new member.
+ *
+ * @param oldMember The old member.
+ * @param newMember The new member.
+ */
+ private void maybeUpdateTaskProcessId(
+ StreamsGroupMember oldMember,
+ StreamsGroupMember newMember
+ ) {
+ maybeRemoveTaskProcessId(oldMember);
+ addTaskProcessId(
+ newMember.assignedTasks(),
+ newMember.processId()
+ );
+ addTaskProcessId(
+ newMember.tasksPendingRevocation(),
+ newMember.processId()
+ );
+ }
+
+ /**
+ * Removes the task process IDs for the provided member.
+ *
+ * @param oldMember The old member.
+ */
+ private void maybeRemoveTaskProcessId(
+ StreamsGroupMember oldMember
+ ) {
+ if (oldMember != null) {
+ removeTaskProcessIds(oldMember.assignedTasks(), oldMember.processId());
+ removeTaskProcessIds(oldMember.tasksPendingRevocation(), oldMember.processId());
+ }
+ }
+
+ void removeTaskProcessIds(
+ TasksTuple tasks,
+ String processId
+ ) {
+ if (tasks != null) {
+ removeTaskProcessIds(tasks.activeTasks(), currentActiveTaskToProcessId, processId);
+ removeTaskProcessIdsFromSet(tasks.standbyTasks(), currentStandbyTaskToProcessIds, processId);
+ removeTaskProcessIdsFromSet(tasks.warmupTasks(), currentWarmupTaskToProcessIds, processId);
+ }
+ }
+
+ /**
+ * Removes the task process IDs based on the provided assignment.
+ *
+ * @param assignment The assignment.
+ * @param expectedProcessId The expected process ID.
+ * @throws IllegalStateException if the process ID does not match the expected one. package-private for testing.
+ */
+ private void removeTaskProcessIds(
+ Map> assignment,
+ TimelineHashMap> currentTasksProcessId,
+ String expectedProcessId
+ ) {
+ assignment.forEach((subtopologyId, assignedPartitions) -> {
+ currentTasksProcessId.compute(subtopologyId, (__, partitionsOrNull) -> {
+ if (partitionsOrNull != null) {
+ assignedPartitions.forEach(partitionId -> {
+ String prevValue = partitionsOrNull.remove(partitionId);
+ if (!Objects.equals(prevValue, expectedProcessId)) {
+ throw new IllegalStateException(
+ String.format("Cannot remove the process ID %s from task %s_%s because the partition is " +
+ "still owned at a different process ID %s", expectedProcessId, subtopologyId, partitionId, prevValue));
+ }
+ });
+ if (partitionsOrNull.isEmpty()) {
+ return null;
+ } else {
+ return partitionsOrNull;
+ }
+ } else {
+ throw new IllegalStateException(
+ String.format("Cannot remove the process ID %s from %s because it does not have any processId",
+ expectedProcessId, subtopologyId));
+ }
+ });
+ });
+ }
+
+ /**
+ * Removes the task process IDs based on the provided assignment.
+ *
+ * @param assignment The assignment.
+ * @param processIdToRemove The expected process ID.
+ * @throws IllegalStateException if the process ID does not match the expected one. package-private for testing.
+ */
+ private void removeTaskProcessIdsFromSet(
+ Map> assignment,
+ TimelineHashMap>> currentTasksProcessId,
+ String processIdToRemove
+ ) {
+ assignment.forEach((subtopologyId, assignedPartitions) -> {
+ currentTasksProcessId.compute(subtopologyId, (__, partitionsOrNull) -> {
+ if (partitionsOrNull != null) {
+ assignedPartitions.forEach(partitionId -> {
+ if (!partitionsOrNull.get(partitionId).remove(processIdToRemove)) {
+ throw new IllegalStateException(
+ String.format("Cannot remove the process ID %s from task %s_%s because the task is " +
+ "not owned by this process ID", processIdToRemove, subtopologyId, partitionId));
+ }
+ });
+ if (partitionsOrNull.isEmpty()) {
+ return null;
+ } else {
+ return partitionsOrNull;
+ }
+ } else {
+ throw new IllegalStateException(
+ String.format("Cannot remove the process ID %s from %s because it does not have any process ID",
+ processIdToRemove, subtopologyId));
+ }
+ });
+ });
+ }
+
+ /**
+ * Adds the partitions epoch based on the provided assignment.
+ *
+ * @param tasks The assigned tasks.
+ * @param processId The process ID.
+ * @throws IllegalStateException if the partition already has an epoch assigned. package-private for testing.
+ */
+ void addTaskProcessId(
+ TasksTuple tasks,
+ String processId
+ ) {
+ if (tasks != null) {
+ addTaskProcessId(tasks.activeTasks(), processId, currentActiveTaskToProcessId);
+ addTaskProcessIdToSet(tasks.standbyTasks(), processId, currentStandbyTaskToProcessIds);
+ addTaskProcessIdToSet(tasks.warmupTasks(), processId, currentWarmupTaskToProcessIds);
+ }
+ }
+
+ private void addTaskProcessId(
+ Map> tasks,
+ String processId,
+ TimelineHashMap> currentTaskProcessId
+ ) {
+ tasks.forEach((subtopologyId, assignedTaskPartitions) -> {
+ currentTaskProcessId.compute(subtopologyId, (__, partitionsOrNull) -> {
+ if (partitionsOrNull == null) {
+ partitionsOrNull = new TimelineHashMap<>(snapshotRegistry, assignedTaskPartitions.size());
+ }
+ for (Integer partitionId : assignedTaskPartitions) {
+ String prevValue = partitionsOrNull.put(partitionId, processId);
+ if (prevValue != null) {
+ throw new IllegalStateException(
+ String.format("Cannot set the process ID of %s-%s to %s because the partition is " +
+ "still owned by process ID %s", subtopologyId, partitionId, processId, prevValue));
+ }
+ }
+ return partitionsOrNull;
+ });
+ });
+ }
+
+ private void addTaskProcessIdToSet(
+ Map> tasks,
+ String processId,
+ TimelineHashMap>> currentTaskProcessId
+ ) {
+ tasks.forEach((subtopologyId, assignedTaskPartitions) -> {
+ currentTaskProcessId.compute(subtopologyId, (__, partitionsOrNull) -> {
+ if (partitionsOrNull == null) {
+ partitionsOrNull = new TimelineHashMap<>(snapshotRegistry, assignedTaskPartitions.size());
+ }
+ for (Integer partitionId : assignedTaskPartitions) {
+ partitionsOrNull.computeIfAbsent(partitionId, ___ -> new HashSet<>()).add(processId);
+ }
+ return partitionsOrNull;
+ });
+ });
+ }
+
+ public StreamsGroupDescribeResponseData.DescribedGroup asDescribedGroup(
+ long committedOffset
+ ) {
+ StreamsGroupDescribeResponseData.DescribedGroup describedGroup = new StreamsGroupDescribeResponseData.DescribedGroup()
+ .setGroupId(groupId)
+ .setGroupEpoch(groupEpoch.get(committedOffset))
+ .setGroupState(state.get(committedOffset).toString())
+ .setAssignmentEpoch(targetAssignmentEpoch.get(committedOffset))
+ .setTopology(configuredTopology.get(committedOffset).map(ConfiguredTopology::asStreamsGroupDescribeTopology).orElse(null));
+ members.entrySet(committedOffset).forEach(
+ entry -> describedGroup.members().add(
+ entry.getValue().asStreamsGroupDescribeMember(
+ targetAssignment.get(entry.getValue().memberId(), committedOffset)
+ )
+ )
+ );
+ return describedGroup;
+ }
+
+}
diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TopicMetadata.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TopicMetadata.java
index 19a988373ef..f4fa3dc7aa7 100644
--- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TopicMetadata.java
+++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TopicMetadata.java
@@ -19,10 +19,7 @@ package org.apache.kafka.coordinator.group.streams;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataValue;
-import java.util.HashMap;
-import java.util.Map;
import java.util.Objects;
-import java.util.Set;
/**
* Immutable topic metadata, representing the current state of a topic in the broker.
@@ -30,15 +27,12 @@ import java.util.Set;
* @param id The topic ID.
* @param name The topic name.
* @param numPartitions The number of partitions.
- * @param partitionRacks Map of every partition ID to a set of its rack IDs, if they exist. If rack information is unavailable for all
- * partitions, this is an empty map.
*/
-public record TopicMetadata(Uuid id, String name, int numPartitions, Map> partitionRacks) {
+public record TopicMetadata(Uuid id, String name, int numPartitions) {
public TopicMetadata(Uuid id,
String name,
- int numPartitions,
- Map> partitionRacks) {
+ int numPartitions) {
this.id = Objects.requireNonNull(id);
if (Uuid.ZERO_UUID.equals(id)) {
throw new IllegalArgumentException("Topic id cannot be ZERO_UUID.");
@@ -51,23 +45,12 @@ public record TopicMetadata(Uuid id, String name, int numPartitions, Map> partitionRacks = new HashMap<>();
- for (StreamsGroupPartitionMetadataValue.PartitionMetadata partitionMetadata : record.partitionMetadata()) {
- partitionRacks.put(
- partitionMetadata.partition(),
- Set.copyOf(partitionMetadata.racks())
- );
- }
-
return new TopicMetadata(
record.topicId(),
record.topicName(),
- record.numPartitions(),
- partitionRacks);
+ record.numPartitions());
}
}
diff --git a/group-coordinator/src/main/resources/common/message/StreamsGroupPartitionMetadataValue.json b/group-coordinator/src/main/resources/common/message/StreamsGroupPartitionMetadataValue.json
index 1f5eb8e8dcb..f9be55b9e42 100644
--- a/group-coordinator/src/main/resources/common/message/StreamsGroupPartitionMetadataValue.json
+++ b/group-coordinator/src/main/resources/common/message/StreamsGroupPartitionMetadataValue.json
@@ -28,14 +28,7 @@
{ "name": "TopicName", "versions": "0+", "type": "string",
"about": "The topic name." },
{ "name": "NumPartitions", "versions": "0+", "type": "int32",
- "about": "The number of partitions of the topic." },
- { "name": "PartitionMetadata", "versions": "0+", "type": "[]PartitionMetadata",
- "about": "Partitions mapped to a set of racks. If the rack information is unavailable for all the partitions, an empty list is stored.", "fields": [
- { "name": "Partition", "versions": "0+", "type": "int32",
- "about": "The partition number." },
- { "name": "Racks", "versions": "0+", "type": "[]string",
- "about": "The set of racks that the partition is mapped to." }
- ]}
+ "about": "The number of partitions of the topic." }
]}
]
}
diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetricsTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetricsTest.java
index 05da88f9f7a..e20640fb129 100644
--- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetricsTest.java
+++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetricsTest.java
@@ -29,6 +29,7 @@ import org.apache.kafka.coordinator.group.Group;
import org.apache.kafka.coordinator.group.classic.ClassicGroupState;
import org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroup.ConsumerGroupState;
import org.apache.kafka.coordinator.group.modern.share.ShareGroup;
+import org.apache.kafka.coordinator.group.streams.StreamsGroup.StreamsGroupState;
import org.apache.kafka.timeline.SnapshotRegistry;
import com.yammer.metrics.core.MetricsRegistry;
@@ -47,6 +48,7 @@ import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics
import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.OFFSET_COMMITS_SENSOR_NAME;
import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.OFFSET_EXPIRED_SENSOR_NAME;
import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.SHARE_GROUP_REBALANCES_SENSOR_NAME;
+import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.STREAMS_GROUP_REBALANCES_SENSOR_NAME;
import static org.apache.kafka.coordinator.group.metrics.MetricsTestUtils.assertGaugeValue;
import static org.apache.kafka.coordinator.group.metrics.MetricsTestUtils.assertMetricsForTypeEqual;
import static org.apache.kafka.coordinator.group.metrics.MetricsTestUtils.metricName;
@@ -129,7 +131,37 @@ public class GroupCoordinatorMetricsTest {
GroupCoordinatorMetrics.METRICS_GROUP,
"The number of share groups in dead state.",
"protocol", Group.GroupType.SHARE.toString(),
- "state", GroupState.DEAD.toString())
+ "state", GroupState.DEAD.toString()),
+ metrics.metricName(
+ "group-count",
+ GroupCoordinatorMetrics.METRICS_GROUP,
+ Collections.singletonMap("protocol", Group.GroupType.STREAMS.toString())),
+ metrics.metricName("streams-group-rebalance-rate", GroupCoordinatorMetrics.METRICS_GROUP),
+ metrics.metricName("streams-group-rebalance-count", GroupCoordinatorMetrics.METRICS_GROUP),
+ metrics.metricName(
+ "streams-group-count",
+ GroupCoordinatorMetrics.METRICS_GROUP,
+ Collections.singletonMap("state", StreamsGroupState.EMPTY.toString())),
+ metrics.metricName(
+ "streams-group-count",
+ GroupCoordinatorMetrics.METRICS_GROUP,
+ Collections.singletonMap("state", StreamsGroupState.ASSIGNING.toString())),
+ metrics.metricName(
+ "streams-group-count",
+ GroupCoordinatorMetrics.METRICS_GROUP,
+ Collections.singletonMap("state", StreamsGroupState.RECONCILING.toString())),
+ metrics.metricName(
+ "streams-group-count",
+ GroupCoordinatorMetrics.METRICS_GROUP,
+ Collections.singletonMap("state", StreamsGroupState.STABLE.toString())),
+ metrics.metricName(
+ "streams-group-count",
+ GroupCoordinatorMetrics.METRICS_GROUP,
+ Collections.singletonMap("state", StreamsGroupState.DEAD.toString())),
+ metrics.metricName(
+ "streams-group-count",
+ GroupCoordinatorMetrics.METRICS_GROUP,
+ Collections.singletonMap("state", StreamsGroupState.NOT_READY.toString()))
));
try {
@@ -145,7 +177,7 @@ public class GroupCoordinatorMetricsTest {
));
assertMetricsForTypeEqual(registry, "kafka.coordinator.group", expectedRegistry);
- expectedMetrics.forEach(metricName -> assertTrue(metrics.metrics().containsKey(metricName)));
+ expectedMetrics.forEach(metricName -> assertTrue(metrics.metrics().containsKey(metricName), metricName + " is missing"));
}
assertMetricsForTypeEqual(registry, "kafka.coordinator.group", Collections.emptySet());
expectedMetrics.forEach(metricName -> assertFalse(metrics.metrics().containsKey(metricName)));
@@ -195,6 +227,10 @@ public class GroupCoordinatorMetricsTest {
IntStream.range(0, 5).forEach(__ -> shard0.incrementNumShareGroups(ShareGroup.ShareGroupState.STABLE));
IntStream.range(0, 5).forEach(__ -> shard1.incrementNumShareGroups(ShareGroup.ShareGroupState.EMPTY));
IntStream.range(0, 3).forEach(__ -> shard1.decrementNumShareGroups(ShareGroup.ShareGroupState.DEAD));
+
+ IntStream.range(0, 5).forEach(__ -> shard0.incrementNumStreamsGroups(StreamsGroupState.STABLE));
+ IntStream.range(0, 5).forEach(__ -> shard1.incrementNumStreamsGroups(StreamsGroupState.EMPTY));
+ IntStream.range(0, 3).forEach(__ -> shard1.decrementNumStreamsGroups(StreamsGroupState.DEAD));
assertEquals(4, shard0.numClassicGroups());
assertEquals(5, shard1.numClassicGroups());
@@ -228,6 +264,14 @@ public class GroupCoordinatorMetricsTest {
metrics.metricName("group-count", METRICS_GROUP, Collections.singletonMap("protocol", "share")),
7
);
+
+ assertEquals(5, shard0.numStreamsGroups());
+ assertEquals(2, shard1.numStreamsGroups());
+ assertGaugeValue(
+ metrics,
+ metrics.metricName("group-count", METRICS_GROUP, Collections.singletonMap("protocol", "streams")),
+ 7
+ );
}
@Test
@@ -269,6 +313,18 @@ public class GroupCoordinatorMetricsTest {
"The total number of share group rebalances",
"protocol", "share"
), 50);
+
+ shard.record(STREAMS_GROUP_REBALANCES_SENSOR_NAME, 50);
+ assertMetricValue(metrics, metrics.metricName(
+ "streams-group-rebalance-rate",
+ GroupCoordinatorMetrics.METRICS_GROUP,
+ "The rate of streams group rebalances"
+ ), 5.0 / 3.0);
+ assertMetricValue(metrics, metrics.metricName(
+ "streams-group-rebalance-count",
+ GroupCoordinatorMetrics.METRICS_GROUP,
+ "The total number of streams group rebalances"
+ ), 50);
}
private void assertMetricValue(Metrics metrics, MetricName metricName, double val) {
diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsCoordinatorRecordHelpersTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsCoordinatorRecordHelpersTest.java
index 489d4fa0254..ab55dd3239b 100644
--- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsCoordinatorRecordHelpersTest.java
+++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsCoordinatorRecordHelpersTest.java
@@ -257,8 +257,8 @@ class StreamsCoordinatorRecordHelpersTest {
Uuid uuid1 = Uuid.randomUuid();
Uuid uuid2 = Uuid.randomUuid();
Map newPartitionMetadata = Map.of(
- TOPIC_1, new TopicMetadata(uuid1, TOPIC_1, 1, Map.of(0, Set.of(RACK_1, RACK_2))),
- TOPIC_2, new TopicMetadata(uuid2, TOPIC_2, 2, Map.of(1, Set.of(RACK_3)))
+ TOPIC_1, new TopicMetadata(uuid1, TOPIC_1, 1),
+ TOPIC_2, new TopicMetadata(uuid2, TOPIC_2, 2)
);
StreamsGroupPartitionMetadataValue value = new StreamsGroupPartitionMetadataValue();
@@ -266,21 +266,11 @@ class StreamsCoordinatorRecordHelpersTest {
.setTopicId(uuid1)
.setTopicName(TOPIC_1)
.setNumPartitions(1)
- .setPartitionMetadata(List.of(
- new StreamsGroupPartitionMetadataValue.PartitionMetadata()
- .setPartition(0)
- .setRacks(List.of(RACK_1, RACK_2))
- ))
);
value.topics().add(new StreamsGroupPartitionMetadataValue.TopicMetadata()
.setTopicId(uuid2)
.setTopicName(TOPIC_2)
.setNumPartitions(2)
- .setPartitionMetadata(List.of(
- new StreamsGroupPartitionMetadataValue.PartitionMetadata()
- .setPartition(1)
- .setRacks(List.of(RACK_3))
- ))
);
CoordinatorRecord expectedRecord = CoordinatorRecord.record(
diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java
new file mode 100644
index 00000000000..47dcf552fa6
--- /dev/null
+++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java
@@ -0,0 +1,1099 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.GroupNotEmptyException;
+import org.apache.kafka.common.errors.StaleMemberEpochException;
+import org.apache.kafka.common.errors.UnknownMemberIdException;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
+import org.apache.kafka.common.message.ListGroupsResponseData;
+import org.apache.kafka.common.message.StreamsGroupDescribeResponseData;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource;
+import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
+import org.apache.kafka.coordinator.group.OffsetAndMetadata;
+import org.apache.kafka.coordinator.group.OffsetExpirationCondition;
+import org.apache.kafka.coordinator.group.OffsetExpirationConditionImpl;
+import org.apache.kafka.coordinator.group.generated.StreamsGroupCurrentMemberAssignmentKey;
+import org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataKey;
+import org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataValue;
+import org.apache.kafka.coordinator.group.generated.StreamsGroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataKey;
+import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberKey;
+import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMetadataKey;
+import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyKey;
+import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue;
+import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetricsShard;
+import org.apache.kafka.coordinator.group.streams.StreamsGroup.StreamsGroupState;
+import org.apache.kafka.coordinator.group.streams.TaskAssignmentTestUtil.TaskRole;
+import org.apache.kafka.coordinator.group.streams.topics.ConfiguredTopology;
+import org.apache.kafka.coordinator.group.streams.topics.InternalTopicManager;
+import org.apache.kafka.image.TopicImage;
+import org.apache.kafka.image.TopicsImage;
+import org.apache.kafka.timeline.SnapshotRegistry;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+import org.mockito.MockedStatic;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.OptionalLong;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static java.util.Collections.emptyMap;
+import static org.apache.kafka.coordinator.group.streams.TaskAssignmentTestUtil.mkTasks;
+import static org.apache.kafka.coordinator.group.streams.TaskAssignmentTestUtil.mkTasksPerSubtopology;
+import static org.apache.kafka.coordinator.group.streams.TaskAssignmentTestUtil.mkTasksTuple;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockStatic;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class StreamsGroupTest {
+
+ private static final LogContext LOG_CONTEXT = new LogContext();
+
+ private StreamsGroup createStreamsGroup(String groupId) {
+ SnapshotRegistry snapshotRegistry = new SnapshotRegistry(LOG_CONTEXT);
+ return new StreamsGroup(
+ LOG_CONTEXT,
+ snapshotRegistry,
+ groupId,
+ mock(GroupCoordinatorMetricsShard.class)
+ );
+ }
+
+ @Test
+ public void testGetOrCreateMember() {
+ StreamsGroup streamsGroup = createStreamsGroup("foo");
+ StreamsGroupMember member;
+
+ // Create a member.
+ member = streamsGroup.getOrMaybeCreateMember("member-id", true);
+ assertEquals("member-id", member.memberId());
+
+ // Add member to the group.
+ streamsGroup.updateMember(member);
+
+ // Get that member back.
+ member = streamsGroup.getOrMaybeCreateMember("member-id", false);
+ assertEquals("member-id", member.memberId());
+
+ assertThrows(UnknownMemberIdException.class, () ->
+ streamsGroup.getOrMaybeCreateMember("does-not-exist", false));
+ }
+
+ @Test
+ public void testUpdateMember() {
+ StreamsGroup streamsGroup = createStreamsGroup("foo");
+ StreamsGroupMember member;
+
+ member = streamsGroup.getOrMaybeCreateMember("member", true);
+
+ member = new StreamsGroupMember.Builder(member).build();
+
+ streamsGroup.updateMember(member);
+
+ assertEquals(member, streamsGroup.getOrMaybeCreateMember("member", false));
+ }
+
+ @Test
+ public void testNoStaticMember() {
+ StreamsGroup streamsGroup = createStreamsGroup("foo");
+
+ // Create a new member which is not static
+ streamsGroup.getOrMaybeCreateMember("member", true);
+ assertNull(streamsGroup.staticMember("instance-id"));
+ }
+
+ @Test
+ public void testGetStaticMemberByInstanceId() {
+ StreamsGroup streamsGroup = createStreamsGroup("foo");
+ StreamsGroupMember member;
+
+ member = streamsGroup.getOrMaybeCreateMember("member", true);
+
+ member = new StreamsGroupMember.Builder(member)
+ .setInstanceId("instance")
+ .build();
+
+ streamsGroup.updateMember(member);
+
+ assertEquals(member, streamsGroup.staticMember("instance"));
+ assertEquals(member, streamsGroup.getOrMaybeCreateMember("member", false));
+ assertEquals(member.memberId(), streamsGroup.staticMemberId("instance"));
+ }
+
+ @Test
+ public void testRemoveMember() {
+ StreamsGroup streamsGroup = createStreamsGroup("foo");
+
+ StreamsGroupMember member = streamsGroup.getOrMaybeCreateMember("member", true);
+ streamsGroup.updateMember(member);
+ assertTrue(streamsGroup.hasMember("member"));
+
+ streamsGroup.removeMember("member");
+ assertFalse(streamsGroup.hasMember("member"));
+
+ }
+
+ @Test
+ public void testRemoveStaticMember() {
+ StreamsGroup streamsGroup = createStreamsGroup("foo");
+
+ StreamsGroupMember member = new StreamsGroupMember.Builder("member")
+ .setInstanceId("instance")
+ .build();
+
+ streamsGroup.updateMember(member);
+ assertTrue(streamsGroup.hasMember("member"));
+
+ streamsGroup.removeMember("member");
+ assertFalse(streamsGroup.hasMember("member"));
+ assertNull(streamsGroup.staticMember("instance"));
+ assertNull(streamsGroup.staticMemberId("instance"));
+ }
+
+ @Test
+ public void testUpdatingMemberUpdatesProcessId() {
+ String fooSubtopology = "foo-sub";
+ String barSubtopology = "bar-sub";
+ String zarSubtopology = "zar-sub";
+
+ StreamsGroup streamsGroup = createStreamsGroup("foo");
+ StreamsGroupMember member;
+
+ member = new StreamsGroupMember.Builder("member")
+ .setProcessId("process")
+ .setAssignedTasks(
+ new TasksTuple(
+ mkTasksPerSubtopology(mkTasks(fooSubtopology, 1)),
+ mkTasksPerSubtopology(mkTasks(fooSubtopology, 2)),
+ mkTasksPerSubtopology(mkTasks(fooSubtopology, 3))
+ )
+ )
+ .setTasksPendingRevocation(
+ new TasksTuple(
+ mkTasksPerSubtopology(mkTasks(barSubtopology, 4)),
+ mkTasksPerSubtopology(mkTasks(barSubtopology, 5)),
+ mkTasksPerSubtopology(mkTasks(barSubtopology, 6))
+ )
+ )
+ .build();
+
+ streamsGroup.updateMember(member);
+
+ assertEquals("process", streamsGroup.currentActiveTaskProcessId(fooSubtopology, 1));
+ assertEquals(Collections.singleton("process"),
+ streamsGroup.currentStandbyTaskProcessIds(fooSubtopology, 2));
+ assertEquals(Collections.singleton("process"),
+ streamsGroup.currentWarmupTaskProcessIds(fooSubtopology, 3));
+ assertEquals("process", streamsGroup.currentActiveTaskProcessId(barSubtopology, 4));
+ assertEquals(Collections.singleton("process"),
+ streamsGroup.currentStandbyTaskProcessIds(barSubtopology, 5));
+ assertEquals(Collections.singleton("process"),
+ streamsGroup.currentWarmupTaskProcessIds(barSubtopology, 6));
+ assertNull(streamsGroup.currentActiveTaskProcessId(zarSubtopology, 7));
+ assertEquals(Collections.emptySet(),
+ streamsGroup.currentStandbyTaskProcessIds(zarSubtopology, 8));
+ assertEquals(Collections.emptySet(),
+ streamsGroup.currentWarmupTaskProcessIds(zarSubtopology, 9));
+
+ member = new StreamsGroupMember.Builder(member)
+ .setProcessId("process1")
+ .setAssignedTasks(
+ new TasksTuple(
+ mkTasksPerSubtopology(mkTasks(fooSubtopology, 1)),
+ mkTasksPerSubtopology(mkTasks(fooSubtopology, 2)),
+ mkTasksPerSubtopology(mkTasks(fooSubtopology, 3))
+ )
+ )
+ .setTasksPendingRevocation(
+ new TasksTuple(
+ mkTasksPerSubtopology(mkTasks(barSubtopology, 4)),
+ mkTasksPerSubtopology(mkTasks(barSubtopology, 5)),
+ mkTasksPerSubtopology(mkTasks(barSubtopology, 6))
+ )
+ )
+ .build();
+
+ streamsGroup.updateMember(member);
+
+ assertEquals("process1", streamsGroup.currentActiveTaskProcessId(fooSubtopology, 1));
+ assertEquals(Collections.singleton("process1"),
+ streamsGroup.currentStandbyTaskProcessIds(fooSubtopology, 2));
+ assertEquals(Collections.singleton("process1"),
+ streamsGroup.currentWarmupTaskProcessIds(fooSubtopology, 3));
+ assertEquals("process1", streamsGroup.currentActiveTaskProcessId(barSubtopology, 4));
+ assertEquals(Collections.singleton("process1"),
+ streamsGroup.currentStandbyTaskProcessIds(barSubtopology, 5));
+ assertEquals(Collections.singleton("process1"),
+ streamsGroup.currentWarmupTaskProcessIds(barSubtopology, 6));
+ assertNull(streamsGroup.currentActiveTaskProcessId(zarSubtopology, 7));
+ assertEquals(Collections.emptySet(),
+ streamsGroup.currentStandbyTaskProcessIds(zarSubtopology, 8));
+ assertEquals(Collections.emptySet(),
+ streamsGroup.currentWarmupTaskProcessIds(zarSubtopology, 9));
+ }
+
+ @Test
+ public void testUpdatingMemberUpdatesTaskProcessIdWhenPartitionIsReassignedBeforeBeingRevoked() {
+ String fooSubtopologyId = "foo-sub";
+
+ StreamsGroup streamsGroup = createStreamsGroup("foo");
+ StreamsGroupMember member;
+
+ member = new StreamsGroupMember.Builder("member")
+ .setProcessId("process")
+ .setAssignedTasks(
+ new TasksTuple(
+ emptyMap(),
+ emptyMap(),
+ emptyMap()
+ )
+ )
+ .setTasksPendingRevocation(
+ new TasksTuple(
+ mkTasksPerSubtopology(mkTasks(fooSubtopologyId, 1)),
+ mkTasksPerSubtopology(mkTasks(fooSubtopologyId, 2)),
+ mkTasksPerSubtopology(mkTasks(fooSubtopologyId, 3))
+ )
+ )
+ .build();
+
+ streamsGroup.updateMember(member);
+
+ assertEquals("process", streamsGroup.currentActiveTaskProcessId(fooSubtopologyId, 1));
+
+ member = new StreamsGroupMember.Builder(member)
+ .setProcessId("process1")
+ .setAssignedTasks(
+ new TasksTuple(
+ mkTasksPerSubtopology(mkTasks(fooSubtopologyId, 1)),
+ mkTasksPerSubtopology(mkTasks(fooSubtopologyId, 2)),
+ mkTasksPerSubtopology(mkTasks(fooSubtopologyId, 3))
+ )
+ )
+ .setTasksPendingRevocation(TasksTuple.EMPTY)
+ .build();
+
+ streamsGroup.updateMember(member);
+
+ assertEquals("process1", streamsGroup.currentActiveTaskProcessId(fooSubtopologyId, 1));
+ }
+
+ @Test
+ public void testUpdatingMemberUpdatesTaskProcessIdWhenPartitionIsNotReleased() {
+ String fooSubtopologyId = "foo-sub";
+ StreamsGroup streamsGroup = createStreamsGroup("foo");
+
+ StreamsGroupMember m1 = new StreamsGroupMember.Builder("m1")
+ .setProcessId("process")
+ .setAssignedTasks(
+ new TasksTuple(
+ mkTasksPerSubtopology(mkTasks(fooSubtopologyId, 1)),
+ emptyMap(),
+ emptyMap()
+ )
+ )
+ .build();
+
+ streamsGroup.updateMember(m1);
+
+ StreamsGroupMember m2 = new StreamsGroupMember.Builder("m2")
+ .setProcessId("process")
+ .setAssignedTasks(
+ new TasksTuple(
+ mkTasksPerSubtopology(mkTasks(fooSubtopologyId, 1)),
+ emptyMap(),
+ emptyMap()
+ )
+ )
+ .build();
+
+ // m2 should not be able to acquire foo-1 because the partition is
+ // still owned by another member.
+ assertThrows(IllegalStateException.class, () -> streamsGroup.updateMember(m2));
+ }
+
+
+ @ParameterizedTest
+ @EnumSource(TaskRole.class)
+ public void testRemoveTaskProcessIds(TaskRole taskRole) {
+ String fooSubtopologyId = "foo-sub";
+ StreamsGroup streamsGroup = createStreamsGroup("foo");
+
+ // Removing should fail because there is no epoch set.
+ assertThrows(IllegalStateException.class, () -> streamsGroup.removeTaskProcessIds(
+ mkTasksTuple(taskRole, mkTasks(fooSubtopologyId, 1)),
+ "process"
+ ));
+
+ StreamsGroupMember m1 = new StreamsGroupMember.Builder("m1")
+ .setProcessId("process")
+ .setAssignedTasks(mkTasksTuple(taskRole, mkTasks(fooSubtopologyId, 1)))
+ .build();
+
+ streamsGroup.updateMember(m1);
+
+ // Removing should fail because the expected epoch is incorrect.
+ assertThrows(IllegalStateException.class, () -> streamsGroup.removeTaskProcessIds(
+ mkTasksTuple(taskRole, mkTasks(fooSubtopologyId, 1)),
+ "process1"
+ ));
+ }
+
+ @Test
+ public void testAddTaskProcessIds() {
+ String fooSubtopologyId = "foo-sub";
+ StreamsGroup streamsGroup = createStreamsGroup("foo");
+
+ streamsGroup.addTaskProcessId(
+ new TasksTuple(
+ mkTasksPerSubtopology(mkTasks(fooSubtopologyId, 1)),
+ mkTasksPerSubtopology(mkTasks(fooSubtopologyId, 2)),
+ mkTasksPerSubtopology(mkTasks(fooSubtopologyId, 3))
+ ),
+ "process"
+ );
+
+ // Changing the epoch should fail because the owner of the partition
+ // should remove it first.
+ assertThrows(IllegalStateException.class, () -> streamsGroup.addTaskProcessId(
+ new TasksTuple(
+ mkTasksPerSubtopology(mkTasks(fooSubtopologyId, 1)),
+ mkTasksPerSubtopology(mkTasks(fooSubtopologyId, 2)),
+ mkTasksPerSubtopology(mkTasks(fooSubtopologyId, 3))
+ ),
+ "process"
+ ));
+ }
+
+ @Test
+ public void testDeletingMemberRemovesProcessId() {
+ String fooSubtopology = "foo-sub";
+ String barSubtopology = "bar-sub";
+ String zarSubtopology = "zar-sub";
+
+ StreamsGroup streamsGroup = createStreamsGroup("foo");
+ StreamsGroupMember member;
+
+ member = new StreamsGroupMember.Builder("member")
+ .setProcessId("process")
+ .setAssignedTasks(
+ new TasksTuple(
+ mkTasksPerSubtopology(mkTasks(fooSubtopology, 1)),
+ mkTasksPerSubtopology(mkTasks(fooSubtopology, 2)),
+ mkTasksPerSubtopology(mkTasks(fooSubtopology, 3))
+ )
+ )
+ .setTasksPendingRevocation(
+ new TasksTuple(
+ mkTasksPerSubtopology(mkTasks(barSubtopology, 4)),
+ mkTasksPerSubtopology(mkTasks(barSubtopology, 5)),
+ mkTasksPerSubtopology(mkTasks(barSubtopology, 6))
+ )
+ )
+ .build();
+
+ streamsGroup.updateMember(member);
+
+ assertEquals("process", streamsGroup.currentActiveTaskProcessId(fooSubtopology, 1));
+ assertEquals(Collections.singleton("process"), streamsGroup.currentStandbyTaskProcessIds(fooSubtopology, 2));
+ assertEquals(Collections.singleton("process"), streamsGroup.currentWarmupTaskProcessIds(fooSubtopology, 3));
+ assertEquals("process", streamsGroup.currentActiveTaskProcessId(barSubtopology, 4));
+ assertEquals(Collections.singleton("process"), streamsGroup.currentStandbyTaskProcessIds(barSubtopology, 5));
+ assertEquals(Collections.singleton("process"), streamsGroup.currentWarmupTaskProcessIds(barSubtopology, 6));
+ assertNull(streamsGroup.currentActiveTaskProcessId(zarSubtopology, 7));
+ assertEquals(Collections.emptySet(), streamsGroup.currentStandbyTaskProcessIds(zarSubtopology, 8));
+ assertEquals(Collections.emptySet(), streamsGroup.currentWarmupTaskProcessIds(zarSubtopology, 9));
+
+ streamsGroup.removeMember(member.memberId());
+
+ assertNull(streamsGroup.currentActiveTaskProcessId(zarSubtopology, 1));
+ assertEquals(Collections.emptySet(), streamsGroup.currentStandbyTaskProcessIds(zarSubtopology, 2));
+ assertEquals(Collections.emptySet(), streamsGroup.currentWarmupTaskProcessIds(zarSubtopology, 3));
+ assertNull(streamsGroup.currentActiveTaskProcessId(zarSubtopology, 3));
+ assertEquals(Collections.emptySet(), streamsGroup.currentStandbyTaskProcessIds(zarSubtopology, 4));
+ assertEquals(Collections.emptySet(), streamsGroup.currentWarmupTaskProcessIds(zarSubtopology, 5));
+ assertNull(streamsGroup.currentActiveTaskProcessId(zarSubtopology, 7));
+ assertEquals(Collections.emptySet(), streamsGroup.currentStandbyTaskProcessIds(zarSubtopology, 8));
+ assertEquals(Collections.emptySet(), streamsGroup.currentWarmupTaskProcessIds(zarSubtopology, 9));
+ }
+
+ @Test
+ public void testGroupState() {
+ StreamsGroup streamsGroup = createStreamsGroup("foo");
+ assertEquals(StreamsGroupState.EMPTY, streamsGroup.state());
+
+ StreamsGroupMember member1 = new StreamsGroupMember.Builder("member1")
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(1)
+ .setPreviousMemberEpoch(0)
+ .build();
+
+ streamsGroup.updateMember(member1);
+ streamsGroup.setGroupEpoch(1);
+
+ assertEquals(MemberState.STABLE, member1.state());
+ assertEquals(StreamsGroup.StreamsGroupState.NOT_READY, streamsGroup.state());
+
+ streamsGroup.setTopology(new StreamsTopology(1, Collections.emptyMap()));
+
+ assertEquals(MemberState.STABLE, member1.state());
+ assertEquals(StreamsGroup.StreamsGroupState.ASSIGNING, streamsGroup.state());
+
+ StreamsGroupMember member2 = new StreamsGroupMember.Builder("member2")
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(1)
+ .setPreviousMemberEpoch(0)
+ .build();
+
+ streamsGroup.updateMember(member2);
+ streamsGroup.setGroupEpoch(2);
+
+ assertEquals(MemberState.STABLE, member2.state());
+ assertEquals(StreamsGroup.StreamsGroupState.ASSIGNING, streamsGroup.state());
+
+ streamsGroup.setTargetAssignmentEpoch(2);
+
+ assertEquals(StreamsGroup.StreamsGroupState.RECONCILING, streamsGroup.state());
+
+ member1 = new StreamsGroupMember.Builder(member1)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(2)
+ .setPreviousMemberEpoch(1)
+ .build();
+
+ streamsGroup.updateMember(member1);
+
+ assertEquals(MemberState.STABLE, member1.state());
+ assertEquals(StreamsGroup.StreamsGroupState.RECONCILING, streamsGroup.state());
+
+ // Member 2 is not stable so the group stays in reconciling state.
+ member2 = new StreamsGroupMember.Builder(member2)
+ .setState(MemberState.UNREVOKED_TASKS)
+ .setMemberEpoch(2)
+ .setPreviousMemberEpoch(1)
+ .build();
+
+ streamsGroup.updateMember(member2);
+
+ assertEquals(MemberState.UNREVOKED_TASKS, member2.state());
+ assertEquals(StreamsGroup.StreamsGroupState.RECONCILING, streamsGroup.state());
+
+ member2 = new StreamsGroupMember.Builder(member2)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(2)
+ .setPreviousMemberEpoch(1)
+ .build();
+
+ streamsGroup.updateMember(member2);
+
+ assertEquals(MemberState.STABLE, member2.state());
+ assertEquals(StreamsGroup.StreamsGroupState.STABLE, streamsGroup.state());
+
+ streamsGroup.removeMember("member1");
+ streamsGroup.removeMember("member2");
+
+ assertEquals(StreamsGroup.StreamsGroupState.EMPTY, streamsGroup.state());
+ }
+
+ @Test
+ public void testMetadataRefreshDeadline() {
+ MockTime time = new MockTime();
+ StreamsGroup group = createStreamsGroup("group-foo");
+
+ // Group epoch starts at 0.
+ assertEquals(0, group.groupEpoch());
+
+ // The refresh time deadline should be empty when the group is created or loaded.
+ assertTrue(group.hasMetadataExpired(time.milliseconds()));
+ assertEquals(0L, group.metadataRefreshDeadline().deadlineMs);
+ assertEquals(0, group.metadataRefreshDeadline().epoch);
+
+ // Set the refresh deadline. The metadata remains valid because the deadline
+ // has not past and the group epoch is correct.
+ group.setMetadataRefreshDeadline(time.milliseconds() + 1000, group.groupEpoch());
+ assertFalse(group.hasMetadataExpired(time.milliseconds()));
+ assertEquals(time.milliseconds() + 1000, group.metadataRefreshDeadline().deadlineMs);
+ assertEquals(group.groupEpoch(), group.metadataRefreshDeadline().epoch);
+
+ // Advance past the deadline. The metadata should have expired.
+ time.sleep(1001L);
+ assertTrue(group.hasMetadataExpired(time.milliseconds()));
+
+ // Set the refresh time deadline with a higher group epoch. The metadata is considered
+ // as expired because the group epoch attached to the deadline is higher than the
+ // current group epoch.
+ group.setMetadataRefreshDeadline(time.milliseconds() + 1000, group.groupEpoch() + 1);
+ assertTrue(group.hasMetadataExpired(time.milliseconds()));
+ assertEquals(time.milliseconds() + 1000, group.metadataRefreshDeadline().deadlineMs);
+ assertEquals(group.groupEpoch() + 1, group.metadataRefreshDeadline().epoch);
+
+ // Advance the group epoch.
+ group.setGroupEpoch(group.groupEpoch() + 1);
+
+ // Set the refresh deadline. The metadata remains valid because the deadline
+ // has not past and the group epoch is correct.
+ group.setMetadataRefreshDeadline(time.milliseconds() + 1000, group.groupEpoch());
+ assertFalse(group.hasMetadataExpired(time.milliseconds()));
+ assertEquals(time.milliseconds() + 1000, group.metadataRefreshDeadline().deadlineMs);
+ assertEquals(group.groupEpoch(), group.metadataRefreshDeadline().epoch);
+
+ // Request metadata refresh. The metadata expires immediately.
+ group.requestMetadataRefresh();
+ assertTrue(group.hasMetadataExpired(time.milliseconds()));
+ assertEquals(0L, group.metadataRefreshDeadline().deadlineMs);
+ assertEquals(0, group.metadataRefreshDeadline().epoch);
+ }
+
+ @ParameterizedTest
+ @ApiKeyVersionsSource(apiKey = ApiKeys.TXN_OFFSET_COMMIT)
+ public void testValidateTransactionalOffsetCommit(short version) {
+ boolean isTransactional = true;
+ StreamsGroup group = createStreamsGroup("group-foo");
+
+
+ // Simulate a call from the admin client without member ID and member epoch.
+ // This should pass only if the group is empty.
+ group.validateOffsetCommit("", "", -1, isTransactional, version);
+
+ // The member does not exist.
+ assertThrows(UnknownMemberIdException.class, () ->
+ group.validateOffsetCommit("member-id", null, 0, isTransactional, version));
+
+ // Create a member.
+ group.updateMember(new StreamsGroupMember.Builder("member-id").setMemberEpoch(0).build());
+
+ // A call from the admin client should fail as the group is not empty.
+ assertThrows(UnknownMemberIdException.class, () ->
+ group.validateOffsetCommit("", "", -1, isTransactional, version));
+
+ // The member epoch is stale.
+ assertThrows(StaleMemberEpochException.class, () ->
+ group.validateOffsetCommit("member-id", "", 10, isTransactional, version));
+
+ // This should succeed.
+ group.validateOffsetCommit("member-id", "", 0, isTransactional, version);
+
+ // This should succeed.
+ group.validateOffsetCommit("", null, -1, isTransactional, version);
+ }
+
+ @ParameterizedTest
+ @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_COMMIT)
+ public void testValidateOffsetCommit(short version) {
+ boolean isTransactional = false;
+ StreamsGroup group = createStreamsGroup("group-foo");
+
+ // Simulate a call from the admin client without member ID and member epoch.
+ // This should pass only if the group is empty.
+ group.validateOffsetCommit("", "", -1, isTransactional, version);
+
+ // The member does not exist.
+ assertThrows(UnknownMemberIdException.class, () ->
+ group.validateOffsetCommit("member-id", null, 0, isTransactional, version));
+
+ // Create members.
+ group.updateMember(
+ new StreamsGroupMember
+ .Builder("new-protocol-member-id").setMemberEpoch(0).build()
+ );
+
+ // A call from the admin client should fail as the group is not empty.
+ assertThrows(UnknownMemberIdException.class, () ->
+ group.validateOffsetCommit("", "", -1, isTransactional, version));
+ assertThrows(UnknownMemberIdException.class, () ->
+ group.validateOffsetCommit("", null, -1, isTransactional, version));
+
+ // The member epoch is stale.
+ if (version >= 9) {
+ assertThrows(StaleMemberEpochException.class, () ->
+ group.validateOffsetCommit("new-protocol-member-id", "", 10, isTransactional, version));
+ } else {
+ assertThrows(UnsupportedVersionException.class, () ->
+ group.validateOffsetCommit("new-protocol-member-id", "", 10, isTransactional, version));
+ }
+
+ // This should succeed.
+ if (version >= 9) {
+ group.validateOffsetCommit("new-protocol-member-id", "", 0, isTransactional, version);
+ } else {
+ assertThrows(UnsupportedVersionException.class, () ->
+ group.validateOffsetCommit("new-protocol-member-id", "", 0, isTransactional, version));
+ }
+ }
+
+ @Test
+ public void testAsListedGroup() {
+ SnapshotRegistry snapshotRegistry = new SnapshotRegistry(LOG_CONTEXT);
+ StreamsGroup group = new StreamsGroup(
+ LOG_CONTEXT,
+ snapshotRegistry,
+ "group-foo",
+ mock(GroupCoordinatorMetricsShard.class)
+ );
+ group.setGroupEpoch(1);
+ group.setTopology(new StreamsTopology(1, Collections.emptyMap()));
+ group.setTargetAssignmentEpoch(1);
+ group.updateMember(new StreamsGroupMember.Builder("member1")
+ .setMemberEpoch(1)
+ .build());
+ snapshotRegistry.idempotentCreateSnapshot(1);
+
+ ListGroupsResponseData.ListedGroup listedGroup = group.asListedGroup(1);
+
+ assertEquals("group-foo", listedGroup.groupId());
+ assertEquals("streams", listedGroup.protocolType());
+ assertEquals("Reconciling", listedGroup.groupState());
+ assertEquals("streams", listedGroup.groupType());
+ }
+
+ @Test
+ public void testValidateOffsetFetch() {
+ SnapshotRegistry snapshotRegistry = new SnapshotRegistry(LOG_CONTEXT);
+ StreamsGroup group = new StreamsGroup(
+ LOG_CONTEXT,
+ snapshotRegistry,
+ "group-foo",
+ mock(GroupCoordinatorMetricsShard.class)
+ );
+
+ // Simulate a call from the admin client without member ID and member epoch.
+ group.validateOffsetFetch(null, -1, Long.MAX_VALUE);
+
+ // The member does not exist.
+ assertThrows(UnknownMemberIdException.class, () ->
+ group.validateOffsetFetch("member-id", 0, Long.MAX_VALUE));
+
+ // Create a member.
+ snapshotRegistry.idempotentCreateSnapshot(0);
+ group.updateMember(new StreamsGroupMember.Builder("member-id").setMemberEpoch(0).build());
+
+ // The member does not exist at last committed offset 0.
+ assertThrows(UnknownMemberIdException.class, () ->
+ group.validateOffsetFetch("member-id", 0, 0));
+
+ // The member exists but the epoch is stale when the last committed offset is not considered.
+ assertThrows(StaleMemberEpochException.class, () ->
+ group.validateOffsetFetch("member-id", 10, Long.MAX_VALUE));
+
+ // This should succeed.
+ group.validateOffsetFetch("member-id", 0, Long.MAX_VALUE);
+ }
+
+ @Test
+ public void testValidateDeleteGroup() {
+ StreamsGroup streamsGroup = createStreamsGroup("foo");
+
+ assertEquals(StreamsGroupState.EMPTY, streamsGroup.state());
+ assertDoesNotThrow(streamsGroup::validateDeleteGroup);
+
+ StreamsGroupMember member1 = new StreamsGroupMember.Builder("member1")
+ .setMemberEpoch(1)
+ .setPreviousMemberEpoch(0)
+ .setState(MemberState.STABLE)
+ .build();
+ streamsGroup.updateMember(member1);
+
+ assertEquals(StreamsGroup.StreamsGroupState.NOT_READY, streamsGroup.state());
+ assertThrows(GroupNotEmptyException.class, streamsGroup::validateDeleteGroup);
+
+ streamsGroup.setTopology(new StreamsTopology(1, Collections.emptyMap()));
+
+ assertEquals(StreamsGroup.StreamsGroupState.RECONCILING, streamsGroup.state());
+ assertThrows(GroupNotEmptyException.class, streamsGroup::validateDeleteGroup);
+
+ streamsGroup.setGroupEpoch(1);
+
+ assertEquals(StreamsGroup.StreamsGroupState.ASSIGNING, streamsGroup.state());
+ assertThrows(GroupNotEmptyException.class, streamsGroup::validateDeleteGroup);
+
+ streamsGroup.setTargetAssignmentEpoch(1);
+
+ assertEquals(StreamsGroup.StreamsGroupState.STABLE, streamsGroup.state());
+ assertThrows(GroupNotEmptyException.class, streamsGroup::validateDeleteGroup);
+
+ streamsGroup.removeMember("member1");
+ assertEquals(StreamsGroup.StreamsGroupState.EMPTY, streamsGroup.state());
+ assertDoesNotThrow(streamsGroup::validateDeleteGroup);
+ }
+
+ @Test
+ public void testOffsetExpirationCondition() {
+ long currentTimestamp = 30000L;
+ long commitTimestamp = 20000L;
+ long offsetsRetentionMs = 10000L;
+ OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(15000L, OptionalInt.empty(), "", commitTimestamp, OptionalLong.empty());
+ StreamsGroup group = new StreamsGroup(LOG_CONTEXT, new SnapshotRegistry(LOG_CONTEXT), "group-id", mock(GroupCoordinatorMetricsShard.class));
+
+ Optional offsetExpirationCondition = group.offsetExpirationCondition();
+ assertTrue(offsetExpirationCondition.isPresent());
+
+ OffsetExpirationConditionImpl condition = (OffsetExpirationConditionImpl) offsetExpirationCondition.get();
+ assertEquals(commitTimestamp, condition.baseTimestamp().apply(offsetAndMetadata));
+ assertTrue(condition.isOffsetExpired(offsetAndMetadata, currentTimestamp, offsetsRetentionMs));
+ }
+
+ @Test
+ public void testAsDescribedGroup() {
+ SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
+ StreamsGroup group = new StreamsGroup(LOG_CONTEXT, snapshotRegistry, "group-id-1", mock(GroupCoordinatorMetricsShard.class));
+ snapshotRegistry.idempotentCreateSnapshot(0);
+ assertEquals(StreamsGroup.StreamsGroupState.EMPTY.toString(), group.stateAsString(0));
+
+ group.setGroupEpoch(1);
+ group.setTopology(new StreamsTopology(1, Collections.emptyMap()));
+ group.setTargetAssignmentEpoch(1);
+ group.updateMember(new StreamsGroupMember.Builder("member1")
+ .setMemberEpoch(1)
+ .setPreviousMemberEpoch(0)
+ .setState(MemberState.STABLE)
+ .setInstanceId("instance1")
+ .setRackId("rack1")
+ .setClientId("client1")
+ .setClientHost("host1")
+ .setRebalanceTimeoutMs(1000)
+ .setTopologyEpoch(1)
+ .setProcessId("process1")
+ .setUserEndpoint(new StreamsGroupMemberMetadataValue.Endpoint().setHost("host1").setPort(9092))
+ .setClientTags(Collections.singletonMap("tag1", "value1"))
+ .setAssignedTasks(new TasksTuple(Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap()))
+ .setTasksPendingRevocation(new TasksTuple(Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap()))
+ .build());
+ group.updateMember(new StreamsGroupMember.Builder("member2")
+ .setMemberEpoch(1)
+ .setPreviousMemberEpoch(0)
+ .setState(MemberState.STABLE)
+ .setInstanceId("instance2")
+ .setRackId("rack2")
+ .setClientId("client2")
+ .setClientHost("host2")
+ .setRebalanceTimeoutMs(1000)
+ .setTopologyEpoch(1)
+ .setProcessId("process2")
+ .setUserEndpoint(new StreamsGroupMemberMetadataValue.Endpoint().setHost("host2").setPort(9092))
+ .setClientTags(Collections.singletonMap("tag2", "value2"))
+ .setAssignedTasks(new TasksTuple(Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap()))
+ .setTasksPendingRevocation(new TasksTuple(Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap()))
+ .build());
+ snapshotRegistry.idempotentCreateSnapshot(1);
+
+ StreamsGroupDescribeResponseData.DescribedGroup expected = new StreamsGroupDescribeResponseData.DescribedGroup()
+ .setGroupId("group-id-1")
+ .setGroupState(StreamsGroup.StreamsGroupState.STABLE.toString())
+ .setGroupEpoch(1)
+ .setTopology(new StreamsGroupDescribeResponseData.Topology().setEpoch(1).setSubtopologies(Collections.emptyList()))
+ .setAssignmentEpoch(1)
+ .setMembers(Arrays.asList(
+ new StreamsGroupDescribeResponseData.Member()
+ .setMemberId("member1")
+ .setMemberEpoch(1)
+ .setInstanceId("instance1")
+ .setRackId("rack1")
+ .setClientId("client1")
+ .setClientHost("host1")
+ .setTopologyEpoch(1)
+ .setProcessId("process1")
+ .setUserEndpoint(new StreamsGroupDescribeResponseData.Endpoint().setHost("host1").setPort(9092))
+ .setClientTags(Collections.singletonList(new StreamsGroupDescribeResponseData.KeyValue().setKey("tag1").setValue("value1")))
+ .setAssignment(new StreamsGroupDescribeResponseData.Assignment())
+ .setTargetAssignment(new StreamsGroupDescribeResponseData.Assignment()),
+ new StreamsGroupDescribeResponseData.Member()
+ .setMemberId("member2")
+ .setMemberEpoch(1)
+ .setInstanceId("instance2")
+ .setRackId("rack2")
+ .setClientId("client2")
+ .setClientHost("host2")
+ .setTopologyEpoch(1)
+ .setProcessId("process2")
+ .setUserEndpoint(new StreamsGroupDescribeResponseData.Endpoint().setHost("host2").setPort(9092))
+ .setClientTags(Collections.singletonList(new StreamsGroupDescribeResponseData.KeyValue().setKey("tag2").setValue("value2")))
+ .setAssignment(new StreamsGroupDescribeResponseData.Assignment())
+ .setTargetAssignment(new StreamsGroupDescribeResponseData.Assignment())
+ ));
+ StreamsGroupDescribeResponseData.DescribedGroup actual = group.asDescribedGroup(1);
+
+ assertEquals(expected, actual);
+ }
+
+ @Test
+ public void testStateTransitionMetrics() {
+ // Confirm metrics is not updated when a new StreamsGroup is created but only when the group transitions
+ // its state.
+ GroupCoordinatorMetricsShard metrics = mock(GroupCoordinatorMetricsShard.class);
+ StreamsGroup streamsGroup = new StreamsGroup(
+ LOG_CONTEXT,
+ new SnapshotRegistry(new LogContext()),
+ "group-id",
+ metrics
+ );
+
+ assertEquals(StreamsGroup.StreamsGroupState.EMPTY, streamsGroup.state());
+ verify(metrics, times(0)).onStreamsGroupStateTransition(null, StreamsGroup.StreamsGroupState.EMPTY);
+
+ StreamsGroupMember member = new StreamsGroupMember.Builder("member")
+ .setMemberEpoch(1)
+ .setPreviousMemberEpoch(0)
+ .setState(MemberState.STABLE)
+ .build();
+
+ streamsGroup.updateMember(member);
+
+ assertEquals(StreamsGroup.StreamsGroupState.NOT_READY, streamsGroup.state());
+ verify(metrics, times(1)).onStreamsGroupStateTransition(StreamsGroup.StreamsGroupState.EMPTY, StreamsGroup.StreamsGroupState.NOT_READY);
+
+ streamsGroup.setTopology(new StreamsTopology(1, Collections.emptyMap()));
+
+ assertEquals(StreamsGroup.StreamsGroupState.RECONCILING, streamsGroup.state());
+ verify(metrics, times(1)).onStreamsGroupStateTransition(StreamsGroup.StreamsGroupState.NOT_READY, StreamsGroup.StreamsGroupState.RECONCILING);
+
+ streamsGroup.setGroupEpoch(1);
+
+ assertEquals(StreamsGroup.StreamsGroupState.ASSIGNING, streamsGroup.state());
+ verify(metrics, times(1)).onStreamsGroupStateTransition(StreamsGroup.StreamsGroupState.RECONCILING, StreamsGroup.StreamsGroupState.ASSIGNING);
+
+ streamsGroup.setTargetAssignmentEpoch(1);
+
+ assertEquals(StreamsGroup.StreamsGroupState.STABLE, streamsGroup.state());
+ verify(metrics, times(1)).onStreamsGroupStateTransition(StreamsGroup.StreamsGroupState.ASSIGNING, StreamsGroup.StreamsGroupState.STABLE);
+
+ streamsGroup.removeMember("member");
+
+ assertEquals(StreamsGroup.StreamsGroupState.EMPTY, streamsGroup.state());
+ verify(metrics, times(1)).onStreamsGroupStateTransition(StreamsGroup.StreamsGroupState.STABLE, StreamsGroup.StreamsGroupState.EMPTY);
+ }
+
+ @Test
+ public void testIsInStatesCaseInsensitiveAndUnderscored() {
+ SnapshotRegistry snapshotRegistry = new SnapshotRegistry(LOG_CONTEXT);
+ GroupCoordinatorMetricsShard metricsShard = new GroupCoordinatorMetricsShard(
+ snapshotRegistry,
+ emptyMap(),
+ new TopicPartition("__consumer_offsets", 0)
+ );
+ StreamsGroup group = new StreamsGroup(LOG_CONTEXT, snapshotRegistry, "group-foo", metricsShard);
+ snapshotRegistry.idempotentCreateSnapshot(0);
+ assertTrue(group.isInStates(Collections.singleton("empty"), 0));
+ assertFalse(group.isInStates(Collections.singleton("Empty"), 0));
+
+ group.updateMember(new StreamsGroupMember.Builder("member1")
+ .build());
+ snapshotRegistry.idempotentCreateSnapshot(1);
+ assertTrue(group.isInStates(Collections.singleton("empty"), 0));
+ assertTrue(group.isInStates(Collections.singleton("not_ready"), 1));
+ assertFalse(group.isInStates(Collections.singleton("empty"), 1));
+ }
+
+ @Test
+ public void testSetTopologyUpdatesStateAndConfiguredTopology() {
+ SnapshotRegistry snapshotRegistry = new SnapshotRegistry(LOG_CONTEXT);
+ GroupCoordinatorMetricsShard metricsShard = mock(GroupCoordinatorMetricsShard.class);
+ StreamsGroup streamsGroup = new StreamsGroup(LOG_CONTEXT, snapshotRegistry, "test-group", metricsShard);
+
+ StreamsTopology topology = new StreamsTopology(1, Collections.emptyMap());
+
+ ConfiguredTopology topo = mock(ConfiguredTopology.class);
+ when(topo.isReady()).thenReturn(true);
+
+ try (MockedStatic mocked = mockStatic(InternalTopicManager.class)) {
+ mocked.when(() -> InternalTopicManager.configureTopics(any(), eq(topology), eq(Map.of()))).thenReturn(topo);
+ streamsGroup.setTopology(topology);
+ mocked.verify(() -> InternalTopicManager.configureTopics(any(), eq(topology), eq(Map.of())));
+ }
+
+ Optional configuredTopology = streamsGroup.configuredTopology();
+ assertTrue(configuredTopology.isPresent(), "Configured topology should be present");
+ assertEquals(StreamsGroupState.EMPTY, streamsGroup.state());
+
+ streamsGroup.updateMember(new StreamsGroupMember.Builder("member1")
+ .setMemberEpoch(1)
+ .build());
+
+ assertEquals(StreamsGroupState.RECONCILING, streamsGroup.state());
+ }
+
+ @Test
+ public void testSetPartitionMetadataUpdatesStateAndConfiguredTopology() {
+ Uuid topicUuid = Uuid.randomUuid();
+ SnapshotRegistry snapshotRegistry = new SnapshotRegistry(LOG_CONTEXT);
+ GroupCoordinatorMetricsShard metricsShard = mock(GroupCoordinatorMetricsShard.class);
+ StreamsGroup streamsGroup = new StreamsGroup(LOG_CONTEXT, snapshotRegistry, "test-group", metricsShard);
+
+ assertEquals(StreamsGroup.StreamsGroupState.EMPTY, streamsGroup.state());
+
+ Map partitionMetadata = new HashMap<>();
+ partitionMetadata.put("topic1", new TopicMetadata(topicUuid, "topic1", 1));
+
+ try (MockedStatic mocked = mockStatic(InternalTopicManager.class)) {
+ streamsGroup.setPartitionMetadata(partitionMetadata);
+ mocked.verify(() -> InternalTopicManager.configureTopics(any(), any(), any()), never());
+ }
+
+ assertTrue(streamsGroup.configuredTopology().isEmpty(), "Configured topology should not be present");
+ assertEquals(partitionMetadata, streamsGroup.partitionMetadata());
+
+ StreamsTopology topology = new StreamsTopology(1, Collections.emptyMap());
+ streamsGroup.setTopology(topology);
+ ConfiguredTopology topo = mock(ConfiguredTopology.class);
+ when(topo.isReady()).thenReturn(true);
+
+ try (MockedStatic mocked = mockStatic(InternalTopicManager.class)) {
+ mocked.when(() -> InternalTopicManager.configureTopics(any(), eq(topology), eq(partitionMetadata))).thenReturn(topo);
+ streamsGroup.setPartitionMetadata(partitionMetadata);
+ mocked.verify(() -> InternalTopicManager.configureTopics(any(), eq(topology), eq(partitionMetadata)));
+ }
+
+ Optional configuredTopology = streamsGroup.configuredTopology();
+ assertTrue(configuredTopology.isPresent(), "Configured topology should be present");
+ assertEquals(topo, configuredTopology.get());
+ assertEquals(partitionMetadata, streamsGroup.partitionMetadata());
+ assertEquals(StreamsGroupState.EMPTY, streamsGroup.state());
+
+ streamsGroup.updateMember(new StreamsGroupMember.Builder("member1")
+ .setMemberEpoch(1)
+ .build());
+
+ assertEquals(StreamsGroupState.RECONCILING, streamsGroup.state());
+ }
+
+ @Test
+ public void testComputePartitionMetadata() {
+ SnapshotRegistry snapshotRegistry = new SnapshotRegistry(LOG_CONTEXT);
+ StreamsGroup streamsGroup = new StreamsGroup(
+ LOG_CONTEXT,
+ snapshotRegistry,
+ "group-foo",
+ mock(GroupCoordinatorMetricsShard.class)
+ );
+ TopicsImage topicsImage = mock(TopicsImage.class);
+ TopicImage topicImage = mock(TopicImage.class);
+ when(topicImage.id()).thenReturn(Uuid.randomUuid());
+ when(topicImage.name()).thenReturn("topic1");
+ when(topicImage.partitions()).thenReturn(Collections.singletonMap(0, null));
+ when(topicsImage.getTopic("topic1")).thenReturn(topicImage);
+ StreamsTopology topology = mock(StreamsTopology.class);
+ when(topology.requiredTopics()).thenReturn(Collections.singleton("topic1"));
+
+ Map partitionMetadata = streamsGroup.computePartitionMetadata(topicsImage, topology);
+
+ assertEquals(1, partitionMetadata.size());
+ assertTrue(partitionMetadata.containsKey("topic1"));
+ TopicMetadata topicMetadata = partitionMetadata.get("topic1");
+ assertNotNull(topicMetadata);
+ assertEquals(topicImage.id(), topicMetadata.id());
+ assertEquals("topic1", topicMetadata.name());
+ assertEquals(1, topicMetadata.numPartitions());
+ }
+
+ @Test
+ void testCreateGroupTombstoneRecords() {
+ SnapshotRegistry snapshotRegistry = new SnapshotRegistry(LOG_CONTEXT);
+ StreamsGroup streamsGroup = new StreamsGroup(
+ LOG_CONTEXT,
+ snapshotRegistry,
+ "test-group",
+ mock(GroupCoordinatorMetricsShard.class)
+ );
+ streamsGroup.updateMember(new StreamsGroupMember.Builder("member1")
+ .setMemberEpoch(1)
+ .build());
+ List records = new ArrayList<>();
+
+ streamsGroup.createGroupTombstoneRecords(records);
+
+ assertEquals(7, records.size());
+ for (CoordinatorRecord record : records) {
+ assertNotNull(record.key());
+ assertNull(record.value());
+ }
+ final Set keys = records.stream().map(CoordinatorRecord::key).collect(Collectors.toSet());
+ assertTrue(keys.contains(new StreamsGroupMetadataKey().setGroupId("test-group")));
+ assertTrue(keys.contains(new StreamsGroupTargetAssignmentMetadataKey().setGroupId("test-group")));
+ assertTrue(keys.contains(new StreamsGroupPartitionMetadataKey().setGroupId("test-group")));
+ assertTrue(keys.contains(new StreamsGroupTopologyKey().setGroupId("test-group")));
+ assertTrue(keys.contains(new StreamsGroupMemberMetadataKey().setGroupId("test-group").setMemberId("member1")));
+ assertTrue(keys.contains(new StreamsGroupTargetAssignmentMemberKey().setGroupId("test-group").setMemberId("member1")));
+ assertTrue(keys.contains(new StreamsGroupCurrentMemberAssignmentKey().setGroupId("test-group").setMemberId("member1")));
+ }
+
+ @Test
+ public void testIsSubscribedToTopic() {
+ LogContext logContext = new LogContext();
+ SnapshotRegistry snapshotRegistry = new SnapshotRegistry(logContext);
+ GroupCoordinatorMetricsShard metricsShard = mock(GroupCoordinatorMetricsShard.class);
+ StreamsGroup streamsGroup = new StreamsGroup(logContext, snapshotRegistry, "test-group", metricsShard);
+
+ assertFalse(streamsGroup.isSubscribedToTopic("test-topic1"));
+ assertFalse(streamsGroup.isSubscribedToTopic("test-topic2"));
+ assertFalse(streamsGroup.isSubscribedToTopic("non-existent-topic"));
+
+ streamsGroup.setTopology(
+ new StreamsTopology(1,
+ Map.of("test-subtopology",
+ new StreamsGroupTopologyValue.Subtopology()
+ .setSubtopologyId("test-subtopology")
+ .setSourceTopics(List.of("test-topic1"))
+ .setRepartitionSourceTopics(List.of(new StreamsGroupTopologyValue.TopicInfo().setName("test-topic2")))
+ .setRepartitionSinkTopics(List.of("test-topic2"))
+ )
+ )
+ );
+
+ assertFalse(streamsGroup.isSubscribedToTopic("test-topic1"));
+ assertFalse(streamsGroup.isSubscribedToTopic("test-topic2"));
+ assertFalse(streamsGroup.isSubscribedToTopic("non-existent-topic"));
+
+ streamsGroup.setPartitionMetadata(
+ Map.of(
+ "test-topic1", new TopicMetadata(Uuid.randomUuid(), "test-topic1", 1),
+ "test-topic2", new TopicMetadata(Uuid.randomUuid(), "test-topic2", 1)
+ )
+ );
+
+ assertTrue(streamsGroup.isSubscribedToTopic("test-topic1"));
+ assertTrue(streamsGroup.isSubscribedToTopic("test-topic2"));
+ assertFalse(streamsGroup.isSubscribedToTopic("non-existent-topic"));
+ }
+}
\ No newline at end of file
diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TargetAssignmentBuilderTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TargetAssignmentBuilderTest.java
index 114974558b8..0380d4cf5e7 100644
--- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TargetAssignmentBuilderTest.java
+++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TargetAssignmentBuilderTest.java
@@ -45,7 +45,6 @@ import java.util.TreeMap;
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.coordinator.group.Assertions.assertUnorderedRecordsEquals;
-import static org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpersTest.mkMapOfPartitionRacks;
import static org.apache.kafka.coordinator.group.streams.StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord;
import static org.apache.kafka.coordinator.group.streams.StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord;
import static org.apache.kafka.coordinator.group.streams.TargetAssignmentBuilder.createAssignmentMemberSpec;
@@ -144,8 +143,8 @@ public class TargetAssignmentBuilderTest {
20
);
- String fooSubtopologyId = context.addSubtopologyWithSingleSourceTopic("foo", 6, Map.of());
- String barSubtopologyId = context.addSubtopologyWithSingleSourceTopic("bar", 6, Map.of());
+ String fooSubtopologyId = context.addSubtopologyWithSingleSourceTopic("foo", 6);
+ String barSubtopologyId = context.addSubtopologyWithSingleSourceTopic("bar", 6);
context.addGroupMember("member-1", mkTasksTuple(taskRole,
mkTasks(fooSubtopologyId, 1, 2, 3),
@@ -196,8 +195,8 @@ public class TargetAssignmentBuilderTest {
20
);
- String fooSubtopologyId = context.addSubtopologyWithSingleSourceTopic("foo", 6, Map.of());
- String barSubtopologyId = context.addSubtopologyWithSingleSourceTopic("bar", 6, Map.of());
+ String fooSubtopologyId = context.addSubtopologyWithSingleSourceTopic("foo", 6);
+ String barSubtopologyId = context.addSubtopologyWithSingleSourceTopic("bar", 6);
context.addGroupMember("member-1", mkTasksTuple(taskRole,
mkTasks(fooSubtopologyId, 1, 2, 3),
@@ -261,8 +260,8 @@ public class TargetAssignmentBuilderTest {
20
);
- String fooSubtopologyId = context.addSubtopologyWithSingleSourceTopic("foo", 6, Map.of());
- String barSubtopologyId = context.addSubtopologyWithSingleSourceTopic("bar", 6, Map.of());
+ String fooSubtopologyId = context.addSubtopologyWithSingleSourceTopic("foo", 6);
+ String barSubtopologyId = context.addSubtopologyWithSingleSourceTopic("bar", 6);
context.addGroupMember("member-1", mkTasksTuple(taskRole,
mkTasks(fooSubtopologyId, 1, 2, 3),
@@ -341,8 +340,8 @@ public class TargetAssignmentBuilderTest {
20
);
- String fooSubtopologyId = context.addSubtopologyWithSingleSourceTopic("foo", 6, Map.of());
- String barSubtopologyId = context.addSubtopologyWithSingleSourceTopic("bar", 6, Map.of());
+ String fooSubtopologyId = context.addSubtopologyWithSingleSourceTopic("foo", 6);
+ String barSubtopologyId = context.addSubtopologyWithSingleSourceTopic("bar", 6);
context.addGroupMember("member-1", mkTasksTuple(taskRole,
mkTasks(fooSubtopologyId, 1, 2, 3),
@@ -429,8 +428,8 @@ public class TargetAssignmentBuilderTest {
20
);
- String fooSubtopologyId = context.addSubtopologyWithSingleSourceTopic("foo", 6, mkMapOfPartitionRacks(6));
- String barSubtopologyId = context.addSubtopologyWithSingleSourceTopic("bar", 6, mkMapOfPartitionRacks(6));
+ String fooSubtopologyId = context.addSubtopologyWithSingleSourceTopic("foo", 6);
+ String barSubtopologyId = context.addSubtopologyWithSingleSourceTopic("bar", 6);
context.addGroupMember("member-1", mkTasksTuple(taskRole,
mkTasks(fooSubtopologyId, 1, 2),
@@ -509,8 +508,8 @@ public class TargetAssignmentBuilderTest {
20
);
- String fooSubtopologyId = context.addSubtopologyWithSingleSourceTopic("foo", 6, Map.of());
- String barSubtopologyId = context.addSubtopologyWithSingleSourceTopic("bar", 6, Map.of());
+ String fooSubtopologyId = context.addSubtopologyWithSingleSourceTopic("foo", 6);
+ String barSubtopologyId = context.addSubtopologyWithSingleSourceTopic("bar", 6);
context.addGroupMember("member-1", mkTasksTuple(taskRole,
mkTasks(fooSubtopologyId, 1, 2),
@@ -581,8 +580,8 @@ public class TargetAssignmentBuilderTest {
20
);
- String fooSubtopologyId = context.addSubtopologyWithSingleSourceTopic("foo", 6, Map.of());
- String barSubtopologyId = context.addSubtopologyWithSingleSourceTopic("bar", 6, Map.of());
+ String fooSubtopologyId = context.addSubtopologyWithSingleSourceTopic("foo", 6);
+ String barSubtopologyId = context.addSubtopologyWithSingleSourceTopic("bar", 6);
context.addGroupMember("member-1", "instance-member-1", mkTasksTuple(taskRole,
mkTasks(fooSubtopologyId, 1, 2),
@@ -709,16 +708,14 @@ public class TargetAssignmentBuilderTest {
public String addSubtopologyWithSingleSourceTopic(
String topicName,
- int numTasks,
- Map> partitionRacks
+ int numTasks
) {
String subtopologyId = Uuid.randomUuid().toString();
Uuid topicId = Uuid.randomUuid();
subscriptionMetadata.put(topicName, new org.apache.kafka.coordinator.group.streams.TopicMetadata(
topicId,
topicName,
- numTasks,
- partitionRacks
+ numTasks
));
topicsImageBuilder = topicsImageBuilder.addTopic(topicId, topicName, numTasks);
subtopologies.put(subtopologyId, new ConfiguredSubtopology(Set.of(topicId.toString()), Map.of(), Set.of(), Map.of()));
diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TopicMetadataTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TopicMetadataTest.java
index 38d63dab6ef..59712d5c954 100644
--- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TopicMetadataTest.java
+++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TopicMetadataTest.java
@@ -21,11 +21,6 @@ import org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadat
import org.junit.jupiter.api.Test;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -35,84 +30,60 @@ public class TopicMetadataTest {
@Test
public void testConstructor() {
assertDoesNotThrow(() ->
- new TopicMetadata(Uuid.randomUuid(), "valid-topic", 3, new HashMap<>()));
+ new TopicMetadata(Uuid.randomUuid(), "valid-topic", 3));
}
@Test
public void testConstructorWithZeroUuid() {
Exception exception = assertThrows(IllegalArgumentException.class, () ->
- new TopicMetadata(Uuid.ZERO_UUID, "valid-topic", 3, new HashMap<>()));
+ new TopicMetadata(Uuid.ZERO_UUID, "valid-topic", 3));
assertEquals("Topic id cannot be ZERO_UUID.", exception.getMessage());
}
@Test
public void testConstructorWithNullUuid() {
assertThrows(NullPointerException.class, () ->
- new TopicMetadata(null, "valid-topic", 3, new HashMap<>()));
+ new TopicMetadata(null, "valid-topic", 3));
}
@Test
public void testConstructorWithNullName() {
assertThrows(NullPointerException.class, () ->
- new TopicMetadata(Uuid.randomUuid(), null, 3, new HashMap<>()));
+ new TopicMetadata(Uuid.randomUuid(), null, 3));
}
@Test
public void testConstructorWithEmptyName() {
Exception exception = assertThrows(IllegalArgumentException.class, () ->
- new TopicMetadata(Uuid.randomUuid(), "", 3, new HashMap<>()));
+ new TopicMetadata(Uuid.randomUuid(), "", 3));
assertEquals("Topic name cannot be empty.", exception.getMessage());
}
@Test
public void testConstructorWithZeroNumPartitions() {
Exception exception = assertThrows(IllegalArgumentException.class, () ->
- new TopicMetadata(Uuid.randomUuid(), "valid-topic", 0, new HashMap<>()));
+ new TopicMetadata(Uuid.randomUuid(), "valid-topic", 0));
assertEquals("Number of partitions must be positive.", exception.getMessage());
}
@Test
public void testConstructorWithNegativeNumPartitions() {
Exception exception = assertThrows(IllegalArgumentException.class, () ->
- new TopicMetadata(Uuid.randomUuid(), "valid-topic", -1, new HashMap<>()));
+ new TopicMetadata(Uuid.randomUuid(), "valid-topic", -1));
assertEquals("Number of partitions must be positive.", exception.getMessage());
}
- @Test
- public void testConstructorWithNullPartitionRacks() {
- assertThrows(NullPointerException.class, () ->
- new TopicMetadata(Uuid.randomUuid(), "valid-topic", 3, null));
- }
-
@Test
public void testFromRecord() {
StreamsGroupPartitionMetadataValue.TopicMetadata record = new StreamsGroupPartitionMetadataValue.TopicMetadata()
.setTopicId(Uuid.randomUuid())
.setTopicName("test-topic")
- .setNumPartitions(3)
- .setPartitionMetadata(List.of(
- new StreamsGroupPartitionMetadataValue.PartitionMetadata()
- .setPartition(0)
- .setRacks(List.of("rack1", "rack2")),
- new StreamsGroupPartitionMetadataValue.PartitionMetadata()
- .setPartition(1)
- .setRacks(List.of("rack3")),
- new StreamsGroupPartitionMetadataValue.PartitionMetadata()
- .setPartition(2)
- .setRacks(List.of("rack4", "rack5"))
- ));
+ .setNumPartitions(3);
TopicMetadata topicMetadata = TopicMetadata.fromRecord(record);
assertEquals(record.topicId(), topicMetadata.id());
assertEquals(record.topicName(), topicMetadata.name());
assertEquals(record.numPartitions(), topicMetadata.numPartitions());
-
- Map> expectedPartitionRacks = new HashMap<>();
- expectedPartitionRacks.put(0, Set.of("rack1", "rack2"));
- expectedPartitionRacks.put(1, Set.of("rack3"));
- expectedPartitionRacks.put(2, Set.of("rack4", "rack5"));
-
- assertEquals(expectedPartitionRacks, topicMetadata.partitionRacks());
}
}
\ No newline at end of file
diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/InternalTopicManagerTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/InternalTopicManagerTest.java
index 4d7fdc08d4c..6ec600a0c38 100644
--- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/InternalTopicManagerTest.java
+++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/InternalTopicManagerTest.java
@@ -55,7 +55,7 @@ class InternalTopicManagerTest {
@Test
void testConfigureTopicsSetsConfigurationExceptionWhenSourceTopicIsMissing() {
Map topicMetadata = new HashMap<>();
- topicMetadata.put(SOURCE_TOPIC_1, new TopicMetadata(Uuid.randomUuid(), SOURCE_TOPIC_1, 2, Collections.emptyMap()));
+ topicMetadata.put(SOURCE_TOPIC_1, new TopicMetadata(Uuid.randomUuid(), SOURCE_TOPIC_1, 2));
// SOURCE_TOPIC_2 is missing from topicMetadata
StreamsTopology topology = makeTestTopology();
@@ -70,10 +70,10 @@ class InternalTopicManagerTest {
@Test
void testConfigureTopics() {
Map topicMetadata = new HashMap<>();
- topicMetadata.put(SOURCE_TOPIC_1, new TopicMetadata(Uuid.randomUuid(), SOURCE_TOPIC_1, 2, Collections.emptyMap()));
- topicMetadata.put(SOURCE_TOPIC_2, new TopicMetadata(Uuid.randomUuid(), SOURCE_TOPIC_2, 2, Collections.emptyMap()));
+ topicMetadata.put(SOURCE_TOPIC_1, new TopicMetadata(Uuid.randomUuid(), SOURCE_TOPIC_1, 2));
+ topicMetadata.put(SOURCE_TOPIC_2, new TopicMetadata(Uuid.randomUuid(), SOURCE_TOPIC_2, 2));
topicMetadata.put(STATE_CHANGELOG_TOPIC_2,
- new TopicMetadata(Uuid.randomUuid(), STATE_CHANGELOG_TOPIC_2, 2, Collections.emptyMap()));
+ new TopicMetadata(Uuid.randomUuid(), STATE_CHANGELOG_TOPIC_2, 2));
StreamsTopology topology = makeTestTopology();
ConfiguredTopology configuredTopology = InternalTopicManager.configureTopics(new LogContext(), topology, topicMetadata);