diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml index 255537dd8d0..4fc539d6067 100644 --- a/checkstyle/import-control.xml +++ b/checkstyle/import-control.xml @@ -221,6 +221,7 @@ + diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java new file mode 100644 index 00000000000..2edf0612444 --- /dev/null +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java @@ -0,0 +1,1011 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.generic; + +import org.apache.kafka.clients.consumer.internals.ConsumerProtocol; +import org.apache.kafka.common.message.JoinGroupResponseData; +import org.apache.kafka.common.message.ListGroupsResponseData; +import org.apache.kafka.common.message.SyncGroupResponseData; +import org.apache.kafka.common.message.JoinGroupResponseData.JoinGroupResponseMember; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.protocol.types.SchemaException; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.coordinator.group.Group; +import org.slf4j.Logger; + +import java.nio.ByteBuffer; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; + +import static org.apache.kafka.coordinator.group.generic.GenericGroupState.COMPLETING_REBALANCE; +import static org.apache.kafka.coordinator.group.generic.GenericGroupState.DEAD; +import static org.apache.kafka.coordinator.group.generic.GenericGroupState.EMPTY; +import static org.apache.kafka.coordinator.group.generic.GenericGroupState.PREPARING_REBALANCE; + +/** + * This class holds metadata for a generic group where the + * member assignment is driven solely from the client side. + * + * The APIs members use to make changes to the group membership + * consist of JoinGroup, SyncGroup, and LeaveGroup. + */ +public class GenericGroup implements Group { + + /** + * Empty generation. + */ + public static final int NO_GENERATION = -1; + + /** + * Protocol with empty name. + */ + public static final String NO_PROTOCOL_NAME = ""; + + /** + * No leader. + */ + public static final String NO_LEADER = ""; + + /** + * Delimiter used to join a randomly generated UUID + * with a client id or a group instance id. + */ + private static final String MEMBER_ID_DELIMITER = "-"; + + /** + * The slf4j logger. + */ + private final Logger log; + + /** + * The group id. + */ + private final String groupId; + + /** + * The time. + */ + private final Time time; + + /** + * The current group state. + */ + private GenericGroupState state; + + /** + * The timestamp of when the group transitioned + * to its current state. + */ + private Optional currentStateTimestamp; + + /** + * The protocol type used for rebalance. + */ + private Optional protocolType = Optional.empty(); + + /** + * The protocol name used for rebalance. + */ + private Optional protocolName = Optional.empty(); + + /** + * The generation id. + */ + private int generationId = 0; + + /** + * The id of the group's leader. + */ + private Optional leaderId = Optional.empty(); + + /** + * The members of the group. + */ + private final Map members = new HashMap<>(); + + /** + * The static members of the group. + */ + private final Map staticMembers = new HashMap<>(); + + /** + * Members who have yet to (re)join the group + * during the join group phase. + */ + private final Set pendingJoinMembers = new HashSet<>(); + + /** + * The number of members awaiting a join response. + */ + private int numMembersAwaitingJoinResponse = 0; + + /** + * Map of protocol names to the number of members that support them. + */ + private final Map supportedProtocols = new HashMap<>(); + + /** + * Members who have yet to sync with the group + * during the sync group phase. + */ + private final Set pendingSyncMembers = new HashSet<>(); + + /** + * The list of topics to which the group members are subscribed. + */ + private Optional> subscribedTopics = Optional.empty(); + + /** + * A flag to indiciate whether a new member was added. Used + * to further delay initial joins (new group). + */ + private boolean newMemberAdded = false; + + public GenericGroup( + LogContext logContext, + String groupId, + GenericGroupState initialState, + Time time + ) { + Objects.requireNonNull(logContext); + this.log = logContext.logger(GenericGroup.class); + this.groupId = Objects.requireNonNull(groupId); + this.state = Objects.requireNonNull(initialState); + this.time = Objects.requireNonNull(time); + this.currentStateTimestamp = Optional.of(time.milliseconds()); + } + + /** + * The type of this group. + * + * @return The group type (Generic). + */ + @Override + public GroupType type() { + return GroupType.GENERIC; + } + + /** + * The state of this group. + * + * @return The current state as a String. + */ + @Override + public String stateAsString() { + return this.state.toString(); + } + + /** + * @return the group id. + */ + public String groupId() { + return this.groupId; + } + + /** + * @return the generation id. + */ + public int generationId() { + return this.generationId; + } + + /** + * @return the protocol name. + */ + public Optional protocolName() { + return this.protocolName; + } + + /** + * @return the protocol type. + */ + public Optional protocolType() { + return this.protocolType; + } + + /** + * @return the current group state. + */ + public GenericGroupState currentState() { + return state; + } + + /** + * Compares the group's current state with the given state. + * + * @param groupState the state to match against. + * @return true if the state matches, false otherwise. + */ + public boolean isInState(GenericGroupState groupState) { + return this.state == groupState; + } + + /** + * To identify whether the given member id is part of this group. + * + * @param memberId the given member id. + * @return true if the member is part of this group, false otherwise. + */ + public boolean hasMemberId(String memberId) { + return members.containsKey(memberId); + } + + /** + * Get the member metadata associated with the provided member id. + * + * @param memberId the member id. + * @return the member metadata if it exists, null otherwise. + */ + public GenericGroupMember member(String memberId) { + return members.get(memberId); + } + + /** + * @return the total number of members in this group. + */ + public int size() { + return members.size(); + } + + /** + * Used to identify whether the given member is the leader of this group. + * + * @param memberId the member id. + * @return true if the member is the leader, false otherwise. + */ + public boolean isLeader(String memberId) { + return leaderId.map(id -> id.equals(memberId)).orElse(false); + } + + /** + * @return the leader id or null if a leader does not exist. + */ + public String leaderOrNull() { + return leaderId.orElse(null); + } + + /** + * @return the current state timestamp. + */ + public long currentStateTimestampOrDefault() { + return currentStateTimestamp.orElse(-1L); + } + + /** + * @return whether the group is using the consumer protocol. + */ + public boolean usesConsumerGroupProtocol() { + return protocolType.map(type -> + type.equals(ConsumerProtocol.PROTOCOL_TYPE) + ).orElse(false); + } + + /** + * Add a member to this group. + * + * @param member the member to add. + */ + public void add(GenericGroupMember member) { + add(member, null); + } + + /** + * Add a member to this group. + * + * @param member the member to add. + * @param future the future to complete once the join group phase completes. + */ + public void add(GenericGroupMember member, CompletableFuture future) { + member.groupInstanceId().ifPresent(instanceId -> { + if (staticMembers.containsKey(instanceId)) { + throw new IllegalStateException("Static member with groupInstanceId=" + + instanceId + " cannot be added to group " + groupId + " since" + + " it is already a member."); + } + staticMembers.put(instanceId, member.memberId()); + }); + + if (members.isEmpty()) { + this.protocolType = Optional.of(member.protocolType()); + } + + if (!Objects.equals(this.protocolType.orElse(null), member.protocolType())) { + throw new IllegalStateException("The group and member's protocol type must be the same."); + } + + if (!supportsProtocols(member)) { + throw new IllegalStateException("None of the member's protocols can be supported."); + } + + if (!leaderId.isPresent()) { + leaderId = Optional.of(member.memberId()); + } + + members.put(member.memberId(), member); + incrementSupportedProtocols(member); + member.setAwaitingJoinFuture(future); + + if (member.isAwaitingJoin()) { + numMembersAwaitingJoinResponse++; + } + + pendingJoinMembers.remove(member.memberId()); + } + + /** + * Remove a member from the group. + * + * @param memberId the member id to remove. + */ + public void remove(String memberId) { + GenericGroupMember removedMember = members.remove(memberId); + if (removedMember != null) { + decrementSupportedProtocols(removedMember); + if (removedMember.isAwaitingJoin()) { + numMembersAwaitingJoinResponse--; + } + + removedMember.groupInstanceId().ifPresent(staticMembers::remove); + } + + if (isLeader(memberId)) { + Iterator iter = members.keySet().iterator(); + String newLeader = iter.hasNext() ? iter.next() : null; + leaderId = Optional.ofNullable(newLeader); + } + + pendingJoinMembers.remove(memberId); + pendingSyncMembers.remove(memberId); + } + + /** + * Check whether current leader has rejoined. If not, choose a + * new leader from one of the joined members. + * + * Return false if + * 1. the group is currently empty (has no designated leader) + * 2. no member rejoined + * + * @return true if a new leader was elected or the existing + * leader rejoined, false otherwise. + */ + public boolean maybeElectNewJoinedLeader() { + if (leaderId.isPresent()) { + GenericGroupMember currentLeader = member(leaderId.get()); + if (!currentLeader.isAwaitingJoin()) { + for (GenericGroupMember member : members.values()) { + if (member.isAwaitingJoin()) { + leaderId = Optional.of(member.memberId()); + log.info("Group leader [memberId: {}, groupInstanceId: {}] " + + "failed to join before the rebalance timeout. Member {} " + + "was elected as the new leader.", + currentLeader.memberId(), + currentLeader.groupInstanceId().orElse("None"), + member + ); + return true; + } + } + log.info("Group leader [memberId: {}, groupInstanceId: {}] " + + "failed to join before the rebalance timeout and the " + + "group couldn't proceed to the next generation because " + + "no member joined.", + currentLeader.memberId(), + currentLeader.groupInstanceId().orElse("None") + ); + return false; + } + return true; + } + return false; + } + + /** + * [For static members only]: Replace the old member id with the new one, + * keep everything else unchanged and return the updated member. + * + * @param groupInstanceId the group instance id. + * @param oldMemberId the old member id. + * @param newMemberId the new member id that will replace the old member id. + * @return the old member. + */ + public GenericGroupMember replaceStaticMember( + String groupInstanceId, + String oldMemberId, + String newMemberId + ) { + GenericGroupMember removedMember = members.remove(oldMemberId); + if (removedMember == null) { + throw new IllegalArgumentException("Cannot replace non-existing member id " + oldMemberId); + } + + // Fence potential duplicate member immediately if someone awaits join/sync future. + JoinGroupResponseData joinGroupResponse = new JoinGroupResponseData() + .setMembers(Collections.emptyList()) + .setMemberId(oldMemberId) + .setGenerationId(NO_GENERATION) + .setProtocolName(null) + .setProtocolType(null) + .setLeader(NO_LEADER) + .setSkipAssignment(false) + .setErrorCode(Errors.FENCED_INSTANCE_ID.code()); + completeJoinFuture(removedMember, joinGroupResponse); + + SyncGroupResponseData syncGroupResponse = new SyncGroupResponseData() + .setAssignment(new byte[0]) + .setProtocolName(null) + .setProtocolType(null) + .setErrorCode(Errors.FENCED_INSTANCE_ID.code()); + completeSyncFuture(removedMember, syncGroupResponse); + + GenericGroupMember newMember = new GenericGroupMember( + newMemberId, + removedMember.groupInstanceId(), + removedMember.clientId(), + removedMember.clientHost(), + removedMember.rebalanceTimeoutMs(), + removedMember.sessionTimeoutMs(), + removedMember.protocolType(), + removedMember.supportedProtocols(), + removedMember.assignment() + ); + + members.put(newMemberId, newMember); + + if (isLeader(oldMemberId)) { + leaderId = Optional.of(newMemberId); + } + + staticMembers.put(groupInstanceId, newMemberId); + return removedMember; + } + + /** + * Check whether a member has joined the group. + * + * @param memberId the member id. + * @return true if the member has yet to join, false otherwise. + */ + public boolean isPendingMember(String memberId) { + return pendingJoinMembers.contains(memberId); + } + + /** + * Add a pending member. + * + * @param memberId the member id. + * @return true if the group did not already have the pending member, + * false otherwise. + */ + public boolean addPendingMember(String memberId) { + if (hasMemberId(memberId)) { + throw new IllegalStateException("Attept to add pending member " + memberId + + " which is already a stable member of the group."); + } + return pendingJoinMembers.add(memberId); + } + + /** + * @return number of members that are pending join. + */ + public int numPending() { + return pendingJoinMembers.size(); + } + + /** + * Add a pending sync member. + * + * @param memberId the member id. + * @return true if the group did not already have the pending sync member, + * false otherwise. + */ + public boolean addPendingSyncMember(String memberId) { + if (!hasMemberId(memberId)) { + throw new IllegalStateException("Attept to add pending sync member " + memberId + + " which is already a stable member of the group."); + } + + return pendingSyncMembers.add(memberId); + } + + /** + * Remove a member that has not yet synced. + * + * @param memberId the member id. + * @return true if the group did store this member, false otherwise. + */ + public boolean removePendingSyncMember(String memberId) { + if (!hasMemberId(memberId)) { + throw new IllegalStateException("Attept to add pending member " + memberId + + " which is already a stable member of the group."); + } + return pendingSyncMembers.remove(memberId); + } + + /** + * @return true if all members have sent sync group requests, + * false otherwise. + */ + public boolean hasReceivedSyncFromAllMembers() { + return pendingSyncMembers.isEmpty(); + } + + /** + * @return members that have yet to sync. + */ + public Set allPendingSyncMembers() { + return pendingSyncMembers; + } + + /** + * Clear members pending sync. + */ + public void clearPendingSyncMembers() { + pendingSyncMembers.clear(); + } + + /** + * Checks whether the given group instance id exists as + * a static member. + * + * @param groupInstanceId the group instance id. + * @return true if a static member with the group instance id exists, + * false otherwise. + */ + public boolean hasStaticMember(String groupInstanceId) { + return staticMembers.containsKey(groupInstanceId); + } + + /** + * Get member id of a static member that matches the given group + * instance id. + * + * @param groupInstanceId the group instance id. + * @return the static member if it exists. + */ + public String staticMemberId(String groupInstanceId) { + return staticMembers.get(groupInstanceId); + } + + /** + * @return members who have yet to rejoin during the + * join group phase. + */ + public Map notYetRejoinedMembers() { + Map notYetRejoinedMembers = new HashMap<>(); + members.values().forEach(member -> { + if (!member.isAwaitingJoin()) { + notYetRejoinedMembers.put(member.memberId(), member); + } + }); + return notYetRejoinedMembers; + } + + /** + * @return whether all members have joined. + */ + public boolean hasAllMembersJoined() { + return members.size() == numMembersAwaitingJoinResponse && pendingJoinMembers.isEmpty(); + } + + /** + * @return the ids of all members in the group. + */ + public Set allMemberIds() { + return members.keySet(); + } + + /** + * @return all static members in the group. + */ + public Set allStaticMemberIds() { + return staticMembers.keySet(); + } + + // For testing only. + Set allDynamicMemberIds() { + Set dynamicMemberSet = new HashSet<>(allMemberIds()); + staticMembers.values().forEach(dynamicMemberSet::remove); + return dynamicMemberSet; + } + + /** + * @return number of members waiting for a join group response. + */ + public int numAwaitingJoinResponse() { + return numMembersAwaitingJoinResponse; + } + + /** + * @return all members. + */ + public Collection allMembers() { + return members.values(); + } + + /** + * @return the group's rebalance timeout in milliseconds. + * It is the max of all members' rebalance timeout. + */ + public int rebalanceTimeoutMs() { + int maxRebalanceTimeoutMs = 0; + for (GenericGroupMember member : members.values()) { + maxRebalanceTimeoutMs = Math.max(maxRebalanceTimeoutMs, member.rebalanceTimeoutMs()); + } + return maxRebalanceTimeoutMs; + } + + /** + * Generate a member id from the given client and group instance ids. + * + * @param clientId the client id. + * @param groupInstanceId the group instance id. + * @return the generated id. + */ + public String generateMemberId(String clientId, Optional groupInstanceId) { + return groupInstanceId.map(s -> s + MEMBER_ID_DELIMITER + UUID.randomUUID()) + .orElseGet(() -> clientId + MEMBER_ID_DELIMITER + UUID.randomUUID()); + } + + /** + * Verify the member id is up to date for static members. Return true if both conditions met: + * 1. given member is a known static member to group + * 2. group stored member id doesn't match with given member id + * + * @param groupInstanceId the group instance id. + * @param memberId the member id. + * @return whether the static member is fenced based on the condition above. + */ + public boolean isStaticMemberFenced( + String groupInstanceId, + String memberId + ) { + String existingMemberId = staticMemberId(groupInstanceId); + return existingMemberId != null && !existingMemberId.equals(memberId); + } + + /** + * @return whether the group can rebalance. + */ + public boolean canRebalance() { + return PREPARING_REBALANCE.validPreviousStates().contains(state); + } + + /** + * Transition to a group state. + * @param groupState the group state. + */ + public void transitionTo(GenericGroupState groupState) { + assertValidTransition(groupState); + state = groupState; + currentStateTimestamp = Optional.of(time.milliseconds()); + } + + /** + * Select a protocol that will be used for this group. Each member + * will vote for a protocol and the one with the most votes will + * be selected. Only a protocol that is supported by all members + * can be selected. + * + * @return the name of the selected protocol. + */ + public String selectProtocol() { + if (members.isEmpty()) { + throw new IllegalStateException("Cannot select protocol for empty group"); + } + + // select the protocol for this group which is supported by all members + Set candidates = candidateProtocols(); + + // let each member vote for one of the protocols + Map votesByProtocol = new HashMap<>(); + allMembers().stream().map(member -> member.vote(candidates)) + .forEach(protocolName -> { + int count = votesByProtocol.getOrDefault(protocolName, 0); + votesByProtocol.put(protocolName, count + 1); + }); + + // choose the one with the most votes + return votesByProtocol.entrySet().stream() + .max(Comparator.comparingInt(Map.Entry::getValue)) + .map(Map.Entry::getKey).orElse(null); + } + + /** + * Increment the protocol count for all of the member's + * supported protocols. + * + * @param member the member. + */ + private void incrementSupportedProtocols(GenericGroupMember member) { + member.supportedProtocols().forEach(protocol -> { + int count = supportedProtocols.getOrDefault(protocol.name(), 0); + supportedProtocols.put(protocol.name(), count + 1); + }); + } + + /** + * Decrement the protocol count for all of the member's + * supported protocols. + * + * @param member the member. + */ + private void decrementSupportedProtocols(GenericGroupMember member) { + member.supportedProtocols().forEach(protocol -> { + int count = supportedProtocols.getOrDefault(protocol.name(), 0); + supportedProtocols.put(protocol.name(), count - 1); + }); + } + + /** + * A candidate protocol must be supported by all members. + * + * @return a set of candidate protocols that can be chosen as the protocol + * for the group. + */ + private Set candidateProtocols() { + // get the set of protocols that are commonly supported by all members + return supportedProtocols.entrySet().stream() + .filter(protocol -> protocol.getValue() == members.size()) + .map(Map.Entry::getKey).collect(Collectors.toSet()); + } + + /** + * Checks whether at least one of the given protocols can be supported. A + * protocol can be supported if it is supported by all members. + * + * @param member the member to check. + * @return a boolean based on the condition mentioned above. + */ + public boolean supportsProtocols(GenericGroupMember member) { + return supportsProtocols( + member.protocolType(), + GenericGroupMember.plainProtocolSet(member.supportedProtocols()) + ); + } + + /** + * Checks whether at least one of the given protocols can be supported. A + * protocol can be supported if it is supported by all members. + * + * @param memberProtocolType the member protocol type. + * @param memberProtocols the set of protocol names. + * @return a boolean based on the condition mentioned above. + */ + public boolean supportsProtocols(String memberProtocolType, Set memberProtocols) { + if (isInState(EMPTY)) { + return !memberProtocolType.isEmpty() && !memberProtocols.isEmpty(); + } else { + return protocolType.map(type -> type.equals(memberProtocolType)).orElse(false) && + memberProtocols.stream() + .anyMatch(name -> supportedProtocols.getOrDefault(name, 0) == members.size()); + } + } + + /** + * @return the topics that the group is subscribed to. + */ + public Optional> subscribedTopics() { + return subscribedTopics; + } + + /** + * Returns true if the consumer group is actively subscribed to the topic. When the consumer + * group does not know, because the information is not available yet or because the it has + * failed to parse the Consumer Protocol, it returns true to be safe. + * + * @param topic the topic name. + * @return whether the group is subscribed to the topic. + */ + public boolean isSubscribedToTopic(String topic) { + return subscribedTopics.map(topics -> topics.contains(topic)) + .orElse(true); + } + + /** + * Collects the set of topics that the members are subscribed to when the Protocol Type is equal + * to 'consumer'. None is returned if + * - the protocol type is not equal to 'consumer'; + * - the protocol is not defined yet; or + * - the protocol metadata does not comply with the schema. + * + * @return the subscribed topics or None based on the condition above. + */ + Optional> computeSubscribedTopics() { + if (!protocolType.isPresent()) { + return Optional.empty(); + } + String type = protocolType.get(); + if (!type.equals(ConsumerProtocol.PROTOCOL_TYPE)) { + return Optional.empty(); + } + if (members.isEmpty()) { + return Optional.of(Collections.emptySet()); + } + + if (protocolName.isPresent()) { + try { + Set allSubscribedTopics = new HashSet<>(); + members.values().forEach(member -> { + ByteBuffer buffer = ByteBuffer.wrap(member.metadata(protocolName.get())); + ConsumerProtocol.deserializeVersion(buffer); + allSubscribedTopics.addAll(new HashSet<>( + ConsumerProtocol.deserializeSubscription(buffer, (short) 0).topics() + )); + }); + return Optional.of(allSubscribedTopics); + } catch (SchemaException e) { + log.warn("Failed to parse Consumer Protocol " + ConsumerProtocol.PROTOCOL_TYPE + ":" + + protocolName.get() + " of group " + groupId + ". " + + "Consumer group coordinator is not aware of the subscribed topics.", e); + } + } + + return Optional.empty(); + } + + /** + * Update a member. + * + * @param member the member. + * @param protocols the list of protocols. + * @param rebalanceTimeoutMs the rebalance timeout in milliseconds. + * @param sessionTimeoutMs the session timeout in milliseconds. + * @param future the future that is invoked once the join phase is complete. + */ + public void updateMember( + GenericGroupMember member, + List protocols, + int rebalanceTimeoutMs, + int sessionTimeoutMs, + CompletableFuture future + ) { + decrementSupportedProtocols(member); + member.setSupportedProtocols(protocols); + incrementSupportedProtocols(member); + member.setRebalanceTimeoutMs(rebalanceTimeoutMs); + member.setSessionTimeoutMs(sessionTimeoutMs); + + if (future != null && !member.isAwaitingJoin()) { + numMembersAwaitingJoinResponse++; + } else if (future == null && member.isAwaitingJoin()) { + numMembersAwaitingJoinResponse--; + } + member.setAwaitingJoinFuture(future); + } + + /** + * Complete the join future. + * + * @param member the member. + * @param response the join response to complete the future with. + * @return true if a join future actually completes. + */ + public boolean completeJoinFuture( + GenericGroupMember member, + JoinGroupResponseData response + ) { + if (member.isAwaitingJoin()) { + member.awaitingJoinFuture().complete(response); + member.setAwaitingJoinFuture(null); + numMembersAwaitingJoinResponse--; + return true; + } + return false; + } + + /** + * Complete a member's sync future. + * + * @param member the member. + * @param response the sync response to complete the future with. + * @return true if a sync future actually completes. + */ + public boolean completeSyncFuture( + GenericGroupMember member, + SyncGroupResponseData response + ) { + if (member.isAwaitingSync()) { + member.awaitingSyncFuture().complete(response); + member.setAwaitingSyncFuture(null); + return true; + } + return false; + } + + /** + * Initiate the next generation for the group. + */ + public void initNextGeneration() { + generationId++; + if (!members.isEmpty()) { + protocolName = Optional.of(selectProtocol()); + subscribedTopics = computeSubscribedTopics(); + transitionTo(COMPLETING_REBALANCE); + } else { + protocolName = Optional.empty(); + subscribedTopics = computeSubscribedTopics(); + transitionTo(EMPTY); + } + clearPendingSyncMembers(); + } + + /** + * Get all members formatted as a join response. + * + * @return the members. + */ + public List currentGenericGroupMembers() { + if (isInState(DEAD) || isInState(PREPARING_REBALANCE)) { + throw new IllegalStateException("Cannot obtain generic member metadata for group " + + groupId + " in state " + state); + } + + return members.values().stream().map(member -> + new JoinGroupResponseMember() + .setMemberId(member.memberId()) + .setGroupInstanceId(member.groupInstanceId().orElse(null)) + .setMetadata(member.metadata(protocolName.orElse(null)))) + .collect(Collectors.toList()); + } + + /** + * @return the group formatted as a list group response. + */ + public ListGroupsResponseData.ListedGroup asListedGroup() { + return new ListGroupsResponseData.ListedGroup() + .setGroupId(groupId) + .setProtocolType(protocolType.orElse("")) + .setGroupState(state.toString()); + } + + /** + * Checks whether the transition to the target state is valid. + * + * @param targetState the target state to transition to. + */ + private void assertValidTransition(GenericGroupState targetState) { + if (!targetState.validPreviousStates().contains(state)) { + throw new IllegalStateException("Group " + groupId + " should be in one of " + + targetState.validPreviousStates() + " states before moving to " + targetState + + " state. Instead it is in " + state + " state."); + } + } + + @Override + public String toString() { + return "GenericGroupMetadata(" + + "groupId=" + groupId + ", " + + "generation=" + generationId + ", " + + "protocolType=" + protocolType + ", " + + "currentState=" + currentState() + ", " + + "members=" + members + ")"; + } +} diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroupState.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroupState.java new file mode 100644 index 00000000000..74730b5a891 --- /dev/null +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroupState.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.coordinator.group.generic; + +import java.util.Arrays; +import java.util.HashSet; +import java.util.Set; + +/** + * Represents all states that a generic group can be in, as well as the states that a group must + * be in to transition to a particular state. + */ +public enum GenericGroupState { + + /** + * Group has no more members, but lingers until all offsets have expired. This state + * also represents groups which use Kafka only for offset commits and have no members. + * + * action: respond normally to join group from new members + * respond to sync group with UNKNOWN_MEMBER_ID + * respond to heartbeat with UNKNOWN_MEMBER_ID + * respond to leave group with UNKNOWN_MEMBER_ID + * respond to offset commit with UNKNOWN_MEMBER_ID + * allow offset fetch requests + * transition: last offsets removed in periodic expiration task => DEAD + * join group from a new member => PREPARING_REBALANCE + * group is removed by partition emigration => DEAD + * group is removed by expiration => DEAD + */ + EMPTY("Empty"), + + /** + * Group is preparing to rebalance. + * + * action: respond to heartbeats with REBALANCE_IN_PROGRESS + * respond to sync group with REBALANCE_IN_PROGRESS + * remove member on leave group request + * park join group requests from new or existing members until all expected members have joined + * allow offset commits from previous generation + * allow offset fetch requests + * transition: some members have joined by the timeout => COMPLETING_REBALANCE + * all members have left the group => EMPTY + * group is removed by partition emigration => DEAD + */ + PREPARING_REBALANCE("PreparingRebalance"), + + /** + * Group is awaiting state assignment from the leader. + * + * action: respond to heartbeats with REBALANCE_IN_PROGRESS + * respond to offset commits with REBALANCE_IN_PROGRESS + * park sync group requests from followers until transition to STABLE + * allow offset fetch requests + * transition: sync group with state assignment received from leader => STABLE + * join group from new member or existing member with updated metadata => PREPARING_REBALANCE + * leave group from existing member => PREPARING_REBALANCE + * member failure detected => PREPARING_REBALANCE + * group is removed by partition emigration => DEAD + */ + COMPLETING_REBALANCE("CompletingRebalance"), + + /** + * Group is stable. + * + * action: respond to member heartbeats normally + * respond to sync group from any member with current assignment + * respond to join group from followers with matching metadata with current group metadata + * allow offset commits from member of current generation + * allow offset fetch requests + * transition: member failure detected via heartbeat => PREPARING_REBALANCE + * leave group from existing member => PREPARING_REBALANCE + * leader join-group received => PREPARING_REBALANCE + * follower join-group with new metadata => PREPARING_REBALANCE + * group is removed by partition emigration => DEAD + */ + STABLE("Stable"), + + /** + * Group has no more members and its metadata is being removed. + * + * action: respond to join group with UNKNOWN_MEMBER_ID + * respond to sync group with UNKNOWN_MEMBER_ID + * respond to heartbeat with UNKNOWN_MEMBER_ID + * respond to leave group with UNKNOWN_MEMBER_ID + * respond to offset commit with UNKNOWN_MEMBER_ID + * allow offset fetch requests + * transition: DEAD is a final state before group metadata is cleaned up, so there are no transitions + */ + DEAD("Dead"); + + private final String name; + private Set validPreviousStates; + + static { + EMPTY.addValidPreviousStates(PREPARING_REBALANCE); + PREPARING_REBALANCE.addValidPreviousStates(STABLE, COMPLETING_REBALANCE, EMPTY); + COMPLETING_REBALANCE.addValidPreviousStates(PREPARING_REBALANCE); + STABLE.addValidPreviousStates(COMPLETING_REBALANCE); + DEAD.addValidPreviousStates(STABLE, PREPARING_REBALANCE, COMPLETING_REBALANCE, EMPTY, DEAD); + } + + GenericGroupState(String name) { + this.name = name; + } + + @Override + public String toString() { + return name; + } + + private void addValidPreviousStates(GenericGroupState... validPreviousStates) { + this.validPreviousStates = new HashSet<>(Arrays.asList(validPreviousStates)); + } + + /** + * @return valid previous states a group must be in to transition to this state. + */ + public Set validPreviousStates() { + return this.validPreviousStates; + } +} diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/generic/GenericGroupTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/generic/GenericGroupTest.java new file mode 100644 index 00000000000..1c12deb13e8 --- /dev/null +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/generic/GenericGroupTest.java @@ -0,0 +1,966 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.generic; + +import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor; +import org.apache.kafka.clients.consumer.internals.ConsumerProtocol; +import org.apache.kafka.common.message.JoinGroupResponseData; +import org.apache.kafka.common.message.SyncGroupResponseData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CompletableFuture; + +import static org.apache.kafka.common.utils.Utils.mkSet; +import static org.apache.kafka.coordinator.group.generic.GenericGroupState.COMPLETING_REBALANCE; +import static org.apache.kafka.coordinator.group.generic.GenericGroupState.DEAD; +import static org.apache.kafka.coordinator.group.generic.GenericGroupState.EMPTY; +import static org.apache.kafka.coordinator.group.generic.GenericGroupState.PREPARING_REBALANCE; +import static org.apache.kafka.coordinator.group.generic.GenericGroupState.STABLE; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class GenericGroupTest { + private final String protocolType = "consumer"; + private final String groupInstanceId = "groupInstanceId"; + private final String memberId = "memberId"; + private final String clientId = "clientId"; + private final String clientHost = "clientHost"; + private final int rebalanceTimeoutMs = 60000; + private final int sessionTimeoutMs = 10000; + + private GenericGroup group = null; + + @BeforeEach + public void initialize() { + group = new GenericGroup(new LogContext(), "groupId", EMPTY, Time.SYSTEM); + } + + @Test + public void testCanRebalanceWhenStable() { + assertTrue(group.canRebalance()); + } + + @Test + public void testCanRebalanceWhenCompletingRebalance() { + group.transitionTo(PREPARING_REBALANCE); + group.transitionTo(COMPLETING_REBALANCE); + assertTrue(group.canRebalance()); + } + + @Test + public void testCannotRebalanceWhenPreparingRebalance() { + group.transitionTo(PREPARING_REBALANCE); + assertFalse(group.canRebalance()); + } + + @Test + public void testCannotRebalanceWhenDead() { + group.transitionTo(PREPARING_REBALANCE); + group.transitionTo(EMPTY); + group.transitionTo(DEAD); + assertFalse(group.canRebalance()); + } + + @Test + public void testStableToPreparingRebalanceTransition() { + group.transitionTo(PREPARING_REBALANCE); + assertState(group, PREPARING_REBALANCE); + } + + @Test + public void testStableToDeadTransition() { + group.transitionTo(DEAD); + assertState(group, DEAD); + } + + @Test + public void testAwaitingRebalanceToPreparingRebalanceTransition() { + group.transitionTo(PREPARING_REBALANCE); + group.transitionTo(COMPLETING_REBALANCE); + group.transitionTo(PREPARING_REBALANCE); + assertState(group, PREPARING_REBALANCE); + } + + @Test + public void testPreparingRebalanceToDeadTransition() { + group.transitionTo(PREPARING_REBALANCE); + group.transitionTo(DEAD); + assertState(group, DEAD); + } + + @Test + public void testPreparingRebalanceToEmptyTransition() { + group.transitionTo(PREPARING_REBALANCE); + group.transitionTo(EMPTY); + assertState(group, EMPTY); + } + + @Test + public void testEmptyToDeadTransition() { + group.transitionTo(PREPARING_REBALANCE); + group.transitionTo(EMPTY); + group.transitionTo(DEAD); + assertState(group, DEAD); + } + + @Test + public void testAwaitingRebalanceToStableTransition() { + group.transitionTo(PREPARING_REBALANCE); + group.transitionTo(COMPLETING_REBALANCE); + group.transitionTo(STABLE); + assertState(group, STABLE); + } + + @Test + public void testEmptyToStableIllegalTransition() { + assertThrows(IllegalStateException.class, () -> group.transitionTo(STABLE)); + } + + @Test + public void testStableToStableIllegalTransition() { + group.transitionTo(PREPARING_REBALANCE); + group.transitionTo(COMPLETING_REBALANCE); + group.transitionTo(STABLE); + assertThrows(IllegalStateException.class, () -> group.transitionTo(STABLE)); + } + + @Test + public void testEmptyToAwaitingRebalanceIllegalTransition() { + assertThrows(IllegalStateException.class, () -> group.transitionTo(COMPLETING_REBALANCE)); + } + + @Test + public void testPreparingRebalanceToPreparingRebalanceIllegalTransition() { + group.transitionTo(PREPARING_REBALANCE); + assertThrows(IllegalStateException.class, () -> group.transitionTo(PREPARING_REBALANCE)); + } + + @Test + public void testPreparingRebalanceToStableIllegalTransition() { + group.transitionTo(PREPARING_REBALANCE); + assertThrows(IllegalStateException.class, () -> group.transitionTo(STABLE)); + } + + @Test + public void testAwaitingRebalanceToAwaitingRebalanceIllegalTransition() { + group.transitionTo(PREPARING_REBALANCE); + group.transitionTo(COMPLETING_REBALANCE); + assertThrows(IllegalStateException.class, () -> group.transitionTo(COMPLETING_REBALANCE)); + } + + @Test + public void testDeadToDeadIllegalTransition() { + group.transitionTo(PREPARING_REBALANCE); + group.transitionTo(DEAD); + group.transitionTo(DEAD); + assertState(group, DEAD); + } + + @Test + public void testDeadToStableIllegalTransition() { + group.transitionTo(PREPARING_REBALANCE); + group.transitionTo(DEAD); + assertThrows(IllegalStateException.class, () -> group.transitionTo(STABLE)); + } + + @Test + public void testDeadToPreparingRebalanceIllegalTransition() { + group.transitionTo(PREPARING_REBALANCE); + group.transitionTo(DEAD); + assertThrows(IllegalStateException.class, () -> group.transitionTo(PREPARING_REBALANCE)); + } + + @Test + public void testDeadToAwaitingRebalanceIllegalTransition() { + group.transitionTo(PREPARING_REBALANCE); + group.transitionTo(DEAD); + assertThrows(IllegalStateException.class, () -> group.transitionTo(COMPLETING_REBALANCE)); + } + + @Test + public void testSelectProtocol() { + List member1Protocols = Arrays.asList( + new Protocol("range", new byte[0]), + new Protocol("roundrobin", new byte[0]) + ); + + GenericGroupMember member1 = new GenericGroupMember( + memberId, + Optional.empty(), + clientId, + clientHost, + rebalanceTimeoutMs, + sessionTimeoutMs, + protocolType, + member1Protocols + ); + group.add(member1); + + List member2Protocols = Arrays.asList( + new Protocol("roundrobin", new byte[0]), + new Protocol("range", new byte[0]) + ); + + GenericGroupMember member2 = new GenericGroupMember( + "member2", + Optional.empty(), + clientId, + clientHost, + rebalanceTimeoutMs, + sessionTimeoutMs, + protocolType, + member2Protocols + ); + group.add(member2); + + // now could be either range or robin since there is no majority preference + assertTrue(group.selectProtocol().equals("range") || + group.selectProtocol().equals("roundrobin")); + + GenericGroupMember member3 = new GenericGroupMember( + "member3", + Optional.empty(), + clientId, + clientHost, + rebalanceTimeoutMs, + sessionTimeoutMs, + protocolType, + member2Protocols + ); + group.add(member3); + + // now we should prefer 'roundrobin' + assertEquals("roundrobin", group.selectProtocol()); + } + + @Test + public void testSelectProtocolRaisesIfNoMembers() { + assertThrows(IllegalStateException.class, () -> group.selectProtocol()); + } + + @Test + public void testSelectProtocolChoosesCompatibleProtocol() { + List member1Protocols = Arrays.asList( + new Protocol("range", new byte[0]), + new Protocol("roundrobin", new byte[0]) + ); + + GenericGroupMember member1 = new GenericGroupMember( + memberId, + Optional.empty(), + clientId, + clientHost, + rebalanceTimeoutMs, + sessionTimeoutMs, + protocolType, + member1Protocols + ); + group.add(member1); + + List member2Protocols = Arrays.asList( + new Protocol("roundrobin", new byte[0]), + new Protocol("blah", new byte[0]) + ); + + GenericGroupMember member2 = new GenericGroupMember( + "member2", + Optional.empty(), + clientId, + clientHost, + rebalanceTimeoutMs, + sessionTimeoutMs, + protocolType, + member2Protocols + ); + group.add(member2); + + assertEquals("roundrobin", group.selectProtocol()); + } + + @Test + public void testSupportsProtocols() { + List member1Protocols = Arrays.asList( + new Protocol("range", new byte[0]), + new Protocol("roundrobin", new byte[0]) + ); + + GenericGroupMember member1 = new GenericGroupMember( + memberId, + Optional.empty(), + clientId, + clientHost, + rebalanceTimeoutMs, + sessionTimeoutMs, + protocolType, + member1Protocols + ); + + // by default, the group supports everything + assertTrue(group.supportsProtocols(protocolType, mkSet("range", "roundrobin"))); + + group.add(member1); + group.transitionTo(PREPARING_REBALANCE); + + assertTrue(group.supportsProtocols(protocolType, mkSet("roundrobin", "foo"))); + assertTrue(group.supportsProtocols(protocolType, mkSet("range", "bar"))); + assertFalse(group.supportsProtocols(protocolType, mkSet("foo", "bar"))); + } + + @Test + public void testSubscribedTopics() { + // not able to compute it for a newly created group + assertEquals(Optional.empty(), group.subscribedTopics()); + + GenericGroupMember member = new GenericGroupMember( + memberId, + Optional.empty(), + clientId, + clientHost, + rebalanceTimeoutMs, + sessionTimeoutMs, + protocolType, + Collections.singletonList( + new Protocol( + "range", + ConsumerProtocol.serializeSubscription( + new ConsumerPartitionAssignor.Subscription( + Collections.singletonList("foo") + ) + ).array() + ) + ) + ); + + group.transitionTo(PREPARING_REBALANCE); + group.add(member); + + group.initNextGeneration(); + + Set expectedTopics = new HashSet<>(Collections.singleton("foo")); + assertEquals(expectedTopics, group.subscribedTopics().get()); + + group.transitionTo(PREPARING_REBALANCE); + group.remove(memberId); + + group.initNextGeneration(); + + assertEquals(Optional.of(Collections.emptySet()), group.subscribedTopics()); + + GenericGroupMember memberWithFaultyProtocol = new GenericGroupMember( + memberId, + Optional.empty(), + clientId, + clientHost, + rebalanceTimeoutMs, + sessionTimeoutMs, + protocolType, + Collections.singletonList( + new Protocol( + "range", + new byte[0] + ) + ) + ); + + group.transitionTo(PREPARING_REBALANCE); + group.add(memberWithFaultyProtocol); + + group.initNextGeneration(); + + assertEquals(Optional.empty(), group.subscribedTopics()); + } + + @Test + public void testSubscribedTopicsNonConsumerGroup() { + // not able to compute it for a newly created group + assertEquals(Optional.empty(), group.subscribedTopics()); + + GenericGroupMember memberWithNonConsumerProtocol = new GenericGroupMember( + memberId, + Optional.empty(), + clientId, + clientHost, + rebalanceTimeoutMs, + sessionTimeoutMs, + "My Protocol", + Collections.singletonList( + new Protocol( + "range", + new byte[0] + ) + ) + ); + + group.transitionTo(PREPARING_REBALANCE); + group.add(memberWithNonConsumerProtocol); + + group.initNextGeneration(); + + assertEquals(Optional.empty(), group.subscribedTopics()); + } + + @Test + public void testInitNextGeneration() { + GenericGroupMember member = new GenericGroupMember( + memberId, + Optional.empty(), + clientId, + clientHost, + rebalanceTimeoutMs, + sessionTimeoutMs, + protocolType, + Collections.singletonList( + new Protocol( + "roundrobin", + new byte[0] + ) + ) + ); + + group.transitionTo(PREPARING_REBALANCE); + group.add(member, new CompletableFuture<>()); + + assertEquals(0, group.generationId()); + assertNull(group.protocolName().orElse(null)); + + group.initNextGeneration(); + + assertEquals(1, group.generationId()); + assertEquals("roundrobin", group.protocolName().orElse(null)); + } + + @Test + public void testInitNextGenerationEmptyGroup() { + assertEquals(EMPTY, group.currentState()); + assertEquals(0, group.generationId()); + assertNull(group.protocolName().orElse(null)); + + group.transitionTo(PREPARING_REBALANCE); + group.initNextGeneration(); + + assertEquals(1, group.generationId()); + assertNull(group.protocolName().orElse(null)); + } + + @Test + public void testUpdateMember() { + GenericGroupMember member = new GenericGroupMember( + memberId, + Optional.empty(), + clientId, + clientHost, + rebalanceTimeoutMs, + sessionTimeoutMs, + protocolType, + Collections.singletonList( + new Protocol( + "roundrobin", + new byte[0] + ) + ) + ); + + group.add(member); + + List newProtocols = Arrays.asList( + new Protocol( + "range", + new byte[0] + ), + new Protocol( + "roundrobin", + new byte[0] + ) + ); + int newRebalanceTimeoutMs = 120000; + int newSessionTimeoutMs = 20000; + group.updateMember(member, newProtocols, newRebalanceTimeoutMs, newSessionTimeoutMs, null); + + assertEquals(group.rebalanceTimeoutMs(), newRebalanceTimeoutMs); + assertEquals(member.sessionTimeoutMs(), newSessionTimeoutMs); + assertEquals(newProtocols, member.supportedProtocols()); + } + + @Test + public void testReplaceGroupInstanceWithNonExistingMember() { + String newMemberId = "newMemberId"; + assertThrows(IllegalArgumentException.class, () -> + group.replaceStaticMember(groupInstanceId, memberId, newMemberId)); + } + + @Test + public void testReplaceGroupInstance() throws Exception { + GenericGroupMember member = new GenericGroupMember( + memberId, + Optional.of(groupInstanceId), + clientId, + clientHost, + rebalanceTimeoutMs, + sessionTimeoutMs, + protocolType, + Collections.singletonList( + new Protocol( + "roundrobin", + new byte[0] + ) + ) + ); + + CompletableFuture joinGroupFuture = new CompletableFuture<>(); + group.add(member, joinGroupFuture); + + CompletableFuture syncGroupFuture = new CompletableFuture<>(); + member.setAwaitingSyncFuture(syncGroupFuture); + + assertTrue(group.isLeader(memberId)); + assertEquals(memberId, group.staticMemberId(groupInstanceId)); + + String newMemberId = "newMemberId"; + group.replaceStaticMember(groupInstanceId, memberId, newMemberId); + + assertTrue(group.isLeader(newMemberId)); + assertEquals(newMemberId, group.staticMemberId(groupInstanceId)); + assertEquals(Errors.FENCED_INSTANCE_ID.code(), joinGroupFuture.get().errorCode()); + assertEquals(Errors.FENCED_INSTANCE_ID.code(), syncGroupFuture.get().errorCode()); + assertFalse(member.isAwaitingJoin()); + assertFalse(member.isAwaitingSync()); + } + + @Test + public void testCompleteJoinFuture() throws Exception { + GenericGroupMember member = new GenericGroupMember( + memberId, + Optional.empty(), + clientId, + clientHost, + rebalanceTimeoutMs, + sessionTimeoutMs, + protocolType, + Collections.singletonList( + new Protocol( + "roundrobin", + new byte[0] + ) + ) + ); + + CompletableFuture joinGroupFuture = new CompletableFuture<>(); + group.add(member, joinGroupFuture); + + assertTrue(group.hasAllMembersJoined()); + assertTrue( + group.completeJoinFuture(member, new JoinGroupResponseData() + .setMemberId(member.memberId()) + .setErrorCode(Errors.NONE.code())) + ); + + assertEquals(Errors.NONE.code(), joinGroupFuture.get().errorCode()); + assertEquals(memberId, joinGroupFuture.get().memberId()); + assertFalse(member.isAwaitingJoin()); + assertEquals(0, group.numAwaitingJoinResponse()); + } + + @Test + public void testNotCompleteJoinFuture() { + GenericGroupMember member = new GenericGroupMember( + memberId, + Optional.empty(), + clientId, + clientHost, + rebalanceTimeoutMs, + sessionTimeoutMs, + protocolType, + Collections.singletonList( + new Protocol( + "roundrobin", + new byte[0] + ) + ) + ); + + group.add(member); + + assertFalse(member.isAwaitingJoin()); + assertFalse( + group.completeJoinFuture(member, new JoinGroupResponseData() + .setMemberId(member.memberId()) + .setErrorCode(Errors.NONE.code())) + ); + + assertFalse(member.isAwaitingJoin()); + } + + @Test + public void testCompleteSyncFuture() throws Exception { + GenericGroupMember member = new GenericGroupMember( + memberId, + Optional.empty(), + clientId, + clientHost, + rebalanceTimeoutMs, + sessionTimeoutMs, + protocolType, + Collections.singletonList( + new Protocol( + "roundrobin", + new byte[0] + ) + ) + ); + + group.add(member); + CompletableFuture syncGroupFuture = new CompletableFuture<>(); + member.setAwaitingSyncFuture(syncGroupFuture); + + assertTrue(group.completeSyncFuture(member, new SyncGroupResponseData() + .setErrorCode(Errors.NONE.code()))); + + assertEquals(0, group.numAwaitingJoinResponse()); + + assertFalse(member.isAwaitingSync()); + assertEquals(Errors.NONE.code(), syncGroupFuture.get().errorCode()); + } + + @Test + public void testNotCompleteSyncFuture() { + GenericGroupMember member = new GenericGroupMember( + memberId, + Optional.empty(), + clientId, + clientHost, + rebalanceTimeoutMs, + sessionTimeoutMs, + protocolType, + Collections.singletonList( + new Protocol( + "roundrobin", + new byte[0] + ) + ) + ); + + group.add(member); + assertFalse(member.isAwaitingSync()); + + assertFalse(group.completeSyncFuture(member, new SyncGroupResponseData() + .setErrorCode(Errors.NONE.code()))); + + assertFalse(member.isAwaitingSync()); + } + + @Test + public void testCannotAddPendingMemberIfStable() { + GenericGroupMember member = new GenericGroupMember( + memberId, + Optional.empty(), + clientId, + clientHost, + rebalanceTimeoutMs, + sessionTimeoutMs, + protocolType, + Collections.singletonList( + new Protocol( + "roundrobin", + new byte[0] + ) + ) + ); + + group.add(member); + assertThrows(IllegalStateException.class, () -> group.addPendingMember(memberId)); + } + + @Test + public void testRemovalFromPendingAfterMemberIsStable() { + group.addPendingMember(memberId); + assertFalse(group.hasMemberId(memberId)); + assertTrue(group.isPendingMember(memberId)); + + GenericGroupMember member = new GenericGroupMember( + memberId, + Optional.empty(), + clientId, + clientHost, + rebalanceTimeoutMs, + sessionTimeoutMs, + protocolType, + Collections.singletonList( + new Protocol( + "roundrobin", + new byte[0] + ) + ) + ); + + group.add(member); + assertTrue(group.hasMemberId(memberId)); + assertFalse(group.isPendingMember(memberId)); + } + + @Test + public void testRemovalFromPendingWhenMemberIsRemoved() { + group.addPendingMember(memberId); + assertFalse(group.hasMemberId(memberId)); + assertTrue(group.isPendingMember(memberId)); + + group.remove(memberId); + assertFalse(group.hasMemberId(memberId)); + assertFalse(group.isPendingMember(memberId)); + } + + @Test + public void testCannotAddStaticMemberIfAlreadyPresent() { + GenericGroupMember member = new GenericGroupMember( + memberId, + Optional.of(groupInstanceId), + clientId, + clientHost, + rebalanceTimeoutMs, + sessionTimeoutMs, + protocolType, + Collections.singletonList( + new Protocol( + "roundrobin", + new byte[0] + ) + ) + ); + + group.add(member); + assertTrue(group.hasMemberId(memberId)); + assertTrue(group.hasStaticMember(groupInstanceId)); + + // We aren ot permitted to add the member again if it is already present + assertThrows(IllegalStateException.class, () -> group.add(member)); + } + + @Test + public void testCannotAddPendingSyncOfUnknownMember() { + assertThrows(IllegalStateException.class, + () -> group.addPendingSyncMember(memberId)); + } + + @Test + public void testCannotRemovePendingSyncOfUnknownMember() { + assertThrows(IllegalStateException.class, + () -> group.removePendingSyncMember(memberId)); + } + + @Test + public void testCanAddAndRemovePendingSyncMember() { + GenericGroupMember member = new GenericGroupMember( + memberId, + Optional.empty(), + clientId, + clientHost, + rebalanceTimeoutMs, + sessionTimeoutMs, + protocolType, + Collections.singletonList( + new Protocol( + "roundrobin", + new byte[0] + ) + ) + ); + + group.add(member); + assertTrue(group.addPendingSyncMember(memberId)); + assertEquals(Collections.singleton(memberId), group.allPendingSyncMembers()); + group.removePendingSyncMember(memberId); + assertEquals(Collections.emptySet(), group.allPendingSyncMembers()); + } + + @Test + public void testRemovalFromPendingSyncWhenMemberIsRemoved() { + GenericGroupMember member = new GenericGroupMember( + memberId, + Optional.of(groupInstanceId), + clientId, + clientHost, + rebalanceTimeoutMs, + sessionTimeoutMs, + protocolType, + Collections.singletonList( + new Protocol( + "roundrobin", + new byte[0] + ) + ) + ); + + group.add(member); + assertTrue(group.addPendingSyncMember(memberId)); + assertEquals(Collections.singleton(memberId), group.allPendingSyncMembers()); + group.remove(memberId); + assertEquals(Collections.emptySet(), group.allPendingSyncMembers()); + } + + @Test + public void testNewGenerationClearsPendingSyncMembers() { + GenericGroupMember member = new GenericGroupMember( + memberId, + Optional.empty(), + clientId, + clientHost, + rebalanceTimeoutMs, + sessionTimeoutMs, + protocolType, + Collections.singletonList( + new Protocol( + "roundrobin", + new byte[0] + ) + ) + ); + + group.add(member); + group.transitionTo(PREPARING_REBALANCE); + assertTrue(group.addPendingSyncMember(memberId)); + assertEquals(Collections.singleton(memberId), group.allPendingSyncMembers()); + group.initNextGeneration(); + assertEquals(Collections.emptySet(), group.allPendingSyncMembers()); + } + + @Test + public void testElectNewJoinedLeader() { + GenericGroupMember leader = new GenericGroupMember( + memberId, + Optional.empty(), + clientId, + clientHost, + rebalanceTimeoutMs, + sessionTimeoutMs, + protocolType, + Collections.singletonList( + new Protocol( + "roundrobin", + new byte[0] + ) + ) + ); + + group.add(leader); + assertTrue(group.isLeader(memberId)); + assertFalse(leader.isAwaitingJoin()); + + GenericGroupMember newLeader = new GenericGroupMember( + "new-leader", + Optional.empty(), + clientId, + clientHost, + rebalanceTimeoutMs, + sessionTimeoutMs, + protocolType, + Collections.singletonList( + new Protocol( + "roundrobin", + new byte[0] + ) + ) + ); + group.add(newLeader, new CompletableFuture<>()); + + GenericGroupMember newMember = new GenericGroupMember( + "new-member", + Optional.empty(), + clientId, + clientHost, + rebalanceTimeoutMs, + sessionTimeoutMs, + protocolType, + Collections.singletonList( + new Protocol( + "roundrobin", + new byte[0] + ) + ) + ); + group.add(newMember); + + assertTrue(group.maybeElectNewJoinedLeader()); + assertTrue(group.isLeader("new-leader")); + } + + @Test + public void testMaybeElectNewJoinedLeaderChooseExisting() { + GenericGroupMember leader = new GenericGroupMember( + memberId, + Optional.empty(), + clientId, + clientHost, + rebalanceTimeoutMs, + sessionTimeoutMs, + protocolType, + Collections.singletonList( + new Protocol( + "roundrobin", + new byte[0] + ) + ) + ); + + group.add(leader, new CompletableFuture<>()); + assertTrue(group.isLeader(memberId)); + assertTrue(leader.isAwaitingJoin()); + + GenericGroupMember newMember = new GenericGroupMember( + "new-member", + Optional.empty(), + clientId, + clientHost, + rebalanceTimeoutMs, + sessionTimeoutMs, + protocolType, + Collections.singletonList( + new Protocol( + "roundrobin", + new byte[0] + ) + ) + ); + group.add(newMember); + + assertTrue(group.maybeElectNewJoinedLeader()); + assertTrue(group.isLeader(memberId)); + } + + private void assertState(GenericGroup group, GenericGroupState targetState) { + Set otherStates = new HashSet<>(); + otherStates.add(STABLE); + otherStates.add(PREPARING_REBALANCE); + otherStates.add(COMPLETING_REBALANCE); + otherStates.add(DEAD); + otherStates.remove(targetState); + + otherStates.forEach(otherState -> assertFalse(group.isInState(otherState))); + assertTrue(group.isInState(targetState)); + } +}