KAFKA-16741: Add ShareGroupHeartbeat API support - 2/N (KIP-932) (#16573)

ShareGroupHeartbeat API support as defined in KIP-932. The heartbeat persists Group and Member information on __consumer_offsets topic.

The PR also moves some of the ShareGroupConfigs to GroupCoordinatorConfigs as they should only be used in group coordinator.


Reviewers: Andrew Schofield <aschofield@confluent.io>, Manikumar Reddy <manikumar.reddy@gmail.com>
This commit is contained in:
Apoorv Mittal 2024-07-15 11:44:55 +01:00 committed by GitHub
parent 15eb555b03
commit 0b6086ed88
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
21 changed files with 2345 additions and 217 deletions

View File

@ -348,7 +348,7 @@
<suppress checks="ParameterNumber" <suppress checks="ParameterNumber"
files="(ConsumerGroupMember|GroupMetadataManager|GroupCoordinatorConfig).java"/> files="(ConsumerGroupMember|GroupMetadataManager|GroupCoordinatorConfig).java"/>
<suppress checks="ClassDataAbstractionCouplingCheck" <suppress checks="ClassDataAbstractionCouplingCheck"
files="(RecordHelpersTest|GroupMetadataManager|GroupMetadataManagerTest|OffsetMetadataManagerTest|GroupCoordinatorServiceTest|GroupCoordinatorShardTest).java"/> files="(RecordHelpersTest|CoordinatorRecordHelpers|GroupMetadataManager|GroupMetadataManagerTest|OffsetMetadataManagerTest|GroupCoordinatorServiceTest|GroupCoordinatorShardTest).java"/>
<suppress checks="JavaNCSS" <suppress checks="JavaNCSS"
files="(GroupMetadataManager|GroupMetadataManagerTest).java"/> files="(GroupMetadataManager|GroupMetadataManagerTest).java"/>

View File

@ -1128,19 +1128,19 @@ class KafkaConfigTest {
case GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG => // ignore string case GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG => // ignore string
/** Share groups configs */ /** Share groups configs */
case ShareGroupConfig.SHARE_GROUP_SESSION_TIMEOUT_MS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1) case GroupCoordinatorConfig.SHARE_GROUP_SESSION_TIMEOUT_MS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
case ShareGroupConfig.SHARE_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1) case GroupCoordinatorConfig.SHARE_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
case ShareGroupConfig.SHARE_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1) case GroupCoordinatorConfig.SHARE_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
case ShareGroupConfig.SHARE_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1) case GroupCoordinatorConfig.SHARE_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
case ShareGroupConfig.SHARE_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1) case GroupCoordinatorConfig.SHARE_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
case ShareGroupConfig.SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1) case GroupCoordinatorConfig.SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
case ShareGroupConfig.SHARE_GROUP_DELIVERY_COUNT_LIMIT_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1) case ShareGroupConfig.SHARE_GROUP_DELIVERY_COUNT_LIMIT_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
case ShareGroupConfig.SHARE_GROUP_RECORD_LOCK_DURATION_MS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1) case ShareGroupConfig.SHARE_GROUP_RECORD_LOCK_DURATION_MS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
case ShareGroupConfig.SHARE_GROUP_MIN_RECORD_LOCK_DURATION_MS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1) case ShareGroupConfig.SHARE_GROUP_MIN_RECORD_LOCK_DURATION_MS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
case ShareGroupConfig.SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1) case ShareGroupConfig.SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
case ShareGroupConfig.SHARE_GROUP_PARTITION_MAX_RECORD_LOCKS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1) case ShareGroupConfig.SHARE_GROUP_PARTITION_MAX_RECORD_LOCKS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
case ShareGroupConfig.SHARE_GROUP_MAX_GROUPS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1) case ShareGroupConfig.SHARE_GROUP_MAX_GROUPS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
case ShareGroupConfig.SHARE_GROUP_MAX_SIZE_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1) case GroupCoordinatorConfig.SHARE_GROUP_MAX_SIZE_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
case _ => assertPropertyInvalid(baseProperties, name, "not_a_number", "-1") case _ => assertPropertyInvalid(baseProperties, name, "not_a_number", "-1")
@ -1997,16 +1997,16 @@ class KafkaConfigTest {
props.putAll(kraftProps()) props.putAll(kraftProps())
// Max should be greater than or equals to min. // Max should be greater than or equals to min.
props.put(ShareGroupConfig.SHARE_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG, "20") props.put(GroupCoordinatorConfig.SHARE_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG, "20")
props.put(ShareGroupConfig.SHARE_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG, "10") props.put(GroupCoordinatorConfig.SHARE_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG, "10")
assertThrows(classOf[IllegalArgumentException], () => KafkaConfig.fromProps(props)) assertThrows(classOf[IllegalArgumentException], () => KafkaConfig.fromProps(props))
// The timeout should be within the min-max range. // The timeout should be within the min-max range.
props.put(ShareGroupConfig.SHARE_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG, "10") props.put(GroupCoordinatorConfig.SHARE_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG, "10")
props.put(ShareGroupConfig.SHARE_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG, "20") props.put(GroupCoordinatorConfig.SHARE_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG, "20")
props.put(ShareGroupConfig.SHARE_GROUP_SESSION_TIMEOUT_MS_CONFIG, "5") props.put(GroupCoordinatorConfig.SHARE_GROUP_SESSION_TIMEOUT_MS_CONFIG, "5")
assertThrows(classOf[IllegalArgumentException], () => KafkaConfig.fromProps(props)) assertThrows(classOf[IllegalArgumentException], () => KafkaConfig.fromProps(props))
props.put(ShareGroupConfig.SHARE_GROUP_SESSION_TIMEOUT_MS_CONFIG, "25") props.put(GroupCoordinatorConfig.SHARE_GROUP_SESSION_TIMEOUT_MS_CONFIG, "25")
assertThrows(classOf[IllegalArgumentException], () => KafkaConfig.fromProps(props)) assertThrows(classOf[IllegalArgumentException], () => KafkaConfig.fromProps(props))
} }
@ -2016,16 +2016,16 @@ class KafkaConfigTest {
props.putAll(kraftProps()) props.putAll(kraftProps())
// Max should be greater than or equals to min. // Max should be greater than or equals to min.
props.put(ShareGroupConfig.SHARE_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG, "20") props.put(GroupCoordinatorConfig.SHARE_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG, "20")
props.put(ShareGroupConfig.SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG, "10") props.put(GroupCoordinatorConfig.SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG, "10")
assertThrows(classOf[IllegalArgumentException], () => KafkaConfig.fromProps(props)) assertThrows(classOf[IllegalArgumentException], () => KafkaConfig.fromProps(props))
// The timeout should be within the min-max range. // The timeout should be within the min-max range.
props.put(ShareGroupConfig.SHARE_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG, "10") props.put(GroupCoordinatorConfig.SHARE_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG, "10")
props.put(ShareGroupConfig.SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG, "20") props.put(GroupCoordinatorConfig.SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG, "20")
props.put(ShareGroupConfig.SHARE_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, "5") props.put(GroupCoordinatorConfig.SHARE_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, "5")
assertThrows(classOf[IllegalArgumentException], () => KafkaConfig.fromProps(props)) assertThrows(classOf[IllegalArgumentException], () => KafkaConfig.fromProps(props))
props.put(ShareGroupConfig.SHARE_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, "25") props.put(GroupCoordinatorConfig.SHARE_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, "25")
assertThrows(classOf[IllegalArgumentException], () => KafkaConfig.fromProps(props)) assertThrows(classOf[IllegalArgumentException], () => KafkaConfig.fromProps(props))
} }

View File

@ -37,9 +37,13 @@ import org.apache.kafka.coordinator.group.generated.GroupMetadataKey;
import org.apache.kafka.coordinator.group.generated.GroupMetadataValue; import org.apache.kafka.coordinator.group.generated.GroupMetadataValue;
import org.apache.kafka.coordinator.group.generated.OffsetCommitKey; import org.apache.kafka.coordinator.group.generated.OffsetCommitKey;
import org.apache.kafka.coordinator.group.generated.OffsetCommitValue; import org.apache.kafka.coordinator.group.generated.OffsetCommitValue;
import org.apache.kafka.coordinator.group.generated.ShareGroupMemberMetadataKey;
import org.apache.kafka.coordinator.group.generated.ShareGroupMemberMetadataValue;
import org.apache.kafka.coordinator.group.generated.ShareGroupMetadataKey; import org.apache.kafka.coordinator.group.generated.ShareGroupMetadataKey;
import org.apache.kafka.coordinator.group.generated.ShareGroupMetadataValue;
import org.apache.kafka.coordinator.group.modern.TopicMetadata; import org.apache.kafka.coordinator.group.modern.TopicMetadata;
import org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroupMember; import org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroupMember;
import org.apache.kafka.coordinator.group.modern.share.ShareGroupMember;
import org.apache.kafka.server.common.ApiMessageAndVersion; import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.common.MetadataVersion; import org.apache.kafka.server.common.MetadataVersion;
@ -188,6 +192,22 @@ public class CoordinatorRecordHelpers {
String groupId, String groupId,
int newGroupEpoch int newGroupEpoch
) { ) {
return newGroupEpochRecord(groupId, newGroupEpoch, GroupType.CONSUMER);
}
/**
* Creates a ConsumerGroupMetadata record.
*
* @param groupId The consumer group id.
* @param newGroupEpoch The consumer group epoch.
* @return The record.
*/
public static CoordinatorRecord newGroupEpochRecord(
String groupId,
int newGroupEpoch,
GroupType groupType
) {
if (groupType == GroupType.CONSUMER) {
return new CoordinatorRecord( return new CoordinatorRecord(
new ApiMessageAndVersion( new ApiMessageAndVersion(
new ConsumerGroupMetadataKey() new ConsumerGroupMetadataKey()
@ -200,6 +220,21 @@ public class CoordinatorRecordHelpers {
(short) 0 (short) 0
) )
); );
} else if (groupType == GroupType.SHARE) {
return new CoordinatorRecord(
new ApiMessageAndVersion(
new ShareGroupMetadataKey()
.setGroupId(groupId),
(short) 11
),
new ApiMessageAndVersion(
new ShareGroupMetadataValue()
.setEpoch(newGroupEpoch),
(short) 0
)
);
}
throw new IllegalArgumentException("Unsupported group type: " + groupType);
} }
/** /**
@ -382,6 +417,35 @@ public class CoordinatorRecordHelpers {
); );
} }
/**
* Creates a ConsumerGroupCurrentMemberAssignment record.
*
* @param groupId The consumer group id.
* @param member The share group member.
* @return The record.
*/
public static CoordinatorRecord newCurrentAssignmentRecord(
String groupId,
ShareGroupMember member
) {
return new CoordinatorRecord(
new ApiMessageAndVersion(
new ConsumerGroupCurrentMemberAssignmentKey()
.setGroupId(groupId)
.setMemberId(member.memberId()),
(short) 8
),
new ApiMessageAndVersion(
new ConsumerGroupCurrentMemberAssignmentValue()
.setMemberEpoch(member.memberEpoch())
.setPreviousMemberEpoch(member.previousMemberEpoch())
.setState(member.state().value())
.setAssignedPartitions(toTopicPartitions(member.assignedPartitions())),
(short) 0
)
);
}
/** /**
* Creates a ConsumerGroupCurrentMemberAssignment tombstone. * Creates a ConsumerGroupCurrentMemberAssignment tombstone.
* *
@ -576,6 +640,59 @@ public class CoordinatorRecordHelpers {
); );
} }
/**
* Creates a ShareGroupMemberMetadata record.
*
* @param groupId The consumer group id.
* @param member The consumer group member.
* @return The record.
*/
public static CoordinatorRecord newShareGroupMemberSubscriptionRecord(
String groupId,
ShareGroupMember member
) {
List<String> topicNames = new ArrayList<>(member.subscribedTopicNames());
Collections.sort(topicNames);
return new CoordinatorRecord(
new ApiMessageAndVersion(
new ShareGroupMemberMetadataKey()
.setGroupId(groupId)
.setMemberId(member.memberId()),
(short) 10
),
new ApiMessageAndVersion(
new ShareGroupMemberMetadataValue()
.setRackId(member.rackId())
.setClientId(member.clientId())
.setClientHost(member.clientHost())
.setSubscribedTopicNames(topicNames),
(short) 0
)
);
}
/**
* Creates a ShareGroupMemberMetadata tombstone.
*
* @param groupId The share group id.
* @param memberId The share group member id.
* @return The record.
*/
public static CoordinatorRecord newShareGroupMemberSubscriptionTombstoneRecord(
String groupId,
String memberId
) {
return new CoordinatorRecord(
new ApiMessageAndVersion(
new ShareGroupMemberMetadataKey()
.setGroupId(groupId)
.setMemberId(memberId),
(short) 10
),
null // Tombstone.
);
}
private static List<ConsumerGroupCurrentMemberAssignmentValue.TopicPartitions> toTopicPartitions( private static List<ConsumerGroupCurrentMemberAssignmentValue.TopicPartitions> toTopicPartitions(
Map<Uuid, Set<Integer>> topicPartitions Map<Uuid, Set<Integer>> topicPartitions
) { ) {

View File

@ -33,6 +33,7 @@ import java.util.stream.Collectors;
import static org.apache.kafka.common.config.ConfigDef.Importance.HIGH; import static org.apache.kafka.common.config.ConfigDef.Importance.HIGH;
import static org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM; import static org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM;
import static org.apache.kafka.common.config.ConfigDef.Range.atLeast; import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
import static org.apache.kafka.common.config.ConfigDef.Range.between;
import static org.apache.kafka.common.config.ConfigDef.Type.BOOLEAN; import static org.apache.kafka.common.config.ConfigDef.Type.BOOLEAN;
import static org.apache.kafka.common.config.ConfigDef.Type.INT; import static org.apache.kafka.common.config.ConfigDef.Type.INT;
import static org.apache.kafka.common.config.ConfigDef.Type.LIST; import static org.apache.kafka.common.config.ConfigDef.Type.LIST;
@ -129,6 +130,35 @@ public class GroupCoordinatorConfig {
ConsumerGroupMigrationPolicy.DOWNGRADE + ": only downgrade from consumer group to classic group is enabled, " + ConsumerGroupMigrationPolicy.DOWNGRADE + ": only downgrade from consumer group to classic group is enabled, " +
ConsumerGroupMigrationPolicy.DISABLED + ": neither upgrade nor downgrade is enabled."; ConsumerGroupMigrationPolicy.DISABLED + ": neither upgrade nor downgrade is enabled.";
/** Share group configs */
public static final String SHARE_GROUP_MAX_SIZE_CONFIG = "group.share.max.size";
public static final short SHARE_GROUP_MAX_SIZE_DEFAULT = 200;
public static final String SHARE_GROUP_MAX_SIZE_DOC = "The maximum number of members that a single share group can accommodate.";
public static final String SHARE_GROUP_SESSION_TIMEOUT_MS_CONFIG = "group.share.session.timeout.ms";
public static final int SHARE_GROUP_SESSION_TIMEOUT_MS_DEFAULT = 45000;
public static final String SHARE_GROUP_SESSION_TIMEOUT_MS_DOC = "The timeout to detect client failures when using the share group protocol.";
public static final String SHARE_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG = "group.share.min.session.timeout.ms";
public static final int SHARE_GROUP_MIN_SESSION_TIMEOUT_MS_DEFAULT = 45000;
public static final String SHARE_GROUP_MIN_SESSION_TIMEOUT_MS_DOC = "The minimum allowed session timeout for share group members.";
public static final String SHARE_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG = "group.share.max.session.timeout.ms";
public static final int SHARE_GROUP_MAX_SESSION_TIMEOUT_MS_DEFAULT = 60000;
public static final String SHARE_GROUP_MAX_SESSION_TIMEOUT_MS_DOC = "The maximum allowed session timeout for share group members.";
public static final String SHARE_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG = "group.share.heartbeat.interval.ms";
public static final int SHARE_GROUP_HEARTBEAT_INTERVAL_MS_DEFAULT = 5000;
public static final String SHARE_GROUP_HEARTBEAT_INTERVAL_MS_DOC = "The heartbeat interval given to the members of a share group.";
public static final String SHARE_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG = "group.share.min.heartbeat.interval.ms";
public static final int SHARE_GROUP_MIN_HEARTBEAT_INTERVAL_MS_DEFAULT = 5000;
public static final String SHARE_GROUP_MIN_HEARTBEAT_INTERVAL_MS_DOC = "The minimum heartbeat interval for share group members.";
public static final String SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG = "group.share.max.heartbeat.interval.ms";
public static final int SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DEFAULT = 15000;
public static final String SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DOC = "The maximum heartbeat interval for share group members.";
public static final String OFFSET_METADATA_MAX_SIZE_CONFIG = "offset.metadata.max.bytes"; public static final String OFFSET_METADATA_MAX_SIZE_CONFIG = "offset.metadata.max.bytes";
public static final int OFFSET_METADATA_MAX_SIZE_DEFAULT = 4096; public static final int OFFSET_METADATA_MAX_SIZE_DEFAULT = 4096;
public static final String OFFSET_METADATA_MAX_SIZE_DOC = "The maximum size for a metadata entry associated with an offset commit."; public static final String OFFSET_METADATA_MAX_SIZE_DOC = "The maximum size for a metadata entry associated with an offset commit.";
@ -211,6 +241,14 @@ public class GroupCoordinatorConfig {
.define(CONSUMER_GROUP_MAX_SIZE_CONFIG, INT, CONSUMER_GROUP_MAX_SIZE_DEFAULT, atLeast(1), MEDIUM, CONSUMER_GROUP_MAX_SIZE_DOC) .define(CONSUMER_GROUP_MAX_SIZE_CONFIG, INT, CONSUMER_GROUP_MAX_SIZE_DEFAULT, atLeast(1), MEDIUM, CONSUMER_GROUP_MAX_SIZE_DOC)
.define(CONSUMER_GROUP_ASSIGNORS_CONFIG, LIST, CONSUMER_GROUP_ASSIGNORS_DEFAULT, null, MEDIUM, CONSUMER_GROUP_ASSIGNORS_DOC) .define(CONSUMER_GROUP_ASSIGNORS_CONFIG, LIST, CONSUMER_GROUP_ASSIGNORS_DEFAULT, null, MEDIUM, CONSUMER_GROUP_ASSIGNORS_DOC)
.defineInternal(CONSUMER_GROUP_MIGRATION_POLICY_CONFIG, STRING, CONSUMER_GROUP_MIGRATION_POLICY_DEFAULT, ConfigDef.CaseInsensitiveValidString.in(Utils.enumOptions(ConsumerGroupMigrationPolicy.class)), MEDIUM, CONSUMER_GROUP_MIGRATION_POLICY_DOC); .defineInternal(CONSUMER_GROUP_MIGRATION_POLICY_CONFIG, STRING, CONSUMER_GROUP_MIGRATION_POLICY_DEFAULT, ConfigDef.CaseInsensitiveValidString.in(Utils.enumOptions(ConsumerGroupMigrationPolicy.class)), MEDIUM, CONSUMER_GROUP_MIGRATION_POLICY_DOC);
public static final ConfigDef SHARE_GROUP_CONFIG_DEF = new ConfigDef()
.define(SHARE_GROUP_SESSION_TIMEOUT_MS_CONFIG, INT, SHARE_GROUP_SESSION_TIMEOUT_MS_DEFAULT, atLeast(1), MEDIUM, SHARE_GROUP_SESSION_TIMEOUT_MS_DOC)
.define(SHARE_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG, INT, SHARE_GROUP_MIN_SESSION_TIMEOUT_MS_DEFAULT, atLeast(1), MEDIUM, SHARE_GROUP_MIN_SESSION_TIMEOUT_MS_DOC)
.define(SHARE_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG, INT, SHARE_GROUP_MAX_SESSION_TIMEOUT_MS_DEFAULT, atLeast(1), MEDIUM, SHARE_GROUP_MAX_SESSION_TIMEOUT_MS_DOC)
.define(SHARE_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, INT, SHARE_GROUP_HEARTBEAT_INTERVAL_MS_DEFAULT, atLeast(1), MEDIUM, SHARE_GROUP_HEARTBEAT_INTERVAL_MS_DOC)
.define(SHARE_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG, INT, SHARE_GROUP_MIN_HEARTBEAT_INTERVAL_MS_DEFAULT, atLeast(1), MEDIUM, SHARE_GROUP_MIN_HEARTBEAT_INTERVAL_MS_DOC)
.define(SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG, INT, SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DEFAULT, atLeast(1), MEDIUM, SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DOC)
.define(SHARE_GROUP_MAX_SIZE_CONFIG, SHORT, SHARE_GROUP_MAX_SIZE_DEFAULT, between(10, 1000), MEDIUM, SHARE_GROUP_MAX_SIZE_DOC);
/** /**
* The timeout used to wait for a new member in milliseconds. * The timeout used to wait for a new member in milliseconds.
@ -242,6 +280,14 @@ public class GroupCoordinatorConfig {
private final int consumerGroupMaxSessionTimeoutMs; private final int consumerGroupMaxSessionTimeoutMs;
private final int consumerGroupMinHeartbeatIntervalMs; private final int consumerGroupMinHeartbeatIntervalMs;
private final int consumerGroupMaxHeartbeatIntervalMs; private final int consumerGroupMaxHeartbeatIntervalMs;
// Share group configurations
private final short shareGroupMaxSize;
private final int shareGroupSessionTimeoutMs;
private final int shareGroupMinSessionTimeoutMs;
private final int shareGroupMaxSessionTimeoutMs;
private final int shareGroupHeartbeatIntervalMs;
private final int shareGroupMinHeartbeatIntervalMs;
private final int shareGroupMaxHeartbeatIntervalMs;
public GroupCoordinatorConfig(AbstractConfig config) { public GroupCoordinatorConfig(AbstractConfig config) {
this.numThreads = config.getInt(GroupCoordinatorConfig.GROUP_COORDINATOR_NUM_THREADS_CONFIG); this.numThreads = config.getInt(GroupCoordinatorConfig.GROUP_COORDINATOR_NUM_THREADS_CONFIG);
@ -273,6 +319,14 @@ public class GroupCoordinatorConfig {
this.consumerGroupMaxSessionTimeoutMs = config.getInt(GroupCoordinatorConfig.CONSUMER_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG); this.consumerGroupMaxSessionTimeoutMs = config.getInt(GroupCoordinatorConfig.CONSUMER_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG);
this.consumerGroupMinHeartbeatIntervalMs = config.getInt(GroupCoordinatorConfig.CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG); this.consumerGroupMinHeartbeatIntervalMs = config.getInt(GroupCoordinatorConfig.CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG);
this.consumerGroupMaxHeartbeatIntervalMs = config.getInt(GroupCoordinatorConfig.CONSUMER_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG); this.consumerGroupMaxHeartbeatIntervalMs = config.getInt(GroupCoordinatorConfig.CONSUMER_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG);
// Share group configurations
this.shareGroupSessionTimeoutMs = config.getInt(GroupCoordinatorConfig.SHARE_GROUP_SESSION_TIMEOUT_MS_CONFIG);
this.shareGroupMinSessionTimeoutMs = config.getInt(GroupCoordinatorConfig.SHARE_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG);
this.shareGroupMaxSessionTimeoutMs = config.getInt(GroupCoordinatorConfig.SHARE_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG);
this.shareGroupHeartbeatIntervalMs = config.getInt(GroupCoordinatorConfig.SHARE_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG);
this.shareGroupMinHeartbeatIntervalMs = config.getInt(GroupCoordinatorConfig.SHARE_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG);
this.shareGroupMaxHeartbeatIntervalMs = config.getInt(GroupCoordinatorConfig.SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG);
this.shareGroupMaxSize = config.getShort(GroupCoordinatorConfig.SHARE_GROUP_MAX_SIZE_CONFIG);
require(offsetCommitRequiredAcks >= -1 && offsetCommitRequiredAcks <= offsetsTopicReplicationFactor, require(offsetCommitRequiredAcks >= -1 && offsetCommitRequiredAcks <= offsetsTopicReplicationFactor,
String.format("%s must be greater or equal to -1 and less or equal to %s", OFFSET_COMMIT_REQUIRED_ACKS_CONFIG, OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG)); String.format("%s must be greater or equal to -1 and less or equal to %s", OFFSET_COMMIT_REQUIRED_ACKS_CONFIG, OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG));
@ -291,6 +345,26 @@ public class GroupCoordinatorConfig {
String.format("%s must be greater than or equals to %s", CONSUMER_GROUP_SESSION_TIMEOUT_MS_CONFIG, CONSUMER_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG)); String.format("%s must be greater than or equals to %s", CONSUMER_GROUP_SESSION_TIMEOUT_MS_CONFIG, CONSUMER_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG));
require(consumerGroupSessionTimeoutMs <= consumerGroupMaxSessionTimeoutMs, require(consumerGroupSessionTimeoutMs <= consumerGroupMaxSessionTimeoutMs,
String.format("%s must be less than or equals to %s", CONSUMER_GROUP_SESSION_TIMEOUT_MS_CONFIG, CONSUMER_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG)); String.format("%s must be less than or equals to %s", CONSUMER_GROUP_SESSION_TIMEOUT_MS_CONFIG, CONSUMER_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG));
// Share group configs validation.
require(shareGroupMaxHeartbeatIntervalMs >= shareGroupMinHeartbeatIntervalMs,
String.format("%s must be greater than or equals to %s",
SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG, SHARE_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG));
require(shareGroupHeartbeatIntervalMs >= shareGroupMinHeartbeatIntervalMs,
String.format("%s must be greater than or equals to %s",
SHARE_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, SHARE_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG));
require(shareGroupHeartbeatIntervalMs <= shareGroupMaxHeartbeatIntervalMs,
String.format("%s must be less than or equals to %s",
SHARE_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG));
require(shareGroupMaxSessionTimeoutMs >= shareGroupMinSessionTimeoutMs,
String.format("%s must be greater than or equals to %s",
SHARE_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG, SHARE_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG));
require(shareGroupSessionTimeoutMs >= shareGroupMinSessionTimeoutMs,
String.format("%s must be greater than or equals to %s",
SHARE_GROUP_SESSION_TIMEOUT_MS_CONFIG, SHARE_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG));
require(shareGroupSessionTimeoutMs <= shareGroupMaxSessionTimeoutMs,
String.format("%s must be less than or equals to %s",
SHARE_GROUP_SESSION_TIMEOUT_MS_CONFIG, SHARE_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG));
} }
/** /**
@ -492,4 +566,53 @@ public class GroupCoordinatorConfig {
public int consumerGroupMaxHeartbeatIntervalMs() { public int consumerGroupMaxHeartbeatIntervalMs() {
return consumerGroupMaxHeartbeatIntervalMs; return consumerGroupMaxHeartbeatIntervalMs;
} }
/**
* The share group session timeout in milliseconds.
*/
public int shareGroupSessionTimeoutMs() {
return shareGroupSessionTimeoutMs;
}
/**
* The consumer group heartbeat interval in milliseconds.
*/
public int shareGroupHeartbeatIntervalMs() {
return shareGroupHeartbeatIntervalMs;
}
/**
* The share group maximum size.
*/
public int shareGroupMaxSize() {
return shareGroupMaxSize;
}
/**
* The minimum allowed session timeout for registered share consumers.
*/
public int shareGroupMinSessionTimeoutMs() {
return shareGroupMinSessionTimeoutMs;
}
/**
* The maximum allowed session timeout for registered share consumers.
*/
public int shareGroupMaxSessionTimeoutMs() {
return shareGroupMaxSessionTimeoutMs;
}
/**
* The minimum heartbeat interval for registered share consumers.
*/
public int shareGroupMinHeartbeatIntervalMs() {
return shareGroupMinHeartbeatIntervalMs;
}
/**
* The maximum heartbeat interval for registered share consumers.
*/
public int shareGroupMaxHeartbeatIntervalMs() {
return shareGroupMaxHeartbeatIntervalMs;
}
} }

View File

@ -319,8 +319,25 @@ public class GroupCoordinatorService implements GroupCoordinator {
RequestContext context, RequestContext context,
ShareGroupHeartbeatRequestData request ShareGroupHeartbeatRequestData request
) { ) {
// TODO: Implement this method as part of KIP-932. if (!isActive.get()) {
throw new UnsupportedOperationException(); return CompletableFuture.completedFuture(new ShareGroupHeartbeatResponseData()
.setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code())
);
}
return runtime.scheduleWriteOperation(
"share-group-heartbeat",
topicPartitionFor(request.groupId()),
Duration.ofMillis(config.offsetCommitTimeoutMs()),
coordinator -> coordinator.shareGroupHeartbeat(context, request)
).exceptionally(exception -> handleOperationException(
"share-group-heartbeat",
request,
exception,
(error, message) -> new ShareGroupHeartbeatResponseData()
.setErrorCode(error.code())
.setErrorMessage(message)
));
} }
/** /**

View File

@ -36,11 +36,12 @@ import org.apache.kafka.common.message.OffsetDeleteRequestData;
import org.apache.kafka.common.message.OffsetDeleteResponseData; import org.apache.kafka.common.message.OffsetDeleteResponseData;
import org.apache.kafka.common.message.OffsetFetchRequestData; import org.apache.kafka.common.message.OffsetFetchRequestData;
import org.apache.kafka.common.message.OffsetFetchResponseData; import org.apache.kafka.common.message.OffsetFetchResponseData;
import org.apache.kafka.common.message.ShareGroupHeartbeatRequestData;
import org.apache.kafka.common.message.ShareGroupHeartbeatResponseData;
import org.apache.kafka.common.message.SyncGroupRequestData; import org.apache.kafka.common.message.SyncGroupRequestData;
import org.apache.kafka.common.message.SyncGroupResponseData; import org.apache.kafka.common.message.SyncGroupResponseData;
import org.apache.kafka.common.message.TxnOffsetCommitRequestData; import org.apache.kafka.common.message.TxnOffsetCommitRequestData;
import org.apache.kafka.common.message.TxnOffsetCommitResponseData; import org.apache.kafka.common.message.TxnOffsetCommitResponseData;
import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.RequestContext; import org.apache.kafka.common.requests.RequestContext;
import org.apache.kafka.common.requests.TransactionResult; import org.apache.kafka.common.requests.TransactionResult;
@ -62,6 +63,10 @@ import org.apache.kafka.coordinator.group.generated.GroupMetadataKey;
import org.apache.kafka.coordinator.group.generated.GroupMetadataValue; import org.apache.kafka.coordinator.group.generated.GroupMetadataValue;
import org.apache.kafka.coordinator.group.generated.OffsetCommitKey; import org.apache.kafka.coordinator.group.generated.OffsetCommitKey;
import org.apache.kafka.coordinator.group.generated.OffsetCommitValue; import org.apache.kafka.coordinator.group.generated.OffsetCommitValue;
import org.apache.kafka.coordinator.group.generated.ShareGroupMemberMetadataKey;
import org.apache.kafka.coordinator.group.generated.ShareGroupMemberMetadataValue;
import org.apache.kafka.coordinator.group.generated.ShareGroupMetadataKey;
import org.apache.kafka.coordinator.group.generated.ShareGroupMetadataValue;
import org.apache.kafka.coordinator.group.metrics.CoordinatorMetrics; import org.apache.kafka.coordinator.group.metrics.CoordinatorMetrics;
import org.apache.kafka.coordinator.group.metrics.CoordinatorMetricsShard; import org.apache.kafka.coordinator.group.metrics.CoordinatorMetricsShard;
import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics; import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics;
@ -191,6 +196,9 @@ public class GroupCoordinatorShard implements CoordinatorShard<CoordinatorRecord
.withClassicGroupMinSessionTimeoutMs(config.classicGroupMinSessionTimeoutMs()) .withClassicGroupMinSessionTimeoutMs(config.classicGroupMinSessionTimeoutMs())
.withClassicGroupMaxSessionTimeoutMs(config.classicGroupMaxSessionTimeoutMs()) .withClassicGroupMaxSessionTimeoutMs(config.classicGroupMaxSessionTimeoutMs())
.withConsumerGroupMigrationPolicy(config.consumerGroupMigrationPolicy()) .withConsumerGroupMigrationPolicy(config.consumerGroupMigrationPolicy())
.withShareGroupMaxSize(config.shareGroupMaxSize())
.withShareGroupSessionTimeout(config.shareGroupSessionTimeoutMs())
.withShareGroupHeartbeatInterval(config.shareGroupHeartbeatIntervalMs())
.withGroupCoordinatorMetricsShard(metricsShard) .withGroupCoordinatorMetricsShard(metricsShard)
.build(); .build();
@ -308,6 +316,22 @@ public class GroupCoordinatorShard implements CoordinatorShard<CoordinatorRecord
return groupMetadataManager.consumerGroupHeartbeat(context, request); return groupMetadataManager.consumerGroupHeartbeat(context, request);
} }
/**
* Handles a ShareGroupHeartbeat request.
*
* @param context The request context.
* @param request The actual ShareGroupHeartbeat request.
*
* @return A Result containing the ShareGroupHeartbeat response and
* a list of records to update the state machine.
*/
public CoordinatorResult<ShareGroupHeartbeatResponseData, CoordinatorRecord> shareGroupHeartbeat(
RequestContext context,
ShareGroupHeartbeatRequestData request
) {
return groupMetadataManager.shareGroupHeartbeat(context, request);
}
/** /**
* Handles a JoinGroup request. * Handles a JoinGroup request.
* *
@ -657,17 +681,6 @@ public class GroupCoordinatorShard implements CoordinatorShard<CoordinatorRecord
offsetMetadataManager.onNewMetadataImage(newImage, delta); offsetMetadataManager.onNewMetadataImage(newImage, delta);
} }
/**
* @return The ApiMessage or null.
*/
private ApiMessage messageOrNull(ApiMessageAndVersion apiMessageAndVersion) {
if (apiMessageAndVersion == null) {
return null;
} else {
return apiMessageAndVersion.message();
}
}
/** /**
* Replays the Record to update the hard state of the group coordinator. * Replays the Record to update the hard state of the group coordinator.
* *
@ -694,56 +707,70 @@ public class GroupCoordinatorShard implements CoordinatorShard<CoordinatorRecord
offset, offset,
producerId, producerId,
(OffsetCommitKey) key.message(), (OffsetCommitKey) key.message(),
(OffsetCommitValue) messageOrNull(value) (OffsetCommitValue) Utils.messageOrNull(value)
); );
break; break;
case 2: case 2:
groupMetadataManager.replay( groupMetadataManager.replay(
(GroupMetadataKey) key.message(), (GroupMetadataKey) key.message(),
(GroupMetadataValue) messageOrNull(value) (GroupMetadataValue) Utils.messageOrNull(value)
); );
break; break;
case 3: case 3:
groupMetadataManager.replay( groupMetadataManager.replay(
(ConsumerGroupMetadataKey) key.message(), (ConsumerGroupMetadataKey) key.message(),
(ConsumerGroupMetadataValue) messageOrNull(value) (ConsumerGroupMetadataValue) Utils.messageOrNull(value)
); );
break; break;
case 4: case 4:
groupMetadataManager.replay( groupMetadataManager.replay(
(ConsumerGroupPartitionMetadataKey) key.message(), (ConsumerGroupPartitionMetadataKey) key.message(),
(ConsumerGroupPartitionMetadataValue) messageOrNull(value) (ConsumerGroupPartitionMetadataValue) Utils.messageOrNull(value)
); );
break; break;
case 5: case 5:
groupMetadataManager.replay( groupMetadataManager.replay(
(ConsumerGroupMemberMetadataKey) key.message(), (ConsumerGroupMemberMetadataKey) key.message(),
(ConsumerGroupMemberMetadataValue) messageOrNull(value) (ConsumerGroupMemberMetadataValue) Utils.messageOrNull(value)
); );
break; break;
case 6: case 6:
groupMetadataManager.replay( groupMetadataManager.replay(
(ConsumerGroupTargetAssignmentMetadataKey) key.message(), (ConsumerGroupTargetAssignmentMetadataKey) key.message(),
(ConsumerGroupTargetAssignmentMetadataValue) messageOrNull(value) (ConsumerGroupTargetAssignmentMetadataValue) Utils.messageOrNull(value)
); );
break; break;
case 7: case 7:
groupMetadataManager.replay( groupMetadataManager.replay(
(ConsumerGroupTargetAssignmentMemberKey) key.message(), (ConsumerGroupTargetAssignmentMemberKey) key.message(),
(ConsumerGroupTargetAssignmentMemberValue) messageOrNull(value) (ConsumerGroupTargetAssignmentMemberValue) Utils.messageOrNull(value)
); );
break; break;
case 8: case 8:
groupMetadataManager.replay( groupMetadataManager.replay(
(ConsumerGroupCurrentMemberAssignmentKey) key.message(), (ConsumerGroupCurrentMemberAssignmentKey) key.message(),
(ConsumerGroupCurrentMemberAssignmentValue) messageOrNull(value) (ConsumerGroupCurrentMemberAssignmentValue) Utils.messageOrNull(value)
);
break;
case 10:
groupMetadataManager.replay(
(ShareGroupMemberMetadataKey) key.message(),
(ShareGroupMemberMetadataValue) Utils.messageOrNull(value)
);
break;
case 11:
groupMetadataManager.replay(
(ShareGroupMetadataKey) key.message(),
(ShareGroupMetadataValue) Utils.messageOrNull(value)
); );
break; break;

View File

@ -20,9 +20,11 @@ import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
import org.apache.kafka.common.message.ConsumerProtocolAssignment; import org.apache.kafka.common.message.ConsumerProtocolAssignment;
import org.apache.kafka.common.message.ConsumerProtocolSubscription; import org.apache.kafka.common.message.ConsumerProtocolSubscription;
import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue; import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue;
import org.apache.kafka.image.TopicImage; import org.apache.kafka.image.TopicImage;
import org.apache.kafka.image.TopicsImage; import org.apache.kafka.image.TopicsImage;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
@ -191,4 +193,15 @@ public class Utils {
ConsumerGroupCurrentMemberAssignmentValue.TopicPartitions::topicId, ConsumerGroupCurrentMemberAssignmentValue.TopicPartitions::topicId,
topicPartitions -> Collections.unmodifiableSet(new HashSet<>(topicPartitions.partitions())))); topicPartitions -> Collections.unmodifiableSet(new HashSet<>(topicPartitions.partitions()))));
} }
/**
* @return The ApiMessage or null.
*/
public static ApiMessage messageOrNull(ApiMessageAndVersion apiMessageAndVersion) {
if (apiMessageAndVersion == null) {
return null;
} else {
return apiMessageAndVersion.message();
}
}
} }

View File

@ -38,7 +38,7 @@ import java.util.Set;
*/ */
public class ShareGroup extends ModernGroup<ShareGroupMember> { public class ShareGroup extends ModernGroup<ShareGroupMember> {
private static final String PROTOCOL_TYPE = "share"; public static final String PROTOCOL_TYPE = "share";
public enum ShareGroupState { public enum ShareGroupState {
EMPTY("Empty"), EMPTY("Empty"),
@ -144,7 +144,9 @@ public class ShareGroup extends ModernGroup<ShareGroupMember> {
String.format("Member %s is not a member of group %s.", memberId, groupId)); String.format("Member %s is not a member of group %s.", memberId, groupId));
} }
return new ShareGroupMember.Builder(memberId).build(); member = new ShareGroupMember.Builder(memberId).build();
updateMember(member);
return member;
} }
/** /**

View File

@ -0,0 +1,93 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.coordinator.group.modern.share;
import org.apache.kafka.coordinator.group.modern.Assignment;
import org.apache.kafka.coordinator.group.modern.MemberState;
import java.util.Objects;
/**
* The ShareGroupAssignmentBuilder class encapsulates the reconciliation engine of the
* share group protocol. Given the current state of a member and a desired or target
* assignment state, the state machine takes the necessary steps to converge them.
*/
public class ShareGroupAssignmentBuilder {
/**
* The share group member which is reconciled.
*/
private final ShareGroupMember member;
/**
* The target assignment epoch.
*/
private int targetAssignmentEpoch;
/**
* The target assignment.
*/
private Assignment targetAssignment;
/**
* Constructs the ShareGroupAssignmentBuilder based on the current state of the
* provided share group member.
*
* @param member The share group member that must be reconciled.
*/
public ShareGroupAssignmentBuilder(ShareGroupMember member) {
this.member = Objects.requireNonNull(member);
}
/**
* Sets the target assignment epoch and the target assignment that the
* share group member must be reconciled to.
*
* @param targetAssignmentEpoch The target assignment epoch.
* @param targetAssignment The target assignment.
* @return This object.
*/
public ShareGroupAssignmentBuilder withTargetAssignment(
int targetAssignmentEpoch,
Assignment targetAssignment
) {
this.targetAssignmentEpoch = targetAssignmentEpoch;
this.targetAssignment = Objects.requireNonNull(targetAssignment);
return this;
}
/**
* Builds the next state for the member or keep the current one if it
* is not possible to move forward with the current state.
*
* @return A new ShareGroupMember or the current one.
*/
public ShareGroupMember build() {
// A new target assignment has been installed, we need to restart
// the reconciliation loop from the beginning.
if (targetAssignmentEpoch != member.memberEpoch()) {
// We transition to the target epoch. The transition to the new state is done
// when the member is updated.
return new ShareGroupMember.Builder(member)
.setState(MemberState.STABLE)
.setAssignedPartitions(targetAssignment.partitions())
.updateMemberEpoch(targetAssignmentEpoch)
.build();
}
return member;
}
}

View File

@ -18,9 +18,12 @@ package org.apache.kafka.coordinator.group.modern.share;
import org.apache.kafka.common.Uuid; import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.message.ShareGroupDescribeResponseData; import org.apache.kafka.common.message.ShareGroupDescribeResponseData;
import org.apache.kafka.coordinator.group.Utils;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue;
import org.apache.kafka.coordinator.group.generated.ShareGroupMemberMetadataValue; import org.apache.kafka.coordinator.group.generated.ShareGroupMemberMetadataValue;
import org.apache.kafka.coordinator.group.modern.MemberState; import org.apache.kafka.coordinator.group.modern.MemberState;
import org.apache.kafka.coordinator.group.modern.ModernGroupMember; import org.apache.kafka.coordinator.group.modern.ModernGroupMember;
import org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroupMember.Builder;
import org.apache.kafka.image.TopicImage; import org.apache.kafka.image.TopicImage;
import org.apache.kafka.image.TopicsImage; import org.apache.kafka.image.TopicsImage;
@ -81,6 +84,13 @@ public class ShareGroupMember extends ModernGroupMember {
this.assignedPartitions = member.assignedPartitions; this.assignedPartitions = member.assignedPartitions;
} }
public Builder updateMemberEpoch(int memberEpoch) {
int currentMemberEpoch = this.memberEpoch;
this.memberEpoch = memberEpoch;
this.previousMemberEpoch = currentMemberEpoch;
return this;
}
public Builder setMemberEpoch(int memberEpoch) { public Builder setMemberEpoch(int memberEpoch) {
this.memberEpoch = memberEpoch; this.memberEpoch = memberEpoch;
return this; return this;
@ -139,6 +149,14 @@ public class ShareGroupMember extends ModernGroupMember {
return this; return this;
} }
public Builder updateWith(ConsumerGroupCurrentMemberAssignmentValue record) {
setMemberEpoch(record.memberEpoch());
setPreviousMemberEpoch(record.previousMemberEpoch());
setState(MemberState.fromValue(record.state()));
setAssignedPartitions(Utils.assignmentFromTopicPartitions(record.assignedPartitions()));
return this;
}
public ShareGroupMember build() { public ShareGroupMember build() {
return new ShareGroupMember( return new ShareGroupMember(
memberId, memberId,

View File

@ -21,6 +21,7 @@ import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid; import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData; import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
import org.apache.kafka.common.message.ShareGroupHeartbeatResponseData;
import org.apache.kafka.common.message.SyncGroupResponseData; import org.apache.kafka.common.message.SyncGroupResponseData;
import org.apache.kafka.common.protocol.types.SchemaException; import org.apache.kafka.common.protocol.types.SchemaException;
import org.apache.kafka.common.utils.Utils; import org.apache.kafka.common.utils.Utils;
@ -66,6 +67,18 @@ public class Assertions {
} }
} }
public static void assertResponseEquals(
ShareGroupHeartbeatResponseData expected,
ShareGroupHeartbeatResponseData actual
) {
if (!responseEquals(expected, actual)) {
assertionFailure()
.expected(expected)
.actual(actual)
.buildAndThrow();
}
}
private static boolean responseEquals( private static boolean responseEquals(
ConsumerGroupHeartbeatResponseData expected, ConsumerGroupHeartbeatResponseData expected,
ConsumerGroupHeartbeatResponseData actual ConsumerGroupHeartbeatResponseData actual
@ -80,6 +93,20 @@ public class Assertions {
return responseAssignmentEquals(expected.assignment(), actual.assignment()); return responseAssignmentEquals(expected.assignment(), actual.assignment());
} }
private static boolean responseEquals(
ShareGroupHeartbeatResponseData expected,
ShareGroupHeartbeatResponseData actual
) {
if (expected.throttleTimeMs() != actual.throttleTimeMs()) return false;
if (expected.errorCode() != actual.errorCode()) return false;
if (!Objects.equals(expected.errorMessage(), actual.errorMessage())) return false;
if (!Objects.equals(expected.memberId(), actual.memberId())) return false;
if (expected.memberEpoch() != actual.memberEpoch()) return false;
if (expected.heartbeatIntervalMs() != actual.heartbeatIntervalMs()) return false;
// Unordered comparison of the assignments.
return responseAssignmentEquals(expected.assignment(), actual.assignment());
}
private static boolean responseAssignmentEquals( private static boolean responseAssignmentEquals(
ConsumerGroupHeartbeatResponseData.Assignment expected, ConsumerGroupHeartbeatResponseData.Assignment expected,
ConsumerGroupHeartbeatResponseData.Assignment actual ConsumerGroupHeartbeatResponseData.Assignment actual
@ -91,6 +118,17 @@ public class Assertions {
return Objects.equals(fromAssignment(expected.topicPartitions()), fromAssignment(actual.topicPartitions())); return Objects.equals(fromAssignment(expected.topicPartitions()), fromAssignment(actual.topicPartitions()));
} }
private static boolean responseAssignmentEquals(
ShareGroupHeartbeatResponseData.Assignment expected,
ShareGroupHeartbeatResponseData.Assignment actual
) {
if (expected == actual) return true;
if (expected == null) return false;
if (actual == null) return false;
return Objects.equals(fromShareGroupAssignment(expected.topicPartitions()), fromShareGroupAssignment(actual.topicPartitions()));
}
private static Map<Uuid, Set<Integer>> fromAssignment( private static Map<Uuid, Set<Integer>> fromAssignment(
List<ConsumerGroupHeartbeatResponseData.TopicPartitions> assignment List<ConsumerGroupHeartbeatResponseData.TopicPartitions> assignment
) { ) {
@ -103,6 +141,18 @@ public class Assertions {
return assignmentMap; return assignmentMap;
} }
private static Map<Uuid, Set<Integer>> fromShareGroupAssignment(
List<ShareGroupHeartbeatResponseData.TopicPartitions> assignment
) {
if (assignment == null) return null;
Map<Uuid, Set<Integer>> assignmentMap = new HashMap<>();
assignment.forEach(topicPartitions -> {
assignmentMap.put(topicPartitions.topicId(), new HashSet<>(topicPartitions.partitions()));
});
return assignmentMap;
}
public static void assertRecordsEquals( public static void assertRecordsEquals(
List<CoordinatorRecord> expectedRecords, List<CoordinatorRecord> expectedRecords,
List<CoordinatorRecord> actualRecords List<CoordinatorRecord> actualRecords

View File

@ -42,7 +42,8 @@ public class GroupCoordinatorConfigTest {
GroupCoordinatorConfig.GROUP_COORDINATOR_CONFIG_DEF, GroupCoordinatorConfig.GROUP_COORDINATOR_CONFIG_DEF,
GroupCoordinatorConfig.NEW_GROUP_CONFIG_DEF, GroupCoordinatorConfig.NEW_GROUP_CONFIG_DEF,
GroupCoordinatorConfig.OFFSET_MANAGEMENT_CONFIG_DEF, GroupCoordinatorConfig.OFFSET_MANAGEMENT_CONFIG_DEF,
GroupCoordinatorConfig.CONSUMER_GROUP_CONFIG_DEF); GroupCoordinatorConfig.CONSUMER_GROUP_CONFIG_DEF,
GroupCoordinatorConfig.SHARE_GROUP_CONFIG_DEF);
@Test @Test
public void testConfigs() { public void testConfigs() {
@ -199,6 +200,11 @@ public class GroupCoordinatorConfigTest {
configs.put(GroupCoordinatorConfig.OFFSET_COMMIT_TIMEOUT_MS_CONFIG, 5000); configs.put(GroupCoordinatorConfig.OFFSET_COMMIT_TIMEOUT_MS_CONFIG, 5000);
configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_MIGRATION_POLICY_CONFIG, ConsumerGroupMigrationPolicy.DISABLED.name()); configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_MIGRATION_POLICY_CONFIG, ConsumerGroupMigrationPolicy.DISABLED.name());
configs.put(GroupCoordinatorConfig.OFFSETS_TOPIC_COMPRESSION_CODEC_CONFIG, (int) CompressionType.NONE.id); configs.put(GroupCoordinatorConfig.OFFSETS_TOPIC_COMPRESSION_CODEC_CONFIG, (int) CompressionType.NONE.id);
configs.put(GroupCoordinatorConfig.SHARE_GROUP_SESSION_TIMEOUT_MS_CONFIG, 45);
configs.put(GroupCoordinatorConfig.SHARE_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG, 45);
configs.put(GroupCoordinatorConfig.SHARE_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, 5);
configs.put(GroupCoordinatorConfig.SHARE_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG, 5);
configs.put(GroupCoordinatorConfig.SHARE_GROUP_MAX_SIZE_CONFIG, (short) 1000);
return createConfig(configs); return createConfig(configs);
} }

View File

@ -49,6 +49,8 @@ import org.apache.kafka.common.message.OffsetDeleteRequestData;
import org.apache.kafka.common.message.OffsetDeleteResponseData; import org.apache.kafka.common.message.OffsetDeleteResponseData;
import org.apache.kafka.common.message.OffsetFetchRequestData; import org.apache.kafka.common.message.OffsetFetchRequestData;
import org.apache.kafka.common.message.OffsetFetchResponseData; import org.apache.kafka.common.message.OffsetFetchResponseData;
import org.apache.kafka.common.message.ShareGroupHeartbeatRequestData;
import org.apache.kafka.common.message.ShareGroupHeartbeatResponseData;
import org.apache.kafka.common.message.SyncGroupRequestData; import org.apache.kafka.common.message.SyncGroupRequestData;
import org.apache.kafka.common.message.SyncGroupResponseData; import org.apache.kafka.common.message.SyncGroupResponseData;
import org.apache.kafka.common.message.TxnOffsetCommitRequestData; import org.apache.kafka.common.message.TxnOffsetCommitRequestData;
@ -191,7 +193,7 @@ public class GroupCoordinatorServiceTest {
assertEquals(new ConsumerGroupHeartbeatResponseData(), future.get(5, TimeUnit.SECONDS)); assertEquals(new ConsumerGroupHeartbeatResponseData(), future.get(5, TimeUnit.SECONDS));
} }
private static Stream<Arguments> testConsumerGroupHeartbeatWithExceptionSource() { private static Stream<Arguments> testGroupHeartbeatWithExceptionSource() {
return Stream.of( return Stream.of(
Arguments.arguments(new UnknownTopicOrPartitionException(), Errors.COORDINATOR_NOT_AVAILABLE.code(), null), Arguments.arguments(new UnknownTopicOrPartitionException(), Errors.COORDINATOR_NOT_AVAILABLE.code(), null),
Arguments.arguments(new NotEnoughReplicasException(), Errors.COORDINATOR_NOT_AVAILABLE.code(), null), Arguments.arguments(new NotEnoughReplicasException(), Errors.COORDINATOR_NOT_AVAILABLE.code(), null),
@ -206,7 +208,7 @@ public class GroupCoordinatorServiceTest {
} }
@ParameterizedTest @ParameterizedTest
@MethodSource("testConsumerGroupHeartbeatWithExceptionSource") @MethodSource("testGroupHeartbeatWithExceptionSource")
public void testConsumerGroupHeartbeatWithException( public void testConsumerGroupHeartbeatWithException(
Throwable exception, Throwable exception,
short expectedErrorCode, short expectedErrorCode,
@ -1585,7 +1587,7 @@ public class GroupCoordinatorServiceTest {
} }
@ParameterizedTest @ParameterizedTest
@MethodSource("testConsumerGroupHeartbeatWithExceptionSource") @MethodSource("testGroupHeartbeatWithExceptionSource")
public void testDeleteOffsetsWithException( public void testDeleteOffsetsWithException(
Throwable exception, Throwable exception,
short expectedErrorCode short expectedErrorCode
@ -1727,7 +1729,7 @@ public class GroupCoordinatorServiceTest {
} }
@ParameterizedTest @ParameterizedTest
@MethodSource("testConsumerGroupHeartbeatWithExceptionSource") @MethodSource("testGroupHeartbeatWithExceptionSource")
public void testDeleteGroupsWithException( public void testDeleteGroupsWithException(
Throwable exception, Throwable exception,
short expectedErrorCode short expectedErrorCode
@ -2103,4 +2105,97 @@ public class GroupCoordinatorServiceTest {
BufferSupplier.NO_CACHING BufferSupplier.NO_CACHING
)); ));
} }
@Test
public void testShareGroupHeartbeatWhenNotStarted() throws ExecutionException, InterruptedException {
CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = mockRuntime();
GroupCoordinatorService service = new GroupCoordinatorService(
new LogContext(),
createConfig(),
runtime,
new GroupCoordinatorMetrics()
);
ShareGroupHeartbeatRequestData request = new ShareGroupHeartbeatRequestData()
.setGroupId("foo");
CompletableFuture<ShareGroupHeartbeatResponseData> future = service.shareGroupHeartbeat(
requestContext(ApiKeys.SHARE_GROUP_HEARTBEAT),
request
);
assertEquals(new ShareGroupHeartbeatResponseData().setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code()), future.get());
}
@Test
public void testShareGroupHeartbeat() throws ExecutionException, InterruptedException, TimeoutException {
CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = mockRuntime();
GroupCoordinatorService service = new GroupCoordinatorService(
new LogContext(),
createConfig(),
runtime,
new GroupCoordinatorMetrics()
);
ShareGroupHeartbeatRequestData request = new ShareGroupHeartbeatRequestData()
.setGroupId("foo");
service.startup(() -> 1);
when(runtime.scheduleWriteOperation(
ArgumentMatchers.eq("share-group-heartbeat"),
ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
ArgumentMatchers.eq(Duration.ofMillis(5000)),
ArgumentMatchers.any()
)).thenReturn(CompletableFuture.completedFuture(
new ShareGroupHeartbeatResponseData()
));
CompletableFuture<ShareGroupHeartbeatResponseData> future = service.shareGroupHeartbeat(
requestContext(ApiKeys.SHARE_GROUP_HEARTBEAT),
request
);
assertEquals(new ShareGroupHeartbeatResponseData(), future.get(5, TimeUnit.SECONDS));
}
@ParameterizedTest
@MethodSource("testGroupHeartbeatWithExceptionSource")
public void testShareGroupHeartbeatWithException(
Throwable exception,
short expectedErrorCode,
String expectedErrorMessage
) throws ExecutionException, InterruptedException, TimeoutException {
CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = mockRuntime();
GroupCoordinatorService service = new GroupCoordinatorService(
new LogContext(),
createConfig(),
runtime,
new GroupCoordinatorMetrics()
);
ShareGroupHeartbeatRequestData request = new ShareGroupHeartbeatRequestData()
.setGroupId("foo");
service.startup(() -> 1);
when(runtime.scheduleWriteOperation(
ArgumentMatchers.eq("share-group-heartbeat"),
ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
ArgumentMatchers.eq(Duration.ofMillis(5000)),
ArgumentMatchers.any()
)).thenReturn(FutureUtils.failedFuture(exception));
CompletableFuture<ShareGroupHeartbeatResponseData> future = service.shareGroupHeartbeat(
requestContext(ApiKeys.SHARE_GROUP_HEARTBEAT),
request
);
assertEquals(
new ShareGroupHeartbeatResponseData()
.setErrorCode(expectedErrorCode)
.setErrorMessage(expectedErrorMessage),
future.get(5, TimeUnit.SECONDS)
);
}
} }

View File

@ -22,6 +22,8 @@ import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
import org.apache.kafka.common.message.DeleteGroupsResponseData; import org.apache.kafka.common.message.DeleteGroupsResponseData;
import org.apache.kafka.common.message.OffsetCommitRequestData; import org.apache.kafka.common.message.OffsetCommitRequestData;
import org.apache.kafka.common.message.OffsetCommitResponseData; import org.apache.kafka.common.message.OffsetCommitResponseData;
import org.apache.kafka.common.message.ShareGroupHeartbeatRequestData;
import org.apache.kafka.common.message.ShareGroupHeartbeatResponseData;
import org.apache.kafka.common.message.TxnOffsetCommitRequestData; import org.apache.kafka.common.message.TxnOffsetCommitRequestData;
import org.apache.kafka.common.message.TxnOffsetCommitResponseData; import org.apache.kafka.common.message.TxnOffsetCommitResponseData;
import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.ApiKeys;
@ -48,6 +50,10 @@ import org.apache.kafka.coordinator.group.generated.GroupMetadataKey;
import org.apache.kafka.coordinator.group.generated.GroupMetadataValue; import org.apache.kafka.coordinator.group.generated.GroupMetadataValue;
import org.apache.kafka.coordinator.group.generated.OffsetCommitKey; import org.apache.kafka.coordinator.group.generated.OffsetCommitKey;
import org.apache.kafka.coordinator.group.generated.OffsetCommitValue; import org.apache.kafka.coordinator.group.generated.OffsetCommitValue;
import org.apache.kafka.coordinator.group.generated.ShareGroupMemberMetadataKey;
import org.apache.kafka.coordinator.group.generated.ShareGroupMemberMetadataValue;
import org.apache.kafka.coordinator.group.generated.ShareGroupMetadataKey;
import org.apache.kafka.coordinator.group.generated.ShareGroupMetadataValue;
import org.apache.kafka.coordinator.group.metrics.CoordinatorMetrics; import org.apache.kafka.coordinator.group.metrics.CoordinatorMetrics;
import org.apache.kafka.coordinator.group.metrics.CoordinatorMetricsShard; import org.apache.kafka.coordinator.group.metrics.CoordinatorMetricsShard;
import org.apache.kafka.coordinator.group.runtime.CoordinatorResult; import org.apache.kafka.coordinator.group.runtime.CoordinatorResult;
@ -1079,4 +1085,146 @@ public class GroupCoordinatorShardTest {
assertEquals(0, timer.size()); assertEquals(0, timer.size());
verify(groupMetadataManager, times(1)).onUnloaded(); verify(groupMetadataManager, times(1)).onUnloaded();
} }
@Test
public void testShareGroupHeartbeat() {
GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class);
OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class);
CoordinatorMetrics coordinatorMetrics = mock(CoordinatorMetrics.class);
CoordinatorMetricsShard metricsShard = mock(CoordinatorMetricsShard.class);
GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
new LogContext(),
groupMetadataManager,
offsetMetadataManager,
Time.SYSTEM,
new MockCoordinatorTimer<>(Time.SYSTEM),
mock(GroupCoordinatorConfig.class),
coordinatorMetrics,
metricsShard
);
RequestContext context = requestContext(ApiKeys.SHARE_GROUP_HEARTBEAT);
ShareGroupHeartbeatRequestData request = new ShareGroupHeartbeatRequestData();
CoordinatorResult<ShareGroupHeartbeatResponseData, CoordinatorRecord> result = new CoordinatorResult<>(
Collections.emptyList(),
new ShareGroupHeartbeatResponseData()
);
when(groupMetadataManager.shareGroupHeartbeat(
context,
request
)).thenReturn(result);
assertEquals(result, coordinator.shareGroupHeartbeat(context, request));
}
@Test
public void testReplayShareGroupMetadata() {
GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class);
OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class);
CoordinatorMetrics coordinatorMetrics = mock(CoordinatorMetrics.class);
CoordinatorMetricsShard metricsShard = mock(CoordinatorMetricsShard.class);
GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
new LogContext(),
groupMetadataManager,
offsetMetadataManager,
Time.SYSTEM,
new MockCoordinatorTimer<>(Time.SYSTEM),
mock(GroupCoordinatorConfig.class),
coordinatorMetrics,
metricsShard
);
ShareGroupMetadataKey key = new ShareGroupMetadataKey();
ShareGroupMetadataValue value = new ShareGroupMetadataValue();
coordinator.replay(0L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, new CoordinatorRecord(
new ApiMessageAndVersion(key, (short) 11),
new ApiMessageAndVersion(value, (short) 0)
));
verify(groupMetadataManager, times(1)).replay(key, value);
}
@Test
public void testReplayShareGroupMetadataWithNullValue() {
GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class);
OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class);
CoordinatorMetrics coordinatorMetrics = mock(CoordinatorMetrics.class);
CoordinatorMetricsShard metricsShard = mock(CoordinatorMetricsShard.class);
GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
new LogContext(),
groupMetadataManager,
offsetMetadataManager,
Time.SYSTEM,
new MockCoordinatorTimer<>(Time.SYSTEM),
mock(GroupCoordinatorConfig.class),
coordinatorMetrics,
metricsShard
);
ShareGroupMetadataKey key = new ShareGroupMetadataKey();
coordinator.replay(0L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, new CoordinatorRecord(
new ApiMessageAndVersion(key, (short) 11),
null
));
verify(groupMetadataManager, times(1)).replay(key, null);
}
@Test
public void testReplayShareGroupMemberMetadata() {
GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class);
OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class);
CoordinatorMetrics coordinatorMetrics = mock(CoordinatorMetrics.class);
CoordinatorMetricsShard metricsShard = mock(CoordinatorMetricsShard.class);
GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
new LogContext(),
groupMetadataManager,
offsetMetadataManager,
Time.SYSTEM,
new MockCoordinatorTimer<>(Time.SYSTEM),
mock(GroupCoordinatorConfig.class),
coordinatorMetrics,
metricsShard
);
ShareGroupMemberMetadataKey key = new ShareGroupMemberMetadataKey();
ShareGroupMemberMetadataValue value = new ShareGroupMemberMetadataValue();
coordinator.replay(0L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, new CoordinatorRecord(
new ApiMessageAndVersion(key, (short) 10),
new ApiMessageAndVersion(value, (short) 0)
));
verify(groupMetadataManager, times(1)).replay(key, value);
}
@Test
public void testReplayShareGroupMemberMetadataWithNullValue() {
GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class);
OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class);
CoordinatorMetrics coordinatorMetrics = mock(CoordinatorMetrics.class);
CoordinatorMetricsShard metricsShard = mock(CoordinatorMetricsShard.class);
GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
new LogContext(),
groupMetadataManager,
offsetMetadataManager,
Time.SYSTEM,
new MockCoordinatorTimer<>(Time.SYSTEM),
mock(GroupCoordinatorConfig.class),
coordinatorMetrics,
metricsShard
);
ShareGroupMemberMetadataKey key = new ShareGroupMemberMetadataKey();
coordinator.replay(0L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, new CoordinatorRecord(
new ApiMessageAndVersion(key, (short) 10),
null
));
verify(groupMetadataManager, times(1)).replay(key, null);
}
} }

View File

@ -50,6 +50,8 @@ import org.apache.kafka.common.message.LeaveGroupRequestData;
import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity; import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity;
import org.apache.kafka.common.message.LeaveGroupResponseData; import org.apache.kafka.common.message.LeaveGroupResponseData;
import org.apache.kafka.common.message.ListGroupsResponseData; import org.apache.kafka.common.message.ListGroupsResponseData;
import org.apache.kafka.common.message.ShareGroupHeartbeatRequestData;
import org.apache.kafka.common.message.ShareGroupHeartbeatResponseData;
import org.apache.kafka.common.message.SyncGroupRequestData; import org.apache.kafka.common.message.SyncGroupRequestData;
import org.apache.kafka.common.message.SyncGroupRequestData.SyncGroupRequestAssignment; import org.apache.kafka.common.message.SyncGroupRequestData.SyncGroupRequestAssignment;
import org.apache.kafka.common.message.SyncGroupResponseData; import org.apache.kafka.common.message.SyncGroupResponseData;
@ -59,6 +61,7 @@ import org.apache.kafka.common.metadata.TopicRecord;
import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Utils; import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.coordinator.group.Group.GroupType;
import org.apache.kafka.coordinator.group.MockCoordinatorTimer.ExpiredTimeout; import org.apache.kafka.coordinator.group.MockCoordinatorTimer.ExpiredTimeout;
import org.apache.kafka.coordinator.group.MockCoordinatorTimer.ScheduledTimeout; import org.apache.kafka.coordinator.group.MockCoordinatorTimer.ScheduledTimeout;
import org.apache.kafka.coordinator.group.api.assignor.ConsumerGroupPartitionAssignor; import org.apache.kafka.coordinator.group.api.assignor.ConsumerGroupPartitionAssignor;
@ -78,6 +81,8 @@ import org.apache.kafka.coordinator.group.modern.TopicMetadata;
import org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroup; import org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroup;
import org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroupBuilder; import org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroupBuilder;
import org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroupMember; import org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroupMember;
import org.apache.kafka.coordinator.group.modern.share.ShareGroup;
import org.apache.kafka.coordinator.group.modern.share.ShareGroupMember;
import org.apache.kafka.coordinator.group.runtime.CoordinatorResult; import org.apache.kafka.coordinator.group.runtime.CoordinatorResult;
import org.apache.kafka.image.MetadataDelta; import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage; import org.apache.kafka.image.MetadataImage;
@ -124,7 +129,7 @@ import static org.apache.kafka.coordinator.group.GroupMetadataManager.classicGro
import static org.apache.kafka.coordinator.group.GroupMetadataManager.classicGroupSyncKey; import static org.apache.kafka.coordinator.group.GroupMetadataManager.classicGroupSyncKey;
import static org.apache.kafka.coordinator.group.GroupMetadataManager.consumerGroupJoinKey; import static org.apache.kafka.coordinator.group.GroupMetadataManager.consumerGroupJoinKey;
import static org.apache.kafka.coordinator.group.GroupMetadataManager.consumerGroupRebalanceTimeoutKey; import static org.apache.kafka.coordinator.group.GroupMetadataManager.consumerGroupRebalanceTimeoutKey;
import static org.apache.kafka.coordinator.group.GroupMetadataManager.consumerGroupSessionTimeoutKey; import static org.apache.kafka.coordinator.group.GroupMetadataManager.groupSessionTimeoutKey;
import static org.apache.kafka.coordinator.group.classic.ClassicGroupMember.EMPTY_ASSIGNMENT; import static org.apache.kafka.coordinator.group.classic.ClassicGroupMember.EMPTY_ASSIGNMENT;
import static org.apache.kafka.coordinator.group.classic.ClassicGroupState.COMPLETING_REBALANCE; import static org.apache.kafka.coordinator.group.classic.ClassicGroupState.COMPLETING_REBALANCE;
import static org.apache.kafka.coordinator.group.classic.ClassicGroupState.DEAD; import static org.apache.kafka.coordinator.group.classic.ClassicGroupState.DEAD;
@ -3112,7 +3117,7 @@ public class GroupMetadataManagerTest {
// Verify the expired timeout. // Verify the expired timeout.
assertEquals( assertEquals(
Collections.singletonList(new ExpiredTimeout<Void, CoordinatorRecord>( Collections.singletonList(new ExpiredTimeout<Void, CoordinatorRecord>(
consumerGroupSessionTimeoutKey(groupId, memberId), groupSessionTimeoutKey(groupId, memberId),
new CoordinatorResult<>( new CoordinatorResult<>(
Arrays.asList( Arrays.asList(
CoordinatorRecordHelpers.newCurrentAssignmentTombstoneRecord(groupId, memberId), CoordinatorRecordHelpers.newCurrentAssignmentTombstoneRecord(groupId, memberId),
@ -3193,7 +3198,7 @@ public class GroupMetadataManagerTest {
// Verify the expired timeout. // Verify the expired timeout.
assertEquals( assertEquals(
Collections.singletonList(new ExpiredTimeout<Void, CoordinatorRecord>( Collections.singletonList(new ExpiredTimeout<Void, CoordinatorRecord>(
consumerGroupSessionTimeoutKey(groupId, memberId), groupSessionTimeoutKey(groupId, memberId),
new CoordinatorResult<>( new CoordinatorResult<>(
Arrays.asList( Arrays.asList(
CoordinatorRecordHelpers.newCurrentAssignmentTombstoneRecord(groupId, memberId), CoordinatorRecordHelpers.newCurrentAssignmentTombstoneRecord(groupId, memberId),
@ -3554,8 +3559,8 @@ public class GroupMetadataManagerTest {
context.groupMetadataManager.onLoaded(); context.groupMetadataManager.onLoaded();
// All members should have a session timeout in place. // All members should have a session timeout in place.
assertNotNull(context.timer.timeout(consumerGroupSessionTimeoutKey("foo", "foo-1"))); assertNotNull(context.timer.timeout(groupSessionTimeoutKey("foo", "foo-1")));
assertNotNull(context.timer.timeout(consumerGroupSessionTimeoutKey("foo", "foo-2"))); assertNotNull(context.timer.timeout(groupSessionTimeoutKey("foo", "foo-2")));
// foo-1 should also have a revocation timeout in place. // foo-1 should also have a revocation timeout in place.
assertNotNull(context.timer.timeout(consumerGroupRebalanceTimeoutKey("foo", "foo-1"))); assertNotNull(context.timer.timeout(consumerGroupRebalanceTimeoutKey("foo", "foo-1")));
@ -8415,12 +8420,14 @@ public class GroupMetadataManagerTest {
public void testListGroups() { public void testListGroups() {
String consumerGroupId = "consumer-group-id"; String consumerGroupId = "consumer-group-id";
String classicGroupId = "classic-group-id"; String classicGroupId = "classic-group-id";
String shareGroupId = "share-group-id";
String memberId1 = Uuid.randomUuid().toString(); String memberId1 = Uuid.randomUuid().toString();
String fooTopicName = "foo"; String fooTopicName = "foo";
MockPartitionAssignor assignor = new MockPartitionAssignor("range"); MockPartitionAssignor assignor = new MockPartitionAssignor("range");
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
.withConsumerGroupAssignors(Collections.singletonList(assignor)) .withConsumerGroupAssignors(Collections.singletonList(assignor))
.withShareGroupAssignor(assignor)
.withConsumerGroup(new ConsumerGroupBuilder(consumerGroupId, 10)) .withConsumerGroup(new ConsumerGroupBuilder(consumerGroupId, 10))
.build(); .build();
@ -8435,6 +8442,8 @@ public class GroupMetadataManagerTest {
.setProtocol("range") .setProtocol("range")
.setCurrentStateTimestamp(context.time.milliseconds()), .setCurrentStateTimestamp(context.time.milliseconds()),
MetadataVersion.latestTesting())); MetadataVersion.latestTesting()));
// Create one share group record.
context.replay(CoordinatorRecordHelpers.newGroupEpochRecord(shareGroupId, 6, GroupType.SHARE));
context.commit(); context.commit();
ClassicGroup classicGroup = context.groupMetadataManager.getOrMaybeCreateClassicGroup(classicGroupId, false); ClassicGroup classicGroup = context.groupMetadataManager.getOrMaybeCreateClassicGroup(classicGroupId, false);
context.replay(CoordinatorRecordHelpers.newMemberSubscriptionRecord(consumerGroupId, new ConsumerGroupMember.Builder(memberId1) context.replay(CoordinatorRecordHelpers.newMemberSubscriptionRecord(consumerGroupId, new ConsumerGroupMember.Builder(memberId1)
@ -8458,7 +8467,12 @@ public class GroupMetadataManagerTest {
.setGroupId(consumerGroupId) .setGroupId(consumerGroupId)
.setProtocolType(ConsumerProtocol.PROTOCOL_TYPE) .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
.setGroupState(ConsumerGroup.ConsumerGroupState.EMPTY.toString()) .setGroupState(ConsumerGroup.ConsumerGroupState.EMPTY.toString())
.setGroupType(Group.GroupType.CONSUMER.toString()) .setGroupType(Group.GroupType.CONSUMER.toString()),
new ListGroupsResponseData.ListedGroup()
.setGroupId(shareGroupId)
.setProtocolType(ShareGroup.PROTOCOL_TYPE)
.setGroupState(ShareGroup.ShareGroupState.EMPTY.toString())
.setGroupType(Group.GroupType.SHARE.toString())
).collect(Collectors.toMap(ListGroupsResponseData.ListedGroup::groupId, Function.identity())); ).collect(Collectors.toMap(ListGroupsResponseData.ListedGroup::groupId, Function.identity()));
assertEquals(expectAllGroupMap, actualAllGroupMap); assertEquals(expectAllGroupMap, actualAllGroupMap);
@ -8494,7 +8508,12 @@ public class GroupMetadataManagerTest {
.setGroupId(classicGroup.groupId()) .setGroupId(classicGroup.groupId())
.setProtocolType("classic") .setProtocolType("classic")
.setGroupState(EMPTY.toString()) .setGroupState(EMPTY.toString())
.setGroupType(Group.GroupType.CLASSIC.toString()) .setGroupType(Group.GroupType.CLASSIC.toString()),
new ListGroupsResponseData.ListedGroup()
.setGroupId(shareGroupId)
.setProtocolType(ShareGroup.PROTOCOL_TYPE)
.setGroupState(ShareGroup.ShareGroupState.EMPTY.toString())
.setGroupType(Group.GroupType.SHARE.toString())
).collect(Collectors.toMap(ListGroupsResponseData.ListedGroup::groupId, Function.identity())); ).collect(Collectors.toMap(ListGroupsResponseData.ListedGroup::groupId, Function.identity()));
assertEquals(expectAllGroupMap, actualAllGroupMap); assertEquals(expectAllGroupMap, actualAllGroupMap);
@ -8525,6 +8544,18 @@ public class GroupMetadataManagerTest {
assertEquals(expectAllGroupMap, actualAllGroupMap); assertEquals(expectAllGroupMap, actualAllGroupMap);
actualAllGroupMap = context.sendListGroups(Collections.emptyList(), Collections.singletonList("Share")).stream()
.collect(Collectors.toMap(ListGroupsResponseData.ListedGroup::groupId, Function.identity()));
expectAllGroupMap = Stream.of(
new ListGroupsResponseData.ListedGroup()
.setGroupId(shareGroupId)
.setProtocolType(ShareGroup.PROTOCOL_TYPE)
.setGroupState(ShareGroup.ShareGroupState.EMPTY.toString())
.setGroupType(Group.GroupType.SHARE.toString())
).collect(Collectors.toMap(ListGroupsResponseData.ListedGroup::groupId, Function.identity()));
assertEquals(expectAllGroupMap, actualAllGroupMap);
actualAllGroupMap = context.sendListGroups(Arrays.asList("empty", "Assigning"), Collections.emptyList()).stream() actualAllGroupMap = context.sendListGroups(Arrays.asList("empty", "Assigning"), Collections.emptyList()).stream()
.collect(Collectors.toMap(ListGroupsResponseData.ListedGroup::groupId, Function.identity())); .collect(Collectors.toMap(ListGroupsResponseData.ListedGroup::groupId, Function.identity()));
expectAllGroupMap = Stream.of( expectAllGroupMap = Stream.of(
@ -8537,7 +8568,12 @@ public class GroupMetadataManagerTest {
.setGroupId(consumerGroupId) .setGroupId(consumerGroupId)
.setProtocolType(ConsumerProtocol.PROTOCOL_TYPE) .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
.setGroupState(ConsumerGroup.ConsumerGroupState.ASSIGNING.toString()) .setGroupState(ConsumerGroup.ConsumerGroupState.ASSIGNING.toString())
.setGroupType(Group.GroupType.CONSUMER.toString()) .setGroupType(Group.GroupType.CONSUMER.toString()),
new ListGroupsResponseData.ListedGroup()
.setGroupId(shareGroupId)
.setProtocolType(ShareGroup.PROTOCOL_TYPE)
.setGroupState(ShareGroup.ShareGroupState.EMPTY.toString())
.setGroupType(Group.GroupType.SHARE.toString())
).collect(Collectors.toMap(ListGroupsResponseData.ListedGroup::groupId, Function.identity())); ).collect(Collectors.toMap(ListGroupsResponseData.ListedGroup::groupId, Function.identity()));
assertEquals(expectAllGroupMap, actualAllGroupMap); assertEquals(expectAllGroupMap, actualAllGroupMap);
@ -10752,7 +10788,7 @@ public class GroupMetadataManagerTest {
// Advance time past the session timeout. // Advance time past the session timeout.
// Member 2 should be fenced from the group, thus triggering the downgrade. // Member 2 should be fenced from the group, thus triggering the downgrade.
MockCoordinatorTimer.ExpiredTimeout<Void, CoordinatorRecord> timeout = context.sleep(45000 + 1).get(0); MockCoordinatorTimer.ExpiredTimeout<Void, CoordinatorRecord> timeout = context.sleep(45000 + 1).get(0);
assertEquals(consumerGroupSessionTimeoutKey(groupId, memberId2), timeout.key); assertEquals(groupSessionTimeoutKey(groupId, memberId2), timeout.key);
byte[] assignment = Utils.toArray(ConsumerProtocol.serializeAssignment(new ConsumerPartitionAssignor.Assignment(Arrays.asList( byte[] assignment = Utils.toArray(ConsumerProtocol.serializeAssignment(new ConsumerPartitionAssignor.Assignment(Arrays.asList(
new TopicPartition(fooTopicName, 0), new TopicPartition(fooTopicName, 0),
@ -12943,7 +12979,7 @@ public class GroupMetadataManagerTest {
// The member is fenced from the group. // The member is fenced from the group.
assertEquals(1, timeouts.size()); assertEquals(1, timeouts.size());
MockCoordinatorTimer.ExpiredTimeout<Void, CoordinatorRecord> timeout = timeouts.get(0); MockCoordinatorTimer.ExpiredTimeout<Void, CoordinatorRecord> timeout = timeouts.get(0);
assertEquals(consumerGroupSessionTimeoutKey(groupId, memberId), timeout.key); assertEquals(groupSessionTimeoutKey(groupId, memberId), timeout.key);
assertRecordsEquals( assertRecordsEquals(
Arrays.asList( Arrays.asList(
// The member is removed. // The member is removed.
@ -13426,6 +13462,394 @@ public class GroupMetadataManagerTest {
assertEquals(Group.GroupType.CONSUMER, context.groupMetadataManager.group(groupId).type()); assertEquals(Group.GroupType.CONSUMER, context.groupMetadataManager.group(groupId).type());
} }
@Test
public void testShareGroupHeartbeatRequestValidation() {
MockPartitionAssignor assignor = new MockPartitionAssignor("share");
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
.withShareGroupAssignor(assignor)
.build();
Exception ex;
// GroupId must be present in all requests.
ex = assertThrows(InvalidRequestException.class, () -> context.shareGroupHeartbeat(
new ShareGroupHeartbeatRequestData()));
assertEquals("GroupId can't be empty.", ex.getMessage());
// GroupId can't be all whitespaces.
ex = assertThrows(InvalidRequestException.class, () -> context.shareGroupHeartbeat(
new ShareGroupHeartbeatRequestData()
.setGroupId(" ")));
assertEquals("GroupId can't be empty.", ex.getMessage());
// SubscribedTopicNames must be present and empty in the first request (epoch == 0).
ex = assertThrows(InvalidRequestException.class, () -> context.shareGroupHeartbeat(
new ShareGroupHeartbeatRequestData()
.setGroupId("foo")
.setMemberEpoch(0)));
assertEquals("SubscribedTopicNames must be set in first request.", ex.getMessage());
// MemberId must be non-empty in all requests except for the first one where it
// could be empty (epoch != 0).
ex = assertThrows(InvalidRequestException.class, () -> context.shareGroupHeartbeat(
new ShareGroupHeartbeatRequestData()
.setGroupId("foo")
.setMemberEpoch(1)));
assertEquals("MemberId can't be empty.", ex.getMessage());
// RackId must be non-empty if provided in all requests.
ex = assertThrows(InvalidRequestException.class, () -> context.shareGroupHeartbeat(
new ShareGroupHeartbeatRequestData()
.setGroupId("foo")
.setMemberId(Uuid.randomUuid().toString())
.setMemberEpoch(1)
.setRackId("")));
assertEquals("RackId can't be empty.", ex.getMessage());
}
@Test
public void testShareGroupMemberIdGeneration() {
MockPartitionAssignor assignor = new MockPartitionAssignor("share");
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
.withShareGroupAssignor(assignor)
.withMetadataImage(MetadataImage.EMPTY)
.build();
assignor.prepareGroupAssignment(new GroupAssignment(
Collections.emptyMap()
));
CoordinatorResult<ShareGroupHeartbeatResponseData, CoordinatorRecord> result = context.shareGroupHeartbeat(
new ShareGroupHeartbeatRequestData()
.setGroupId("group-foo")
.setMemberEpoch(0)
.setSubscribedTopicNames(Arrays.asList("foo", "bar")));
// Verify that a member id was generated for the new member.
String memberId = result.response().memberId();
assertNotNull(memberId);
assertNotEquals("", memberId);
// The response should get a bumped epoch and should not
// contain any assignment because we did not provide
// topics metadata.
assertEquals(
new ShareGroupHeartbeatResponseData()
.setMemberId(memberId)
.setMemberEpoch(1)
.setHeartbeatIntervalMs(5000)
.setAssignment(new ShareGroupHeartbeatResponseData.Assignment()),
result.response()
);
}
@Test
public void testShareGroupUnknownGroupId() {
String groupId = "fooup";
String memberId = Uuid.randomUuid().toString();
MockPartitionAssignor assignor = new MockPartitionAssignor("share");
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
.withShareGroupAssignor(assignor)
.build();
assertThrows(IllegalStateException.class, () ->
context.shareGroupHeartbeat(
new ShareGroupHeartbeatRequestData()
.setGroupId(groupId)
.setMemberId(memberId)
.setMemberEpoch(100) // Epoch must be > 0.
.setSubscribedTopicNames(Arrays.asList("foo", "bar"))));
}
@Test
public void testShareGroupUnknownMemberIdJoins() {
String groupId = "fooup";
String memberId = Uuid.randomUuid().toString();
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
.withShareGroupAssignor(new NoOpPartitionAssignor())
.build();
// A first member joins to create the group.
context.shareGroupHeartbeat(
new ShareGroupHeartbeatRequestData()
.setGroupId(groupId)
.setMemberId(memberId)
.setMemberEpoch(0)
.setSubscribedTopicNames(Arrays.asList("foo", "bar")));
// The second member is rejected because the member id is unknown and
// the member epoch is not zero.
assertThrows(UnknownMemberIdException.class, () ->
context.shareGroupHeartbeat(
new ShareGroupHeartbeatRequestData()
.setGroupId(groupId)
.setMemberId(Uuid.randomUuid().toString())
.setMemberEpoch(1)
.setSubscribedTopicNames(Arrays.asList("foo", "bar"))));
}
@Test
public void testShareGroupMemberRejoinAfterFailover() {
String groupId = "fooup";
String memberId = Uuid.randomUuid().toString();
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
.withShareGroupAssignor(new NoOpPartitionAssignor())
.build();
context.replay(CoordinatorRecordHelpers.newGroupEpochRecord(groupId, 100, GroupType.SHARE));
// Member is not known after the failover.
assertThrows(UnknownMemberIdException.class, () ->
context.shareGroupHeartbeat(
new ShareGroupHeartbeatRequestData()
.setGroupId(groupId)
.setMemberId(memberId)
.setMemberEpoch(100)
.setSubscribedTopicNames(Arrays.asList("foo", "bar"))));
// Member joins with epoch 0 to re-join the group.
CoordinatorResult<ShareGroupHeartbeatResponseData, CoordinatorRecord> result = context.shareGroupHeartbeat(
new ShareGroupHeartbeatRequestData()
.setGroupId(groupId)
.setMemberId(memberId)
.setMemberEpoch(0)
.setSubscribedTopicNames(Arrays.asList("foo", "bar")));
assertEquals(101, result.response().memberEpoch());
}
@Test
public void testShareGroupMemberJoinsEmptyGroupWithAssignments() {
String groupId = "fooup";
String memberId = Uuid.randomUuid().toString();
Uuid fooTopicId = Uuid.randomUuid();
String fooTopicName = "foo";
Uuid barTopicId = Uuid.randomUuid();
String barTopicName = "bar";
MockPartitionAssignor assignor = new MockPartitionAssignor("share");
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
.withShareGroupAssignor(assignor)
.withMetadataImage(new MetadataImageBuilder()
.addTopic(fooTopicId, fooTopicName, 6)
.addTopic(barTopicId, barTopicName, 3)
.addRacks()
.build())
.build();
assignor.prepareGroupAssignment(new GroupAssignment(
Collections.singletonMap(memberId, new MemberAssignmentImpl(mkAssignment(
mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5),
mkTopicAssignment(barTopicId, 0, 1, 2)
)))
));
assertThrows(GroupIdNotFoundException.class, () ->
context.groupMetadataManager.shareGroup(groupId));
CoordinatorResult<ShareGroupHeartbeatResponseData, CoordinatorRecord> result = context.shareGroupHeartbeat(
new ShareGroupHeartbeatRequestData()
.setGroupId(groupId)
.setMemberId(memberId)
.setMemberEpoch(0)
.setSubscribedTopicNames(Arrays.asList("foo", "bar")));
assertResponseEquals(
new ShareGroupHeartbeatResponseData()
.setMemberId(memberId)
.setMemberEpoch(1)
.setHeartbeatIntervalMs(5000)
.setAssignment(new ShareGroupHeartbeatResponseData.Assignment()
.setTopicPartitions(Arrays.asList(
new ShareGroupHeartbeatResponseData.TopicPartitions()
.setTopicId(fooTopicId)
.setPartitions(Arrays.asList(0, 1, 2, 3, 4, 5)),
new ShareGroupHeartbeatResponseData.TopicPartitions()
.setTopicId(barTopicId)
.setPartitions(Arrays.asList(0, 1, 2))
))),
result.response()
);
ShareGroupMember expectedMember = new ShareGroupMember.Builder(memberId)
.setClientId("client")
.setClientHost("localhost/127.0.0.1")
.setSubscribedTopicNames(Arrays.asList("foo", "bar"))
.build();
List<CoordinatorRecord> expectedRecords = Arrays.asList(
CoordinatorRecordHelpers.newShareGroupMemberSubscriptionRecord(groupId, expectedMember),
CoordinatorRecordHelpers.newGroupEpochRecord(groupId, 1, GroupType.SHARE)
);
assertRecordsEquals(expectedRecords, result.records());
}
@Test
public void testShareGroupLeavingMemberBumpsGroupEpoch() {
String groupId = "fooup";
String memberId1 = Uuid.randomUuid().toString();
String memberId2 = Uuid.randomUuid().toString();
// A share group cannot have pre-defined members and member metadata as members and assignments
// are not persisted.
MockPartitionAssignor assignor = new MockPartitionAssignor("share");
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
.withShareGroupAssignor(assignor)
.build();
assignor.prepareGroupAssignment(new GroupAssignment(
Collections.emptyMap()
));
context.replay(CoordinatorRecordHelpers.newGroupEpochRecord(groupId, 100, GroupType.SHARE));
// Member 1 joins the group.
CoordinatorResult<ShareGroupHeartbeatResponseData, CoordinatorRecord> result = context.shareGroupHeartbeat(
new ShareGroupHeartbeatRequestData()
.setGroupId(groupId)
.setMemberId(memberId1)
.setMemberEpoch(0)
.setSubscribedTopicNames(Arrays.asList("foo", "bar")));
assertEquals(101, result.response().memberEpoch());
// Member 2 joins the group.
result = context.shareGroupHeartbeat(
new ShareGroupHeartbeatRequestData()
.setGroupId(groupId)
.setMemberId(memberId2)
.setMemberEpoch(0)
.setSubscribedTopicNames(Arrays.asList("foo", "bar")));
assertEquals(102, result.response().memberEpoch());
// Member 2 leaves the consumer group.
result = context.shareGroupHeartbeat(
new ShareGroupHeartbeatRequestData()
.setGroupId(groupId)
.setMemberId(memberId2)
.setMemberEpoch(LEAVE_GROUP_MEMBER_EPOCH)
.setSubscribedTopicNames(Arrays.asList("foo", "bar")));
assertResponseEquals(
new ShareGroupHeartbeatResponseData()
.setMemberId(memberId2)
.setMemberEpoch(LEAVE_GROUP_MEMBER_EPOCH),
result.response()
);
List<CoordinatorRecord> expectedRecords = Arrays.asList(
CoordinatorRecordHelpers.newShareGroupMemberSubscriptionTombstoneRecord(groupId, memberId2),
CoordinatorRecordHelpers.newGroupEpochRecord(groupId, 103, GroupType.SHARE)
);
assertRecordsEquals(expectedRecords, result.records());
}
@Test
public void testShareGroupNewMemberIsRejectedWithMaximumMembersIsReached() {
String groupId = "fooup";
// Use a static member id as it makes the test easier.
String memberId1 = Uuid.randomUuid().toString();
String memberId2 = Uuid.randomUuid().toString();
// A share group cannot have pre-defined members and member metadata as members and assignments
// are not persisted.
MockPartitionAssignor assignor = new MockPartitionAssignor("share");
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
.withShareGroupAssignor(assignor)
.withShareGroupMaxSize(1)
.build();
assignor.prepareGroupAssignment(new GroupAssignment(
Collections.emptyMap()
));
context.replay(CoordinatorRecordHelpers.newGroupEpochRecord(groupId, 100, GroupType.SHARE));
// Member 1 joins the group.
CoordinatorResult<ShareGroupHeartbeatResponseData, CoordinatorRecord> result = context.shareGroupHeartbeat(
new ShareGroupHeartbeatRequestData()
.setGroupId(groupId)
.setMemberId(memberId1)
.setMemberEpoch(0)
.setSubscribedTopicNames(Arrays.asList("foo", "bar")));
assertEquals(101, result.response().memberEpoch());
// Member 2 joins the group.
assertThrows(GroupMaxSizeReachedException.class, () -> context.shareGroupHeartbeat(
new ShareGroupHeartbeatRequestData()
.setGroupId(groupId)
.setMemberId(memberId2)
.setMemberEpoch(0)
.setSubscribedTopicNames(Arrays.asList("foo", "bar"))));
}
@Test
public void testShareGroupDelete() {
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
.build();
context.groupMetadataManager.getOrMaybeCreatePersistedShareGroup("share-group-id", true);
List<CoordinatorRecord> expectedRecords = Collections.singletonList(
CoordinatorRecordHelpers.newGroupEpochTombstoneRecord("share-group-id", GroupType.SHARE)
);
List<CoordinatorRecord> records = new ArrayList<>();
context.groupMetadataManager.createGroupTombstoneRecords("share-group-id", records);
assertEquals(expectedRecords, records);
}
@Test
public void testShareGroupStates() {
String groupId = "fooup";
String memberId1 = Uuid.randomUuid().toString();
Uuid fooTopicId = Uuid.randomUuid();
String fooTopicName = "foo";
MockPartitionAssignor assignor = new MockPartitionAssignor("share-range");
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
.withShareGroupAssignor(assignor)
.build();
assignor.prepareGroupAssignment(new GroupAssignment(
Collections.emptyMap()
));
context.replay(CoordinatorRecordHelpers.newGroupEpochRecord(groupId, 10, GroupType.SHARE), GroupType.SHARE);
assertEquals(ShareGroup.ShareGroupState.EMPTY, context.shareGroupState(groupId));
context.replay(CoordinatorRecordHelpers.newShareGroupMemberSubscriptionRecord(groupId, new ShareGroupMember.Builder(memberId1)
.setSubscribedTopicNames(Collections.singletonList(fooTopicName))
.build()), GroupType.SHARE);
context.replay(CoordinatorRecordHelpers.newGroupEpochRecord(groupId, 11, GroupType.SHARE), GroupType.SHARE);
assertEquals(ShareGroup.ShareGroupState.STABLE, context.shareGroupState(groupId));
context.replay(CoordinatorRecordHelpers.newTargetAssignmentRecord(groupId, memberId1, mkAssignment(
mkTopicAssignment(fooTopicId, 1, 2, 3))), GroupType.SHARE);
context.replay(CoordinatorRecordHelpers.newTargetAssignmentEpochRecord(groupId, 11), GroupType.SHARE);
assertEquals(ShareGroup.ShareGroupState.STABLE, context.shareGroupState(groupId));
context.replay(CoordinatorRecordHelpers.newCurrentAssignmentRecord(groupId, new ShareGroupMember.Builder(memberId1)
.setMemberEpoch(11)
.setPreviousMemberEpoch(10)
.setAssignedPartitions(mkAssignment(mkTopicAssignment(fooTopicId, 1, 2)))
.build()), GroupType.SHARE);
assertEquals(ShareGroup.ShareGroupState.STABLE, context.shareGroupState(groupId));
context.replay(CoordinatorRecordHelpers.newCurrentAssignmentRecord(groupId, new ShareGroupMember.Builder(memberId1)
.setMemberEpoch(11)
.setPreviousMemberEpoch(10)
.setAssignedPartitions(mkAssignment(mkTopicAssignment(fooTopicId, 1, 2, 3)))
.build()), GroupType.SHARE);
assertEquals(ShareGroup.ShareGroupState.STABLE, context.shareGroupState(groupId));
}
private static void checkJoinGroupResponse( private static void checkJoinGroupResponse(
JoinGroupResponseData expectedResponse, JoinGroupResponseData expectedResponse,
JoinGroupResponseData actualResponse, JoinGroupResponseData actualResponse,

View File

@ -33,6 +33,8 @@ import org.apache.kafka.common.message.JoinGroupResponseData;
import org.apache.kafka.common.message.LeaveGroupRequestData; import org.apache.kafka.common.message.LeaveGroupRequestData;
import org.apache.kafka.common.message.LeaveGroupResponseData; import org.apache.kafka.common.message.LeaveGroupResponseData;
import org.apache.kafka.common.message.ListGroupsResponseData; import org.apache.kafka.common.message.ListGroupsResponseData;
import org.apache.kafka.common.message.ShareGroupHeartbeatRequestData;
import org.apache.kafka.common.message.ShareGroupHeartbeatResponseData;
import org.apache.kafka.common.message.SyncGroupRequestData; import org.apache.kafka.common.message.SyncGroupRequestData;
import org.apache.kafka.common.message.SyncGroupResponseData; import org.apache.kafka.common.message.SyncGroupResponseData;
import org.apache.kafka.common.network.ClientInformation; import org.apache.kafka.common.network.ClientInformation;
@ -46,7 +48,9 @@ import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.security.auth.SecurityProtocol; import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.coordinator.group.Group.GroupType;
import org.apache.kafka.coordinator.group.api.assignor.ConsumerGroupPartitionAssignor; import org.apache.kafka.coordinator.group.api.assignor.ConsumerGroupPartitionAssignor;
import org.apache.kafka.coordinator.group.api.assignor.ShareGroupPartitionAssignor;
import org.apache.kafka.coordinator.group.classic.ClassicGroup; import org.apache.kafka.coordinator.group.classic.ClassicGroup;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey; import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue; import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue;
@ -62,10 +66,15 @@ import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmen
import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue; import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue;
import org.apache.kafka.coordinator.group.generated.GroupMetadataKey; import org.apache.kafka.coordinator.group.generated.GroupMetadataKey;
import org.apache.kafka.coordinator.group.generated.GroupMetadataValue; import org.apache.kafka.coordinator.group.generated.GroupMetadataValue;
import org.apache.kafka.coordinator.group.generated.ShareGroupMemberMetadataKey;
import org.apache.kafka.coordinator.group.generated.ShareGroupMemberMetadataValue;
import org.apache.kafka.coordinator.group.generated.ShareGroupMetadataKey;
import org.apache.kafka.coordinator.group.generated.ShareGroupMetadataValue;
import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetricsShard; import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetricsShard;
import org.apache.kafka.coordinator.group.modern.MemberState; import org.apache.kafka.coordinator.group.modern.MemberState;
import org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroup; import org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroup;
import org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroupBuilder; import org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroupBuilder;
import org.apache.kafka.coordinator.group.modern.share.ShareGroup;
import org.apache.kafka.coordinator.group.runtime.CoordinatorResult; import org.apache.kafka.coordinator.group.runtime.CoordinatorResult;
import org.apache.kafka.image.MetadataImage; import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.server.common.ApiMessageAndVersion; import org.apache.kafka.server.common.ApiMessageAndVersion;
@ -91,8 +100,8 @@ import static org.apache.kafka.coordinator.group.GroupMetadataManager.EMPTY_RESU
import static org.apache.kafka.coordinator.group.GroupMetadataManager.classicGroupHeartbeatKey; import static org.apache.kafka.coordinator.group.GroupMetadataManager.classicGroupHeartbeatKey;
import static org.apache.kafka.coordinator.group.GroupMetadataManager.consumerGroupJoinKey; import static org.apache.kafka.coordinator.group.GroupMetadataManager.consumerGroupJoinKey;
import static org.apache.kafka.coordinator.group.GroupMetadataManager.consumerGroupRebalanceTimeoutKey; import static org.apache.kafka.coordinator.group.GroupMetadataManager.consumerGroupRebalanceTimeoutKey;
import static org.apache.kafka.coordinator.group.GroupMetadataManager.consumerGroupSessionTimeoutKey;
import static org.apache.kafka.coordinator.group.GroupMetadataManager.consumerGroupSyncKey; import static org.apache.kafka.coordinator.group.GroupMetadataManager.consumerGroupSyncKey;
import static org.apache.kafka.coordinator.group.GroupMetadataManager.groupSessionTimeoutKey;
import static org.apache.kafka.coordinator.group.classic.ClassicGroupState.COMPLETING_REBALANCE; import static org.apache.kafka.coordinator.group.classic.ClassicGroupState.COMPLETING_REBALANCE;
import static org.apache.kafka.coordinator.group.classic.ClassicGroupState.DEAD; import static org.apache.kafka.coordinator.group.classic.ClassicGroupState.DEAD;
import static org.apache.kafka.coordinator.group.classic.ClassicGroupState.EMPTY; import static org.apache.kafka.coordinator.group.classic.ClassicGroupState.EMPTY;
@ -388,6 +397,9 @@ public class GroupMetadataManagerTestContext {
private int classicGroupMaxSessionTimeoutMs = 10 * 60 * 1000; private int classicGroupMaxSessionTimeoutMs = 10 * 60 * 1000;
private final GroupCoordinatorMetricsShard metrics = mock(GroupCoordinatorMetricsShard.class); private final GroupCoordinatorMetricsShard metrics = mock(GroupCoordinatorMetricsShard.class);
private ConsumerGroupMigrationPolicy consumerGroupMigrationPolicy = ConsumerGroupMigrationPolicy.DISABLED; private ConsumerGroupMigrationPolicy consumerGroupMigrationPolicy = ConsumerGroupMigrationPolicy.DISABLED;
// Share group configs
private ShareGroupPartitionAssignor shareGroupAssignor = new MockPartitionAssignor("share");
private int shareGroupMaxSize = Integer.MAX_VALUE;
public Builder withMetadataImage(MetadataImage metadataImage) { public Builder withMetadataImage(MetadataImage metadataImage) {
this.metadataImage = metadataImage; this.metadataImage = metadataImage;
@ -439,6 +451,16 @@ public class GroupMetadataManagerTestContext {
return this; return this;
} }
public Builder withShareGroupAssignor(ShareGroupPartitionAssignor shareGroupAssignor) {
this.shareGroupAssignor = shareGroupAssignor;
return this;
}
public Builder withShareGroupMaxSize(int shareGroupMaxSize) {
this.shareGroupMaxSize = shareGroupMaxSize;
return this;
}
public GroupMetadataManagerTestContext build() { public GroupMetadataManagerTestContext build() {
if (metadataImage == null) metadataImage = MetadataImage.EMPTY; if (metadataImage == null) metadataImage = MetadataImage.EMPTY;
if (consumerGroupAssignors == null) consumerGroupAssignors = Collections.emptyList(); if (consumerGroupAssignors == null) consumerGroupAssignors = Collections.emptyList();
@ -466,11 +488,14 @@ public class GroupMetadataManagerTestContext {
.withClassicGroupNewMemberJoinTimeoutMs(classicGroupNewMemberJoinTimeoutMs) .withClassicGroupNewMemberJoinTimeoutMs(classicGroupNewMemberJoinTimeoutMs)
.withGroupCoordinatorMetricsShard(metrics) .withGroupCoordinatorMetricsShard(metrics)
.withConsumerGroupMigrationPolicy(consumerGroupMigrationPolicy) .withConsumerGroupMigrationPolicy(consumerGroupMigrationPolicy)
.withShareGroupAssignor(shareGroupAssignor)
.withShareGroupMaxSize(shareGroupMaxSize)
.build(), .build(),
classicGroupInitialRebalanceDelayMs, classicGroupInitialRebalanceDelayMs,
classicGroupNewMemberJoinTimeoutMs classicGroupNewMemberJoinTimeoutMs
); );
consumerGroupBuilders.forEach(builder -> builder.build(metadataImage.topics()).forEach(context::replay));
consumerGroupBuilders.forEach(builder -> builder.build(metadataImage.topics()).forEach(context::replay)); consumerGroupBuilders.forEach(builder -> builder.build(metadataImage.topics()).forEach(context::replay));
context.commit(); context.commit();
@ -528,6 +553,14 @@ public class GroupMetadataManagerTestContext {
.state(); .state();
} }
public ShareGroup.ShareGroupState shareGroupState(
String groupId
) {
return groupMetadataManager
.shareGroup(groupId)
.state();
}
public MemberState consumerGroupMemberState( public MemberState consumerGroupMemberState(
String groupId, String groupId,
String memberId String memberId
@ -568,6 +601,34 @@ public class GroupMetadataManagerTestContext {
return result; return result;
} }
public CoordinatorResult<ShareGroupHeartbeatResponseData, CoordinatorRecord> shareGroupHeartbeat(
ShareGroupHeartbeatRequestData request
) {
RequestContext context = new RequestContext(
new RequestHeader(
ApiKeys.SHARE_GROUP_HEARTBEAT,
ApiKeys.SHARE_GROUP_HEARTBEAT.latestVersion(),
"client",
0
),
"1",
InetAddress.getLoopbackAddress(),
KafkaPrincipal.ANONYMOUS,
ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT),
SecurityProtocol.PLAINTEXT,
ClientInformation.EMPTY,
false
);
CoordinatorResult<ShareGroupHeartbeatResponseData, CoordinatorRecord> result = groupMetadataManager.shareGroupHeartbeat(
context,
request
);
result.records().forEach(this::replay);
return result;
}
public List<MockCoordinatorTimer.ExpiredTimeout<Void, CoordinatorRecord>> sleep(long ms) { public List<MockCoordinatorTimer.ExpiredTimeout<Void, CoordinatorRecord>> sleep(long ms) {
time.sleep(ms); time.sleep(ms);
List<MockCoordinatorTimer.ExpiredTimeout<Void, CoordinatorRecord>> timeouts = timer.poll(); List<MockCoordinatorTimer.ExpiredTimeout<Void, CoordinatorRecord>> timeouts = timer.poll();
@ -585,7 +646,7 @@ public class GroupMetadataManagerTestContext {
long delayMs long delayMs
) { ) {
MockCoordinatorTimer.ScheduledTimeout<Void, CoordinatorRecord> timeout = MockCoordinatorTimer.ScheduledTimeout<Void, CoordinatorRecord> timeout =
timer.timeout(consumerGroupSessionTimeoutKey(groupId, memberId)); timer.timeout(groupSessionTimeoutKey(groupId, memberId));
assertNotNull(timeout); assertNotNull(timeout);
assertEquals(time.milliseconds() + delayMs, timeout.deadlineMs); assertEquals(time.milliseconds() + delayMs, timeout.deadlineMs);
} }
@ -595,7 +656,7 @@ public class GroupMetadataManagerTestContext {
String memberId String memberId
) { ) {
MockCoordinatorTimer.ScheduledTimeout<Void, CoordinatorRecord> timeout = MockCoordinatorTimer.ScheduledTimeout<Void, CoordinatorRecord> timeout =
timer.timeout(consumerGroupSessionTimeoutKey(groupId, memberId)); timer.timeout(groupSessionTimeoutKey(groupId, memberId));
assertNull(timeout); assertNull(timeout);
} }
@ -1357,6 +1418,13 @@ public class GroupMetadataManagerTestContext {
public void replay( public void replay(
CoordinatorRecord record CoordinatorRecord record
) {
replay(record, null);
}
public void replay(
CoordinatorRecord record,
GroupType groupType
) { ) {
ApiMessageAndVersion key = record.key(); ApiMessageAndVersion key = record.key();
ApiMessageAndVersion value = record.value(); ApiMessageAndVersion value = record.value();
@ -1390,28 +1458,46 @@ public class GroupMetadataManagerTestContext {
case ConsumerGroupPartitionMetadataKey.HIGHEST_SUPPORTED_VERSION: case ConsumerGroupPartitionMetadataKey.HIGHEST_SUPPORTED_VERSION:
groupMetadataManager.replay( groupMetadataManager.replay(
(ConsumerGroupPartitionMetadataKey) key.message(), (ConsumerGroupPartitionMetadataKey) key.message(),
(ConsumerGroupPartitionMetadataValue) messageOrNull(value) (ConsumerGroupPartitionMetadataValue) messageOrNull(value),
groupType != null ? groupType : GroupType.CONSUMER
); );
break; break;
case ConsumerGroupTargetAssignmentMemberKey.HIGHEST_SUPPORTED_VERSION: case ConsumerGroupTargetAssignmentMemberKey.HIGHEST_SUPPORTED_VERSION:
groupMetadataManager.replay( groupMetadataManager.replay(
(ConsumerGroupTargetAssignmentMemberKey) key.message(), (ConsumerGroupTargetAssignmentMemberKey) key.message(),
(ConsumerGroupTargetAssignmentMemberValue) messageOrNull(value) (ConsumerGroupTargetAssignmentMemberValue) messageOrNull(value),
groupType != null ? groupType : GroupType.CONSUMER
); );
break; break;
case ConsumerGroupTargetAssignmentMetadataKey.HIGHEST_SUPPORTED_VERSION: case ConsumerGroupTargetAssignmentMetadataKey.HIGHEST_SUPPORTED_VERSION:
groupMetadataManager.replay( groupMetadataManager.replay(
(ConsumerGroupTargetAssignmentMetadataKey) key.message(), (ConsumerGroupTargetAssignmentMetadataKey) key.message(),
(ConsumerGroupTargetAssignmentMetadataValue) messageOrNull(value) (ConsumerGroupTargetAssignmentMetadataValue) messageOrNull(value),
groupType != null ? groupType : GroupType.CONSUMER
); );
break; break;
case ConsumerGroupCurrentMemberAssignmentKey.HIGHEST_SUPPORTED_VERSION: case ConsumerGroupCurrentMemberAssignmentKey.HIGHEST_SUPPORTED_VERSION:
groupMetadataManager.replay( groupMetadataManager.replay(
(ConsumerGroupCurrentMemberAssignmentKey) key.message(), (ConsumerGroupCurrentMemberAssignmentKey) key.message(),
(ConsumerGroupCurrentMemberAssignmentValue) messageOrNull(value) (ConsumerGroupCurrentMemberAssignmentValue) messageOrNull(value),
groupType != null ? groupType : GroupType.CONSUMER
);
break;
case ShareGroupMetadataKey.HIGHEST_SUPPORTED_VERSION:
groupMetadataManager.replay(
(ShareGroupMetadataKey) key.message(),
(ShareGroupMetadataValue) messageOrNull(value)
);
break;
case ShareGroupMemberMetadataKey.HIGHEST_SUPPORTED_VERSION:
groupMetadataManager.replay(
(ShareGroupMemberMetadataKey) key.message(),
(ShareGroupMemberMetadataValue) messageOrNull(value)
); );
break; break;

View File

@ -21,13 +21,14 @@ import org.apache.kafka.coordinator.group.api.assignor.ConsumerGroupPartitionAss
import org.apache.kafka.coordinator.group.api.assignor.GroupAssignment; import org.apache.kafka.coordinator.group.api.assignor.GroupAssignment;
import org.apache.kafka.coordinator.group.api.assignor.GroupSpec; import org.apache.kafka.coordinator.group.api.assignor.GroupSpec;
import org.apache.kafka.coordinator.group.api.assignor.PartitionAssignorException; import org.apache.kafka.coordinator.group.api.assignor.PartitionAssignorException;
import org.apache.kafka.coordinator.group.api.assignor.ShareGroupPartitionAssignor;
import org.apache.kafka.coordinator.group.api.assignor.SubscribedTopicDescriber; import org.apache.kafka.coordinator.group.api.assignor.SubscribedTopicDescriber;
import java.util.Map; import java.util.Map;
import java.util.Objects; import java.util.Objects;
import java.util.Set; import java.util.Set;
public class MockPartitionAssignor implements ConsumerGroupPartitionAssignor { public class MockPartitionAssignor implements ConsumerGroupPartitionAssignor, ShareGroupPartitionAssignor {
private final String name; private final String name;
private GroupAssignment prepareGroupAssignment = null; private GroupAssignment prepareGroupAssignment = null;

View File

@ -19,12 +19,13 @@ package org.apache.kafka.coordinator.group;
import org.apache.kafka.coordinator.group.api.assignor.ConsumerGroupPartitionAssignor; import org.apache.kafka.coordinator.group.api.assignor.ConsumerGroupPartitionAssignor;
import org.apache.kafka.coordinator.group.api.assignor.GroupAssignment; import org.apache.kafka.coordinator.group.api.assignor.GroupAssignment;
import org.apache.kafka.coordinator.group.api.assignor.GroupSpec; import org.apache.kafka.coordinator.group.api.assignor.GroupSpec;
import org.apache.kafka.coordinator.group.api.assignor.ShareGroupPartitionAssignor;
import org.apache.kafka.coordinator.group.api.assignor.SubscribedTopicDescriber; import org.apache.kafka.coordinator.group.api.assignor.SubscribedTopicDescriber;
import java.util.function.Function; import java.util.function.Function;
import java.util.stream.Collectors; import java.util.stream.Collectors;
public class NoOpPartitionAssignor implements ConsumerGroupPartitionAssignor { public class NoOpPartitionAssignor implements ConsumerGroupPartitionAssignor, ShareGroupPartitionAssignor {
static final String NAME = "no-op"; static final String NAME = "no-op";
@Override @Override

View File

@ -53,6 +53,7 @@ public abstract class AbstractKafkaConfig extends AbstractConfig {
GroupCoordinatorConfig.NEW_GROUP_CONFIG_DEF, GroupCoordinatorConfig.NEW_GROUP_CONFIG_DEF,
GroupCoordinatorConfig.OFFSET_MANAGEMENT_CONFIG_DEF, GroupCoordinatorConfig.OFFSET_MANAGEMENT_CONFIG_DEF,
GroupCoordinatorConfig.CONSUMER_GROUP_CONFIG_DEF, GroupCoordinatorConfig.CONSUMER_GROUP_CONFIG_DEF,
GroupCoordinatorConfig.SHARE_GROUP_CONFIG_DEF,
CleanerConfig.CONFIG_DEF, CleanerConfig.CONFIG_DEF,
LogConfig.SERVER_CONFIG_DEF, LogConfig.SERVER_CONFIG_DEF,
ShareGroupConfig.CONFIG_DEF, ShareGroupConfig.CONFIG_DEF,

View File

@ -21,7 +21,6 @@ import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.utils.Utils; import org.apache.kafka.common.utils.Utils;
import static org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM; import static org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM;
import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
import static org.apache.kafka.common.config.ConfigDef.Range.between; import static org.apache.kafka.common.config.ConfigDef.Range.between;
import static org.apache.kafka.common.config.ConfigDef.Type.BOOLEAN; import static org.apache.kafka.common.config.ConfigDef.Type.BOOLEAN;
import static org.apache.kafka.common.config.ConfigDef.Type.INT; import static org.apache.kafka.common.config.ConfigDef.Type.INT;
@ -47,34 +46,6 @@ public class ShareGroupConfig {
public static final short SHARE_GROUP_MAX_GROUPS_DEFAULT = 10; public static final short SHARE_GROUP_MAX_GROUPS_DEFAULT = 10;
public static final String SHARE_GROUP_MAX_GROUPS_DOC = "The maximum number of share groups."; public static final String SHARE_GROUP_MAX_GROUPS_DOC = "The maximum number of share groups.";
public static final String SHARE_GROUP_MAX_SIZE_CONFIG = "group.share.max.size";
public static final short SHARE_GROUP_MAX_SIZE_DEFAULT = 200;
public static final String SHARE_GROUP_MAX_SIZE_DOC = "The maximum number of consumers that a single share group can accommodate.";
public static final String SHARE_GROUP_SESSION_TIMEOUT_MS_CONFIG = "group.share.session.timeout.ms";
public static final int SHARE_GROUP_SESSION_TIMEOUT_MS_DEFAULT = 45000;
public static final String SHARE_GROUP_SESSION_TIMEOUT_MS_DOC = "The timeout to detect client failures when using the share group protocol.";
public static final String SHARE_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG = "group.share.min.session.timeout.ms";
public static final int SHARE_GROUP_MIN_SESSION_TIMEOUT_MS_DEFAULT = 45000;
public static final String SHARE_GROUP_MIN_SESSION_TIMEOUT_MS_DOC = "The minimum allowed session timeout for share group members.";
public static final String SHARE_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG = "group.share.max.session.timeout.ms";
public static final int SHARE_GROUP_MAX_SESSION_TIMEOUT_MS_DEFAULT = 60000;
public static final String SHARE_GROUP_MAX_SESSION_TIMEOUT_MS_DOC = "The maximum allowed session timeout for share group members.";
public static final String SHARE_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG = "group.share.heartbeat.interval.ms";
public static final int SHARE_GROUP_HEARTBEAT_INTERVAL_MS_DEFAULT = 5000;
public static final String SHARE_GROUP_HEARTBEAT_INTERVAL_MS_DOC = "The heartbeat interval given to the members of a share group.";
public static final String SHARE_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG = "group.share.min.heartbeat.interval.ms";
public static final int SHARE_GROUP_MIN_HEARTBEAT_INTERVAL_MS_DEFAULT = 5000;
public static final String SHARE_GROUP_MIN_HEARTBEAT_INTERVAL_MS_DOC = "The minimum heartbeat interval for share group members.";
public static final String SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG = "group.share.max.heartbeat.interval.ms";
public static final int SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DEFAULT = 15000;
public static final String SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DOC = "The maximum heartbeat interval for share group members.";
public static final String SHARE_GROUP_RECORD_LOCK_DURATION_MS_CONFIG = "group.share.record.lock.duration.ms"; public static final String SHARE_GROUP_RECORD_LOCK_DURATION_MS_CONFIG = "group.share.record.lock.duration.ms";
public static final int SHARE_GROUP_RECORD_LOCK_DURATION_MS_DEFAULT = 30000; public static final int SHARE_GROUP_RECORD_LOCK_DURATION_MS_DEFAULT = 30000;
public static final String SHARE_GROUP_RECORD_LOCK_DURATION_MS_DOC = "The record acquisition lock duration in milliseconds for share groups."; public static final String SHARE_GROUP_RECORD_LOCK_DURATION_MS_DOC = "The record acquisition lock duration in milliseconds for share groups.";
@ -93,27 +64,13 @@ public class ShareGroupConfig {
.define(SHARE_GROUP_RECORD_LOCK_DURATION_MS_CONFIG, INT, SHARE_GROUP_RECORD_LOCK_DURATION_MS_DEFAULT, between(1000, 60000), MEDIUM, SHARE_GROUP_RECORD_LOCK_DURATION_MS_DOC) .define(SHARE_GROUP_RECORD_LOCK_DURATION_MS_CONFIG, INT, SHARE_GROUP_RECORD_LOCK_DURATION_MS_DEFAULT, between(1000, 60000), MEDIUM, SHARE_GROUP_RECORD_LOCK_DURATION_MS_DOC)
.define(SHARE_GROUP_MIN_RECORD_LOCK_DURATION_MS_CONFIG, INT, SHARE_GROUP_MIN_RECORD_LOCK_DURATION_MS_DEFAULT, between(1000, 30000), MEDIUM, SHARE_GROUP_MIN_RECORD_LOCK_DURATION_MS_DOC) .define(SHARE_GROUP_MIN_RECORD_LOCK_DURATION_MS_CONFIG, INT, SHARE_GROUP_MIN_RECORD_LOCK_DURATION_MS_DEFAULT, between(1000, 30000), MEDIUM, SHARE_GROUP_MIN_RECORD_LOCK_DURATION_MS_DOC)
.define(SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS_CONFIG, INT, SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS_DEFAULT, between(30000, 3600000), MEDIUM, SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS_DOC) .define(SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS_CONFIG, INT, SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS_DEFAULT, between(30000, 3600000), MEDIUM, SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS_DOC)
.define(SHARE_GROUP_PARTITION_MAX_RECORD_LOCKS_CONFIG, INT, SHARE_GROUP_PARTITION_MAX_RECORD_LOCKS_DEFAULT, between(100, 10000), MEDIUM, SHARE_GROUP_PARTITION_MAX_RECORD_LOCKS_DOC)
.define(SHARE_GROUP_SESSION_TIMEOUT_MS_CONFIG, INT, SHARE_GROUP_SESSION_TIMEOUT_MS_DEFAULT, atLeast(1), MEDIUM, SHARE_GROUP_SESSION_TIMEOUT_MS_DOC)
.define(SHARE_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG, INT, SHARE_GROUP_MIN_SESSION_TIMEOUT_MS_DEFAULT, atLeast(1), MEDIUM, SHARE_GROUP_MIN_SESSION_TIMEOUT_MS_DOC)
.define(SHARE_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG, INT, SHARE_GROUP_MAX_SESSION_TIMEOUT_MS_DEFAULT, atLeast(1), MEDIUM, SHARE_GROUP_MAX_SESSION_TIMEOUT_MS_DOC)
.define(SHARE_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, INT, SHARE_GROUP_HEARTBEAT_INTERVAL_MS_DEFAULT, atLeast(1), MEDIUM, SHARE_GROUP_HEARTBEAT_INTERVAL_MS_DOC)
.define(SHARE_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG, INT, SHARE_GROUP_MIN_HEARTBEAT_INTERVAL_MS_DEFAULT, atLeast(1), MEDIUM, SHARE_GROUP_MIN_HEARTBEAT_INTERVAL_MS_DOC)
.define(SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG, INT, SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DEFAULT, atLeast(1), MEDIUM, SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DOC)
.define(SHARE_GROUP_MAX_GROUPS_CONFIG, SHORT, SHARE_GROUP_MAX_GROUPS_DEFAULT, between(1, 100), MEDIUM, SHARE_GROUP_MAX_GROUPS_DOC) .define(SHARE_GROUP_MAX_GROUPS_CONFIG, SHORT, SHARE_GROUP_MAX_GROUPS_DEFAULT, between(1, 100), MEDIUM, SHARE_GROUP_MAX_GROUPS_DOC)
.define(SHARE_GROUP_MAX_SIZE_CONFIG, SHORT, SHARE_GROUP_MAX_SIZE_DEFAULT, between(10, 1000), MEDIUM, SHARE_GROUP_MAX_SIZE_DOC); .define(SHARE_GROUP_PARTITION_MAX_RECORD_LOCKS_CONFIG, INT, SHARE_GROUP_PARTITION_MAX_RECORD_LOCKS_DEFAULT, between(100, 10000), MEDIUM, SHARE_GROUP_PARTITION_MAX_RECORD_LOCKS_DOC);
private final boolean isShareGroupEnabled; private final boolean isShareGroupEnabled;
private final int shareGroupPartitionMaxRecordLocks; private final int shareGroupPartitionMaxRecordLocks;
private final int shareGroupDeliveryCountLimit; private final int shareGroupDeliveryCountLimit;
private final short shareGroupMaxGroups; private final short shareGroupMaxGroups;
private final short shareGroupMaxSize;
private final int shareGroupSessionTimeoutMs;
private final int shareGroupMinSessionTimeoutMs;
private final int shareGroupMaxSessionTimeoutMs;
private final int shareGroupHeartbeatIntervalMs;
private final int shareGroupMinHeartbeatIntervalMs;
private final int shareGroupMaxHeartbeatIntervalMs;
private final int shareGroupRecordLockDurationMs; private final int shareGroupRecordLockDurationMs;
private final int shareGroupMaxRecordLockDurationMs; private final int shareGroupMaxRecordLockDurationMs;
private final int shareGroupMinRecordLockDurationMs; private final int shareGroupMinRecordLockDurationMs;
@ -123,16 +80,9 @@ public class ShareGroupConfig {
shareGroupPartitionMaxRecordLocks = config.getInt(ShareGroupConfig.SHARE_GROUP_PARTITION_MAX_RECORD_LOCKS_CONFIG); shareGroupPartitionMaxRecordLocks = config.getInt(ShareGroupConfig.SHARE_GROUP_PARTITION_MAX_RECORD_LOCKS_CONFIG);
shareGroupDeliveryCountLimit = config.getInt(ShareGroupConfig.SHARE_GROUP_DELIVERY_COUNT_LIMIT_CONFIG); shareGroupDeliveryCountLimit = config.getInt(ShareGroupConfig.SHARE_GROUP_DELIVERY_COUNT_LIMIT_CONFIG);
shareGroupMaxGroups = config.getShort(ShareGroupConfig.SHARE_GROUP_MAX_GROUPS_CONFIG); shareGroupMaxGroups = config.getShort(ShareGroupConfig.SHARE_GROUP_MAX_GROUPS_CONFIG);
shareGroupSessionTimeoutMs = config.getInt(ShareGroupConfig.SHARE_GROUP_SESSION_TIMEOUT_MS_CONFIG);
shareGroupMinSessionTimeoutMs = config.getInt(ShareGroupConfig.SHARE_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG);
shareGroupMaxSessionTimeoutMs = config.getInt(ShareGroupConfig.SHARE_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG);
shareGroupHeartbeatIntervalMs = config.getInt(ShareGroupConfig.SHARE_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG);
shareGroupMinHeartbeatIntervalMs = config.getInt(ShareGroupConfig.SHARE_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG);
shareGroupMaxHeartbeatIntervalMs = config.getInt(ShareGroupConfig.SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG);
shareGroupRecordLockDurationMs = config.getInt(ShareGroupConfig.SHARE_GROUP_RECORD_LOCK_DURATION_MS_CONFIG); shareGroupRecordLockDurationMs = config.getInt(ShareGroupConfig.SHARE_GROUP_RECORD_LOCK_DURATION_MS_CONFIG);
shareGroupMaxRecordLockDurationMs = config.getInt(ShareGroupConfig.SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS_CONFIG); shareGroupMaxRecordLockDurationMs = config.getInt(ShareGroupConfig.SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS_CONFIG);
shareGroupMinRecordLockDurationMs = config.getInt(ShareGroupConfig.SHARE_GROUP_MIN_RECORD_LOCK_DURATION_MS_CONFIG); shareGroupMinRecordLockDurationMs = config.getInt(ShareGroupConfig.SHARE_GROUP_MIN_RECORD_LOCK_DURATION_MS_CONFIG);
shareGroupMaxSize = config.getShort(ShareGroupConfig.SHARE_GROUP_MAX_SIZE_CONFIG);
validate(); validate();
} }
@ -153,34 +103,6 @@ public class ShareGroupConfig {
return shareGroupMaxGroups; return shareGroupMaxGroups;
} }
public short shareGroupMaxSize() {
return shareGroupMaxSize;
}
public int shareGroupSessionTimeoutMs() {
return shareGroupSessionTimeoutMs;
}
public int shareGroupMinSessionTimeoutMs() {
return shareGroupMinSessionTimeoutMs;
}
public int shareGroupMaxSessionTimeoutMs() {
return shareGroupMaxSessionTimeoutMs;
}
public int shareGroupHeartbeatIntervalMs() {
return shareGroupHeartbeatIntervalMs;
}
public int shareGroupMinHeartbeatIntervalMs() {
return shareGroupMinHeartbeatIntervalMs;
}
public int shareGroupMaxHeartbeatIntervalMs() {
return shareGroupMaxHeartbeatIntervalMs;
}
public int shareGroupRecordLockDurationMs() { public int shareGroupRecordLockDurationMs() {
return shareGroupRecordLockDurationMs; return shareGroupRecordLockDurationMs;
} }
@ -194,26 +116,6 @@ public class ShareGroupConfig {
} }
private void validate() { private void validate() {
Utils.require(shareGroupMaxHeartbeatIntervalMs >= shareGroupMinHeartbeatIntervalMs,
String.format("%s must be greater than or equals to %s",
SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG, SHARE_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG));
Utils.require(shareGroupHeartbeatIntervalMs >= shareGroupMinHeartbeatIntervalMs,
String.format("%s must be greater than or equals to %s",
SHARE_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, SHARE_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG));
Utils.require(shareGroupHeartbeatIntervalMs <= shareGroupMaxHeartbeatIntervalMs,
String.format("%s must be less than or equals to %s",
SHARE_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG));
Utils.require(shareGroupMaxSessionTimeoutMs >= shareGroupMinSessionTimeoutMs,
String.format("%s must be greater than or equals to %s",
SHARE_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG, SHARE_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG));
Utils.require(shareGroupSessionTimeoutMs >= shareGroupMinSessionTimeoutMs,
String.format("%s must be greater than or equals to %s",
SHARE_GROUP_SESSION_TIMEOUT_MS_CONFIG, SHARE_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG));
Utils.require(shareGroupSessionTimeoutMs <= shareGroupMaxSessionTimeoutMs,
String.format("%s must be less than or equals to %s",
SHARE_GROUP_SESSION_TIMEOUT_MS_CONFIG, SHARE_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG));
Utils.require(shareGroupRecordLockDurationMs >= shareGroupMinRecordLockDurationMs, Utils.require(shareGroupRecordLockDurationMs >= shareGroupMinRecordLockDurationMs,
String.format("%s must be greater than or equals to %s", String.format("%s must be greater than or equals to %s",
SHARE_GROUP_RECORD_LOCK_DURATION_MS_CONFIG, SHARE_GROUP_MIN_RECORD_LOCK_DURATION_MS_CONFIG)); SHARE_GROUP_RECORD_LOCK_DURATION_MS_CONFIG, SHARE_GROUP_MIN_RECORD_LOCK_DURATION_MS_CONFIG));