KAFKA-18621: Add StreamsCoordinatorRecordHelpers (#18669)

A class with helper methods to create records stored in the __consumer_offsets topic.

Compared to the feature branch, I added unit tests (most functions were not tested) and adopted the new interface for constructing coordinator records introduced by David.

Reviewers: Bruno Cadonna <cadonna@apache.org>
This commit is contained in:
Lucas Brutschy 2025-01-30 09:28:45 +01:00 committed by GitHub
parent 0dfc4017b8
commit 56e50120be
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 1355 additions and 0 deletions

View File

@ -0,0 +1,473 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.coordinator.group.streams;
import org.apache.kafka.common.message.StreamsGroupHeartbeatRequestData;
import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
import org.apache.kafka.coordinator.group.generated.StreamsGroupCurrentMemberAssignmentKey;
import org.apache.kafka.coordinator.group.generated.StreamsGroupCurrentMemberAssignmentValue;
import org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataKey;
import org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataValue;
import org.apache.kafka.coordinator.group.generated.StreamsGroupMetadataKey;
import org.apache.kafka.coordinator.group.generated.StreamsGroupMetadataValue;
import org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataKey;
import org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataValue;
import org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataValue.PartitionMetadata;
import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberKey;
import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberValue;
import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMetadataKey;
import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMetadataValue;
import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyKey;
import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
/**
* This class contains helper methods to create records stored in the __consumer_offsets topic.
*/
public class StreamsCoordinatorRecordHelpers {
public static CoordinatorRecord newStreamsGroupMemberRecord(
String groupId,
StreamsGroupMember member
) {
Objects.requireNonNull(groupId, "groupId should not be null here");
Objects.requireNonNull(member, "member should not be null here");
return CoordinatorRecord.record(
new StreamsGroupMemberMetadataKey()
.setGroupId(groupId)
.setMemberId(member.memberId()),
new ApiMessageAndVersion(
new StreamsGroupMemberMetadataValue()
.setRackId(member.rackId().orElse(null))
.setInstanceId(member.instanceId().orElse(null))
.setClientId(member.clientId())
.setClientHost(member.clientHost())
.setRebalanceTimeoutMs(member.rebalanceTimeoutMs())
.setTopologyEpoch(member.topologyEpoch())
.setProcessId(member.processId())
.setUserEndpoint(member.userEndpoint().orElse(null))
.setClientTags(member.clientTags().entrySet().stream().map(e ->
new StreamsGroupMemberMetadataValue.KeyValue()
.setKey(e.getKey())
.setValue(e.getValue())
).sorted(Comparator.comparing(StreamsGroupMemberMetadataValue.KeyValue::key)).collect(Collectors.toList())),
(short) 0
)
);
}
/**
* Creates a StreamsGroupMemberMetadata tombstone.
*
* @param groupId The streams group id.
* @param memberId The streams group member id.
* @return The record.
*/
public static CoordinatorRecord newStreamsGroupMemberTombstoneRecord(
String groupId,
String memberId
) {
Objects.requireNonNull(groupId, "groupId should not be null here");
Objects.requireNonNull(memberId, "memberId should not be null here");
return CoordinatorRecord.tombstone(
new StreamsGroupMemberMetadataKey()
.setGroupId(groupId)
.setMemberId(memberId)
);
}
/**
* Creates a StreamsGroupPartitionMetadata record.
*
* @param groupId The streams group id.
* @param newPartitionMetadata The partition metadata.
* @return The record.
*/
public static CoordinatorRecord newStreamsGroupPartitionMetadataRecord(
String groupId,
Map<String, org.apache.kafka.coordinator.group.streams.TopicMetadata> newPartitionMetadata
) {
Objects.requireNonNull(groupId, "groupId should not be null here");
Objects.requireNonNull(newPartitionMetadata, "newPartitionMetadata should not be null here");
StreamsGroupPartitionMetadataValue value = new StreamsGroupPartitionMetadataValue();
newPartitionMetadata.forEach((topicName, topicMetadata) -> {
List<StreamsGroupPartitionMetadataValue.PartitionMetadata> partitionMetadata = new ArrayList<>();
if (!topicMetadata.partitionRacks().isEmpty()) {
topicMetadata.partitionRacks().forEach((partition, racks) ->
partitionMetadata.add(new StreamsGroupPartitionMetadataValue.PartitionMetadata()
.setPartition(partition)
.setRacks(racks.stream().sorted().toList())
)
);
}
partitionMetadata.sort(Comparator.comparingInt(PartitionMetadata::partition));
value.topics().add(new StreamsGroupPartitionMetadataValue.TopicMetadata()
.setTopicId(topicMetadata.id())
.setTopicName(topicMetadata.name())
.setNumPartitions(topicMetadata.numPartitions())
.setPartitionMetadata(partitionMetadata)
);
});
value.topics().sort(Comparator.comparing(StreamsGroupPartitionMetadataValue.TopicMetadata::topicName));
return CoordinatorRecord.record(
new StreamsGroupPartitionMetadataKey()
.setGroupId(groupId),
new ApiMessageAndVersion(
value,
(short) 0
)
);
}
/**
* Creates a StreamsGroupPartitionMetadata tombstone.
*
* @param groupId The streams group id.
* @return The record.
*/
public static CoordinatorRecord newStreamsGroupPartitionMetadataTombstoneRecord(
String groupId
) {
Objects.requireNonNull(groupId, "groupId should not be null here");
return CoordinatorRecord.tombstone(
new StreamsGroupPartitionMetadataKey()
.setGroupId(groupId)
);
}
public static CoordinatorRecord newStreamsGroupEpochRecord(
String groupId,
int newGroupEpoch
) {
Objects.requireNonNull(groupId, "groupId should not be null here");
return CoordinatorRecord.record(
new StreamsGroupMetadataKey()
.setGroupId(groupId),
new ApiMessageAndVersion(
new StreamsGroupMetadataValue()
.setEpoch(newGroupEpoch),
(short) 0
)
);
}
/**
* Creates a StreamsGroupMetadata tombstone.
*
* @param groupId The streams group id.
* @return The record.
*/
public static CoordinatorRecord newStreamsGroupEpochTombstoneRecord(
String groupId
) {
Objects.requireNonNull(groupId, "groupId should not be null here");
return CoordinatorRecord.tombstone(
new StreamsGroupMetadataKey()
.setGroupId(groupId)
);
}
public static CoordinatorRecord newStreamsGroupTargetAssignmentRecord(
String groupId,
String memberId,
TasksTuple assignment
) {
Objects.requireNonNull(groupId, "groupId should not be null here");
Objects.requireNonNull(memberId, "memberId should not be null here");
Objects.requireNonNull(assignment, "assignment should not be null here");
List<StreamsGroupTargetAssignmentMemberValue.TaskIds> activeTaskIds = new ArrayList<>(assignment.activeTasks().size());
for (Map.Entry<String, Set<Integer>> entry : assignment.activeTasks().entrySet()) {
activeTaskIds.add(
new StreamsGroupTargetAssignmentMemberValue.TaskIds()
.setSubtopologyId(entry.getKey())
.setPartitions(entry.getValue().stream().sorted().toList())
);
}
activeTaskIds.sort(Comparator.comparing(StreamsGroupTargetAssignmentMemberValue.TaskIds::subtopologyId));
List<StreamsGroupTargetAssignmentMemberValue.TaskIds> standbyTaskIds = new ArrayList<>(assignment.standbyTasks().size());
for (Map.Entry<String, Set<Integer>> entry : assignment.standbyTasks().entrySet()) {
standbyTaskIds.add(
new StreamsGroupTargetAssignmentMemberValue.TaskIds()
.setSubtopologyId(entry.getKey())
.setPartitions(entry.getValue().stream().sorted().toList())
);
}
standbyTaskIds.sort(Comparator.comparing(StreamsGroupTargetAssignmentMemberValue.TaskIds::subtopologyId));
List<StreamsGroupTargetAssignmentMemberValue.TaskIds> warmupTaskIds = new ArrayList<>(assignment.warmupTasks().size());
for (Map.Entry<String, Set<Integer>> entry : assignment.warmupTasks().entrySet()) {
warmupTaskIds.add(
new StreamsGroupTargetAssignmentMemberValue.TaskIds()
.setSubtopologyId(entry.getKey())
.setPartitions(entry.getValue().stream().sorted().toList())
);
}
warmupTaskIds.sort(Comparator.comparing(StreamsGroupTargetAssignmentMemberValue.TaskIds::subtopologyId));
return CoordinatorRecord.record(
new StreamsGroupTargetAssignmentMemberKey()
.setGroupId(groupId)
.setMemberId(memberId),
new ApiMessageAndVersion(
new StreamsGroupTargetAssignmentMemberValue()
.setActiveTasks(activeTaskIds)
.setStandbyTasks(standbyTaskIds)
.setWarmupTasks(warmupTaskIds),
(short) 0
)
);
}
/**
* Creates a StreamsGroupTargetAssignmentMember tombstone.
*
* @param groupId The streams group id.
* @param memberId The streams group member id.
* @return The record.
*/
public static CoordinatorRecord newStreamsGroupTargetAssignmentTombstoneRecord(
String groupId,
String memberId
) {
Objects.requireNonNull(groupId, "groupId should not be null here");
Objects.requireNonNull(memberId, "memberId should not be null here");
return CoordinatorRecord.tombstone(
new StreamsGroupTargetAssignmentMemberKey()
.setGroupId(groupId)
.setMemberId(memberId)
);
}
public static CoordinatorRecord newStreamsGroupTargetAssignmentEpochRecord(
String groupId,
int assignmentEpoch
) {
Objects.requireNonNull(groupId, "groupId should not be null here");
return CoordinatorRecord.record(
new StreamsGroupTargetAssignmentMetadataKey()
.setGroupId(groupId),
new ApiMessageAndVersion(
new StreamsGroupTargetAssignmentMetadataValue()
.setAssignmentEpoch(assignmentEpoch),
(short) 0
)
);
}
/**
* Creates a StreamsGroupTargetAssignmentMetadata tombstone.
*
* @param groupId The streams group id.
* @return The record.
*/
public static CoordinatorRecord newStreamsGroupTargetAssignmentEpochTombstoneRecord(
String groupId
) {
Objects.requireNonNull(groupId, "groupId should not be null here");
return CoordinatorRecord.tombstone(
new StreamsGroupTargetAssignmentMetadataKey()
.setGroupId(groupId)
);
}
public static CoordinatorRecord newStreamsGroupCurrentAssignmentRecord(
String groupId,
StreamsGroupMember member
) {
Objects.requireNonNull(groupId, "groupId should not be null here");
Objects.requireNonNull(member, "member should not be null here");
return CoordinatorRecord.record(
new StreamsGroupCurrentMemberAssignmentKey()
.setGroupId(groupId)
.setMemberId(member.memberId()),
new ApiMessageAndVersion(
new StreamsGroupCurrentMemberAssignmentValue()
.setMemberEpoch(member.memberEpoch())
.setPreviousMemberEpoch(member.previousMemberEpoch())
.setState(member.state().value())
.setActiveTasks(toTaskIds(member.assignedTasks().activeTasks()))
.setStandbyTasks(toTaskIds(member.assignedTasks().standbyTasks()))
.setWarmupTasks(toTaskIds(member.assignedTasks().warmupTasks()))
.setActiveTasksPendingRevocation(toTaskIds(member.tasksPendingRevocation().activeTasks()))
.setStandbyTasksPendingRevocation(toTaskIds(member.tasksPendingRevocation().standbyTasks()))
.setWarmupTasksPendingRevocation(toTaskIds(member.tasksPendingRevocation().warmupTasks())),
(short) 0
)
);
}
/**
* Creates a StreamsGroupCurrentMemberAssignment tombstone.
*
* @param groupId The streams group id.
* @param memberId The streams group member id.
* @return The record.
*/
public static CoordinatorRecord newStreamsGroupCurrentAssignmentTombstoneRecord(
String groupId,
String memberId
) {
Objects.requireNonNull(groupId, "groupId should not be null here");
Objects.requireNonNull(memberId, "memberId should not be null here");
return CoordinatorRecord.tombstone(
new StreamsGroupCurrentMemberAssignmentKey()
.setGroupId(groupId)
.setMemberId(memberId)
);
}
private static List<StreamsGroupCurrentMemberAssignmentValue.TaskIds> toTaskIds(
Map<String, Set<Integer>> tasks
) {
List<StreamsGroupCurrentMemberAssignmentValue.TaskIds> taskIds = new ArrayList<>(tasks.size());
tasks.forEach((subtopologyId, partitions) ->
taskIds.add(new StreamsGroupCurrentMemberAssignmentValue.TaskIds()
.setSubtopologyId(subtopologyId)
.setPartitions(partitions.stream().sorted().toList()))
);
taskIds.sort(Comparator.comparing(StreamsGroupCurrentMemberAssignmentValue.TaskIds::subtopologyId));
return taskIds;
}
/**
* Creates a StreamsTopology record.
*
* @param groupId The consumer group id.
* @param topology The new topology.
* @return The record.
*/
public static CoordinatorRecord newStreamsGroupTopologyRecord(String groupId,
StreamsGroupHeartbeatRequestData.Topology topology) {
Objects.requireNonNull(groupId, "groupId should not be null here");
Objects.requireNonNull(topology, "topology should not be null here");
return newStreamsGroupTopologyRecord(groupId, convertToStreamsGroupTopologyRecord(topology));
}
/**
* Creates a StreamsTopology record.
*
* @param groupId The consumer group id.
* @param value The encoded topology record value.
* @return The record.
*/
public static CoordinatorRecord newStreamsGroupTopologyRecord(String groupId, StreamsGroupTopologyValue value) {
Objects.requireNonNull(groupId, "groupId should not be null here");
Objects.requireNonNull(value, "value should not be null here");
return CoordinatorRecord.record(
new StreamsGroupTopologyKey()
.setGroupId(groupId),
new ApiMessageAndVersion(value, (short) 0)
);
}
/**
* Encodes subtopologies from the Heartbeat RPC to a StreamsTopology record value.
*
* @param topology The new topology
* @return The record value.
*/
public static StreamsGroupTopologyValue convertToStreamsGroupTopologyRecord(StreamsGroupHeartbeatRequestData.Topology topology) {
Objects.requireNonNull(topology, "topology should not be null here");
StreamsGroupTopologyValue value = new StreamsGroupTopologyValue();
value.setEpoch(topology.epoch());
topology.subtopologies().forEach(subtopology -> {
List<StreamsGroupTopologyValue.TopicInfo> repartitionSourceTopics =
subtopology.repartitionSourceTopics().stream()
.map(StreamsCoordinatorRecordHelpers::convertToTopicInfo)
.collect(Collectors.toList());
List<StreamsGroupTopologyValue.TopicInfo> stateChangelogTopics =
subtopology.stateChangelogTopics().stream()
.map(StreamsCoordinatorRecordHelpers::convertToTopicInfo)
.collect(Collectors.toList());
List<StreamsGroupTopologyValue.CopartitionGroup> copartitionGroups =
subtopology.copartitionGroups().stream()
.map(copartitionGroup -> new StreamsGroupTopologyValue.CopartitionGroup()
.setSourceTopics(copartitionGroup.sourceTopics())
.setSourceTopicRegex(copartitionGroup.sourceTopicRegex())
.setRepartitionSourceTopics(copartitionGroup.repartitionSourceTopics())
)
.collect(Collectors.toList());
value.subtopologies().add(
new StreamsGroupTopologyValue.Subtopology()
.setSubtopologyId(subtopology.subtopologyId())
.setSourceTopics(subtopology.sourceTopics())
.setSourceTopicRegex(subtopology.sourceTopicRegex())
.setRepartitionSinkTopics(subtopology.repartitionSinkTopics())
.setRepartitionSourceTopics(repartitionSourceTopics)
.setStateChangelogTopics(stateChangelogTopics)
.setCopartitionGroups(copartitionGroups)
);
});
return value;
}
private static StreamsGroupTopologyValue.TopicInfo convertToTopicInfo(StreamsGroupHeartbeatRequestData.TopicInfo topicInfo) {
List<StreamsGroupTopologyValue.TopicConfig> topicConfigs = topicInfo.topicConfigs() != null ? topicInfo.topicConfigs().stream()
.map(config -> new StreamsGroupTopologyValue.TopicConfig().setKey(config.key()).setValue(config.value()))
.collect(Collectors.toList()) : null;
return new StreamsGroupTopologyValue.TopicInfo()
.setName(topicInfo.name())
.setTopicConfigs(topicConfigs)
.setPartitions(topicInfo.partitions())
.setReplicationFactor(topicInfo.replicationFactor());
}
/**
* Creates a StreamsGroupTopology tombstone.
*
* @param groupId The streams group id.
* @return The record.
*/
public static CoordinatorRecord newStreamsGroupTopologyRecordTombstone(
String groupId
) {
Objects.requireNonNull(groupId, "groupId should not be null here");
return CoordinatorRecord.tombstone(
new StreamsGroupTopologyKey()
.setGroupId(groupId)
);
}
}

View File

@ -0,0 +1,882 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.coordinator.group.streams;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.message.StreamsGroupHeartbeatRequestData;
import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
import org.apache.kafka.coordinator.group.generated.StreamsGroupCurrentMemberAssignmentKey;
import org.apache.kafka.coordinator.group.generated.StreamsGroupCurrentMemberAssignmentValue;
import org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataKey;
import org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataValue;
import org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataValue.Endpoint;
import org.apache.kafka.coordinator.group.generated.StreamsGroupMetadataKey;
import org.apache.kafka.coordinator.group.generated.StreamsGroupMetadataValue;
import org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataKey;
import org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataValue;
import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberKey;
import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberValue;
import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberValue.TaskIds;
import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMetadataKey;
import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMetadataValue;
import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyKey;
import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue;
import org.apache.kafka.coordinator.group.streams.TaskAssignmentTestUtil.TaskRole;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import java.util.List;
import java.util.Map;
import java.util.Set;
import static org.apache.kafka.coordinator.group.streams.TaskAssignmentTestUtil.mkTasks;
import static org.apache.kafka.coordinator.group.streams.TaskAssignmentTestUtil.mkTasksTuple;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.Mockito.mock;
class StreamsCoordinatorRecordHelpersTest {
public static final String CLIENT_HOST = "client-host";
public static final String CLIENT_ID = "client-id";
public static final String CONFIG_NAME_1 = "config-name1";
public static final String CONFIG_NAME_2 = "config-name2";
public static final String CONFIG_VALUE_1 = "config-value1";
public static final String CONFIG_VALUE_2 = "config-value2";
public static final String GROUP_ID = "group-id";
public static final String INSTANCE_ID = "instance-id";
public static final String MEMBER_ID = "member-id";
public static final String PROCESS_ID = "process-id";
public static final String RACK_1 = "rack1";
public static final String RACK_2 = "rack2";
public static final String RACK_3 = "rack3";
public static final String SUBTOPOLOGY_1 = "subtopology1";
public static final String SUBTOPOLOGY_2 = "subtopology2";
public static final String SUBTOPOLOGY_3 = "subtopology3";
public static final String TAG_1 = "tag1";
public static final String TAG_2 = "tag2";
public static final String TOPIC_1 = "topic1";
public static final String TOPIC_2 = "topic2";
public static final String TOPIC_BAR = "bar";
public static final String TOPIC_CHANGELOG = "changelog";
public static final String TOPIC_FOO = "foo";
public static final String TOPIC_REGEX = "regex";
public static final String TOPIC_REPARTITION = "repartition";
public static final String USER_ENDPOINT = "user-endpoint";
public static final String VALUE_1 = "value1";
public static final String VALUE_2 = "value2";
public static final int REBALANCE_TIMEOUT_MS = 1000;
public static final int USER_ENDPOINT_PORT = 40;
@Test
public void testNewStreamsGroupMemberRecord() {
StreamsGroupMember member = new StreamsGroupMember.Builder(MEMBER_ID)
.setRackId(RACK_1)
.setInstanceId(INSTANCE_ID)
.setClientId(CLIENT_ID)
.setClientHost(CLIENT_HOST)
.setRebalanceTimeoutMs(REBALANCE_TIMEOUT_MS)
.setTopologyEpoch(1)
.setProcessId(PROCESS_ID)
.setUserEndpoint(new StreamsGroupMemberMetadataValue.Endpoint().setHost(USER_ENDPOINT).setPort(USER_ENDPOINT_PORT))
.setClientTags(Map.of(TAG_1, VALUE_1, TAG_2, VALUE_2))
.build();
CoordinatorRecord expectedRecord = CoordinatorRecord.record(
new StreamsGroupMemberMetadataKey()
.setGroupId(GROUP_ID)
.setMemberId(MEMBER_ID),
new ApiMessageAndVersion(
new StreamsGroupMemberMetadataValue()
.setRackId(RACK_1)
.setInstanceId(INSTANCE_ID)
.setClientId(CLIENT_ID)
.setClientHost(CLIENT_HOST)
.setRebalanceTimeoutMs(REBALANCE_TIMEOUT_MS)
.setTopologyEpoch(1)
.setProcessId(PROCESS_ID)
.setUserEndpoint(new StreamsGroupMemberMetadataValue.Endpoint().setHost(USER_ENDPOINT).setPort(USER_ENDPOINT_PORT))
.setClientTags(List.of(
new StreamsGroupMemberMetadataValue.KeyValue().setKey(TAG_1).setValue(VALUE_1),
new StreamsGroupMemberMetadataValue.KeyValue().setKey(TAG_2).setValue(VALUE_2)
)),
(short) 0
)
);
assertEquals(expectedRecord, StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(GROUP_ID, member));
}
@Test
public void testNewStreamsGroupMemberRecordWithNullRackId() {
StreamsGroupMember member = new StreamsGroupMember.Builder(MEMBER_ID)
.setRackId(null)
.setInstanceId(INSTANCE_ID)
.setClientId(CLIENT_ID)
.setClientHost(CLIENT_HOST)
.setRebalanceTimeoutMs(REBALANCE_TIMEOUT_MS)
.setTopologyEpoch(1)
.setProcessId(PROCESS_ID)
.setUserEndpoint(new StreamsGroupMemberMetadataValue.Endpoint().setHost(USER_ENDPOINT).setPort(USER_ENDPOINT_PORT))
.setClientTags(Map.of(TAG_1, VALUE_1, TAG_2, VALUE_2))
.build();
CoordinatorRecord expectedRecord = CoordinatorRecord.record(
new StreamsGroupMemberMetadataKey()
.setGroupId(GROUP_ID)
.setMemberId(MEMBER_ID),
new ApiMessageAndVersion(
new StreamsGroupMemberMetadataValue()
.setRackId(null)
.setInstanceId(INSTANCE_ID)
.setClientId(CLIENT_ID)
.setClientHost(CLIENT_HOST)
.setRebalanceTimeoutMs(REBALANCE_TIMEOUT_MS)
.setTopologyEpoch(1)
.setProcessId(PROCESS_ID)
.setUserEndpoint(new StreamsGroupMemberMetadataValue.Endpoint().setHost(USER_ENDPOINT).setPort(USER_ENDPOINT_PORT))
.setClientTags(List.of(
new StreamsGroupMemberMetadataValue.KeyValue().setKey(TAG_1).setValue(VALUE_1),
new StreamsGroupMemberMetadataValue.KeyValue().setKey(TAG_2).setValue(VALUE_2)
)),
(short) 0
)
);
assertEquals(expectedRecord, StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(GROUP_ID, member));
}
@Test
public void testNewStreamsGroupMemberRecordWithNullInstanceId() {
StreamsGroupMember member = new StreamsGroupMember.Builder(MEMBER_ID)
.setRackId(RACK_1)
.setInstanceId(null)
.setClientId(CLIENT_ID)
.setClientHost(CLIENT_HOST)
.setRebalanceTimeoutMs(REBALANCE_TIMEOUT_MS)
.setTopologyEpoch(1)
.setProcessId(PROCESS_ID)
.setUserEndpoint(new StreamsGroupMemberMetadataValue.Endpoint().setHost(USER_ENDPOINT).setPort(USER_ENDPOINT_PORT))
.setClientTags(Map.of(TAG_1, VALUE_1, TAG_2, VALUE_2))
.build();
CoordinatorRecord expectedRecord = CoordinatorRecord.record(
new StreamsGroupMemberMetadataKey()
.setGroupId(GROUP_ID)
.setMemberId(MEMBER_ID),
new ApiMessageAndVersion(
new StreamsGroupMemberMetadataValue()
.setRackId(RACK_1)
.setInstanceId(null)
.setClientId(CLIENT_ID)
.setClientHost(CLIENT_HOST)
.setRebalanceTimeoutMs(REBALANCE_TIMEOUT_MS)
.setTopologyEpoch(1)
.setProcessId(PROCESS_ID)
.setUserEndpoint(new StreamsGroupMemberMetadataValue.Endpoint().setHost(USER_ENDPOINT).setPort(USER_ENDPOINT_PORT))
.setClientTags(List.of(
new StreamsGroupMemberMetadataValue.KeyValue().setKey(TAG_1).setValue(VALUE_1),
new StreamsGroupMemberMetadataValue.KeyValue().setKey(TAG_2).setValue(VALUE_2)
)),
(short) 0
)
);
assertEquals(expectedRecord, StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(GROUP_ID, member));
}
@Test
public void testNewStreamsGroupMemberRecordWithNullUserEndpoint() {
StreamsGroupMember member = new StreamsGroupMember.Builder(MEMBER_ID)
.setRackId(RACK_1)
.setInstanceId(INSTANCE_ID)
.setClientId(CLIENT_ID)
.setClientHost(CLIENT_HOST)
.setRebalanceTimeoutMs(REBALANCE_TIMEOUT_MS)
.setTopologyEpoch(1)
.setProcessId(PROCESS_ID)
.setUserEndpoint(null)
.setClientTags(Map.of(TAG_1, VALUE_1, TAG_2, VALUE_2))
.build();
CoordinatorRecord expectedRecord = CoordinatorRecord.record(
new StreamsGroupMemberMetadataKey()
.setGroupId(GROUP_ID)
.setMemberId(MEMBER_ID),
new ApiMessageAndVersion(
new StreamsGroupMemberMetadataValue()
.setRackId(RACK_1)
.setInstanceId(INSTANCE_ID)
.setClientId(CLIENT_ID)
.setClientHost(CLIENT_HOST)
.setRebalanceTimeoutMs(REBALANCE_TIMEOUT_MS)
.setTopologyEpoch(1)
.setProcessId(PROCESS_ID)
.setUserEndpoint(null)
.setClientTags(List.of(
new StreamsGroupMemberMetadataValue.KeyValue().setKey(TAG_1).setValue(VALUE_1),
new StreamsGroupMemberMetadataValue.KeyValue().setKey(TAG_2).setValue(VALUE_2)
)),
(short) 0
)
);
assertEquals(expectedRecord, StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(GROUP_ID, member));
}
@Test
public void testNewStreamsGroupMemberTombstoneRecord() {
CoordinatorRecord expectedRecord = CoordinatorRecord.tombstone(
new StreamsGroupMemberMetadataKey()
.setGroupId(GROUP_ID)
.setMemberId(MEMBER_ID)
);
assertEquals(expectedRecord, StreamsCoordinatorRecordHelpers.newStreamsGroupMemberTombstoneRecord(GROUP_ID, MEMBER_ID));
}
@Test
public void testNewStreamsGroupPartitionMetadataRecord() {
Uuid uuid1 = Uuid.randomUuid();
Uuid uuid2 = Uuid.randomUuid();
Map<String, TopicMetadata> newPartitionMetadata = Map.of(
TOPIC_1, new TopicMetadata(uuid1, TOPIC_1, 1, Map.of(0, Set.of(RACK_1, RACK_2))),
TOPIC_2, new TopicMetadata(uuid2, TOPIC_2, 2, Map.of(1, Set.of(RACK_3)))
);
StreamsGroupPartitionMetadataValue value = new StreamsGroupPartitionMetadataValue();
value.topics().add(new StreamsGroupPartitionMetadataValue.TopicMetadata()
.setTopicId(uuid1)
.setTopicName(TOPIC_1)
.setNumPartitions(1)
.setPartitionMetadata(List.of(
new StreamsGroupPartitionMetadataValue.PartitionMetadata()
.setPartition(0)
.setRacks(List.of(RACK_1, RACK_2))
))
);
value.topics().add(new StreamsGroupPartitionMetadataValue.TopicMetadata()
.setTopicId(uuid2)
.setTopicName(TOPIC_2)
.setNumPartitions(2)
.setPartitionMetadata(List.of(
new StreamsGroupPartitionMetadataValue.PartitionMetadata()
.setPartition(1)
.setRacks(List.of(RACK_3))
))
);
CoordinatorRecord expectedRecord = CoordinatorRecord.record(
new StreamsGroupPartitionMetadataKey()
.setGroupId(GROUP_ID),
new ApiMessageAndVersion(value, (short) 0)
);
assertEquals(expectedRecord,
StreamsCoordinatorRecordHelpers.newStreamsGroupPartitionMetadataRecord(GROUP_ID, newPartitionMetadata));
}
@Test
public void testNewStreamsGroupPartitionMetadataTombstoneRecord() {
CoordinatorRecord expectedRecord = CoordinatorRecord.tombstone(
new StreamsGroupPartitionMetadataKey()
.setGroupId(GROUP_ID)
);
assertEquals(expectedRecord, StreamsCoordinatorRecordHelpers.newStreamsGroupPartitionMetadataTombstoneRecord(GROUP_ID));
}
@Test
public void testNewStreamsGroupEpochRecord() {
CoordinatorRecord expectedRecord = CoordinatorRecord.record(
new StreamsGroupMetadataKey()
.setGroupId(GROUP_ID),
new ApiMessageAndVersion(
new StreamsGroupMetadataValue()
.setEpoch(42),
(short) 0
)
);
assertEquals(expectedRecord, StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(GROUP_ID, 42));
}
@Test
public void testNewStreamsGroupEpochTombstoneRecord() {
CoordinatorRecord expectedRecord = CoordinatorRecord.tombstone(
new StreamsGroupMetadataKey()
.setGroupId(GROUP_ID)
);
assertEquals(expectedRecord, StreamsCoordinatorRecordHelpers.newStreamsGroupEpochTombstoneRecord(GROUP_ID));
}
@Test
public void testNewStreamsGroupTargetAssignmentRecord() {
Map<String, Set<Integer>> activeTasks = Map.of(SUBTOPOLOGY_1, Set.of(1, 2, 3));
Map<String, Set<Integer>> standbyTasks = Map.of(SUBTOPOLOGY_2, Set.of(4, 5, 6));
Map<String, Set<Integer>> warmupTasks = Map.of(SUBTOPOLOGY_3, Set.of(7, 8, 9));
CoordinatorRecord expectedRecord = CoordinatorRecord.record(
new StreamsGroupTargetAssignmentMemberKey()
.setGroupId(GROUP_ID)
.setMemberId(MEMBER_ID),
new ApiMessageAndVersion(
new StreamsGroupTargetAssignmentMemberValue()
.setActiveTasks(List.of(
new StreamsGroupTargetAssignmentMemberValue.TaskIds()
.setSubtopologyId(SUBTOPOLOGY_1)
.setPartitions(List.of(1, 2, 3))
))
.setStandbyTasks(List.of(
new StreamsGroupTargetAssignmentMemberValue.TaskIds()
.setSubtopologyId(SUBTOPOLOGY_2)
.setPartitions(List.of(4, 5, 6))
))
.setWarmupTasks(List.of(
new StreamsGroupTargetAssignmentMemberValue.TaskIds()
.setSubtopologyId(SUBTOPOLOGY_3)
.setPartitions(List.of(7, 8, 9))
)),
(short) 0
)
);
assertEquals(expectedRecord,
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(GROUP_ID, MEMBER_ID,
new TasksTuple(activeTasks, standbyTasks, warmupTasks)));
}
@ParameterizedTest
@EnumSource(TaskRole.class)
public void testNewStreamsGroupTargetAssignmentRecordWithEmptyTaskIds(TaskRole taskRole) {
final StreamsGroupTargetAssignmentMemberValue targetAssignmentMemberValue = new StreamsGroupTargetAssignmentMemberValue();
final List<TaskIds> taskIds = List.of(new TaskIds().setSubtopologyId(SUBTOPOLOGY_1).setPartitions(List.of(1, 2, 3)));
switch (taskRole) {
case ACTIVE:
targetAssignmentMemberValue.setActiveTasks(taskIds);
break;
case STANDBY:
targetAssignmentMemberValue.setStandbyTasks(taskIds);
break;
case WARMUP:
targetAssignmentMemberValue.setWarmupTasks(taskIds);
break;
}
CoordinatorRecord expectedRecord = CoordinatorRecord.record(
new StreamsGroupTargetAssignmentMemberKey()
.setGroupId(GROUP_ID)
.setMemberId(MEMBER_ID),
new ApiMessageAndVersion(
targetAssignmentMemberValue,
(short) 0
)
);
assertEquals(expectedRecord,
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(GROUP_ID, MEMBER_ID,
mkTasksTuple(taskRole, mkTasks(SUBTOPOLOGY_1, 1, 2, 3))));
}
@Test
public void testNewStreamsGroupTargetAssignmentTombstoneRecord() {
CoordinatorRecord expectedRecord = CoordinatorRecord.tombstone(
new StreamsGroupTargetAssignmentMemberKey()
.setGroupId(GROUP_ID)
.setMemberId(MEMBER_ID)
);
assertEquals(expectedRecord,
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentTombstoneRecord(GROUP_ID, MEMBER_ID));
}
@Test
public void testNewStreamsGroupTargetAssignmentEpochRecord() {
CoordinatorRecord expectedRecord = CoordinatorRecord.record(
new StreamsGroupTargetAssignmentMetadataKey()
.setGroupId(GROUP_ID),
new ApiMessageAndVersion(
new StreamsGroupTargetAssignmentMetadataValue()
.setAssignmentEpoch(42),
(short) 0
)
);
assertEquals(expectedRecord, StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(GROUP_ID, 42));
}
@Test
public void testNewStreamsGroupTargetAssignmentEpochTombstoneRecord() {
CoordinatorRecord expectedRecord = CoordinatorRecord.tombstone(
new StreamsGroupTargetAssignmentMetadataKey()
.setGroupId(GROUP_ID)
);
assertEquals(expectedRecord, StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochTombstoneRecord(GROUP_ID));
}
@Test
public void testNewStreamsGroupCurrentAssignmentRecord() {
StreamsGroupMember member = new StreamsGroupMember.Builder(MEMBER_ID)
.setRackId(RACK_1)
.setInstanceId(INSTANCE_ID)
.setClientId(CLIENT_ID)
.setClientHost(CLIENT_HOST)
.setRebalanceTimeoutMs(REBALANCE_TIMEOUT_MS)
.setMemberEpoch(1)
.setPreviousMemberEpoch(0)
.setState(MemberState.STABLE)
.setTopologyEpoch(1)
.setProcessId(PROCESS_ID)
.setUserEndpoint(new Endpoint().setHost(USER_ENDPOINT).setPort(USER_ENDPOINT_PORT))
.setClientTags(Map.of(TAG_1, VALUE_1, TAG_2, VALUE_2))
.setAssignedTasks(new TasksTuple(
Map.of(
SUBTOPOLOGY_1, Set.of(1, 2, 3)
),
Map.of(
SUBTOPOLOGY_2, Set.of(4, 5, 6)
),
Map.of(
SUBTOPOLOGY_3, Set.of(7, 8, 9)
)
))
.setTasksPendingRevocation(new TasksTuple(
Map.of(
SUBTOPOLOGY_1, Set.of(1, 2, 3)
),
Map.of(
SUBTOPOLOGY_2, Set.of(4, 5, 6)
),
Map.of(
SUBTOPOLOGY_3, Set.of(7, 8, 9)
)
))
.build();
CoordinatorRecord expectedRecord = CoordinatorRecord.record(
new StreamsGroupCurrentMemberAssignmentKey()
.setGroupId(GROUP_ID)
.setMemberId(MEMBER_ID),
new ApiMessageAndVersion(
new StreamsGroupCurrentMemberAssignmentValue()
.setMemberEpoch(1)
.setPreviousMemberEpoch(0)
.setState(MemberState.STABLE.value())
.setActiveTasks(List.of(
new StreamsGroupCurrentMemberAssignmentValue.TaskIds()
.setSubtopologyId(SUBTOPOLOGY_1)
.setPartitions(List.of(1, 2, 3))
))
.setStandbyTasks(List.of(
new StreamsGroupCurrentMemberAssignmentValue.TaskIds()
.setSubtopologyId(SUBTOPOLOGY_2)
.setPartitions(List.of(4, 5, 6))
))
.setWarmupTasks(List.of(
new StreamsGroupCurrentMemberAssignmentValue.TaskIds()
.setSubtopologyId(SUBTOPOLOGY_3)
.setPartitions(List.of(7, 8, 9))
))
.setActiveTasksPendingRevocation(List.of(
new StreamsGroupCurrentMemberAssignmentValue.TaskIds()
.setSubtopologyId(SUBTOPOLOGY_1)
.setPartitions(List.of(1, 2, 3))
))
.setStandbyTasksPendingRevocation(List.of(
new StreamsGroupCurrentMemberAssignmentValue.TaskIds()
.setSubtopologyId(SUBTOPOLOGY_2)
.setPartitions(List.of(4, 5, 6))
))
.setWarmupTasksPendingRevocation(List.of(
new StreamsGroupCurrentMemberAssignmentValue.TaskIds()
.setSubtopologyId(SUBTOPOLOGY_3)
.setPartitions(List.of(7, 8, 9))
)),
(short) 0
)
);
assertEquals(expectedRecord, StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(GROUP_ID, member));
}
@Test
public void testNewStreamsGroupCurrentAssignmentRecordWithEmptyAssignment() {
StreamsGroupMember member = new StreamsGroupMember.Builder(MEMBER_ID)
.setRackId(RACK_1)
.setInstanceId(INSTANCE_ID)
.setClientId(CLIENT_ID)
.setClientHost(CLIENT_HOST)
.setRebalanceTimeoutMs(REBALANCE_TIMEOUT_MS)
.setMemberEpoch(1)
.setPreviousMemberEpoch(0)
.setState(MemberState.STABLE)
.setTopologyEpoch(1)
.setProcessId(PROCESS_ID)
.setUserEndpoint(new Endpoint().setHost(USER_ENDPOINT).setPort(USER_ENDPOINT_PORT))
.setClientTags(Map.of(TAG_1, VALUE_1, TAG_2, VALUE_2))
.setAssignedTasks(new TasksTuple(Map.of(), Map.of(), Map.of()))
.setTasksPendingRevocation(new TasksTuple(Map.of(), Map.of(), Map.of()))
.build();
CoordinatorRecord expectedRecord = CoordinatorRecord.record(
new StreamsGroupCurrentMemberAssignmentKey()
.setGroupId(GROUP_ID)
.setMemberId(MEMBER_ID),
new ApiMessageAndVersion(
new StreamsGroupCurrentMemberAssignmentValue()
.setMemberEpoch(1)
.setPreviousMemberEpoch(0)
.setState(MemberState.STABLE.value())
.setActiveTasks(List.of())
.setStandbyTasks(List.of())
.setWarmupTasks(List.of())
.setActiveTasksPendingRevocation(List.of())
.setStandbyTasksPendingRevocation(List.of())
.setWarmupTasksPendingRevocation(List.of()),
(short) 0
)
);
assertEquals(expectedRecord, StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(GROUP_ID, member));
}
@Test
public void testNewStreamsGroupCurrentAssignmentTombstoneRecord() {
CoordinatorRecord expectedRecord = CoordinatorRecord.tombstone(
new StreamsGroupCurrentMemberAssignmentKey()
.setGroupId(GROUP_ID)
.setMemberId(MEMBER_ID)
);
assertEquals(expectedRecord,
StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentTombstoneRecord(GROUP_ID, MEMBER_ID));
}
@Test
public void testNewStreamsGroupTopologyRecord() {
StreamsGroupHeartbeatRequestData.Topology topology =
new StreamsGroupHeartbeatRequestData.Topology()
.setEpoch(42)
.setSubtopologies(
List.of(new StreamsGroupHeartbeatRequestData.Subtopology()
.setSubtopologyId(SUBTOPOLOGY_1)
.setRepartitionSinkTopics(List.of(TOPIC_FOO))
.setSourceTopics(List.of(TOPIC_BAR))
.setSourceTopicRegex(List.of(TOPIC_REGEX))
.setRepartitionSourceTopics(
List.of(
new StreamsGroupHeartbeatRequestData.TopicInfo()
.setName(TOPIC_REPARTITION)
.setPartitions(4)
.setReplicationFactor((short) 3)
.setTopicConfigs(List.of(
new StreamsGroupHeartbeatRequestData.KeyValue()
.setKey(CONFIG_NAME_1)
.setValue(CONFIG_VALUE_1)
))
)
)
.setStateChangelogTopics(
List.of(
new StreamsGroupHeartbeatRequestData.TopicInfo()
.setName(TOPIC_CHANGELOG)
.setReplicationFactor((short) 2)
.setTopicConfigs(List.of(
new StreamsGroupHeartbeatRequestData.KeyValue()
.setKey(CONFIG_NAME_2)
.setValue(CONFIG_VALUE_2)
))
)
)
.setCopartitionGroups(List.of(
new StreamsGroupHeartbeatRequestData.CopartitionGroup()
.setSourceTopics(List.of((short) 0))
.setRepartitionSourceTopics(List.of((short) 0)),
new StreamsGroupHeartbeatRequestData.CopartitionGroup()
.setSourceTopicRegex(List.of((short) 0))
)),
new StreamsGroupHeartbeatRequestData.Subtopology()
.setSubtopologyId(SUBTOPOLOGY_1)
.setRepartitionSinkTopics(List.of())
.setSourceTopics(List.of(TOPIC_BAR))
.setSourceTopicRegex(List.of())
.setRepartitionSourceTopics(List.of())
.setStateChangelogTopics(List.of())
.setCopartitionGroups(List.of())
)
);
StreamsGroupTopologyValue expectedTopology =
new StreamsGroupTopologyValue()
.setEpoch(42)
.setSubtopologies(
List.of(new StreamsGroupTopologyValue.Subtopology()
.setSubtopologyId(SUBTOPOLOGY_1)
.setRepartitionSinkTopics(List.of(TOPIC_FOO))
.setSourceTopics(List.of(TOPIC_BAR))
.setSourceTopicRegex(List.of(TOPIC_REGEX))
.setRepartitionSourceTopics(
List.of(
new StreamsGroupTopologyValue.TopicInfo()
.setName(TOPIC_REPARTITION)
.setPartitions(4)
.setReplicationFactor((short) 3)
.setTopicConfigs(List.of(
new StreamsGroupTopologyValue.TopicConfig()
.setKey(CONFIG_NAME_1)
.setValue(CONFIG_VALUE_1)
))
)
)
.setStateChangelogTopics(
List.of(
new StreamsGroupTopologyValue.TopicInfo()
.setName(TOPIC_CHANGELOG)
.setReplicationFactor((short) 2)
.setTopicConfigs(List.of(
new StreamsGroupTopologyValue.TopicConfig()
.setKey(CONFIG_NAME_2)
.setValue(CONFIG_VALUE_2)
))
)
)
.setCopartitionGroups(List.of(
new StreamsGroupTopologyValue.CopartitionGroup()
.setSourceTopics(List.of((short) 0))
.setRepartitionSourceTopics(List.of((short) 0)),
new StreamsGroupTopologyValue.CopartitionGroup()
.setSourceTopicRegex(List.of((short) 0))
)),
new StreamsGroupTopologyValue.Subtopology()
.setSubtopologyId(SUBTOPOLOGY_1)
.setRepartitionSinkTopics(List.of())
.setSourceTopics(List.of(TOPIC_BAR))
.setSourceTopicRegex(List.of())
.setRepartitionSourceTopics(List.of())
.setStateChangelogTopics(List.of())
.setCopartitionGroups(List.of())
)
);
CoordinatorRecord expectedRecord = CoordinatorRecord.record(
new StreamsGroupTopologyKey()
.setGroupId(GROUP_ID),
new ApiMessageAndVersion(
expectedTopology,
(short) 0));
assertEquals(expectedRecord, StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord(GROUP_ID, topology));
}
@Test
public void testNewStreamsGroupTopologyRecordTombstone() {
CoordinatorRecord expectedRecord = CoordinatorRecord.tombstone(
new StreamsGroupTopologyKey()
.setGroupId(GROUP_ID)
);
assertEquals(expectedRecord, StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecordTombstone(GROUP_ID));
}
@Test
public void testNewStreamsGroupMemberRecordNullGroupId() {
NullPointerException exception = assertThrows(NullPointerException.class, () ->
StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(null, mock(StreamsGroupMember.class)));
assertEquals("groupId should not be null here", exception.getMessage());
}
@Test
public void testNewStreamsGroupMemberRecordNullMember() {
NullPointerException exception = assertThrows(NullPointerException.class, () ->
StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord("groupId", null));
assertEquals("member should not be null here", exception.getMessage());
}
@Test
public void testNewStreamsGroupMemberTombstoneRecordNullGroupId() {
NullPointerException exception = assertThrows(NullPointerException.class, () ->
StreamsCoordinatorRecordHelpers.newStreamsGroupMemberTombstoneRecord(null, "memberId"));
assertEquals("groupId should not be null here", exception.getMessage());
}
@Test
public void testNewStreamsGroupMemberTombstoneRecordNullMemberId() {
NullPointerException exception = assertThrows(NullPointerException.class, () ->
StreamsCoordinatorRecordHelpers.newStreamsGroupMemberTombstoneRecord("groupId", null));
assertEquals("memberId should not be null here", exception.getMessage());
}
@Test
public void testNewStreamsGroupPartitionMetadataRecordNullGroupId() {
NullPointerException exception = assertThrows(NullPointerException.class, () ->
StreamsCoordinatorRecordHelpers.newStreamsGroupPartitionMetadataRecord(null, Map.of()));
assertEquals("groupId should not be null here", exception.getMessage());
}
@Test
public void testNewStreamsGroupPartitionMetadataRecordNullNewPartitionMetadata() {
NullPointerException exception = assertThrows(NullPointerException.class, () ->
StreamsCoordinatorRecordHelpers.newStreamsGroupPartitionMetadataRecord("groupId", null));
assertEquals("newPartitionMetadata should not be null here", exception.getMessage());
}
@Test
public void testNewStreamsGroupPartitionMetadataTombstoneRecordNullGroupId() {
NullPointerException exception = assertThrows(NullPointerException.class, () ->
StreamsCoordinatorRecordHelpers.newStreamsGroupPartitionMetadataTombstoneRecord(null));
assertEquals("groupId should not be null here", exception.getMessage());
}
@Test
public void testNewStreamsGroupEpochRecordNullGroupId() {
NullPointerException exception = assertThrows(NullPointerException.class, () ->
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(null, 1));
assertEquals("groupId should not be null here", exception.getMessage());
}
@Test
public void testNewStreamsGroupEpochTombstoneRecordNullGroupId() {
NullPointerException exception = assertThrows(NullPointerException.class, () ->
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochTombstoneRecord(null));
assertEquals("groupId should not be null here", exception.getMessage());
}
@Test
public void testNewStreamsGroupTargetAssignmentRecordNullGroupId() {
NullPointerException exception = assertThrows(NullPointerException.class, () ->
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(null, "memberId", mock(TasksTuple.class)));
assertEquals("groupId should not be null here", exception.getMessage());
}
@Test
public void testNewStreamsGroupTargetAssignmentRecordNullMemberId() {
NullPointerException exception = assertThrows(NullPointerException.class, () ->
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord("groupId", null, mock(TasksTuple.class)));
assertEquals("memberId should not be null here", exception.getMessage());
}
@Test
public void testNewStreamsGroupTargetAssignmentRecordNullAssignment() {
NullPointerException exception = assertThrows(NullPointerException.class, () ->
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord("groupId", "memberId", null));
assertEquals("assignment should not be null here", exception.getMessage());
}
@Test
public void testNewStreamsGroupTargetAssignmentTombstoneRecordNullGroupId() {
NullPointerException exception = assertThrows(NullPointerException.class, () ->
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentTombstoneRecord(null, "memberId"));
assertEquals("groupId should not be null here", exception.getMessage());
}
@Test
public void testNewStreamsGroupTargetAssignmentTombstoneRecordNullMemberId() {
NullPointerException exception = assertThrows(NullPointerException.class, () ->
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentTombstoneRecord("groupId", null));
assertEquals("memberId should not be null here", exception.getMessage());
}
@Test
public void testNewStreamsGroupTargetAssignmentEpochRecordNullGroupId() {
NullPointerException exception = assertThrows(NullPointerException.class, () ->
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(null, 1));
assertEquals("groupId should not be null here", exception.getMessage());
}
@Test
public void testNewStreamsGroupTargetAssignmentEpochTombstoneRecordNullGroupId() {
NullPointerException exception = assertThrows(NullPointerException.class, () ->
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochTombstoneRecord(null));
assertEquals("groupId should not be null here", exception.getMessage());
}
@Test
public void testNewStreamsGroupCurrentAssignmentRecordNullGroupId() {
NullPointerException exception = assertThrows(NullPointerException.class, () ->
StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(null, mock(StreamsGroupMember.class)));
assertEquals("groupId should not be null here", exception.getMessage());
}
@Test
public void testNewStreamsGroupCurrentAssignmentRecordNullMember() {
NullPointerException exception = assertThrows(NullPointerException.class, () ->
StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord("groupId", null));
assertEquals("member should not be null here", exception.getMessage());
}
@Test
public void testNewStreamsGroupCurrentAssignmentTombstoneRecordNullGroupId() {
NullPointerException exception = assertThrows(NullPointerException.class, () ->
StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentTombstoneRecord(null, "memberId"));
assertEquals("groupId should not be null here", exception.getMessage());
}
@Test
public void testNewStreamsGroupCurrentAssignmentTombstoneRecordNullMemberId() {
NullPointerException exception = assertThrows(NullPointerException.class, () ->
StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentTombstoneRecord("groupId", null));
assertEquals("memberId should not be null here", exception.getMessage());
}
@Test
public void testNewStreamsGroupTopologyRecordWithValueNullGroupId() {
NullPointerException exception = assertThrows(NullPointerException.class, () ->
StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord(null, mock(StreamsGroupTopologyValue.class)));
assertEquals("groupId should not be null here", exception.getMessage());
}
@Test
public void testNewStreamsGroupTopologyRecordWithTopologyNullGroupId() {
NullPointerException exception = assertThrows(NullPointerException.class, () ->
StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord(null, mock(StreamsGroupHeartbeatRequestData.Topology.class)));
assertEquals("groupId should not be null here", exception.getMessage());
}
@Test
public void testNewStreamsGroupTopologyRecordNullTopology() {
NullPointerException exception = assertThrows(NullPointerException.class, () ->
StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord("groupId", (StreamsGroupHeartbeatRequestData.Topology) null));
assertEquals("topology should not be null here", exception.getMessage());
}
@Test
public void testNewStreamsGroupTopologyRecordNullValue() {
NullPointerException exception = assertThrows(NullPointerException.class, () ->
StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord("groupId", (StreamsGroupTopologyValue) null));
assertEquals("value should not be null here", exception.getMessage());
}
@Test
public void testNewStreamsGroupTopologyRecordTombstoneNullGroupId() {
NullPointerException exception = assertThrows(NullPointerException.class, () ->
StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecordTombstone(null));
assertEquals("groupId should not be null here", exception.getMessage());
}
@Test
public void testConvertToStreamsGroupTopologyRecordNullTopology() {
NullPointerException exception = assertThrows(NullPointerException.class, () ->
StreamsCoordinatorRecordHelpers.convertToStreamsGroupTopologyRecord(null));
assertEquals("topology should not be null here", exception.getMessage());
}
}