KAFKA-14462; [15/N] Make Result generic and rename it (#13793)

This patch makes the record type generic, moves the class to the runtime package and finally rename the class to follow the naming of the other classes in the runtime package.

Reviewers: Calvin Liu <caliu@confluent.io>, Justine Olshan <jolshan@confluent.io>
This commit is contained in:
David Jacot 2023-06-02 09:18:09 +02:00 committed by GitHub
parent f499662923
commit 7c3a2846d4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 34 additions and 30 deletions

View File

@ -49,6 +49,7 @@ import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmen
import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue;
import org.apache.kafka.coordinator.group.runtime.CoordinatorResult;
import org.apache.kafka.image.TopicsImage;
import org.apache.kafka.timeline.SnapshotRegistry;
import org.apache.kafka.timeline.TimelineHashMap;
@ -457,7 +458,7 @@ public class GroupMetadataManager {
* @return A Result containing the ConsumerGroupHeartbeat response and
* a list of records to update the state machine.
*/
private Result<ConsumerGroupHeartbeatResponseData> consumerGroupHeartbeat(
private CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGroupHeartbeat(
String groupId,
String memberId,
int memberEpoch,
@ -603,7 +604,7 @@ public class GroupMetadataManager {
response.setAssignment(createResponseAssignment(updatedMember));
}
return new Result<>(records, response);
return new CoordinatorResult<>(records, response);
}
/**
@ -614,7 +615,7 @@ public class GroupMetadataManager {
* @return A Result containing the ConsumerGroupHeartbeat response and
* a list of records to update the state machine.
*/
private Result<ConsumerGroupHeartbeatResponseData> consumerGroupLeave(
private CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGroupLeave(
String groupId,
String memberId
) throws ApiException {
@ -647,7 +648,7 @@ public class GroupMetadataManager {
int groupEpoch = group.groupEpoch() + 1;
records.add(newGroupEpochRecord(groupId, groupEpoch));
return new Result<>(records, new ConsumerGroupHeartbeatResponseData()
return new CoordinatorResult<>(records, new ConsumerGroupHeartbeatResponseData()
.setMemberId(memberId)
.setMemberEpoch(-1)
);
@ -662,7 +663,7 @@ public class GroupMetadataManager {
* @return A Result containing the ConsumerGroupHeartbeat response and
* a list of records to update the state machine.
*/
public Result<ConsumerGroupHeartbeatResponseData> consumerGroupHeartbeat(
public CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGroupHeartbeat(
RequestContext context,
ConsumerGroupHeartbeatRequestData request
) throws ApiException {

View File

@ -14,22 +14,23 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.coordinator.group;
package org.apache.kafka.coordinator.group.runtime;
import java.util.List;
import java.util.Objects;
/**
* The result of an operation applied to a state machine. The result
* contains a list of {{@link Record}} and a response.
* contains a list of records and a response.
*
* @param <T> The type of the response.
* @param <U> The type of the record.
*/
class Result<T> {
public class CoordinatorResult<T, U> {
/**
* The records.
*/
private final List<Record> records;
private final List<U> records;
/**
* The response.
@ -42,8 +43,8 @@ class Result<T> {
* @param records A non-null list of records.
* @param response A non-null response.
*/
public Result(
List<Record> records,
public CoordinatorResult(
List<U> records,
T response
) {
this.records = Objects.requireNonNull(records);
@ -53,7 +54,7 @@ class Result<T> {
/**
* @return The list of records.
*/
public List<Record> records() {
public List<U> records() {
return records;
}
@ -69,7 +70,7 @@ class Result<T> {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Result<?> result = (Result<?>) o;
CoordinatorResult<?, ?> result = (CoordinatorResult<?, ?>) o;
if (!records.equals(result.records)) return false;
return response.equals(result.response);

View File

@ -58,6 +58,7 @@ import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmen
import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue;
import org.apache.kafka.coordinator.group.runtime.CoordinatorResult;
import org.apache.kafka.image.TopicImage;
import org.apache.kafka.image.TopicsDelta;
import org.apache.kafka.image.TopicsImage;
@ -307,7 +308,7 @@ public class GroupMetadataManagerTest {
.state();
}
public Result<ConsumerGroupHeartbeatResponseData> consumerGroupHeartbeat(
public CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGroupHeartbeat(
ConsumerGroupHeartbeatRequestData request
) {
snapshotRegistry.getOrCreateSnapshot(lastCommittedOffset);
@ -328,7 +329,7 @@ public class GroupMetadataManagerTest {
false
);
Result<ConsumerGroupHeartbeatResponseData> result = groupMetadataManager.consumerGroupHeartbeat(
CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> result = groupMetadataManager.consumerGroupHeartbeat(
context,
request
);
@ -492,7 +493,7 @@ public class GroupMetadataManagerTest {
Collections.emptyMap()
));
Result<ConsumerGroupHeartbeatResponseData> result = context.consumerGroupHeartbeat(
CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> result = context.consumerGroupHeartbeat(
new ConsumerGroupHeartbeatRequestData()
.setGroupId("group-foo")
.setMemberEpoch(0)
@ -647,7 +648,7 @@ public class GroupMetadataManagerTest {
// Member joins with previous epoch and has a subset of the owned partitions. This
// is accepted as the response with the bumped epoch may have been lost. In this
// case, we provide back the correct epoch to the member.
Result<ConsumerGroupHeartbeatResponseData> result = context.consumerGroupHeartbeat(
CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> result = context.consumerGroupHeartbeat(
new ConsumerGroupHeartbeatRequestData()
.setGroupId(groupId)
.setMemberId(memberId)
@ -690,7 +691,7 @@ public class GroupMetadataManagerTest {
assertThrows(GroupIdNotFoundException.class, () ->
context.groupMetadataManager.getOrMaybeCreateConsumerGroup(groupId, false));
Result<ConsumerGroupHeartbeatResponseData> result = context.consumerGroupHeartbeat(
CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> result = context.consumerGroupHeartbeat(
new ConsumerGroupHeartbeatRequestData()
.setGroupId(groupId)
.setMemberId(memberId)
@ -791,7 +792,7 @@ public class GroupMetadataManagerTest {
)))
));
Result<ConsumerGroupHeartbeatResponseData> result = context.consumerGroupHeartbeat(
CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> result = context.consumerGroupHeartbeat(
new ConsumerGroupHeartbeatRequestData()
.setGroupId(groupId)
.setMemberId(memberId)
@ -924,7 +925,7 @@ public class GroupMetadataManagerTest {
));
// Member 3 joins the consumer group.
Result<ConsumerGroupHeartbeatResponseData> result = context.consumerGroupHeartbeat(
CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> result = context.consumerGroupHeartbeat(
new ConsumerGroupHeartbeatRequestData()
.setGroupId(groupId)
.setMemberId(memberId3)
@ -1049,7 +1050,7 @@ public class GroupMetadataManagerTest {
.build();
// Member 2 leaves the consumer group.
Result<ConsumerGroupHeartbeatResponseData> result = context.consumerGroupHeartbeat(
CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> result = context.consumerGroupHeartbeat(
new ConsumerGroupHeartbeatRequestData()
.setGroupId(groupId)
.setMemberId(memberId2)
@ -1159,7 +1160,7 @@ public class GroupMetadataManagerTest {
}
));
Result<ConsumerGroupHeartbeatResponseData> result;
CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> result;
// Members in the group are in Stable state.
assertEquals(ConsumerGroupMember.MemberState.STABLE, context.consumerGroupMemberState(groupId, memberId1));
@ -1566,7 +1567,7 @@ public class GroupMetadataManagerTest {
.withAssignmentEpoch(10))
.build();
Result<ConsumerGroupHeartbeatResponseData> result;
CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> result;
// Prepare new assignment for the group.
assignor.prepareGroupAssignment(new GroupAssignment(

View File

@ -14,8 +14,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.coordinator.group;
package org.apache.kafka.coordinator.group.runtime;
import org.apache.kafka.coordinator.group.Record;
import org.junit.jupiter.api.Test;
import java.util.Collections;
@ -23,24 +24,24 @@ import java.util.Collections;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
public class ResultTest {
public class CoordinatorResultTest {
@Test
public void testAttributes() {
Result<String> result = new Result<>(Collections.emptyList(), "response");
CoordinatorResult<String, Record> result = new CoordinatorResult<>(Collections.emptyList(), "response");
assertEquals(Collections.emptyList(), result.records());
assertEquals("response", result.response());
}
@Test
public void testAttributesCannotBeNull() {
assertThrows(NullPointerException.class, () -> new Result<>(Collections.emptyList(), null));
assertThrows(NullPointerException.class, () -> new Result<>(null, "response"));
assertThrows(NullPointerException.class, () -> new CoordinatorResult<>(Collections.emptyList(), null));
assertThrows(NullPointerException.class, () -> new CoordinatorResult<>(null, "response"));
}
@Test
public void testEquals() {
Result<String> result1 = new Result<>(Collections.emptyList(), "response");
Result<String> result2 = new Result<>(Collections.emptyList(), "response");
CoordinatorResult<String, Record> result1 = new CoordinatorResult<>(Collections.emptyList(), "response");
CoordinatorResult<String, Record> result2 = new CoordinatorResult<>(Collections.emptyList(), "response");
assertEquals(result1, result2);
}
}