Compare commits

...

5 Commits

Author SHA1 Message Date
Fan Yang 08a4dd91cd
Merge 399f5bc19e into 4a5aa37169 2025-10-07 14:33:27 -07:00
Chang-Chi Hsu 4a5aa37169
MINOR: Move ReconfigurableQuorumIntegrationTest from core module to server module (#20636)
CI / build (push) Waiting to run Details
It moves the `ReconfigurableQuorumIntegrationTest` class to the
`org.apache.kafka.server` package and consolidates two related tests,
`RemoveAndAddVoterWithValidClusterId` and
`RemoveAndAddVoterWithInconsistentClusterId`, into a single file. This
improves code organization and reduces redundancy.

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
2025-10-08 01:10:58 +08:00
Fan Yang 399f5bc19e Add UT 2025-09-25 09:53:14 +08:00
Fan Yang dd8beb4bbf Add UT 2025-09-22 09:59:41 +08:00
Fan Yang fe4f3e4508 KAFKA-19697: Fix unregistering connectorStatusMetrics NPE if tasks fail to start 2025-09-20 18:23:28 +08:00
4 changed files with 139 additions and 140 deletions

View File

@ -1,132 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.admin;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.InconsistentClusterIdException;
import org.apache.kafka.common.test.KafkaClusterTestKit;
import org.apache.kafka.common.test.TestKitNodes;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import static org.junit.jupiter.api.Assertions.assertEquals;
@Tag("integration")
public class ReconfigurableQuorumIntegrationTest {
static Map<Integer, Uuid> descVoterDirs(Admin admin) throws ExecutionException, InterruptedException {
var quorumInfo = admin.describeMetadataQuorum().quorumInfo().get();
return quorumInfo.voters().stream().collect(Collectors.toMap(QuorumInfo.ReplicaState::replicaId, QuorumInfo.ReplicaState::replicaDirectoryId));
}
@Test
public void testRemoveAndAddVoterWithValidClusterId() throws Exception {
final var nodes = new TestKitNodes.Builder()
.setClusterId("test-cluster")
.setNumBrokerNodes(1)
.setNumControllerNodes(3)
.build();
final Map<Integer, Uuid> initialVoters = new HashMap<>();
for (final var controllerNode : nodes.controllerNodes().values()) {
initialVoters.put(
controllerNode.id(),
controllerNode.metadataDirectoryId()
);
}
try (var cluster = new KafkaClusterTestKit.Builder(nodes).setInitialVoterSet(initialVoters).build()) {
cluster.format();
cluster.startup();
try (Admin admin = Admin.create(cluster.clientProperties())) {
TestUtils.waitForCondition(() -> {
Map<Integer, Uuid> voters = descVoterDirs(admin);
assertEquals(Set.of(3000, 3001, 3002), voters.keySet());
return voters.values().stream().noneMatch(directory -> directory.equals(Uuid.ZERO_UUID));
}, "Initial quorum voters should be {3000, 3001, 3002} and all should have non-zero directory IDs");
Uuid dirId = cluster.nodes().controllerNodes().get(3000).metadataDirectoryId();
admin.removeRaftVoter(
3000,
dirId,
new RemoveRaftVoterOptions().setClusterId(Optional.of("test-cluster"))
).all().get();
TestUtils.waitForCondition(() -> {
Map<Integer, Uuid> voters = descVoterDirs(admin);
assertEquals(Set.of(3001, 3002), voters.keySet());
return voters.values().stream().noneMatch(directory -> directory.equals(Uuid.ZERO_UUID));
}, "After removing voter 3000, remaining voters should be {3001, 3002} with non-zero directory IDs");
admin.addRaftVoter(
3000,
dirId,
Set.of(new RaftVoterEndpoint("CONTROLLER", "example.com", 8080)),
new AddRaftVoterOptions().setClusterId(Optional.of("test-cluster"))
).all().get();
}
}
}
@Test
public void testRemoveAndAddVoterWithInconsistentClusterId() throws Exception {
final var nodes = new TestKitNodes.Builder()
.setClusterId("test-cluster")
.setNumBrokerNodes(1)
.setNumControllerNodes(3)
.build();
final Map<Integer, Uuid> initialVoters = new HashMap<>();
for (final var controllerNode : nodes.controllerNodes().values()) {
initialVoters.put(
controllerNode.id(),
controllerNode.metadataDirectoryId()
);
}
try (var cluster = new KafkaClusterTestKit.Builder(nodes).setInitialVoterSet(initialVoters).build()) {
cluster.format();
cluster.startup();
try (Admin admin = Admin.create(cluster.clientProperties())) {
Uuid dirId = cluster.nodes().controllerNodes().get(3000).metadataDirectoryId();
var removeFuture = admin.removeRaftVoter(
3000,
dirId,
new RemoveRaftVoterOptions().setClusterId(Optional.of("inconsistent"))
).all();
TestUtils.assertFutureThrows(InconsistentClusterIdException.class, removeFuture);
var addFuture = admin.addRaftVoter(
3000,
dirId,
Set.of(new RaftVoterEndpoint("CONTROLLER", "example.com", 8080)),
new AddRaftVoterOptions().setClusterId(Optional.of("inconsistent"))
).all();
TestUtils.assertFutureThrows(InconsistentClusterIdException.class, addFuture);
}
}
}
}

View File

@ -2394,6 +2394,10 @@ public final class Worker {
} }
protected synchronized void recordTaskRemoved(ConnectorTaskId connectorTaskId) { protected synchronized void recordTaskRemoved(ConnectorTaskId connectorTaskId) {
if (!connectorStatusMetrics.containsKey(connectorTaskId.connector())) {
return;
}
// Unregister connector task count metric if we remove the last task of the connector // Unregister connector task count metric if we remove the last task of the connector
if (tasks.keySet().stream().noneMatch(id -> id.connector().equals(connectorTaskId.connector()))) { if (tasks.keySet().stream().noneMatch(id -> id.connector().equals(connectorTaskId.connector()))) {
connectorStatusMetrics.get(connectorTaskId.connector()).close(); connectorStatusMetrics.get(connectorTaskId.connector()).close();

View File

@ -146,6 +146,7 @@ import static org.apache.kafka.connect.runtime.distributed.DistributedConfig.GRO
import static org.apache.kafka.connect.runtime.distributed.DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG; import static org.apache.kafka.connect.runtime.distributed.DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG;
import static org.apache.kafka.connect.runtime.distributed.DistributedConfig.STATUS_STORAGE_TOPIC_CONFIG; import static org.apache.kafka.connect.runtime.distributed.DistributedConfig.STATUS_STORAGE_TOPIC_CONFIG;
import static org.apache.kafka.connect.sink.SinkTask.TOPICS_CONFIG; import static org.apache.kafka.connect.sink.SinkTask.TOPICS_CONFIG;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertInstanceOf; import static org.junit.jupiter.api.Assertions.assertInstanceOf;
@ -898,6 +899,35 @@ public class WorkerTest {
verifyKafkaClusterId(); verifyKafkaClusterId();
} }
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testConnectorStatusMetricsGroup_tasksFailedToStart(boolean enableTopicCreation) {
setup(enableTopicCreation);
mockKafkaClusterId();
mockInternalConverters();
mockFileConfigProvider();
worker = new Worker(WORKER_ID,
new MockTime(),
plugins,
config,
offsetBackingStore,
noneConnectorClientConfigOverridePolicy);
worker.herder = herder;
// Pass an empty tasks map to simulate all tasks failing to start
Worker.ConnectorStatusMetricsGroup metricsGroup = new Worker.ConnectorStatusMetricsGroup(
worker.metrics(), new ConcurrentHashMap<>(), herder
);
ConnectorTaskId taskId1 = new ConnectorTaskId("c1", 0);
ConnectorTaskId taskId2 = new ConnectorTaskId("c1", 1);
metricsGroup.recordTaskAdded(taskId1);
metricsGroup.recordTaskAdded(taskId2);
metricsGroup.recordTaskRemoved(taskId1);
assertDoesNotThrow(() -> metricsGroup.recordTaskRemoved(taskId2), "should not throw NPE");
}
@ParameterizedTest @ParameterizedTest
@ValueSource(booleans = {true, false}) @ValueSource(booleans = {true, false})
public void testStartTaskFailure(boolean enableTopicCreation) { public void testStartTaskFailure(boolean enableTopicCreation) {

View File

@ -15,13 +15,16 @@
* limitations under the License. * limitations under the License.
*/ */
package kafka.server; package org.apache.kafka.server;
import org.apache.kafka.clients.admin.AddRaftVoterOptions;
import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.FeatureMetadata; import org.apache.kafka.clients.admin.FeatureMetadata;
import org.apache.kafka.clients.admin.QuorumInfo; import org.apache.kafka.clients.admin.QuorumInfo;
import org.apache.kafka.clients.admin.RaftVoterEndpoint; import org.apache.kafka.clients.admin.RaftVoterEndpoint;
import org.apache.kafka.clients.admin.RemoveRaftVoterOptions;
import org.apache.kafka.common.Uuid; import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.InconsistentClusterIdException;
import org.apache.kafka.common.test.KafkaClusterTestKit; import org.apache.kafka.common.test.KafkaClusterTestKit;
import org.apache.kafka.common.test.TestKitNodes; import org.apache.kafka.common.test.TestKitNodes;
import org.apache.kafka.common.test.api.TestKitDefaults; import org.apache.kafka.common.test.api.TestKitDefaults;
@ -29,10 +32,12 @@ import org.apache.kafka.raft.QuorumConfig;
import org.apache.kafka.server.common.KRaftVersion; import org.apache.kafka.server.common.KRaftVersion;
import org.apache.kafka.test.TestUtils; import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.Optional;
import java.util.Set; import java.util.Set;
import java.util.TreeMap; import java.util.TreeMap;
@ -41,6 +46,7 @@ import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.assertTrue;
@Tag("integration")
public class ReconfigurableQuorumIntegrationTest { public class ReconfigurableQuorumIntegrationTest {
static void checkKRaftVersions(Admin admin, short finalized) throws Exception { static void checkKRaftVersions(Admin admin, short finalized) throws Exception {
FeatureMetadata featureMetadata = admin.describeFeatures().featureMetadata().get(); FeatureMetadata featureMetadata = admin.describeFeatures().featureMetadata().get();
@ -70,7 +76,7 @@ public class ReconfigurableQuorumIntegrationTest {
).build()) { ).build()) {
cluster.format(); cluster.format();
cluster.startup(); cluster.startup();
try (Admin admin = Admin.create(cluster.clientProperties())) { try (var admin = Admin.create(cluster.clientProperties())) {
TestUtils.retryOnExceptionWithTimeout(30_000, () -> { TestUtils.retryOnExceptionWithTimeout(30_000, () -> {
checkKRaftVersions(admin, KRaftVersion.KRAFT_VERSION_0.featureLevel()); checkKRaftVersions(admin, KRaftVersion.KRAFT_VERSION_0.featureLevel());
}); });
@ -88,7 +94,7 @@ public class ReconfigurableQuorumIntegrationTest {
).setStandalone(true).build()) { ).setStandalone(true).build()) {
cluster.format(); cluster.format();
cluster.startup(); cluster.startup();
try (Admin admin = Admin.create(cluster.clientProperties())) { try (var admin = Admin.create(cluster.clientProperties())) {
TestUtils.retryOnExceptionWithTimeout(30_000, () -> { TestUtils.retryOnExceptionWithTimeout(30_000, () -> {
checkKRaftVersions(admin, KRaftVersion.KRAFT_VERSION_1.featureLevel()); checkKRaftVersions(admin, KRaftVersion.KRAFT_VERSION_1.featureLevel());
}); });
@ -126,7 +132,7 @@ public class ReconfigurableQuorumIntegrationTest {
) { ) {
cluster.format(); cluster.format();
cluster.startup(); cluster.startup();
try (Admin admin = Admin.create(cluster.clientProperties())) { try (var admin = Admin.create(cluster.clientProperties())) {
TestUtils.retryOnExceptionWithTimeout(30_000, 10, () -> { TestUtils.retryOnExceptionWithTimeout(30_000, 10, () -> {
Map<Integer, Uuid> voters = findVoterDirs(admin); Map<Integer, Uuid> voters = findVoterDirs(admin);
assertEquals(Set.of(3000, 3001, 3002), voters.keySet()); assertEquals(Set.of(3000, 3001, 3002), voters.keySet());
@ -161,7 +167,7 @@ public class ReconfigurableQuorumIntegrationTest {
) { ) {
cluster.format(); cluster.format();
cluster.startup(); cluster.startup();
try (Admin admin = Admin.create(cluster.clientProperties())) { try (var admin = Admin.create(cluster.clientProperties())) {
TestUtils.retryOnExceptionWithTimeout(30_000, 10, () -> { TestUtils.retryOnExceptionWithTimeout(30_000, 10, () -> {
Map<Integer, Uuid> voters = findVoterDirs(admin); Map<Integer, Uuid> voters = findVoterDirs(admin);
assertEquals(Set.of(3000, 3001, 3002, 3003), voters.keySet()); assertEquals(Set.of(3000, 3001, 3002, 3003), voters.keySet());
@ -200,7 +206,7 @@ public class ReconfigurableQuorumIntegrationTest {
) { ) {
cluster.format(); cluster.format();
cluster.startup(); cluster.startup();
try (Admin admin = Admin.create(cluster.clientProperties())) { try (var admin = Admin.create(cluster.clientProperties())) {
TestUtils.retryOnExceptionWithTimeout(30_000, 10, () -> { TestUtils.retryOnExceptionWithTimeout(30_000, 10, () -> {
Map<Integer, Uuid> voters = findVoterDirs(admin); Map<Integer, Uuid> voters = findVoterDirs(admin);
assertEquals(Set.of(3000, 3001, 3002), voters.keySet()); assertEquals(Set.of(3000, 3001, 3002), voters.keySet());
@ -238,7 +244,7 @@ public class ReconfigurableQuorumIntegrationTest {
) { ) {
cluster.format(); cluster.format();
cluster.startup(); cluster.startup();
try (Admin admin = Admin.create(cluster.clientProperties())) { try (var admin = Admin.create(cluster.clientProperties())) {
TestUtils.retryOnExceptionWithTimeout(30_000, 10, () -> { TestUtils.retryOnExceptionWithTimeout(30_000, 10, () -> {
Map<Integer, Uuid> voters = findVoterDirs(admin); Map<Integer, Uuid> voters = findVoterDirs(admin);
assertEquals(Set.of(3000, 3001, 3002), voters.keySet()); assertEquals(Set.of(3000, 3001, 3002), voters.keySet());
@ -249,4 +255,95 @@ public class ReconfigurableQuorumIntegrationTest {
} }
} }
} }
@Test
public void testRemoveAndAddVoterWithValidClusterId() throws Exception {
final var nodes = new TestKitNodes.Builder()
.setClusterId("test-cluster")
.setNumBrokerNodes(1)
.setNumControllerNodes(3)
.build();
final Map<Integer, Uuid> initialVoters = new HashMap<>();
for (final var controllerNode : nodes.controllerNodes().values()) {
initialVoters.put(
controllerNode.id(),
controllerNode.metadataDirectoryId()
);
}
try (var cluster = new KafkaClusterTestKit.Builder(nodes).setInitialVoterSet(initialVoters).build()) {
cluster.format();
cluster.startup();
try (var admin = Admin.create(cluster.clientProperties())) {
TestUtils.retryOnExceptionWithTimeout(30_000, 10, () -> {
Map<Integer, Uuid> voters = findVoterDirs(admin);
assertEquals(Set.of(3000, 3001, 3002), voters.keySet());
for (int replicaId : new int[] {3000, 3001, 3002}) {
assertNotEquals(Uuid.ZERO_UUID, voters.get(replicaId));
}
});
Uuid dirId = cluster.nodes().controllerNodes().get(3000).metadataDirectoryId();
admin.removeRaftVoter(
3000,
dirId,
new RemoveRaftVoterOptions().setClusterId(Optional.of("test-cluster"))
).all().get();
TestUtils.retryOnExceptionWithTimeout(30_000, 10, () -> {
Map<Integer, Uuid> voters = findVoterDirs(admin);
assertEquals(Set.of(3001, 3002), voters.keySet());
for (int replicaId : new int[] {3001, 3002}) {
assertNotEquals(Uuid.ZERO_UUID, voters.get(replicaId));
}
});
admin.addRaftVoter(
3000,
dirId,
Set.of(new RaftVoterEndpoint("CONTROLLER", "example.com", 8080)),
new AddRaftVoterOptions().setClusterId(Optional.of("test-cluster"))
).all().get();
}
}
}
@Test
public void testRemoveAndAddVoterWithInconsistentClusterId() throws Exception {
final var nodes = new TestKitNodes.Builder()
.setClusterId("test-cluster")
.setNumBrokerNodes(1)
.setNumControllerNodes(3)
.build();
final Map<Integer, Uuid> initialVoters = new HashMap<>();
for (final var controllerNode : nodes.controllerNodes().values()) {
initialVoters.put(
controllerNode.id(),
controllerNode.metadataDirectoryId()
);
}
try (var cluster = new KafkaClusterTestKit.Builder(nodes).setInitialVoterSet(initialVoters).build()) {
cluster.format();
cluster.startup();
try (var admin = Admin.create(cluster.clientProperties())) {
Uuid dirId = cluster.nodes().controllerNodes().get(3000).metadataDirectoryId();
var removeFuture = admin.removeRaftVoter(
3000,
dirId,
new RemoveRaftVoterOptions().setClusterId(Optional.of("inconsistent"))
).all();
TestUtils.assertFutureThrows(InconsistentClusterIdException.class, removeFuture);
var addFuture = admin.addRaftVoter(
3000,
dirId,
Set.of(new RaftVoterEndpoint("CONTROLLER", "example.com", 8080)),
new AddRaftVoterOptions().setClusterId(Optional.of("inconsistent"))
).all();
TestUtils.assertFutureThrows(InconsistentClusterIdException.class, addFuture);
}
}
}
} }