KAFKA-18323: Add StreamsGroup class (#18729)

Implements a memory model for representing streams groups in the group coordinator, as well as group count and rebalance metrics.

Reviewers: Bill Bejeck <bill@confluent.io>, Bruno Cadonna <bruno@confluent.io>
This commit is contained in:
Lucas Brutschy 2025-02-12 11:01:53 +01:00 committed by GitHub
parent 63fc9b3cb8
commit c70b7c4b9e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
14 changed files with 2472 additions and 118 deletions

View File

@ -335,7 +335,7 @@
<suppress checks="ParameterNumber" <suppress checks="ParameterNumber"
files="(ConsumerGroupMember|GroupMetadataManager|GroupCoordinatorConfig).java"/> files="(ConsumerGroupMember|GroupMetadataManager|GroupCoordinatorConfig).java"/>
<suppress checks="ClassDataAbstractionCouplingCheck" <suppress checks="ClassDataAbstractionCouplingCheck"
files="(RecordHelpersTest|GroupCoordinatorRecordHelpers|GroupMetadataManager|GroupMetadataManagerTest|OffsetMetadataManagerTest|GroupCoordinatorServiceTest|GroupCoordinatorShardTest|GroupCoordinatorRecordSerde).java"/> files="(RecordHelpersTest|GroupCoordinatorRecordHelpers|GroupMetadataManager|GroupMetadataManagerTest|OffsetMetadataManagerTest|GroupCoordinatorServiceTest|GroupCoordinatorShardTest|GroupCoordinatorRecordSerde|StreamsGroupTest).java"/>
<suppress checks="JavaNCSS" <suppress checks="JavaNCSS"
files="(GroupMetadataManager|GroupMetadataManagerTest).java"/> files="(GroupMetadataManager|GroupMetadataManagerTest).java"/>

View File

@ -37,6 +37,7 @@ public interface Group {
CONSUMER("consumer"), CONSUMER("consumer"),
CLASSIC("classic"), CLASSIC("classic"),
SHARE("share"), SHARE("share"),
STREAMS("streams"),
UNKNOWN("unknown"); UNKNOWN("unknown");
private final String name; private final String name;

View File

@ -29,6 +29,7 @@ import org.apache.kafka.coordinator.group.Group;
import org.apache.kafka.coordinator.group.classic.ClassicGroupState; import org.apache.kafka.coordinator.group.classic.ClassicGroupState;
import org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroup.ConsumerGroupState; import org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroup.ConsumerGroupState;
import org.apache.kafka.coordinator.group.modern.share.ShareGroup; import org.apache.kafka.coordinator.group.modern.share.ShareGroup;
import org.apache.kafka.coordinator.group.streams.StreamsGroup.StreamsGroupState;
import org.apache.kafka.server.metrics.KafkaYammerMetrics; import org.apache.kafka.server.metrics.KafkaYammerMetrics;
import org.apache.kafka.timeline.SnapshotRegistry; import org.apache.kafka.timeline.SnapshotRegistry;
@ -73,6 +74,8 @@ public class GroupCoordinatorMetrics extends CoordinatorMetrics implements AutoC
public static final String SHARE_GROUP_COUNT_METRIC_NAME = "group-count"; public static final String SHARE_GROUP_COUNT_METRIC_NAME = "group-count";
public static final String CONSUMER_GROUP_COUNT_STATE_TAG = "state"; public static final String CONSUMER_GROUP_COUNT_STATE_TAG = "state";
public static final String SHARE_GROUP_COUNT_STATE_TAG = CONSUMER_GROUP_COUNT_STATE_TAG; public static final String SHARE_GROUP_COUNT_STATE_TAG = CONSUMER_GROUP_COUNT_STATE_TAG;
public static final String STREAMS_GROUP_COUNT_METRIC_NAME = "streams-group-count";
public static final String STREAMS_GROUP_COUNT_STATE_TAG = "state";
public static final String OFFSET_COMMITS_SENSOR_NAME = "OffsetCommits"; public static final String OFFSET_COMMITS_SENSOR_NAME = "OffsetCommits";
public static final String OFFSET_EXPIRED_SENSOR_NAME = "OffsetExpired"; public static final String OFFSET_EXPIRED_SENSOR_NAME = "OffsetExpired";
@ -80,6 +83,7 @@ public class GroupCoordinatorMetrics extends CoordinatorMetrics implements AutoC
public static final String CLASSIC_GROUP_COMPLETED_REBALANCES_SENSOR_NAME = "CompletedRebalances"; public static final String CLASSIC_GROUP_COMPLETED_REBALANCES_SENSOR_NAME = "CompletedRebalances";
public static final String CONSUMER_GROUP_REBALANCES_SENSOR_NAME = "ConsumerGroupRebalances"; public static final String CONSUMER_GROUP_REBALANCES_SENSOR_NAME = "ConsumerGroupRebalances";
public static final String SHARE_GROUP_REBALANCES_SENSOR_NAME = "ShareGroupRebalances"; public static final String SHARE_GROUP_REBALANCES_SENSOR_NAME = "ShareGroupRebalances";
public static final String STREAMS_GROUP_REBALANCES_SENSOR_NAME = "StreamsGroupRebalances";
private final MetricName classicGroupCountMetricName; private final MetricName classicGroupCountMetricName;
private final MetricName consumerGroupCountMetricName; private final MetricName consumerGroupCountMetricName;
@ -92,6 +96,13 @@ public class GroupCoordinatorMetrics extends CoordinatorMetrics implements AutoC
private final MetricName shareGroupCountEmptyMetricName; private final MetricName shareGroupCountEmptyMetricName;
private final MetricName shareGroupCountStableMetricName; private final MetricName shareGroupCountStableMetricName;
private final MetricName shareGroupCountDeadMetricName; private final MetricName shareGroupCountDeadMetricName;
private final MetricName streamsGroupCountMetricName;
private final MetricName streamsGroupCountEmptyMetricName;
private final MetricName streamsGroupCountAssigningMetricName;
private final MetricName streamsGroupCountReconcilingMetricName;
private final MetricName streamsGroupCountStableMetricName;
private final MetricName streamsGroupCountDeadMetricName;
private final MetricName streamsGroupCountNotReadyMetricName;
private final MetricsRegistry registry; private final MetricsRegistry registry;
private final Metrics metrics; private final Metrics metrics;
@ -106,6 +117,7 @@ public class GroupCoordinatorMetrics extends CoordinatorMetrics implements AutoC
this(KafkaYammerMetrics.defaultRegistry(), new Metrics()); this(KafkaYammerMetrics.defaultRegistry(), new Metrics());
} }
@SuppressWarnings("MethodLength")
public GroupCoordinatorMetrics(MetricsRegistry registry, Metrics metrics) { public GroupCoordinatorMetrics(MetricsRegistry registry, Metrics metrics) {
this.registry = Objects.requireNonNull(registry); this.registry = Objects.requireNonNull(registry);
this.metrics = Objects.requireNonNull(metrics); this.metrics = Objects.requireNonNull(metrics);
@ -190,6 +202,55 @@ public class GroupCoordinatorMetrics extends CoordinatorMetrics implements AutoC
SHARE_GROUP_COUNT_STATE_TAG, ShareGroup.ShareGroupState.DEAD.toString() SHARE_GROUP_COUNT_STATE_TAG, ShareGroup.ShareGroupState.DEAD.toString()
); );
streamsGroupCountMetricName = metrics.metricName(
GROUP_COUNT_METRIC_NAME,
METRICS_GROUP,
"The total number of groups using the streams rebalance protocol.",
Collections.singletonMap(GROUP_COUNT_PROTOCOL_TAG, Group.GroupType.STREAMS.toString())
);
streamsGroupCountEmptyMetricName = metrics.metricName(
STREAMS_GROUP_COUNT_METRIC_NAME,
METRICS_GROUP,
"The number of streams groups in empty state.",
Collections.singletonMap(STREAMS_GROUP_COUNT_STATE_TAG, StreamsGroupState.EMPTY.toString())
);
streamsGroupCountAssigningMetricName = metrics.metricName(
STREAMS_GROUP_COUNT_METRIC_NAME,
METRICS_GROUP,
"The number of streams groups in assigning state.",
Collections.singletonMap(STREAMS_GROUP_COUNT_STATE_TAG, StreamsGroupState.ASSIGNING.toString())
);
streamsGroupCountReconcilingMetricName = metrics.metricName(
STREAMS_GROUP_COUNT_METRIC_NAME,
METRICS_GROUP,
"The number of streams groups in reconciling state.",
Collections.singletonMap(STREAMS_GROUP_COUNT_STATE_TAG, StreamsGroupState.RECONCILING.toString())
);
streamsGroupCountStableMetricName = metrics.metricName(
STREAMS_GROUP_COUNT_METRIC_NAME,
METRICS_GROUP,
"The number of streams groups in stable state.",
Collections.singletonMap(STREAMS_GROUP_COUNT_STATE_TAG, StreamsGroupState.STABLE.toString())
);
streamsGroupCountDeadMetricName = metrics.metricName(
STREAMS_GROUP_COUNT_METRIC_NAME,
METRICS_GROUP,
"The number of streams groups in dead state.",
Collections.singletonMap(STREAMS_GROUP_COUNT_STATE_TAG, StreamsGroupState.DEAD.toString())
);
streamsGroupCountNotReadyMetricName = metrics.metricName(
STREAMS_GROUP_COUNT_METRIC_NAME,
METRICS_GROUP,
"The number of streams groups in not ready state.",
Collections.singletonMap(STREAMS_GROUP_COUNT_STATE_TAG, StreamsGroupState.NOT_READY.toString())
);
registerGauges(); registerGauges();
Sensor offsetCommitsSensor = metrics.sensor(OFFSET_COMMITS_SENSOR_NAME); Sensor offsetCommitsSensor = metrics.sensor(OFFSET_COMMITS_SENSOR_NAME);
@ -248,13 +309,23 @@ public class GroupCoordinatorMetrics extends CoordinatorMetrics implements AutoC
"The total number of share group rebalances", "The total number of share group rebalances",
SHARE_GROUP_PROTOCOL_TAG, Group.GroupType.SHARE.toString()))); SHARE_GROUP_PROTOCOL_TAG, Group.GroupType.SHARE.toString())));
Sensor streamsGroupRebalanceSensor = metrics.sensor(STREAMS_GROUP_REBALANCES_SENSOR_NAME);
streamsGroupRebalanceSensor.add(new Meter(
metrics.metricName("streams-group-rebalance-rate",
METRICS_GROUP,
"The rate of streams group rebalances"),
metrics.metricName("streams-group-rebalance-count",
METRICS_GROUP,
"The total number of streams group rebalances")));
globalSensors = Collections.unmodifiableMap(Utils.mkMap( globalSensors = Collections.unmodifiableMap(Utils.mkMap(
Utils.mkEntry(OFFSET_COMMITS_SENSOR_NAME, offsetCommitsSensor), Utils.mkEntry(OFFSET_COMMITS_SENSOR_NAME, offsetCommitsSensor),
Utils.mkEntry(OFFSET_EXPIRED_SENSOR_NAME, offsetExpiredSensor), Utils.mkEntry(OFFSET_EXPIRED_SENSOR_NAME, offsetExpiredSensor),
Utils.mkEntry(OFFSET_DELETIONS_SENSOR_NAME, offsetDeletionsSensor), Utils.mkEntry(OFFSET_DELETIONS_SENSOR_NAME, offsetDeletionsSensor),
Utils.mkEntry(CLASSIC_GROUP_COMPLETED_REBALANCES_SENSOR_NAME, classicGroupCompletedRebalancesSensor), Utils.mkEntry(CLASSIC_GROUP_COMPLETED_REBALANCES_SENSOR_NAME, classicGroupCompletedRebalancesSensor),
Utils.mkEntry(CONSUMER_GROUP_REBALANCES_SENSOR_NAME, consumerGroupRebalanceSensor), Utils.mkEntry(CONSUMER_GROUP_REBALANCES_SENSOR_NAME, consumerGroupRebalanceSensor),
Utils.mkEntry(SHARE_GROUP_REBALANCES_SENSOR_NAME, shareGroupRebalanceSensor) Utils.mkEntry(SHARE_GROUP_REBALANCES_SENSOR_NAME, shareGroupRebalanceSensor),
Utils.mkEntry(STREAMS_GROUP_REBALANCES_SENSOR_NAME, streamsGroupRebalanceSensor)
)); ));
} }
@ -278,6 +349,14 @@ public class GroupCoordinatorMetrics extends CoordinatorMetrics implements AutoC
return shards.values().stream().mapToLong(shard -> shard.numConsumerGroups(state)).sum(); return shards.values().stream().mapToLong(shard -> shard.numConsumerGroups(state)).sum();
} }
private long numStreamsGroups() {
return shards.values().stream().mapToLong(GroupCoordinatorMetricsShard::numStreamsGroups).sum();
}
private long numStreamsGroups(StreamsGroupState state) {
return shards.values().stream().mapToLong(shard -> shard.numStreamsGroups(state)).sum();
}
private long numShareGroups() { private long numShareGroups() {
return shards.values().stream().mapToLong(GroupCoordinatorMetricsShard::numShareGroups).sum(); return shards.values().stream().mapToLong(GroupCoordinatorMetricsShard::numShareGroups).sum();
} }
@ -309,7 +388,14 @@ public class GroupCoordinatorMetrics extends CoordinatorMetrics implements AutoC
shareGroupCountMetricName, shareGroupCountMetricName,
shareGroupCountEmptyMetricName, shareGroupCountEmptyMetricName,
shareGroupCountStableMetricName, shareGroupCountStableMetricName,
shareGroupCountDeadMetricName shareGroupCountDeadMetricName,
streamsGroupCountMetricName,
streamsGroupCountEmptyMetricName,
streamsGroupCountAssigningMetricName,
streamsGroupCountReconcilingMetricName,
streamsGroupCountStableMetricName,
streamsGroupCountDeadMetricName,
streamsGroupCountNotReadyMetricName
).forEach(metrics::removeMetric); ).forEach(metrics::removeMetric);
Arrays.asList( Arrays.asList(
@ -318,7 +404,8 @@ public class GroupCoordinatorMetrics extends CoordinatorMetrics implements AutoC
OFFSET_DELETIONS_SENSOR_NAME, OFFSET_DELETIONS_SENSOR_NAME,
CLASSIC_GROUP_COMPLETED_REBALANCES_SENSOR_NAME, CLASSIC_GROUP_COMPLETED_REBALANCES_SENSOR_NAME,
CONSUMER_GROUP_REBALANCES_SENSOR_NAME, CONSUMER_GROUP_REBALANCES_SENSOR_NAME,
SHARE_GROUP_REBALANCES_SENSOR_NAME SHARE_GROUP_REBALANCES_SENSOR_NAME,
STREAMS_GROUP_REBALANCES_SENSOR_NAME
).forEach(metrics::removeSensor); ).forEach(metrics::removeSensor);
} }
@ -461,5 +548,40 @@ public class GroupCoordinatorMetrics extends CoordinatorMetrics implements AutoC
shareGroupCountDeadMetricName, shareGroupCountDeadMetricName,
(Gauge<Long>) (config, now) -> numShareGroups(ShareGroup.ShareGroupState.DEAD) (Gauge<Long>) (config, now) -> numShareGroups(ShareGroup.ShareGroupState.DEAD)
); );
metrics.addMetric(
streamsGroupCountMetricName,
(Gauge<Long>) (config, now) -> numStreamsGroups()
);
metrics.addMetric(
streamsGroupCountEmptyMetricName,
(Gauge<Long>) (config, now) -> numStreamsGroups(StreamsGroupState.EMPTY)
);
metrics.addMetric(
streamsGroupCountAssigningMetricName,
(Gauge<Long>) (config, now) -> numStreamsGroups(StreamsGroupState.ASSIGNING)
);
metrics.addMetric(
streamsGroupCountReconcilingMetricName,
(Gauge<Long>) (config, now) -> numStreamsGroups(StreamsGroupState.RECONCILING)
);
metrics.addMetric(
streamsGroupCountStableMetricName,
(Gauge<Long>) (config, now) -> numStreamsGroups(StreamsGroupState.STABLE)
);
metrics.addMetric(
streamsGroupCountDeadMetricName,
(Gauge<Long>) (config, now) -> numStreamsGroups(StreamsGroupState.DEAD)
);
metrics.addMetric(
streamsGroupCountNotReadyMetricName,
(Gauge<Long>) (config, now) -> numStreamsGroups(StreamsGroupState.NOT_READY)
);
} }
} }

View File

@ -23,6 +23,7 @@ import org.apache.kafka.coordinator.common.runtime.CoordinatorMetricsShard;
import org.apache.kafka.coordinator.group.classic.ClassicGroupState; import org.apache.kafka.coordinator.group.classic.ClassicGroupState;
import org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroup.ConsumerGroupState; import org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroup.ConsumerGroupState;
import org.apache.kafka.coordinator.group.modern.share.ShareGroup; import org.apache.kafka.coordinator.group.modern.share.ShareGroup;
import org.apache.kafka.coordinator.group.streams.StreamsGroup.StreamsGroupState;
import org.apache.kafka.timeline.SnapshotRegistry; import org.apache.kafka.timeline.SnapshotRegistry;
import org.apache.kafka.timeline.TimelineLong; import org.apache.kafka.timeline.TimelineLong;
@ -78,6 +79,11 @@ public class GroupCoordinatorMetricsShard implements CoordinatorMetricsShard {
*/ */
private final Map<ShareGroup.ShareGroupState, TimelineGaugeCounter> shareGroupGauges; private final Map<ShareGroup.ShareGroupState, TimelineGaugeCounter> shareGroupGauges;
/**
* Streams group size gauge counters keyed by the metric name.
*/
private final Map<StreamsGroupState, TimelineGaugeCounter> streamsGroupGauges;
/** /**
* All sensors keyed by the sensor name. A Sensor object is shared across all metrics shards. * All sensors keyed by the sensor name. A Sensor object is shared across all metrics shards.
*/ */
@ -119,6 +125,21 @@ public class GroupCoordinatorMetricsShard implements CoordinatorMetricsShard {
new TimelineGaugeCounter(new TimelineLong(snapshotRegistry), new AtomicLong(0))) new TimelineGaugeCounter(new TimelineLong(snapshotRegistry), new AtomicLong(0)))
); );
this.streamsGroupGauges = Utils.mkMap(
Utils.mkEntry(StreamsGroupState.EMPTY,
new TimelineGaugeCounter(new TimelineLong(snapshotRegistry), new AtomicLong(0))),
Utils.mkEntry(StreamsGroupState.ASSIGNING,
new TimelineGaugeCounter(new TimelineLong(snapshotRegistry), new AtomicLong(0))),
Utils.mkEntry(StreamsGroupState.RECONCILING,
new TimelineGaugeCounter(new TimelineLong(snapshotRegistry), new AtomicLong(0))),
Utils.mkEntry(StreamsGroupState.STABLE,
new TimelineGaugeCounter(new TimelineLong(snapshotRegistry), new AtomicLong(0))),
Utils.mkEntry(StreamsGroupState.DEAD,
new TimelineGaugeCounter(new TimelineLong(snapshotRegistry), new AtomicLong(0))),
Utils.mkEntry(StreamsGroupState.NOT_READY,
new TimelineGaugeCounter(new TimelineLong(snapshotRegistry), new AtomicLong(0)))
);
this.globalSensors = Objects.requireNonNull(globalSensors); this.globalSensors = Objects.requireNonNull(globalSensors);
this.topicPartition = Objects.requireNonNull(topicPartition); this.topicPartition = Objects.requireNonNull(topicPartition);
} }
@ -144,6 +165,20 @@ public class GroupCoordinatorMetricsShard implements CoordinatorMetricsShard {
this.consumerGroupGauges = consumerGroupGauges; this.consumerGroupGauges = consumerGroupGauges;
} }
/**
* Increment the number of streams groups.
*
* @param state the streams group state.
*/
public void incrementNumStreamsGroups(StreamsGroupState state) {
TimelineGaugeCounter gaugeCounter = streamsGroupGauges.get(state);
if (gaugeCounter != null) {
synchronized (gaugeCounter.timelineLong) {
gaugeCounter.timelineLong.increment();
}
}
}
/** /**
* Decrement the number of offsets. * Decrement the number of offsets.
*/ */
@ -153,6 +188,20 @@ public class GroupCoordinatorMetricsShard implements CoordinatorMetricsShard {
} }
} }
/**
* Decrement the number of streams groups.
*
* @param state the streams group state.
*/
public void decrementNumStreamsGroups(StreamsGroupState state) {
TimelineGaugeCounter gaugeCounter = streamsGroupGauges.get(state);
if (gaugeCounter != null) {
synchronized (gaugeCounter.timelineLong) {
gaugeCounter.timelineLong.decrement();
}
}
}
/** /**
* @return The number of offsets. * @return The number of offsets.
*/ */
@ -206,6 +255,29 @@ public class GroupCoordinatorMetricsShard implements CoordinatorMetricsShard {
.mapToLong(Long::longValue).sum(); .mapToLong(Long::longValue).sum();
} }
/**
* Obtain the number of streams groups in the specified state.
*
* @param state the streams group state.
*
* @return The number of streams groups in `state`.
*/
public long numStreamsGroups(StreamsGroupState state) {
TimelineGaugeCounter gaugeCounter = streamsGroupGauges.get(state);
if (gaugeCounter != null) {
return gaugeCounter.atomicLong.get();
}
return 0L;
}
/**
* @return The total number of streams groups.
*/
public long numStreamsGroups() {
return streamsGroupGauges.values().stream()
.mapToLong(timelineGaugeCounter -> timelineGaugeCounter.atomicLong.get()).sum();
}
@Override @Override
public void record(String sensorName) { public void record(String sensorName) {
Sensor sensor = globalSensors.get(sensorName); Sensor sensor = globalSensors.get(sensorName);
@ -246,6 +318,14 @@ public class GroupCoordinatorMetricsShard implements CoordinatorMetricsShard {
} }
gaugeCounter.atomicLong.set(value); gaugeCounter.atomicLong.set(value);
}); });
this.streamsGroupGauges.forEach((__, gaugeCounter) -> {
long value;
synchronized (gaugeCounter.timelineLong) {
value = gaugeCounter.timelineLong.get(offset);
}
gaugeCounter.atomicLong.set(value);
});
} }
/** /**
@ -330,4 +410,66 @@ public class GroupCoordinatorMetricsShard implements CoordinatorMetricsShard {
} }
} }
} }
/**
* Called when a streams group's state has changed. Increment/decrement
* the counter accordingly.
*
* @param oldState The previous state. null value means that it's a new group.
* @param newState The next state. null value means that the group has been removed.
*/
public void onStreamsGroupStateTransition(
StreamsGroupState oldState,
StreamsGroupState newState
) {
if (newState != null) {
switch (newState) {
case EMPTY:
incrementNumStreamsGroups(StreamsGroupState.EMPTY);
break;
case NOT_READY:
incrementNumStreamsGroups(StreamsGroupState.NOT_READY);
break;
case ASSIGNING:
incrementNumStreamsGroups(StreamsGroupState.ASSIGNING);
break;
case RECONCILING:
incrementNumStreamsGroups(StreamsGroupState.RECONCILING);
break;
case STABLE:
incrementNumStreamsGroups(StreamsGroupState.STABLE);
break;
case DEAD:
incrementNumStreamsGroups(StreamsGroupState.DEAD);
break;
default:
throw new IllegalArgumentException("Unknown new state for streams group: " + newState);
}
}
if (oldState != null) {
switch (oldState) {
case EMPTY:
decrementNumStreamsGroups(StreamsGroupState.EMPTY);
break;
case NOT_READY:
decrementNumStreamsGroups(StreamsGroupState.NOT_READY);
break;
case ASSIGNING:
decrementNumStreamsGroups(StreamsGroupState.ASSIGNING);
break;
case RECONCILING:
decrementNumStreamsGroups(StreamsGroupState.RECONCILING);
break;
case STABLE:
decrementNumStreamsGroups(StreamsGroupState.STABLE);
break;
case DEAD:
decrementNumStreamsGroups(StreamsGroupState.DEAD);
break;
default:
throw new IllegalArgumentException("Unknown old state for streams group: " + newState);
}
}
}
} }

View File

@ -26,7 +26,6 @@ import org.apache.kafka.coordinator.group.generated.StreamsGroupMetadataKey;
import org.apache.kafka.coordinator.group.generated.StreamsGroupMetadataValue; import org.apache.kafka.coordinator.group.generated.StreamsGroupMetadataValue;
import org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataKey; import org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataKey;
import org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataValue; import org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataValue;
import org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataValue.PartitionMetadata;
import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberKey; import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberKey;
import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberValue; import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberValue;
import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMetadataKey; import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMetadataKey;
@ -116,21 +115,10 @@ public class StreamsCoordinatorRecordHelpers {
StreamsGroupPartitionMetadataValue value = new StreamsGroupPartitionMetadataValue(); StreamsGroupPartitionMetadataValue value = new StreamsGroupPartitionMetadataValue();
newPartitionMetadata.forEach((topicName, topicMetadata) -> { newPartitionMetadata.forEach((topicName, topicMetadata) -> {
List<StreamsGroupPartitionMetadataValue.PartitionMetadata> partitionMetadata = new ArrayList<>();
if (!topicMetadata.partitionRacks().isEmpty()) {
topicMetadata.partitionRacks().forEach((partition, racks) ->
partitionMetadata.add(new StreamsGroupPartitionMetadataValue.PartitionMetadata()
.setPartition(partition)
.setRacks(racks.stream().sorted().toList())
)
);
}
partitionMetadata.sort(Comparator.comparingInt(PartitionMetadata::partition));
value.topics().add(new StreamsGroupPartitionMetadataValue.TopicMetadata() value.topics().add(new StreamsGroupPartitionMetadataValue.TopicMetadata()
.setTopicId(topicMetadata.id()) .setTopicId(topicMetadata.id())
.setTopicName(topicMetadata.name()) .setTopicName(topicMetadata.name())
.setNumPartitions(topicMetadata.numPartitions()) .setNumPartitions(topicMetadata.numPartitions())
.setPartitionMetadata(partitionMetadata)
); );
}); });

View File

@ -19,10 +19,7 @@ package org.apache.kafka.coordinator.group.streams;
import org.apache.kafka.common.Uuid; import org.apache.kafka.common.Uuid;
import org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataValue; import org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataValue;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects; import java.util.Objects;
import java.util.Set;
/** /**
* Immutable topic metadata, representing the current state of a topic in the broker. * Immutable topic metadata, representing the current state of a topic in the broker.
@ -30,15 +27,12 @@ import java.util.Set;
* @param id The topic ID. * @param id The topic ID.
* @param name The topic name. * @param name The topic name.
* @param numPartitions The number of partitions. * @param numPartitions The number of partitions.
* @param partitionRacks Map of every partition ID to a set of its rack IDs, if they exist. If rack information is unavailable for all
* partitions, this is an empty map.
*/ */
public record TopicMetadata(Uuid id, String name, int numPartitions, Map<Integer, Set<String>> partitionRacks) { public record TopicMetadata(Uuid id, String name, int numPartitions) {
public TopicMetadata(Uuid id, public TopicMetadata(Uuid id,
String name, String name,
int numPartitions, int numPartitions) {
Map<Integer, Set<String>> partitionRacks) {
this.id = Objects.requireNonNull(id); this.id = Objects.requireNonNull(id);
if (Uuid.ZERO_UUID.equals(id)) { if (Uuid.ZERO_UUID.equals(id)) {
throw new IllegalArgumentException("Topic id cannot be ZERO_UUID."); throw new IllegalArgumentException("Topic id cannot be ZERO_UUID.");
@ -51,23 +45,12 @@ public record TopicMetadata(Uuid id, String name, int numPartitions, Map<Integer
if (numPartitions <= 0) { if (numPartitions <= 0) {
throw new IllegalArgumentException("Number of partitions must be positive."); throw new IllegalArgumentException("Number of partitions must be positive.");
} }
this.partitionRacks = Objects.requireNonNull(partitionRacks);
} }
public static TopicMetadata fromRecord(StreamsGroupPartitionMetadataValue.TopicMetadata record) { public static TopicMetadata fromRecord(StreamsGroupPartitionMetadataValue.TopicMetadata record) {
// Converting the data type from a list stored in the record to a map for the topic metadata.
Map<Integer, Set<String>> partitionRacks = new HashMap<>();
for (StreamsGroupPartitionMetadataValue.PartitionMetadata partitionMetadata : record.partitionMetadata()) {
partitionRacks.put(
partitionMetadata.partition(),
Set.copyOf(partitionMetadata.racks())
);
}
return new TopicMetadata( return new TopicMetadata(
record.topicId(), record.topicId(),
record.topicName(), record.topicName(),
record.numPartitions(), record.numPartitions());
partitionRacks);
} }
} }

View File

@ -28,14 +28,7 @@
{ "name": "TopicName", "versions": "0+", "type": "string", { "name": "TopicName", "versions": "0+", "type": "string",
"about": "The topic name." }, "about": "The topic name." },
{ "name": "NumPartitions", "versions": "0+", "type": "int32", { "name": "NumPartitions", "versions": "0+", "type": "int32",
"about": "The number of partitions of the topic." }, "about": "The number of partitions of the topic." }
{ "name": "PartitionMetadata", "versions": "0+", "type": "[]PartitionMetadata",
"about": "Partitions mapped to a set of racks. If the rack information is unavailable for all the partitions, an empty list is stored.", "fields": [
{ "name": "Partition", "versions": "0+", "type": "int32",
"about": "The partition number." },
{ "name": "Racks", "versions": "0+", "type": "[]string",
"about": "The set of racks that the partition is mapped to." }
]}
]} ]}
] ]
} }

View File

@ -29,6 +29,7 @@ import org.apache.kafka.coordinator.group.Group;
import org.apache.kafka.coordinator.group.classic.ClassicGroupState; import org.apache.kafka.coordinator.group.classic.ClassicGroupState;
import org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroup.ConsumerGroupState; import org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroup.ConsumerGroupState;
import org.apache.kafka.coordinator.group.modern.share.ShareGroup; import org.apache.kafka.coordinator.group.modern.share.ShareGroup;
import org.apache.kafka.coordinator.group.streams.StreamsGroup.StreamsGroupState;
import org.apache.kafka.timeline.SnapshotRegistry; import org.apache.kafka.timeline.SnapshotRegistry;
import com.yammer.metrics.core.MetricsRegistry; import com.yammer.metrics.core.MetricsRegistry;
@ -47,6 +48,7 @@ import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics
import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.OFFSET_COMMITS_SENSOR_NAME; import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.OFFSET_COMMITS_SENSOR_NAME;
import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.OFFSET_EXPIRED_SENSOR_NAME; import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.OFFSET_EXPIRED_SENSOR_NAME;
import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.SHARE_GROUP_REBALANCES_SENSOR_NAME; import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.SHARE_GROUP_REBALANCES_SENSOR_NAME;
import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.STREAMS_GROUP_REBALANCES_SENSOR_NAME;
import static org.apache.kafka.coordinator.group.metrics.MetricsTestUtils.assertGaugeValue; import static org.apache.kafka.coordinator.group.metrics.MetricsTestUtils.assertGaugeValue;
import static org.apache.kafka.coordinator.group.metrics.MetricsTestUtils.assertMetricsForTypeEqual; import static org.apache.kafka.coordinator.group.metrics.MetricsTestUtils.assertMetricsForTypeEqual;
import static org.apache.kafka.coordinator.group.metrics.MetricsTestUtils.metricName; import static org.apache.kafka.coordinator.group.metrics.MetricsTestUtils.metricName;
@ -129,7 +131,37 @@ public class GroupCoordinatorMetricsTest {
GroupCoordinatorMetrics.METRICS_GROUP, GroupCoordinatorMetrics.METRICS_GROUP,
"The number of share groups in dead state.", "The number of share groups in dead state.",
"protocol", Group.GroupType.SHARE.toString(), "protocol", Group.GroupType.SHARE.toString(),
"state", GroupState.DEAD.toString()) "state", GroupState.DEAD.toString()),
metrics.metricName(
"group-count",
GroupCoordinatorMetrics.METRICS_GROUP,
Collections.singletonMap("protocol", Group.GroupType.STREAMS.toString())),
metrics.metricName("streams-group-rebalance-rate", GroupCoordinatorMetrics.METRICS_GROUP),
metrics.metricName("streams-group-rebalance-count", GroupCoordinatorMetrics.METRICS_GROUP),
metrics.metricName(
"streams-group-count",
GroupCoordinatorMetrics.METRICS_GROUP,
Collections.singletonMap("state", StreamsGroupState.EMPTY.toString())),
metrics.metricName(
"streams-group-count",
GroupCoordinatorMetrics.METRICS_GROUP,
Collections.singletonMap("state", StreamsGroupState.ASSIGNING.toString())),
metrics.metricName(
"streams-group-count",
GroupCoordinatorMetrics.METRICS_GROUP,
Collections.singletonMap("state", StreamsGroupState.RECONCILING.toString())),
metrics.metricName(
"streams-group-count",
GroupCoordinatorMetrics.METRICS_GROUP,
Collections.singletonMap("state", StreamsGroupState.STABLE.toString())),
metrics.metricName(
"streams-group-count",
GroupCoordinatorMetrics.METRICS_GROUP,
Collections.singletonMap("state", StreamsGroupState.DEAD.toString())),
metrics.metricName(
"streams-group-count",
GroupCoordinatorMetrics.METRICS_GROUP,
Collections.singletonMap("state", StreamsGroupState.NOT_READY.toString()))
)); ));
try { try {
@ -145,7 +177,7 @@ public class GroupCoordinatorMetricsTest {
)); ));
assertMetricsForTypeEqual(registry, "kafka.coordinator.group", expectedRegistry); assertMetricsForTypeEqual(registry, "kafka.coordinator.group", expectedRegistry);
expectedMetrics.forEach(metricName -> assertTrue(metrics.metrics().containsKey(metricName))); expectedMetrics.forEach(metricName -> assertTrue(metrics.metrics().containsKey(metricName), metricName + " is missing"));
} }
assertMetricsForTypeEqual(registry, "kafka.coordinator.group", Collections.emptySet()); assertMetricsForTypeEqual(registry, "kafka.coordinator.group", Collections.emptySet());
expectedMetrics.forEach(metricName -> assertFalse(metrics.metrics().containsKey(metricName))); expectedMetrics.forEach(metricName -> assertFalse(metrics.metrics().containsKey(metricName)));
@ -196,6 +228,10 @@ public class GroupCoordinatorMetricsTest {
IntStream.range(0, 5).forEach(__ -> shard1.incrementNumShareGroups(ShareGroup.ShareGroupState.EMPTY)); IntStream.range(0, 5).forEach(__ -> shard1.incrementNumShareGroups(ShareGroup.ShareGroupState.EMPTY));
IntStream.range(0, 3).forEach(__ -> shard1.decrementNumShareGroups(ShareGroup.ShareGroupState.DEAD)); IntStream.range(0, 3).forEach(__ -> shard1.decrementNumShareGroups(ShareGroup.ShareGroupState.DEAD));
IntStream.range(0, 5).forEach(__ -> shard0.incrementNumStreamsGroups(StreamsGroupState.STABLE));
IntStream.range(0, 5).forEach(__ -> shard1.incrementNumStreamsGroups(StreamsGroupState.EMPTY));
IntStream.range(0, 3).forEach(__ -> shard1.decrementNumStreamsGroups(StreamsGroupState.DEAD));
assertEquals(4, shard0.numClassicGroups()); assertEquals(4, shard0.numClassicGroups());
assertEquals(5, shard1.numClassicGroups()); assertEquals(5, shard1.numClassicGroups());
assertGaugeValue(registry, metricName("GroupMetadataManager", "NumGroups"), 9); assertGaugeValue(registry, metricName("GroupMetadataManager", "NumGroups"), 9);
@ -228,6 +264,14 @@ public class GroupCoordinatorMetricsTest {
metrics.metricName("group-count", METRICS_GROUP, Collections.singletonMap("protocol", "share")), metrics.metricName("group-count", METRICS_GROUP, Collections.singletonMap("protocol", "share")),
7 7
); );
assertEquals(5, shard0.numStreamsGroups());
assertEquals(2, shard1.numStreamsGroups());
assertGaugeValue(
metrics,
metrics.metricName("group-count", METRICS_GROUP, Collections.singletonMap("protocol", "streams")),
7
);
} }
@Test @Test
@ -269,6 +313,18 @@ public class GroupCoordinatorMetricsTest {
"The total number of share group rebalances", "The total number of share group rebalances",
"protocol", "share" "protocol", "share"
), 50); ), 50);
shard.record(STREAMS_GROUP_REBALANCES_SENSOR_NAME, 50);
assertMetricValue(metrics, metrics.metricName(
"streams-group-rebalance-rate",
GroupCoordinatorMetrics.METRICS_GROUP,
"The rate of streams group rebalances"
), 5.0 / 3.0);
assertMetricValue(metrics, metrics.metricName(
"streams-group-rebalance-count",
GroupCoordinatorMetrics.METRICS_GROUP,
"The total number of streams group rebalances"
), 50);
} }
private void assertMetricValue(Metrics metrics, MetricName metricName, double val) { private void assertMetricValue(Metrics metrics, MetricName metricName, double val) {

View File

@ -257,8 +257,8 @@ class StreamsCoordinatorRecordHelpersTest {
Uuid uuid1 = Uuid.randomUuid(); Uuid uuid1 = Uuid.randomUuid();
Uuid uuid2 = Uuid.randomUuid(); Uuid uuid2 = Uuid.randomUuid();
Map<String, TopicMetadata> newPartitionMetadata = Map.of( Map<String, TopicMetadata> newPartitionMetadata = Map.of(
TOPIC_1, new TopicMetadata(uuid1, TOPIC_1, 1, Map.of(0, Set.of(RACK_1, RACK_2))), TOPIC_1, new TopicMetadata(uuid1, TOPIC_1, 1),
TOPIC_2, new TopicMetadata(uuid2, TOPIC_2, 2, Map.of(1, Set.of(RACK_3))) TOPIC_2, new TopicMetadata(uuid2, TOPIC_2, 2)
); );
StreamsGroupPartitionMetadataValue value = new StreamsGroupPartitionMetadataValue(); StreamsGroupPartitionMetadataValue value = new StreamsGroupPartitionMetadataValue();
@ -266,21 +266,11 @@ class StreamsCoordinatorRecordHelpersTest {
.setTopicId(uuid1) .setTopicId(uuid1)
.setTopicName(TOPIC_1) .setTopicName(TOPIC_1)
.setNumPartitions(1) .setNumPartitions(1)
.setPartitionMetadata(List.of(
new StreamsGroupPartitionMetadataValue.PartitionMetadata()
.setPartition(0)
.setRacks(List.of(RACK_1, RACK_2))
))
); );
value.topics().add(new StreamsGroupPartitionMetadataValue.TopicMetadata() value.topics().add(new StreamsGroupPartitionMetadataValue.TopicMetadata()
.setTopicId(uuid2) .setTopicId(uuid2)
.setTopicName(TOPIC_2) .setTopicName(TOPIC_2)
.setNumPartitions(2) .setNumPartitions(2)
.setPartitionMetadata(List.of(
new StreamsGroupPartitionMetadataValue.PartitionMetadata()
.setPartition(1)
.setRacks(List.of(RACK_3))
))
); );
CoordinatorRecord expectedRecord = CoordinatorRecord.record( CoordinatorRecord expectedRecord = CoordinatorRecord.record(

View File

@ -45,7 +45,6 @@ import java.util.TreeMap;
import static org.apache.kafka.common.utils.Utils.mkEntry; import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap; import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.coordinator.group.Assertions.assertUnorderedRecordsEquals; import static org.apache.kafka.coordinator.group.Assertions.assertUnorderedRecordsEquals;
import static org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpersTest.mkMapOfPartitionRacks;
import static org.apache.kafka.coordinator.group.streams.StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord; import static org.apache.kafka.coordinator.group.streams.StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord;
import static org.apache.kafka.coordinator.group.streams.StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord; import static org.apache.kafka.coordinator.group.streams.StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord;
import static org.apache.kafka.coordinator.group.streams.TargetAssignmentBuilder.createAssignmentMemberSpec; import static org.apache.kafka.coordinator.group.streams.TargetAssignmentBuilder.createAssignmentMemberSpec;
@ -144,8 +143,8 @@ public class TargetAssignmentBuilderTest {
20 20
); );
String fooSubtopologyId = context.addSubtopologyWithSingleSourceTopic("foo", 6, Map.of()); String fooSubtopologyId = context.addSubtopologyWithSingleSourceTopic("foo", 6);
String barSubtopologyId = context.addSubtopologyWithSingleSourceTopic("bar", 6, Map.of()); String barSubtopologyId = context.addSubtopologyWithSingleSourceTopic("bar", 6);
context.addGroupMember("member-1", mkTasksTuple(taskRole, context.addGroupMember("member-1", mkTasksTuple(taskRole,
mkTasks(fooSubtopologyId, 1, 2, 3), mkTasks(fooSubtopologyId, 1, 2, 3),
@ -196,8 +195,8 @@ public class TargetAssignmentBuilderTest {
20 20
); );
String fooSubtopologyId = context.addSubtopologyWithSingleSourceTopic("foo", 6, Map.of()); String fooSubtopologyId = context.addSubtopologyWithSingleSourceTopic("foo", 6);
String barSubtopologyId = context.addSubtopologyWithSingleSourceTopic("bar", 6, Map.of()); String barSubtopologyId = context.addSubtopologyWithSingleSourceTopic("bar", 6);
context.addGroupMember("member-1", mkTasksTuple(taskRole, context.addGroupMember("member-1", mkTasksTuple(taskRole,
mkTasks(fooSubtopologyId, 1, 2, 3), mkTasks(fooSubtopologyId, 1, 2, 3),
@ -261,8 +260,8 @@ public class TargetAssignmentBuilderTest {
20 20
); );
String fooSubtopologyId = context.addSubtopologyWithSingleSourceTopic("foo", 6, Map.of()); String fooSubtopologyId = context.addSubtopologyWithSingleSourceTopic("foo", 6);
String barSubtopologyId = context.addSubtopologyWithSingleSourceTopic("bar", 6, Map.of()); String barSubtopologyId = context.addSubtopologyWithSingleSourceTopic("bar", 6);
context.addGroupMember("member-1", mkTasksTuple(taskRole, context.addGroupMember("member-1", mkTasksTuple(taskRole,
mkTasks(fooSubtopologyId, 1, 2, 3), mkTasks(fooSubtopologyId, 1, 2, 3),
@ -341,8 +340,8 @@ public class TargetAssignmentBuilderTest {
20 20
); );
String fooSubtopologyId = context.addSubtopologyWithSingleSourceTopic("foo", 6, Map.of()); String fooSubtopologyId = context.addSubtopologyWithSingleSourceTopic("foo", 6);
String barSubtopologyId = context.addSubtopologyWithSingleSourceTopic("bar", 6, Map.of()); String barSubtopologyId = context.addSubtopologyWithSingleSourceTopic("bar", 6);
context.addGroupMember("member-1", mkTasksTuple(taskRole, context.addGroupMember("member-1", mkTasksTuple(taskRole,
mkTasks(fooSubtopologyId, 1, 2, 3), mkTasks(fooSubtopologyId, 1, 2, 3),
@ -429,8 +428,8 @@ public class TargetAssignmentBuilderTest {
20 20
); );
String fooSubtopologyId = context.addSubtopologyWithSingleSourceTopic("foo", 6, mkMapOfPartitionRacks(6)); String fooSubtopologyId = context.addSubtopologyWithSingleSourceTopic("foo", 6);
String barSubtopologyId = context.addSubtopologyWithSingleSourceTopic("bar", 6, mkMapOfPartitionRacks(6)); String barSubtopologyId = context.addSubtopologyWithSingleSourceTopic("bar", 6);
context.addGroupMember("member-1", mkTasksTuple(taskRole, context.addGroupMember("member-1", mkTasksTuple(taskRole,
mkTasks(fooSubtopologyId, 1, 2), mkTasks(fooSubtopologyId, 1, 2),
@ -509,8 +508,8 @@ public class TargetAssignmentBuilderTest {
20 20
); );
String fooSubtopologyId = context.addSubtopologyWithSingleSourceTopic("foo", 6, Map.of()); String fooSubtopologyId = context.addSubtopologyWithSingleSourceTopic("foo", 6);
String barSubtopologyId = context.addSubtopologyWithSingleSourceTopic("bar", 6, Map.of()); String barSubtopologyId = context.addSubtopologyWithSingleSourceTopic("bar", 6);
context.addGroupMember("member-1", mkTasksTuple(taskRole, context.addGroupMember("member-1", mkTasksTuple(taskRole,
mkTasks(fooSubtopologyId, 1, 2), mkTasks(fooSubtopologyId, 1, 2),
@ -581,8 +580,8 @@ public class TargetAssignmentBuilderTest {
20 20
); );
String fooSubtopologyId = context.addSubtopologyWithSingleSourceTopic("foo", 6, Map.of()); String fooSubtopologyId = context.addSubtopologyWithSingleSourceTopic("foo", 6);
String barSubtopologyId = context.addSubtopologyWithSingleSourceTopic("bar", 6, Map.of()); String barSubtopologyId = context.addSubtopologyWithSingleSourceTopic("bar", 6);
context.addGroupMember("member-1", "instance-member-1", mkTasksTuple(taskRole, context.addGroupMember("member-1", "instance-member-1", mkTasksTuple(taskRole,
mkTasks(fooSubtopologyId, 1, 2), mkTasks(fooSubtopologyId, 1, 2),
@ -709,16 +708,14 @@ public class TargetAssignmentBuilderTest {
public String addSubtopologyWithSingleSourceTopic( public String addSubtopologyWithSingleSourceTopic(
String topicName, String topicName,
int numTasks, int numTasks
Map<Integer, Set<String>> partitionRacks
) { ) {
String subtopologyId = Uuid.randomUuid().toString(); String subtopologyId = Uuid.randomUuid().toString();
Uuid topicId = Uuid.randomUuid(); Uuid topicId = Uuid.randomUuid();
subscriptionMetadata.put(topicName, new org.apache.kafka.coordinator.group.streams.TopicMetadata( subscriptionMetadata.put(topicName, new org.apache.kafka.coordinator.group.streams.TopicMetadata(
topicId, topicId,
topicName, topicName,
numTasks, numTasks
partitionRacks
)); ));
topicsImageBuilder = topicsImageBuilder.addTopic(topicId, topicName, numTasks); topicsImageBuilder = topicsImageBuilder.addTopic(topicId, topicName, numTasks);
subtopologies.put(subtopologyId, new ConfiguredSubtopology(Set.of(topicId.toString()), Map.of(), Set.of(), Map.of())); subtopologies.put(subtopologyId, new ConfiguredSubtopology(Set.of(topicId.toString()), Map.of(), Set.of(), Map.of()));

View File

@ -21,11 +21,6 @@ import org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadat
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertThrows;
@ -35,84 +30,60 @@ public class TopicMetadataTest {
@Test @Test
public void testConstructor() { public void testConstructor() {
assertDoesNotThrow(() -> assertDoesNotThrow(() ->
new TopicMetadata(Uuid.randomUuid(), "valid-topic", 3, new HashMap<>())); new TopicMetadata(Uuid.randomUuid(), "valid-topic", 3));
} }
@Test @Test
public void testConstructorWithZeroUuid() { public void testConstructorWithZeroUuid() {
Exception exception = assertThrows(IllegalArgumentException.class, () -> Exception exception = assertThrows(IllegalArgumentException.class, () ->
new TopicMetadata(Uuid.ZERO_UUID, "valid-topic", 3, new HashMap<>())); new TopicMetadata(Uuid.ZERO_UUID, "valid-topic", 3));
assertEquals("Topic id cannot be ZERO_UUID.", exception.getMessage()); assertEquals("Topic id cannot be ZERO_UUID.", exception.getMessage());
} }
@Test @Test
public void testConstructorWithNullUuid() { public void testConstructorWithNullUuid() {
assertThrows(NullPointerException.class, () -> assertThrows(NullPointerException.class, () ->
new TopicMetadata(null, "valid-topic", 3, new HashMap<>())); new TopicMetadata(null, "valid-topic", 3));
} }
@Test @Test
public void testConstructorWithNullName() { public void testConstructorWithNullName() {
assertThrows(NullPointerException.class, () -> assertThrows(NullPointerException.class, () ->
new TopicMetadata(Uuid.randomUuid(), null, 3, new HashMap<>())); new TopicMetadata(Uuid.randomUuid(), null, 3));
} }
@Test @Test
public void testConstructorWithEmptyName() { public void testConstructorWithEmptyName() {
Exception exception = assertThrows(IllegalArgumentException.class, () -> Exception exception = assertThrows(IllegalArgumentException.class, () ->
new TopicMetadata(Uuid.randomUuid(), "", 3, new HashMap<>())); new TopicMetadata(Uuid.randomUuid(), "", 3));
assertEquals("Topic name cannot be empty.", exception.getMessage()); assertEquals("Topic name cannot be empty.", exception.getMessage());
} }
@Test @Test
public void testConstructorWithZeroNumPartitions() { public void testConstructorWithZeroNumPartitions() {
Exception exception = assertThrows(IllegalArgumentException.class, () -> Exception exception = assertThrows(IllegalArgumentException.class, () ->
new TopicMetadata(Uuid.randomUuid(), "valid-topic", 0, new HashMap<>())); new TopicMetadata(Uuid.randomUuid(), "valid-topic", 0));
assertEquals("Number of partitions must be positive.", exception.getMessage()); assertEquals("Number of partitions must be positive.", exception.getMessage());
} }
@Test @Test
public void testConstructorWithNegativeNumPartitions() { public void testConstructorWithNegativeNumPartitions() {
Exception exception = assertThrows(IllegalArgumentException.class, () -> Exception exception = assertThrows(IllegalArgumentException.class, () ->
new TopicMetadata(Uuid.randomUuid(), "valid-topic", -1, new HashMap<>())); new TopicMetadata(Uuid.randomUuid(), "valid-topic", -1));
assertEquals("Number of partitions must be positive.", exception.getMessage()); assertEquals("Number of partitions must be positive.", exception.getMessage());
} }
@Test
public void testConstructorWithNullPartitionRacks() {
assertThrows(NullPointerException.class, () ->
new TopicMetadata(Uuid.randomUuid(), "valid-topic", 3, null));
}
@Test @Test
public void testFromRecord() { public void testFromRecord() {
StreamsGroupPartitionMetadataValue.TopicMetadata record = new StreamsGroupPartitionMetadataValue.TopicMetadata() StreamsGroupPartitionMetadataValue.TopicMetadata record = new StreamsGroupPartitionMetadataValue.TopicMetadata()
.setTopicId(Uuid.randomUuid()) .setTopicId(Uuid.randomUuid())
.setTopicName("test-topic") .setTopicName("test-topic")
.setNumPartitions(3) .setNumPartitions(3);
.setPartitionMetadata(List.of(
new StreamsGroupPartitionMetadataValue.PartitionMetadata()
.setPartition(0)
.setRacks(List.of("rack1", "rack2")),
new StreamsGroupPartitionMetadataValue.PartitionMetadata()
.setPartition(1)
.setRacks(List.of("rack3")),
new StreamsGroupPartitionMetadataValue.PartitionMetadata()
.setPartition(2)
.setRacks(List.of("rack4", "rack5"))
));
TopicMetadata topicMetadata = TopicMetadata.fromRecord(record); TopicMetadata topicMetadata = TopicMetadata.fromRecord(record);
assertEquals(record.topicId(), topicMetadata.id()); assertEquals(record.topicId(), topicMetadata.id());
assertEquals(record.topicName(), topicMetadata.name()); assertEquals(record.topicName(), topicMetadata.name());
assertEquals(record.numPartitions(), topicMetadata.numPartitions()); assertEquals(record.numPartitions(), topicMetadata.numPartitions());
Map<Integer, Set<String>> expectedPartitionRacks = new HashMap<>();
expectedPartitionRacks.put(0, Set.of("rack1", "rack2"));
expectedPartitionRacks.put(1, Set.of("rack3"));
expectedPartitionRacks.put(2, Set.of("rack4", "rack5"));
assertEquals(expectedPartitionRacks, topicMetadata.partitionRacks());
} }
} }

View File

@ -55,7 +55,7 @@ class InternalTopicManagerTest {
@Test @Test
void testConfigureTopicsSetsConfigurationExceptionWhenSourceTopicIsMissing() { void testConfigureTopicsSetsConfigurationExceptionWhenSourceTopicIsMissing() {
Map<String, TopicMetadata> topicMetadata = new HashMap<>(); Map<String, TopicMetadata> topicMetadata = new HashMap<>();
topicMetadata.put(SOURCE_TOPIC_1, new TopicMetadata(Uuid.randomUuid(), SOURCE_TOPIC_1, 2, Collections.emptyMap())); topicMetadata.put(SOURCE_TOPIC_1, new TopicMetadata(Uuid.randomUuid(), SOURCE_TOPIC_1, 2));
// SOURCE_TOPIC_2 is missing from topicMetadata // SOURCE_TOPIC_2 is missing from topicMetadata
StreamsTopology topology = makeTestTopology(); StreamsTopology topology = makeTestTopology();
@ -70,10 +70,10 @@ class InternalTopicManagerTest {
@Test @Test
void testConfigureTopics() { void testConfigureTopics() {
Map<String, TopicMetadata> topicMetadata = new HashMap<>(); Map<String, TopicMetadata> topicMetadata = new HashMap<>();
topicMetadata.put(SOURCE_TOPIC_1, new TopicMetadata(Uuid.randomUuid(), SOURCE_TOPIC_1, 2, Collections.emptyMap())); topicMetadata.put(SOURCE_TOPIC_1, new TopicMetadata(Uuid.randomUuid(), SOURCE_TOPIC_1, 2));
topicMetadata.put(SOURCE_TOPIC_2, new TopicMetadata(Uuid.randomUuid(), SOURCE_TOPIC_2, 2, Collections.emptyMap())); topicMetadata.put(SOURCE_TOPIC_2, new TopicMetadata(Uuid.randomUuid(), SOURCE_TOPIC_2, 2));
topicMetadata.put(STATE_CHANGELOG_TOPIC_2, topicMetadata.put(STATE_CHANGELOG_TOPIC_2,
new TopicMetadata(Uuid.randomUuid(), STATE_CHANGELOG_TOPIC_2, 2, Collections.emptyMap())); new TopicMetadata(Uuid.randomUuid(), STATE_CHANGELOG_TOPIC_2, 2));
StreamsTopology topology = makeTestTopology(); StreamsTopology topology = makeTestTopology();
ConfiguredTopology configuredTopology = InternalTopicManager.configureTopics(new LogContext(), topology, topicMetadata); ConfiguredTopology configuredTopology = InternalTopicManager.configureTopics(new LogContext(), topology, topicMetadata);