diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml index 214a06deb26..af271ee77bd 100644 --- a/checkstyle/suppressions.xml +++ b/checkstyle/suppressions.xml @@ -318,6 +318,12 @@ + + + + diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupMember.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupMember.java new file mode 100644 index 00000000000..823368e65e0 --- /dev/null +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupMember.java @@ -0,0 +1,611 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.consumer; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue; + +import java.util.Collections; +import java.util.Comparator; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.OptionalInt; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * ConsumerGroupMember contains all the information related to a member + * within a consumer group. This class is immutable and is fully backed + * by records stored in the __consumer_offsets topic. + */ +public class ConsumerGroupMember { + /** + * A builder allowing to create a new member or update an + * existing one. + * + * Please refer to the javadoc of {{@link ConsumerGroupMember}} for the + * definition of the fields. + */ + public static class Builder { + private final String memberId; + private int memberEpoch = 0; + private int previousMemberEpoch = -1; + private int nextMemberEpoch = 0; + private String instanceId = null; + private String rackId = null; + private int rebalanceTimeoutMs = -1; + private String clientId = ""; + private String clientHost = ""; + private List subscribedTopicNames = Collections.emptyList(); + private String subscribedTopicRegex = ""; + private String serverAssignorName = null; + private List clientAssignors = Collections.emptyList(); + private Map> assignedPartitions = Collections.emptyMap(); + private Map> partitionsPendingRevocation = Collections.emptyMap(); + private Map> partitionsPendingAssignment = Collections.emptyMap(); + + public Builder(String memberId) { + this.memberId = Objects.requireNonNull(memberId); + } + + public Builder(ConsumerGroupMember member) { + Objects.requireNonNull(member); + + this.memberId = member.memberId; + this.memberEpoch = member.memberEpoch; + this.previousMemberEpoch = member.previousMemberEpoch; + this.nextMemberEpoch = member.nextMemberEpoch; + this.instanceId = member.instanceId; + this.rackId = member.rackId; + this.rebalanceTimeoutMs = member.rebalanceTimeoutMs; + this.clientId = member.clientId; + this.clientHost = member.clientHost; + this.subscribedTopicNames = member.subscribedTopicNames; + this.subscribedTopicRegex = member.subscribedTopicRegex; + this.serverAssignorName = member.serverAssignorName; + this.clientAssignors = member.clientAssignors; + this.assignedPartitions = member.assignedPartitions; + this.partitionsPendingRevocation = member.partitionsPendingRevocation; + this.partitionsPendingAssignment = member.partitionsPendingAssignment; + } + + public Builder setMemberEpoch(int memberEpoch) { + this.memberEpoch = memberEpoch; + return this; + } + + public Builder setPreviousMemberEpoch(int previousMemberEpoch) { + this.previousMemberEpoch = previousMemberEpoch; + return this; + } + + public Builder setNextMemberEpoch(int nextMemberEpoch) { + this.nextMemberEpoch = nextMemberEpoch; + return this; + } + + public Builder setInstanceId(String instanceId) { + this.instanceId = instanceId; + return this; + } + + public Builder maybeUpdateInstanceId(Optional instanceId) { + this.instanceId = instanceId.orElse(this.instanceId); + return this; + } + + public Builder setRackId(String rackId) { + this.rackId = rackId; + return this; + } + + public Builder maybeUpdateRackId(Optional rackId) { + this.rackId = rackId.orElse(this.rackId); + return this; + } + + public Builder setRebalanceTimeoutMs(int rebalanceTimeoutMs) { + this.rebalanceTimeoutMs = rebalanceTimeoutMs; + return this; + } + + public Builder maybeUpdateRebalanceTimeoutMs(OptionalInt rebalanceTimeoutMs) { + this.rebalanceTimeoutMs = rebalanceTimeoutMs.orElse(this.rebalanceTimeoutMs); + return this; + } + + public Builder setClientId(String clientId) { + this.clientId = clientId; + return this; + } + + public Builder setClientHost(String clientHost) { + this.clientHost = clientHost; + return this; + } + + public Builder setSubscribedTopicNames(List subscribedTopicNames) { + this.subscribedTopicNames = subscribedTopicNames; + this.subscribedTopicNames.sort(Comparator.naturalOrder()); + return this; + } + + public Builder maybeUpdateSubscribedTopicNames(Optional> subscribedTopicNames) { + this.subscribedTopicNames = subscribedTopicNames.orElse(this.subscribedTopicNames); + this.subscribedTopicNames.sort(Comparator.naturalOrder()); + return this; + } + + public Builder setSubscribedTopicRegex(String subscribedTopicRegex) { + this.subscribedTopicRegex = subscribedTopicRegex; + return this; + } + + public Builder maybeUpdateSubscribedTopicRegex(Optional subscribedTopicRegex) { + this.subscribedTopicRegex = subscribedTopicRegex.orElse(this.subscribedTopicRegex); + return this; + } + + public Builder setServerAssignorName(String serverAssignorName) { + this.serverAssignorName = serverAssignorName; + return this; + } + + public Builder maybeUpdateServerAssignorName(Optional serverAssignorName) { + this.serverAssignorName = serverAssignorName.orElse(this.serverAssignorName); + return this; + } + + public Builder setClientAssignors(List clientAssignors) { + this.clientAssignors = clientAssignors; + return this; + } + + public Builder maybeUpdateClientAssignors(Optional> clientAssignors) { + this.clientAssignors = clientAssignors.orElse(this.clientAssignors); + return this; + } + + public Builder setAssignedPartitions(Map> assignedPartitions) { + this.assignedPartitions = assignedPartitions; + return this; + } + + public Builder setPartitionsPendingRevocation(Map> partitionsPendingRevocation) { + this.partitionsPendingRevocation = partitionsPendingRevocation; + return this; + } + + public Builder setPartitionsPendingAssignment(Map> partitionsPendingAssignment) { + this.partitionsPendingAssignment = partitionsPendingAssignment; + return this; + } + + public Builder updateWith(ConsumerGroupMemberMetadataValue record) { + setInstanceId(record.instanceId()); + setRackId(record.rackId()); + setClientId(record.clientId()); + setClientHost(record.clientHost()); + setSubscribedTopicNames(record.subscribedTopicNames()); + setSubscribedTopicRegex(record.subscribedTopicRegex()); + setRebalanceTimeoutMs(record.rebalanceTimeoutMs()); + setServerAssignorName(record.serverAssignor()); + setClientAssignors(record.assignors().stream() + .map(ClientAssignor::fromRecord) + .collect(Collectors.toList())); + return this; + } + + public Builder updateWith(ConsumerGroupCurrentMemberAssignmentValue record) { + setMemberEpoch(record.memberEpoch()); + setPreviousMemberEpoch(record.previousMemberEpoch()); + setNextMemberEpoch(record.targetMemberEpoch()); + setAssignedPartitions(assignmentFromTopicPartitions(record.assignedPartitions())); + setPartitionsPendingRevocation(assignmentFromTopicPartitions(record.partitionsPendingRevocation())); + setPartitionsPendingAssignment(assignmentFromTopicPartitions(record.partitionsPendingAssignment())); + return this; + } + + private Map> assignmentFromTopicPartitions( + List topicPartitionsList + ) { + return topicPartitionsList.stream().collect(Collectors.toMap( + ConsumerGroupCurrentMemberAssignmentValue.TopicPartitions::topicId, + topicPartitions -> Collections.unmodifiableSet(new HashSet<>(topicPartitions.partitions())))); + } + + public ConsumerGroupMember build() { + MemberState state; + if (!partitionsPendingRevocation.isEmpty()) { + state = MemberState.REVOKING; + } else if (!partitionsPendingAssignment.isEmpty()) { + state = MemberState.ASSIGNING; + } else { + state = MemberState.STABLE; + } + + return new ConsumerGroupMember( + memberId, + memberEpoch, + previousMemberEpoch, + nextMemberEpoch, + instanceId, + rackId, + rebalanceTimeoutMs, + clientId, + clientHost, + subscribedTopicNames, + subscribedTopicRegex, + serverAssignorName, + clientAssignors, + state, + assignedPartitions, + partitionsPendingRevocation, + partitionsPendingAssignment + ); + } + } + + public enum MemberState { + REVOKING("revoking"), + ASSIGNING("assigning"), + STABLE("stable"); + + private final String name; + + MemberState(String name) { + this.name = name; + } + + @Override + public String toString() { + return name; + } + } + + /** + * The member id. + */ + private final String memberId; + + /** + * The current member epoch. + */ + private final int memberEpoch; + + /** + * The previous member epoch. + */ + private final int previousMemberEpoch; + + /** + * The next member epoch. This corresponds to the target + * assignment epoch used to compute the current assigned, + * revoking and assigning partitions. + */ + private final int nextMemberEpoch; + + /** + * The instance id provided by the member. + */ + private final String instanceId; + + /** + * The rack id provided by the member. + */ + private final String rackId; + + /** + * The rebalance timeout provided by the member. + */ + private final int rebalanceTimeoutMs; + + /** + * The client id reported by the member. + */ + private final String clientId; + + /** + * The host reported by the member. + */ + private final String clientHost; + + /** + * The list of subscriptions (topic names) configured by the member. + */ + private final List subscribedTopicNames; + + /** + * The subscription pattern configured by the member. + */ + private final String subscribedTopicRegex; + + /** + * The server side assignor selected by the member. + */ + private final String serverAssignorName; + + /** + * The states of the client side assignors of the member. + */ + private final List clientAssignors; + + /** + * The member state. + */ + private final MemberState state; + + /** + * The partitions assigned to this member. + */ + private final Map> assignedPartitions; + + /** + * The partitions being revoked by this member. + */ + private final Map> partitionsPendingRevocation; + + /** + * The partitions waiting to be assigned to this + * member. They will be assigned when they are + * released by their previous owners. + */ + private final Map> partitionsPendingAssignment; + + private ConsumerGroupMember( + String memberId, + int memberEpoch, + int previousMemberEpoch, + int nextMemberEpoch, + String instanceId, + String rackId, + int rebalanceTimeoutMs, + String clientId, + String clientHost, + List subscribedTopicNames, + String subscribedTopicRegex, + String serverAssignorName, + List clientAssignors, + MemberState state, + Map> assignedPartitions, + Map> partitionsPendingRevocation, + Map> partitionsPendingAssignment + ) { + this.memberId = memberId; + this.memberEpoch = memberEpoch; + this.previousMemberEpoch = previousMemberEpoch; + this.nextMemberEpoch = nextMemberEpoch; + this.instanceId = instanceId; + this.rackId = rackId; + this.rebalanceTimeoutMs = rebalanceTimeoutMs; + this.clientId = clientId; + this.clientHost = clientHost; + this.subscribedTopicNames = subscribedTopicNames; + this.subscribedTopicRegex = subscribedTopicRegex; + this.serverAssignorName = serverAssignorName; + this.clientAssignors = clientAssignors; + this.state = state; + this.assignedPartitions = assignedPartitions; + this.partitionsPendingRevocation = partitionsPendingRevocation; + this.partitionsPendingAssignment = partitionsPendingAssignment; + } + + /** + * @return The member id. + */ + public String memberId() { + return memberId; + } + + /** + * @return The current member epoch. + */ + public int memberEpoch() { + return memberEpoch; + } + + /** + * @return The previous member epoch. + */ + public int previousMemberEpoch() { + return previousMemberEpoch; + } + + /** + * @return The next member epoch. + */ + public int nextMemberEpoch() { + return nextMemberEpoch; + } + + /** + * @return The instance id. + */ + public String instanceId() { + return instanceId; + } + + /** + * @return The rack id. + */ + public String rackId() { + return rackId; + } + + /** + * @return The rebalance timeout in millis. + */ + public int rebalanceTimeoutMs() { + return rebalanceTimeoutMs; + } + + /** + * @return The client id. + */ + public String clientId() { + return clientId; + } + + /** + * @return The client host. + */ + public String clientHost() { + return clientHost; + } + + /** + * @return The list of subscribed topic names. + */ + public List subscribedTopicNames() { + return subscribedTopicNames; + } + + /** + * @return The regular expression based subscription. + */ + public String subscribedTopicRegex() { + return subscribedTopicRegex; + } + + /** + * @return The server side assignor or an empty optional. + */ + public Optional serverAssignorName() { + return Optional.ofNullable(serverAssignorName); + } + + /** + * @return The list of client side assignors. + */ + public List clientAssignors() { + return clientAssignors; + } + + /** + * @return The current state. + */ + public MemberState state() { + return state; + } + + /** + * @return The set of assigned partitions. + */ + public Map> assignedPartitions() { + return assignedPartitions; + } + + /** + * @return The set of partitions awaiting revocation from the member. + */ + public Map> partitionsPendingRevocation() { + return partitionsPendingRevocation; + } + + /** + * @return The set of partitions awaiting assigning to the member. + */ + public Map> partitionsPendingAssignment() { + return partitionsPendingAssignment; + } + + /** + * @return A string representation of the current assignment state. + */ + public String currentAssignmentSummary() { + return "CurrentAssignment(" + + ", memberEpoch=" + memberEpoch + + ", previousMemberEpoch=" + previousMemberEpoch + + ", nextMemberEpoch=" + nextMemberEpoch + + ", state=" + state + + ", assignedPartitions=" + assignedPartitions + + ", partitionsPendingRevocation=" + partitionsPendingRevocation + + ", partitionsPendingAssignment=" + partitionsPendingAssignment + + ')'; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + ConsumerGroupMember that = (ConsumerGroupMember) o; + return memberEpoch == that.memberEpoch + && previousMemberEpoch == that.previousMemberEpoch + && nextMemberEpoch == that.nextMemberEpoch + && rebalanceTimeoutMs == that.rebalanceTimeoutMs + && Objects.equals(memberId, that.memberId) + && Objects.equals(instanceId, that.instanceId) + && Objects.equals(rackId, that.rackId) + && Objects.equals(clientId, that.clientId) + && Objects.equals(clientHost, that.clientHost) + && Objects.equals(subscribedTopicNames, that.subscribedTopicNames) + && Objects.equals(subscribedTopicRegex, that.subscribedTopicRegex) + && Objects.equals(serverAssignorName, that.serverAssignorName) + && Objects.equals(clientAssignors, that.clientAssignors) + && Objects.equals(assignedPartitions, that.assignedPartitions) + && Objects.equals(partitionsPendingRevocation, that.partitionsPendingRevocation) + && Objects.equals(partitionsPendingAssignment, that.partitionsPendingAssignment); + } + + @Override + public int hashCode() { + int result = memberId != null ? memberId.hashCode() : 0; + result = 31 * result + memberEpoch; + result = 31 * result + previousMemberEpoch; + result = 31 * result + nextMemberEpoch; + result = 31 * result + Objects.hashCode(instanceId); + result = 31 * result + Objects.hashCode(rackId); + result = 31 * result + rebalanceTimeoutMs; + result = 31 * result + Objects.hashCode(clientId); + result = 31 * result + Objects.hashCode(clientHost); + result = 31 * result + Objects.hashCode(subscribedTopicNames); + result = 31 * result + Objects.hashCode(subscribedTopicRegex); + result = 31 * result + Objects.hashCode(serverAssignorName); + result = 31 * result + Objects.hashCode(clientAssignors); + result = 31 * result + Objects.hashCode(assignedPartitions); + result = 31 * result + Objects.hashCode(partitionsPendingRevocation); + result = 31 * result + Objects.hashCode(partitionsPendingAssignment); + return result; + } + + @Override + public String toString() { + return "ConsumerGroupMember(" + + "memberId='" + memberId + '\'' + + ", memberEpoch=" + memberEpoch + + ", previousMemberEpoch=" + previousMemberEpoch + + ", nextMemberEpoch=" + nextMemberEpoch + + ", instanceId='" + instanceId + '\'' + + ", rackId='" + rackId + '\'' + + ", rebalanceTimeoutMs=" + rebalanceTimeoutMs + + ", clientId='" + clientId + '\'' + + ", clientHost='" + clientHost + '\'' + + ", subscribedTopicNames=" + subscribedTopicNames + + ", subscribedTopicRegex='" + subscribedTopicRegex + '\'' + + ", serverAssignorName='" + serverAssignorName + '\'' + + ", clientAssignors=" + clientAssignors + + ", state=" + state + + ", assignedPartitions=" + assignedPartitions + + ", partitionsPendingRevocation=" + partitionsPendingRevocation + + ", partitionsPendingAssignment=" + partitionsPendingAssignment + + ')'; + } +} diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupMemberTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupMemberTest.java new file mode 100644 index 00000000000..7df34650be8 --- /dev/null +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupMemberTest.java @@ -0,0 +1,307 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.consumer; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue; +import org.junit.jupiter.api.Test; + +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.Collections; +import java.util.Optional; +import java.util.OptionalInt; + +import static org.apache.kafka.coordinator.group.consumer.AssignmentTestUtil.mkAssignment; +import static org.apache.kafka.coordinator.group.consumer.AssignmentTestUtil.mkTopicAssignment; +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class ConsumerGroupMemberTest { + + @Test + public void testNewMember() { + Uuid topicId1 = Uuid.randomUuid(); + Uuid topicId2 = Uuid.randomUuid(); + Uuid topicId3 = Uuid.randomUuid(); + + ConsumerGroupMember member = new ConsumerGroupMember.Builder("member-id") + .setMemberEpoch(10) + .setPreviousMemberEpoch(9) + .setNextMemberEpoch(11) + .setInstanceId("instance-id") + .setRackId("rack-id") + .setRebalanceTimeoutMs(5000) + .setClientId("client-id") + .setClientHost("hostname") + .setSubscribedTopicNames(Arrays.asList("foo", "bar")) + .setSubscribedTopicRegex("regex") + .setServerAssignorName("range") + .setClientAssignors(Collections.singletonList( + new ClientAssignor( + "assignor", + (byte) 0, + (byte) 0, + (byte) 1, + new VersionedMetadata( + (byte) 1, + ByteBuffer.allocate(0))))) + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(topicId1, 1, 2, 3))) + .setPartitionsPendingRevocation(mkAssignment( + mkTopicAssignment(topicId2, 4, 5, 6))) + .setPartitionsPendingAssignment(mkAssignment( + mkTopicAssignment(topicId3, 7, 8, 9))) + .build(); + + assertEquals("member-id", member.memberId()); + assertEquals(10, member.memberEpoch()); + assertEquals(9, member.previousMemberEpoch()); + assertEquals(11, member.nextMemberEpoch()); + assertEquals("instance-id", member.instanceId()); + assertEquals("rack-id", member.rackId()); + assertEquals("client-id", member.clientId()); + assertEquals("hostname", member.clientHost()); + // Names are sorted. + assertEquals(Arrays.asList("bar", "foo"), member.subscribedTopicNames()); + assertEquals("regex", member.subscribedTopicRegex()); + assertEquals("range", member.serverAssignorName().get()); + assertEquals( + Collections.singletonList( + new ClientAssignor( + "assignor", + (byte) 0, + (byte) 0, + (byte) 1, + new VersionedMetadata( + (byte) 1, + ByteBuffer.allocate(0)))), + member.clientAssignors()); + assertEquals(mkAssignment(mkTopicAssignment(topicId1, 1, 2, 3)), member.assignedPartitions()); + assertEquals(mkAssignment(mkTopicAssignment(topicId2, 4, 5, 6)), member.partitionsPendingRevocation()); + assertEquals(mkAssignment(mkTopicAssignment(topicId3, 7, 8, 9)), member.partitionsPendingAssignment()); + } + + @Test + public void testEquals() { + Uuid topicId1 = Uuid.randomUuid(); + Uuid topicId2 = Uuid.randomUuid(); + Uuid topicId3 = Uuid.randomUuid(); + + ConsumerGroupMember member1 = new ConsumerGroupMember.Builder("member-id") + .setMemberEpoch(10) + .setPreviousMemberEpoch(9) + .setNextMemberEpoch(11) + .setInstanceId("instance-id") + .setRackId("rack-id") + .setRebalanceTimeoutMs(5000) + .setClientId("client-id") + .setClientHost("hostname") + .setSubscribedTopicNames(Arrays.asList("foo", "bar")) + .setSubscribedTopicRegex("regex") + .setServerAssignorName("range") + .setClientAssignors(Collections.singletonList( + new ClientAssignor( + "assignor", + (byte) 0, + (byte) 0, + (byte) 1, + new VersionedMetadata( + (byte) 1, + ByteBuffer.allocate(0))))) + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(topicId1, 1, 2, 3))) + .setPartitionsPendingRevocation(mkAssignment( + mkTopicAssignment(topicId2, 4, 5, 6))) + .setPartitionsPendingAssignment(mkAssignment( + mkTopicAssignment(topicId3, 7, 8, 9))) + .build(); + + ConsumerGroupMember member2 = new ConsumerGroupMember.Builder("member-id") + .setMemberEpoch(10) + .setPreviousMemberEpoch(9) + .setNextMemberEpoch(11) + .setInstanceId("instance-id") + .setRackId("rack-id") + .setRebalanceTimeoutMs(5000) + .setClientId("client-id") + .setClientHost("hostname") + .setSubscribedTopicNames(Arrays.asList("foo", "bar")) + .setSubscribedTopicRegex("regex") + .setServerAssignorName("range") + .setClientAssignors(Collections.singletonList( + new ClientAssignor( + "assignor", + (byte) 0, + (byte) 0, + (byte) 1, + new VersionedMetadata( + (byte) 1, + ByteBuffer.allocate(0))))) + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(topicId1, 1, 2, 3))) + .setPartitionsPendingRevocation(mkAssignment( + mkTopicAssignment(topicId2, 4, 5, 6))) + .setPartitionsPendingAssignment(mkAssignment( + mkTopicAssignment(topicId3, 7, 8, 9))) + .build(); + + assertEquals(member1, member2); + } + + @Test + public void testUpdateMember() { + Uuid topicId1 = Uuid.randomUuid(); + Uuid topicId2 = Uuid.randomUuid(); + Uuid topicId3 = Uuid.randomUuid(); + + ConsumerGroupMember member = new ConsumerGroupMember.Builder("member-id") + .setMemberEpoch(10) + .setPreviousMemberEpoch(9) + .setNextMemberEpoch(11) + .setInstanceId("instance-id") + .setRackId("rack-id") + .setRebalanceTimeoutMs(5000) + .setClientId("client-id") + .setClientHost("hostname") + .setSubscribedTopicNames(Arrays.asList("foo", "bar")) + .setSubscribedTopicRegex("regex") + .setServerAssignorName("range") + .setClientAssignors(Collections.singletonList( + new ClientAssignor( + "assignor", + (byte) 0, + (byte) 0, + (byte) 1, + new VersionedMetadata( + (byte) 1, + ByteBuffer.allocate(0))))) + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(topicId1, 1, 2, 3))) + .setPartitionsPendingRevocation(mkAssignment( + mkTopicAssignment(topicId2, 4, 5, 6))) + .setPartitionsPendingAssignment(mkAssignment( + mkTopicAssignment(topicId3, 7, 8, 9))) + .build(); + + // This is a no-op. + ConsumerGroupMember updatedMember = new ConsumerGroupMember.Builder(member) + .maybeUpdateRackId(Optional.empty()) + .maybeUpdateInstanceId(Optional.empty()) + .maybeUpdateServerAssignorName(Optional.empty()) + .maybeUpdateSubscribedTopicNames(Optional.empty()) + .maybeUpdateSubscribedTopicRegex(Optional.empty()) + .maybeUpdateRebalanceTimeoutMs(OptionalInt.empty()) + .maybeUpdateClientAssignors(Optional.empty()) + .build(); + + assertEquals(member, updatedMember); + + updatedMember = new ConsumerGroupMember.Builder(member) + .maybeUpdateRackId(Optional.of("new-rack-id")) + .maybeUpdateInstanceId(Optional.of("new-instance-id")) + .maybeUpdateServerAssignorName(Optional.of("new-assignor")) + .maybeUpdateSubscribedTopicNames(Optional.of(Arrays.asList("zar"))) + .maybeUpdateSubscribedTopicRegex(Optional.of("new-regex")) + .maybeUpdateRebalanceTimeoutMs(OptionalInt.of(6000)) + .maybeUpdateClientAssignors(Optional.of(Collections.emptyList())) + .build(); + + assertEquals("new-instance-id", updatedMember.instanceId()); + assertEquals("new-rack-id", updatedMember.rackId()); + // Names are sorted. + assertEquals(Arrays.asList("zar"), updatedMember.subscribedTopicNames()); + assertEquals("new-regex", updatedMember.subscribedTopicRegex()); + assertEquals("new-assignor", updatedMember.serverAssignorName().get()); + assertEquals(Collections.emptyList(), updatedMember.clientAssignors()); + } + + @Test + public void testUpdateWithConsumerGroupMemberMetadataValue() { + ConsumerGroupMemberMetadataValue record = new ConsumerGroupMemberMetadataValue() + .setAssignors(Collections.singletonList(new ConsumerGroupMemberMetadataValue.Assignor() + .setName("client") + .setMinimumVersion((short) 0) + .setMaximumVersion((short) 2) + .setVersion((short) 1) + .setMetadata(new byte[0]))) + .setServerAssignor("range") + .setClientId("client-id") + .setClientHost("host-id") + .setInstanceId("instance-id") + .setRackId("rack-id") + .setRebalanceTimeoutMs(1000) + .setSubscribedTopicNames(Arrays.asList("foo", "bar")) + .setSubscribedTopicRegex("regex"); + + ConsumerGroupMember member = new ConsumerGroupMember.Builder("member-id") + .updateWith(record) + .build(); + + assertEquals("instance-id", member.instanceId()); + assertEquals("rack-id", member.rackId()); + assertEquals("client-id", member.clientId()); + assertEquals("host-id", member.clientHost()); + // Names are sorted. + assertEquals(Arrays.asList("bar", "foo"), member.subscribedTopicNames()); + assertEquals("regex", member.subscribedTopicRegex()); + assertEquals("range", member.serverAssignorName().get()); + assertEquals( + Collections.singletonList( + new ClientAssignor( + "client", + (byte) 0, + (byte) 0, + (byte) 2, + new VersionedMetadata( + (byte) 1, + ByteBuffer.allocate(0)))), + member.clientAssignors()); + } + + @Test + public void testUpdateWithConsumerGroupCurrentMemberAssignmentValue() { + Uuid topicId1 = Uuid.randomUuid(); + Uuid topicId2 = Uuid.randomUuid(); + Uuid topicId3 = Uuid.randomUuid(); + + ConsumerGroupCurrentMemberAssignmentValue record = new ConsumerGroupCurrentMemberAssignmentValue() + .setMemberEpoch(10) + .setPreviousMemberEpoch(9) + .setTargetMemberEpoch(11) + .setAssignedPartitions(Collections.singletonList(new ConsumerGroupCurrentMemberAssignmentValue.TopicPartitions() + .setTopicId(topicId1) + .setPartitions(Arrays.asList(0, 1, 2)))) + .setPartitionsPendingRevocation(Collections.singletonList(new ConsumerGroupCurrentMemberAssignmentValue.TopicPartitions() + .setTopicId(topicId2) + .setPartitions(Arrays.asList(3, 4, 5)))) + .setPartitionsPendingAssignment(Collections.singletonList(new ConsumerGroupCurrentMemberAssignmentValue.TopicPartitions() + .setTopicId(topicId3) + .setPartitions(Arrays.asList(6, 7, 8)))); + + ConsumerGroupMember member = new ConsumerGroupMember.Builder("member-id") + .updateWith(record) + .build(); + + assertEquals(10, member.memberEpoch()); + assertEquals(9, member.previousMemberEpoch()); + assertEquals(11, member.nextMemberEpoch()); + assertEquals(mkAssignment(mkTopicAssignment(topicId1, 0, 1, 2)), member.assignedPartitions()); + assertEquals(mkAssignment(mkTopicAssignment(topicId2, 3, 4, 5)), member.partitionsPendingRevocation()); + assertEquals(mkAssignment(mkTopicAssignment(topicId3, 6, 7, 8)), member.partitionsPendingAssignment()); + } +}