Compare commits

...

4 Commits

Author SHA1 Message Date
Lan Ding 0c3800057c
Merge b9633dd96a into 4a5aa37169 2025-10-08 01:11:24 +08:00
Chang-Chi Hsu 4a5aa37169
MINOR: Move ReconfigurableQuorumIntegrationTest from core module to server module (#20636)
CI / build (push) Waiting to run Details
It moves the `ReconfigurableQuorumIntegrationTest` class to the
`org.apache.kafka.server` package and consolidates two related tests,
`RemoveAndAddVoterWithValidClusterId` and
`RemoveAndAddVoterWithInconsistentClusterId`, into a single file. This
improves code organization and reduces redundancy.

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
2025-10-08 01:10:58 +08:00
lucliu1108 2938c4242e
KAFKA-19754: Add RPC-level integration test for StreamsGroupDescribeRequest (#20632)
CI / build (push) Waiting to run Details
Test the `StreamsGroupDescribeRequest` RPC and corresponding responses
for situations where
- `streams.version` not upgraded to 1
- `streams.version` enabled, multiple groups listening to the same
topic.

Reviewers: Lucas Brutschy <lucasbru@apache.org>
2025-10-07 15:47:32 +02:00
Lan Ding b9633dd96a MINOR: change the field header name in ConsumerPerformance 2025-09-30 11:12:36 +08:00
5 changed files with 474 additions and 143 deletions

View File

@ -1,132 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.admin;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.InconsistentClusterIdException;
import org.apache.kafka.common.test.KafkaClusterTestKit;
import org.apache.kafka.common.test.TestKitNodes;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import static org.junit.jupiter.api.Assertions.assertEquals;
@Tag("integration")
public class ReconfigurableQuorumIntegrationTest {
static Map<Integer, Uuid> descVoterDirs(Admin admin) throws ExecutionException, InterruptedException {
var quorumInfo = admin.describeMetadataQuorum().quorumInfo().get();
return quorumInfo.voters().stream().collect(Collectors.toMap(QuorumInfo.ReplicaState::replicaId, QuorumInfo.ReplicaState::replicaDirectoryId));
}
@Test
public void testRemoveAndAddVoterWithValidClusterId() throws Exception {
final var nodes = new TestKitNodes.Builder()
.setClusterId("test-cluster")
.setNumBrokerNodes(1)
.setNumControllerNodes(3)
.build();
final Map<Integer, Uuid> initialVoters = new HashMap<>();
for (final var controllerNode : nodes.controllerNodes().values()) {
initialVoters.put(
controllerNode.id(),
controllerNode.metadataDirectoryId()
);
}
try (var cluster = new KafkaClusterTestKit.Builder(nodes).setInitialVoterSet(initialVoters).build()) {
cluster.format();
cluster.startup();
try (Admin admin = Admin.create(cluster.clientProperties())) {
TestUtils.waitForCondition(() -> {
Map<Integer, Uuid> voters = descVoterDirs(admin);
assertEquals(Set.of(3000, 3001, 3002), voters.keySet());
return voters.values().stream().noneMatch(directory -> directory.equals(Uuid.ZERO_UUID));
}, "Initial quorum voters should be {3000, 3001, 3002} and all should have non-zero directory IDs");
Uuid dirId = cluster.nodes().controllerNodes().get(3000).metadataDirectoryId();
admin.removeRaftVoter(
3000,
dirId,
new RemoveRaftVoterOptions().setClusterId(Optional.of("test-cluster"))
).all().get();
TestUtils.waitForCondition(() -> {
Map<Integer, Uuid> voters = descVoterDirs(admin);
assertEquals(Set.of(3001, 3002), voters.keySet());
return voters.values().stream().noneMatch(directory -> directory.equals(Uuid.ZERO_UUID));
}, "After removing voter 3000, remaining voters should be {3001, 3002} with non-zero directory IDs");
admin.addRaftVoter(
3000,
dirId,
Set.of(new RaftVoterEndpoint("CONTROLLER", "example.com", 8080)),
new AddRaftVoterOptions().setClusterId(Optional.of("test-cluster"))
).all().get();
}
}
}
@Test
public void testRemoveAndAddVoterWithInconsistentClusterId() throws Exception {
final var nodes = new TestKitNodes.Builder()
.setClusterId("test-cluster")
.setNumBrokerNodes(1)
.setNumControllerNodes(3)
.build();
final Map<Integer, Uuid> initialVoters = new HashMap<>();
for (final var controllerNode : nodes.controllerNodes().values()) {
initialVoters.put(
controllerNode.id(),
controllerNode.metadataDirectoryId()
);
}
try (var cluster = new KafkaClusterTestKit.Builder(nodes).setInitialVoterSet(initialVoters).build()) {
cluster.format();
cluster.startup();
try (Admin admin = Admin.create(cluster.clientProperties())) {
Uuid dirId = cluster.nodes().controllerNodes().get(3000).metadataDirectoryId();
var removeFuture = admin.removeRaftVoter(
3000,
dirId,
new RemoveRaftVoterOptions().setClusterId(Optional.of("inconsistent"))
).all();
TestUtils.assertFutureThrows(InconsistentClusterIdException.class, removeFuture);
var addFuture = admin.addRaftVoter(
3000,
dirId,
Set.of(new RaftVoterEndpoint("CONTROLLER", "example.com", 8080)),
new AddRaftVoterOptions().setClusterId(Optional.of("inconsistent"))
).all();
TestUtils.assertFutureThrows(InconsistentClusterIdException.class, addFuture);
}
}
}
}

View File

@ -26,9 +26,9 @@ import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity
import org.apache.kafka.common.message.LeaveGroupResponseData.MemberResponse
import org.apache.kafka.common.message.SyncGroupRequestData.SyncGroupRequestAssignment
import org.apache.kafka.common.message.WriteTxnMarkersRequestData.{WritableTxnMarker, WritableTxnMarkerTopic}
import org.apache.kafka.common.message.{AddOffsetsToTxnRequestData, AddOffsetsToTxnResponseData, ConsumerGroupDescribeRequestData, ConsumerGroupDescribeResponseData, ConsumerGroupHeartbeatRequestData, ConsumerGroupHeartbeatResponseData, DeleteGroupsRequestData, DeleteGroupsResponseData, DescribeGroupsRequestData, DescribeGroupsResponseData, EndTxnRequestData, HeartbeatRequestData, HeartbeatResponseData, InitProducerIdRequestData, JoinGroupRequestData, JoinGroupResponseData, LeaveGroupResponseData, ListGroupsRequestData, ListGroupsResponseData, OffsetCommitRequestData, OffsetCommitResponseData, OffsetDeleteRequestData, OffsetDeleteResponseData, OffsetFetchRequestData, OffsetFetchResponseData, ShareGroupDescribeRequestData, ShareGroupDescribeResponseData, ShareGroupHeartbeatRequestData, ShareGroupHeartbeatResponseData, SyncGroupRequestData, SyncGroupResponseData, TxnOffsetCommitRequestData, TxnOffsetCommitResponseData, WriteTxnMarkersRequestData}
import org.apache.kafka.common.message.{AddOffsetsToTxnRequestData, AddOffsetsToTxnResponseData, ConsumerGroupDescribeRequestData, ConsumerGroupDescribeResponseData, ConsumerGroupHeartbeatRequestData, ConsumerGroupHeartbeatResponseData, DeleteGroupsRequestData, DeleteGroupsResponseData, DescribeGroupsRequestData, DescribeGroupsResponseData, EndTxnRequestData, HeartbeatRequestData, HeartbeatResponseData, InitProducerIdRequestData, JoinGroupRequestData, JoinGroupResponseData, LeaveGroupResponseData, ListGroupsRequestData, ListGroupsResponseData, OffsetCommitRequestData, OffsetCommitResponseData, OffsetDeleteRequestData, OffsetDeleteResponseData, OffsetFetchRequestData, OffsetFetchResponseData, ShareGroupDescribeRequestData, ShareGroupDescribeResponseData, ShareGroupHeartbeatRequestData, ShareGroupHeartbeatResponseData, StreamsGroupDescribeRequestData, StreamsGroupDescribeResponseData, StreamsGroupHeartbeatRequestData, StreamsGroupHeartbeatResponseData, SyncGroupRequestData, SyncGroupResponseData, TxnOffsetCommitRequestData, TxnOffsetCommitResponseData, WriteTxnMarkersRequestData}
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse, AddOffsetsToTxnRequest, AddOffsetsToTxnResponse, ConsumerGroupDescribeRequest, ConsumerGroupDescribeResponse, ConsumerGroupHeartbeatRequest, ConsumerGroupHeartbeatResponse, DeleteGroupsRequest, DeleteGroupsResponse, DescribeGroupsRequest, DescribeGroupsResponse, EndTxnRequest, EndTxnResponse, HeartbeatRequest, HeartbeatResponse, InitProducerIdRequest, InitProducerIdResponse, JoinGroupRequest, JoinGroupResponse, LeaveGroupRequest, LeaveGroupResponse, ListGroupsRequest, ListGroupsResponse, OffsetCommitRequest, OffsetCommitResponse, OffsetDeleteRequest, OffsetDeleteResponse, OffsetFetchRequest, OffsetFetchResponse, ShareGroupDescribeRequest, ShareGroupDescribeResponse, ShareGroupHeartbeatRequest, ShareGroupHeartbeatResponse, SyncGroupRequest, SyncGroupResponse, TxnOffsetCommitRequest, TxnOffsetCommitResponse, WriteTxnMarkersRequest, WriteTxnMarkersResponse}
import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse, AddOffsetsToTxnRequest, AddOffsetsToTxnResponse, ConsumerGroupDescribeRequest, ConsumerGroupDescribeResponse, ConsumerGroupHeartbeatRequest, ConsumerGroupHeartbeatResponse, DeleteGroupsRequest, DeleteGroupsResponse, DescribeGroupsRequest, DescribeGroupsResponse, EndTxnRequest, EndTxnResponse, HeartbeatRequest, HeartbeatResponse, InitProducerIdRequest, InitProducerIdResponse, JoinGroupRequest, JoinGroupResponse, LeaveGroupRequest, LeaveGroupResponse, ListGroupsRequest, ListGroupsResponse, OffsetCommitRequest, OffsetCommitResponse, OffsetDeleteRequest, OffsetDeleteResponse, OffsetFetchRequest, OffsetFetchResponse, ShareGroupDescribeRequest, ShareGroupDescribeResponse, ShareGroupHeartbeatRequest, ShareGroupHeartbeatResponse, StreamsGroupDescribeRequest, StreamsGroupDescribeResponse, StreamsGroupHeartbeatRequest, StreamsGroupHeartbeatResponse, SyncGroupRequest, SyncGroupResponse, TxnOffsetCommitRequest, TxnOffsetCommitResponse, WriteTxnMarkersRequest, WriteTxnMarkersResponse}
import org.apache.kafka.common.serialization.StringSerializer
import org.apache.kafka.common.test.ClusterInstance
import org.apache.kafka.common.utils.ProducerIdAndEpoch
@ -768,6 +768,21 @@ class GroupCoordinatorBaseRequestTest(cluster: ClusterInstance) {
shareGroupDescribeResponse.data.groups.asScala.toList
}
protected def streamsGroupDescribe(
groupIds: List[String],
includeAuthorizedOperations: Boolean = false,
version: Short = ApiKeys.STREAMS_GROUP_DESCRIBE.latestVersion(isUnstableApiEnabled)
): List[StreamsGroupDescribeResponseData.DescribedGroup] = {
val streamsGroupDescribeRequest = new StreamsGroupDescribeRequest.Builder(
new StreamsGroupDescribeRequestData()
.setGroupIds(groupIds.asJava)
.setIncludeAuthorizedOperations(includeAuthorizedOperations)
).build(version)
val streamsGroupDescribeResponse = connectAndReceive[StreamsGroupDescribeResponse](streamsGroupDescribeRequest)
streamsGroupDescribeResponse.data.groups.asScala.toList
}
protected def heartbeat(
groupId: String,
generationId: Int,
@ -855,6 +870,41 @@ class GroupCoordinatorBaseRequestTest(cluster: ClusterInstance) {
shareGroupHeartbeatResponse.data
}
protected def streamsGroupHeartbeat(
groupId: String,
memberId: String = "",
memberEpoch: Int = 0,
rebalanceTimeoutMs: Int = -1,
activeTasks: List[StreamsGroupHeartbeatRequestData.TaskIds] = null,
standbyTasks: List[StreamsGroupHeartbeatRequestData.TaskIds] = null,
warmupTasks: List[StreamsGroupHeartbeatRequestData.TaskIds] = null,
topology: StreamsGroupHeartbeatRequestData.Topology = null,
expectedError: Errors = Errors.NONE,
version: Short = ApiKeys.STREAMS_GROUP_HEARTBEAT.latestVersion(isUnstableApiEnabled)
): StreamsGroupHeartbeatResponseData = {
val streamsGroupHeartbeatRequest = new StreamsGroupHeartbeatRequest.Builder(
new StreamsGroupHeartbeatRequestData()
.setGroupId(groupId)
.setMemberId(memberId)
.setMemberEpoch(memberEpoch)
.setRebalanceTimeoutMs(rebalanceTimeoutMs)
.setActiveTasks(activeTasks.asJava)
.setStandbyTasks(standbyTasks.asJava)
.setWarmupTasks(warmupTasks.asJava)
.setTopology(topology)
).build(version)
// Send the request until receiving a successful response. There is a delay
// here because the group coordinator is loaded in the background.
var streamsGroupHeartbeatResponse: StreamsGroupHeartbeatResponse = null
TestUtils.waitUntilTrue(() => {
streamsGroupHeartbeatResponse = connectAndReceive[StreamsGroupHeartbeatResponse](streamsGroupHeartbeatRequest)
streamsGroupHeartbeatResponse.data.errorCode == expectedError.code
}, msg = s"Could not heartbeat successfully. Last response $streamsGroupHeartbeatResponse.")
streamsGroupHeartbeatResponse.data
}
protected def leaveGroupWithNewProtocol(
groupId: String,
memberId: String

View File

@ -0,0 +1,316 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.server
import kafka.utils.TestUtils
import org.apache.kafka.common.message.{StreamsGroupDescribeRequestData, StreamsGroupDescribeResponseData, StreamsGroupHeartbeatRequestData, StreamsGroupHeartbeatResponseData}
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.requests.{StreamsGroupDescribeRequest, StreamsGroupDescribeResponse}
import org.apache.kafka.common.resource.ResourceType
import org.apache.kafka.common.test.ClusterInstance
import org.apache.kafka.common.test.api._
import scala.jdk.CollectionConverters._
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
import org.apache.kafka.security.authorizer.AclEntry
import org.apache.kafka.server.common.Feature
import org.junit.Assert.{assertEquals, assertTrue}
import java.lang.{Byte => JByte}
@ClusterTestDefaults(
types = Array(Type.KRAFT),
brokers = 1,
serverProperties = Array(
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
)
)
class StreamsGroupDescribeRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBaseRequestTest(cluster) {
@ClusterTest(
features = Array(
new ClusterFeature(feature = Feature.STREAMS_VERSION, version = 0)
)
)
def testStreamsGroupDescribeWhenFeatureFlagNotEnabled(): Unit = {
val streamsGroupDescribeRequest = new StreamsGroupDescribeRequest.Builder(
new StreamsGroupDescribeRequestData().setGroupIds(List("grp-mock-1", "grp-mock-2").asJava)
).build(ApiKeys.STREAMS_GROUP_DESCRIBE.latestVersion(isUnstableApiEnabled))
val streamsGroupDescribeResponse = connectAndReceive[StreamsGroupDescribeResponse](streamsGroupDescribeRequest)
val expectedResponse = new StreamsGroupDescribeResponseData()
expectedResponse.groups().add(
new StreamsGroupDescribeResponseData.DescribedGroup()
.setGroupId("grp-mock-1")
.setErrorCode(Errors.UNSUPPORTED_VERSION.code)
)
expectedResponse.groups().add(
new StreamsGroupDescribeResponseData.DescribedGroup()
.setGroupId("grp-mock-2")
.setErrorCode(Errors.UNSUPPORTED_VERSION.code)
)
assertEquals(expectedResponse, streamsGroupDescribeResponse.data)
}
@ClusterTest(
serverProperties = Array(
new ClusterConfigProperty(key = GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic,consumer,streams"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
)
)
def testStreamsGroupDescribeGroupsWithNewGroupCoordinator(): Unit = {
// Creates the __consumer_offsets topics because it won't be created automatically
// in this test because it does not use FindCoordinator API.
createOffsetsTopic()
val admin = cluster.admin()
val topicName = "foo"
try {
TestUtils.createTopicWithAdminRaw(
admin = admin,
topic = topicName,
numPartitions = 3
)
TestUtils.waitUntilTrue(() => {
admin.listTopics().names().get().contains(topicName)
}, msg = s"Topic $topicName is not available to the group coordinator")
val timeoutMs = 5 * 60 * 1000
val clientId = "client-id"
val clientHost = "/127.0.0.1"
val authorizedOperationsInt = Utils.to32BitField(
AclEntry.supportedOperations(ResourceType.GROUP).asScala
.map(_.code.asInstanceOf[JByte]).asJava)
var grp1Member1Response: StreamsGroupHeartbeatResponseData = null
var grp1Member2Response: StreamsGroupHeartbeatResponseData = null
var grp2Member1Response: StreamsGroupHeartbeatResponseData = null
var grp2Member2Response: StreamsGroupHeartbeatResponseData = null
// grp-1 with 2 members
TestUtils.waitUntilTrue(() => {
grp1Member1Response = streamsGroupHeartbeat(
groupId = "grp-1",
memberId = "member-1",
rebalanceTimeoutMs = timeoutMs,
activeTasks = List.empty,
standbyTasks = List.empty,
warmupTasks = List.empty,
topology = new StreamsGroupHeartbeatRequestData.Topology()
.setEpoch(1)
.setSubtopologies(List(
new StreamsGroupHeartbeatRequestData.Subtopology()
.setSubtopologyId("subtopology-1")
.setSourceTopics(List(topicName).asJava)
.setRepartitionSinkTopics(List.empty.asJava)
.setRepartitionSourceTopics(List.empty.asJava)
.setStateChangelogTopics(List.empty.asJava)
).asJava)
)
grp1Member2Response = streamsGroupHeartbeat(
groupId = "grp-1",
memberId = "member-2",
rebalanceTimeoutMs = timeoutMs,
activeTasks = List.empty,
standbyTasks = List.empty,
warmupTasks = List.empty,
topology = new StreamsGroupHeartbeatRequestData.Topology()
.setEpoch(1)
.setSubtopologies(List(
new StreamsGroupHeartbeatRequestData.Subtopology()
.setSubtopologyId("subtopology-1")
.setSourceTopics(List(topicName).asJava)
.setRepartitionSinkTopics(List.empty.asJava)
.setRepartitionSourceTopics(List.empty.asJava)
.setStateChangelogTopics(List.empty.asJava)
).asJava)
)
val groupsDescription1 = streamsGroupDescribe(
groupIds = List("grp-1"),
includeAuthorizedOperations = true
)
grp1Member1Response.errorCode == Errors.NONE.code && grp1Member2Response.errorCode == Errors.NONE.code &&
groupsDescription1.size == 1 && groupsDescription1.head.members.size == 2
}, msg = s"Could not create grp-1 with 2 members successfully")
// grp-2 with 2 members
TestUtils.waitUntilTrue(() => {
grp2Member1Response = streamsGroupHeartbeat(
groupId = "grp-2",
memberId = "member-3",
rebalanceTimeoutMs = timeoutMs,
activeTasks = List.empty,
standbyTasks = List.empty,
warmupTasks = List.empty,
topology = new StreamsGroupHeartbeatRequestData.Topology()
.setEpoch(1)
.setSubtopologies(List(
new StreamsGroupHeartbeatRequestData.Subtopology()
.setSubtopologyId("subtopology-1")
.setSourceTopics(List(topicName).asJava)
.setRepartitionSinkTopics(List.empty.asJava)
.setRepartitionSourceTopics(List.empty.asJava)
.setStateChangelogTopics(List.empty.asJava)
).asJava)
)
grp2Member2Response = streamsGroupHeartbeat(
groupId = "grp-2",
memberId = "member-4",
rebalanceTimeoutMs = timeoutMs,
activeTasks = List.empty,
standbyTasks = List.empty,
warmupTasks = List.empty,
topology = new StreamsGroupHeartbeatRequestData.Topology()
.setEpoch(1)
.setSubtopologies(List(
new StreamsGroupHeartbeatRequestData.Subtopology()
.setSubtopologyId("subtopology-1")
.setSourceTopics(List(topicName).asJava)
.setRepartitionSinkTopics(List.empty.asJava)
.setRepartitionSourceTopics(List.empty.asJava)
.setStateChangelogTopics(List.empty.asJava)
).asJava)
)
val groupsDescription2 = streamsGroupDescribe(
groupIds = List("grp-2"),
includeAuthorizedOperations = true,
version = ApiKeys.STREAMS_GROUP_DESCRIBE.latestVersion(isUnstableApiEnabled).toShort
)
grp2Member1Response.errorCode == Errors.NONE.code && grp2Member2Response.errorCode == Errors.NONE.code &&
groupsDescription2.size == 1 && groupsDescription2.head.members.size == 2
}, msg = s"Could not create grp-2 with 2 members successfully")
// Send follow-up heartbeats until both groups are stable
TestUtils.waitUntilTrue(() => {
grp1Member1Response = streamsGroupHeartbeat(
groupId = "grp-1",
memberId = grp1Member1Response.memberId,
memberEpoch = grp1Member1Response.memberEpoch,
rebalanceTimeoutMs = timeoutMs,
activeTasks = convertTaskIds(grp1Member1Response.activeTasks),
standbyTasks = convertTaskIds(grp1Member1Response.standbyTasks),
warmupTasks = convertTaskIds(grp1Member1Response.warmupTasks),
topology = null
)
grp1Member2Response = streamsGroupHeartbeat(
groupId = "grp-1",
memberId = grp1Member2Response.memberId,
memberEpoch = grp1Member2Response.memberEpoch,
rebalanceTimeoutMs = timeoutMs,
activeTasks = convertTaskIds(grp1Member2Response.activeTasks),
standbyTasks = convertTaskIds(grp1Member2Response.standbyTasks),
warmupTasks = convertTaskIds(grp1Member2Response.warmupTasks),
topology = null
)
grp2Member1Response = streamsGroupHeartbeat(
groupId = "grp-2",
memberId = grp2Member1Response.memberId,
memberEpoch = grp2Member1Response.memberEpoch,
rebalanceTimeoutMs = timeoutMs,
activeTasks = convertTaskIds(grp2Member1Response.activeTasks),
standbyTasks = convertTaskIds(grp2Member1Response.standbyTasks),
warmupTasks = convertTaskIds(grp2Member1Response.warmupTasks),
topology = null
)
grp2Member2Response = streamsGroupHeartbeat(
groupId = "grp-2",
memberId = grp2Member2Response.memberId,
memberEpoch = grp2Member2Response.memberEpoch,
rebalanceTimeoutMs = timeoutMs,
activeTasks = convertTaskIds(grp2Member2Response.activeTasks),
standbyTasks = convertTaskIds(grp2Member2Response.standbyTasks),
warmupTasks = convertTaskIds(grp2Member2Response.warmupTasks),
topology = null
)
val actual = streamsGroupDescribe(
groupIds = List("grp-1","grp-2"),
includeAuthorizedOperations = true,
version = ApiKeys.STREAMS_GROUP_DESCRIBE.latestVersion(isUnstableApiEnabled).toShort
)
actual.head.groupState() == "Stable" && actual(1).groupState() == "Stable" &&
actual.head.members.size == 2 && actual(1).members.size == 2
}, "Two groups did not stabilize with 2 members each in time")
// Test the describe request for both groups in stable state
for (version <- ApiKeys.STREAMS_GROUP_DESCRIBE.oldestVersion() to ApiKeys.STREAMS_GROUP_DESCRIBE.latestVersion(isUnstableApiEnabled)) {
val actual = streamsGroupDescribe(
groupIds = List("grp-1","grp-2"),
includeAuthorizedOperations = true,
version = version.toShort
)
assertEquals(2, actual.size)
assertEquals(actual.map(_.groupId).toSet, Set("grp-1", "grp-2"))
for (describedGroup <- actual) {
assertEquals("Stable", describedGroup.groupState)
assertTrue("Group epoch is not equal to the assignment epoch", describedGroup.groupEpoch == describedGroup.assignmentEpoch)
// Verify topology
assertEquals(1, describedGroup.topology.epoch)
assertEquals(1, describedGroup.topology.subtopologies.size)
assertEquals("subtopology-1", describedGroup.topology.subtopologies.get(0).subtopologyId)
assertEquals(List(topicName).asJava, describedGroup.topology.subtopologies.get(0).sourceTopics)
// Verify members
assertEquals(2, describedGroup.members.size)
val expectedMemberIds = describedGroup.groupId match {
case "grp-1" => Set(grp1Member1Response.memberId, grp1Member2Response.memberId)
case "grp-2" => Set(grp2Member1Response.memberId, grp2Member2Response.memberId)
case unexpected => throw new AssertionError(s"Unexpected group ID: $unexpected")
}
val actualMemberIds = describedGroup.members.asScala.map(_.memberId).toSet
assertEquals(expectedMemberIds, actualMemberIds)
assertEquals(authorizedOperationsInt, describedGroup.authorizedOperations)
describedGroup.members.asScala.foreach { member =>
assertTrue("Group epoch is not equal to the member epoch", member.memberEpoch == describedGroup.assignmentEpoch)
assertEquals(1, member.topologyEpoch)
assertEquals(member.targetAssignment, member.assignment)
assertEquals(clientId, member.clientId())
assertEquals(clientHost, member.clientHost())
}
// Verify all partitions 0, 1, 2 are assigned exactly once
val allAssignedPartitions = describedGroup.members.asScala.flatMap { member =>
member.assignment.activeTasks.asScala.flatMap(_.partitions.asScala)
}.toList
assertEquals(List(0, 1, 2).sorted, allAssignedPartitions.sorted)
}
}
} finally{
admin.close()
}
}
private def convertTaskIds(responseTasks: java.util.List[StreamsGroupHeartbeatResponseData.TaskIds]): List[StreamsGroupHeartbeatRequestData.TaskIds] = {
if (responseTasks == null) {
List.empty
} else {
responseTasks.asScala.map { responseTask =>
new StreamsGroupHeartbeatRequestData.TaskIds()
.setSubtopologyId(responseTask.subtopologyId)
.setPartitions(responseTask.partitions)
}.toList
}
}
}

View File

@ -15,13 +15,16 @@
* limitations under the License.
*/
package kafka.server;
package org.apache.kafka.server;
import org.apache.kafka.clients.admin.AddRaftVoterOptions;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.FeatureMetadata;
import org.apache.kafka.clients.admin.QuorumInfo;
import org.apache.kafka.clients.admin.RaftVoterEndpoint;
import org.apache.kafka.clients.admin.RemoveRaftVoterOptions;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.InconsistentClusterIdException;
import org.apache.kafka.common.test.KafkaClusterTestKit;
import org.apache.kafka.common.test.TestKitNodes;
import org.apache.kafka.common.test.api.TestKitDefaults;
@ -29,10 +32,12 @@ import org.apache.kafka.raft.QuorumConfig;
import org.apache.kafka.server.common.KRaftVersion;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.TreeMap;
@ -41,6 +46,7 @@ import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
@Tag("integration")
public class ReconfigurableQuorumIntegrationTest {
static void checkKRaftVersions(Admin admin, short finalized) throws Exception {
FeatureMetadata featureMetadata = admin.describeFeatures().featureMetadata().get();
@ -70,7 +76,7 @@ public class ReconfigurableQuorumIntegrationTest {
).build()) {
cluster.format();
cluster.startup();
try (Admin admin = Admin.create(cluster.clientProperties())) {
try (var admin = Admin.create(cluster.clientProperties())) {
TestUtils.retryOnExceptionWithTimeout(30_000, () -> {
checkKRaftVersions(admin, KRaftVersion.KRAFT_VERSION_0.featureLevel());
});
@ -88,7 +94,7 @@ public class ReconfigurableQuorumIntegrationTest {
).setStandalone(true).build()) {
cluster.format();
cluster.startup();
try (Admin admin = Admin.create(cluster.clientProperties())) {
try (var admin = Admin.create(cluster.clientProperties())) {
TestUtils.retryOnExceptionWithTimeout(30_000, () -> {
checkKRaftVersions(admin, KRaftVersion.KRAFT_VERSION_1.featureLevel());
});
@ -126,7 +132,7 @@ public class ReconfigurableQuorumIntegrationTest {
) {
cluster.format();
cluster.startup();
try (Admin admin = Admin.create(cluster.clientProperties())) {
try (var admin = Admin.create(cluster.clientProperties())) {
TestUtils.retryOnExceptionWithTimeout(30_000, 10, () -> {
Map<Integer, Uuid> voters = findVoterDirs(admin);
assertEquals(Set.of(3000, 3001, 3002), voters.keySet());
@ -161,7 +167,7 @@ public class ReconfigurableQuorumIntegrationTest {
) {
cluster.format();
cluster.startup();
try (Admin admin = Admin.create(cluster.clientProperties())) {
try (var admin = Admin.create(cluster.clientProperties())) {
TestUtils.retryOnExceptionWithTimeout(30_000, 10, () -> {
Map<Integer, Uuid> voters = findVoterDirs(admin);
assertEquals(Set.of(3000, 3001, 3002, 3003), voters.keySet());
@ -200,7 +206,7 @@ public class ReconfigurableQuorumIntegrationTest {
) {
cluster.format();
cluster.startup();
try (Admin admin = Admin.create(cluster.clientProperties())) {
try (var admin = Admin.create(cluster.clientProperties())) {
TestUtils.retryOnExceptionWithTimeout(30_000, 10, () -> {
Map<Integer, Uuid> voters = findVoterDirs(admin);
assertEquals(Set.of(3000, 3001, 3002), voters.keySet());
@ -238,7 +244,7 @@ public class ReconfigurableQuorumIntegrationTest {
) {
cluster.format();
cluster.startup();
try (Admin admin = Admin.create(cluster.clientProperties())) {
try (var admin = Admin.create(cluster.clientProperties())) {
TestUtils.retryOnExceptionWithTimeout(30_000, 10, () -> {
Map<Integer, Uuid> voters = findVoterDirs(admin);
assertEquals(Set.of(3000, 3001, 3002), voters.keySet());
@ -249,4 +255,95 @@ public class ReconfigurableQuorumIntegrationTest {
}
}
}
@Test
public void testRemoveAndAddVoterWithValidClusterId() throws Exception {
final var nodes = new TestKitNodes.Builder()
.setClusterId("test-cluster")
.setNumBrokerNodes(1)
.setNumControllerNodes(3)
.build();
final Map<Integer, Uuid> initialVoters = new HashMap<>();
for (final var controllerNode : nodes.controllerNodes().values()) {
initialVoters.put(
controllerNode.id(),
controllerNode.metadataDirectoryId()
);
}
try (var cluster = new KafkaClusterTestKit.Builder(nodes).setInitialVoterSet(initialVoters).build()) {
cluster.format();
cluster.startup();
try (var admin = Admin.create(cluster.clientProperties())) {
TestUtils.retryOnExceptionWithTimeout(30_000, 10, () -> {
Map<Integer, Uuid> voters = findVoterDirs(admin);
assertEquals(Set.of(3000, 3001, 3002), voters.keySet());
for (int replicaId : new int[] {3000, 3001, 3002}) {
assertNotEquals(Uuid.ZERO_UUID, voters.get(replicaId));
}
});
Uuid dirId = cluster.nodes().controllerNodes().get(3000).metadataDirectoryId();
admin.removeRaftVoter(
3000,
dirId,
new RemoveRaftVoterOptions().setClusterId(Optional.of("test-cluster"))
).all().get();
TestUtils.retryOnExceptionWithTimeout(30_000, 10, () -> {
Map<Integer, Uuid> voters = findVoterDirs(admin);
assertEquals(Set.of(3001, 3002), voters.keySet());
for (int replicaId : new int[] {3001, 3002}) {
assertNotEquals(Uuid.ZERO_UUID, voters.get(replicaId));
}
});
admin.addRaftVoter(
3000,
dirId,
Set.of(new RaftVoterEndpoint("CONTROLLER", "example.com", 8080)),
new AddRaftVoterOptions().setClusterId(Optional.of("test-cluster"))
).all().get();
}
}
}
@Test
public void testRemoveAndAddVoterWithInconsistentClusterId() throws Exception {
final var nodes = new TestKitNodes.Builder()
.setClusterId("test-cluster")
.setNumBrokerNodes(1)
.setNumControllerNodes(3)
.build();
final Map<Integer, Uuid> initialVoters = new HashMap<>();
for (final var controllerNode : nodes.controllerNodes().values()) {
initialVoters.put(
controllerNode.id(),
controllerNode.metadataDirectoryId()
);
}
try (var cluster = new KafkaClusterTestKit.Builder(nodes).setInitialVoterSet(initialVoters).build()) {
cluster.format();
cluster.startup();
try (var admin = Admin.create(cluster.clientProperties())) {
Uuid dirId = cluster.nodes().controllerNodes().get(3000).metadataDirectoryId();
var removeFuture = admin.removeRaftVoter(
3000,
dirId,
new RemoveRaftVoterOptions().setClusterId(Optional.of("inconsistent"))
).all();
TestUtils.assertFutureThrows(InconsistentClusterIdException.class, removeFuture);
var addFuture = admin.addRaftVoter(
3000,
dirId,
Set.of(new RaftVoterEndpoint("CONTROLLER", "example.com", 8080)),
new AddRaftVoterOptions().setClusterId(Optional.of("inconsistent"))
).all();
TestUtils.assertFutureThrows(InconsistentClusterIdException.class, addFuture);
}
}
}
}

View File

@ -115,7 +115,7 @@ public class ConsumerPerformance {
}
protected static void printHeader(boolean showDetailedStats) {
String newFieldsInHeader = ", rebalance.time.ms, fetch.time.ms, fetch.MB.sec, fetch.nMsg.sec";
String newFieldsInHeader = ", stop.the.world.ms, fetch.time.ms, fetch.MB.sec, fetch.nMsg.sec";
if (!showDetailedStats)
System.out.printf("start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec%s%n", newFieldsInHeader);
else