KAFKA-14934 KafkaClusterTestKit makes FaultHandler accessible (#17774)

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
TengYao Chi 2024-11-18 18:23:54 +08:00 committed by GitHub
parent 381fbc1359
commit 53d8316b5d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 80 additions and 65 deletions

View File

@ -34,7 +34,6 @@ import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.test.api.ClusterInstance;
import org.apache.kafka.common.test.api.ClusterTest;
import org.apache.kafka.common.test.api.ClusterTestExtensions;
import org.apache.kafka.common.test.api.RaftClusterInvocationContext;
import org.apache.kafka.common.test.api.Type;
import org.apache.kafka.storage.internals.checkpoint.PartitionMetadataFile;
import org.apache.kafka.test.TestUtils;
@ -52,7 +51,6 @@ import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
@ -67,8 +65,6 @@ public class LogManagerIntegrationTest {
@ClusterTest(types = {Type.KRAFT, Type.CO_KRAFT}, brokers = 3)
public void testRestartBrokerNoErrorIfMissingPartitionMetadata() throws IOException, ExecutionException, InterruptedException {
RaftClusterInvocationContext.RaftClusterInstance raftInstance =
(RaftClusterInvocationContext.RaftClusterInstance) cluster;
try (Admin admin = cluster.admin()) {
admin.createTopics(Collections.singletonList(new NewTopic("foo", 1, (short) 3))).all().get();
@ -76,12 +72,12 @@ public class LogManagerIntegrationTest {
cluster.waitForTopic("foo", 1);
Optional<PartitionMetadataFile> partitionMetadataFile = Optional.ofNullable(
raftInstance.brokers().get(0).logManager()
cluster.brokers().get(0).logManager()
.getLog(new TopicPartition("foo", 0), false).get()
.partitionMetadataFile().getOrElse(null));
assertTrue(partitionMetadataFile.isPresent());
raftInstance.brokers().get(0).shutdown();
cluster.brokers().get(0).shutdown();
try (Admin admin = cluster.admin()) {
TestUtils.waitForCondition(() -> {
List<TopicPartitionInfo> partitionInfos = admin.describeTopics(Collections.singletonList("foo"))
@ -93,9 +89,9 @@ public class LogManagerIntegrationTest {
// delete partition.metadata file here to simulate the scenario that partition.metadata not flush to disk yet
partitionMetadataFile.get().delete();
assertFalse(partitionMetadataFile.get().exists());
raftInstance.brokers().get(0).startup();
cluster.brokers().get(0).startup();
// make sure there is no error during load logs
assertDoesNotThrow(() -> raftInstance.getUnderlying().fatalFaultHandler().maybeRethrowFirstException());
assertTrue(cluster.firstFatalException().isEmpty());
try (Admin admin = cluster.admin()) {
TestUtils.waitForCondition(() -> {
List<TopicPartitionInfo> partitionInfos = admin.describeTopics(Collections.singletonList("foo"))

View File

@ -21,7 +21,6 @@ import kafka.server.{BrokerServer, ControllerServer, IntegrationTestUtils}
import org.apache.kafka.common.test.api.ClusterInstance
import org.apache.kafka.common.test.api.{ClusterTest, ClusterTestDefaults, Type}
import org.apache.kafka.common.test.api.ClusterTestExtensions
import org.apache.kafka.common.test.api.RaftClusterInvocationContext.RaftClusterInstance
import org.apache.kafka.common.message.AllocateProducerIdsRequestData
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests._
@ -35,11 +34,10 @@ class AllocateProducerIdsRequestTest(cluster: ClusterInstance) {
@ClusterTest
def testAllocateProducersIdSentToController(): Unit = {
val raftCluster = cluster.asInstanceOf[RaftClusterInstance]
val sourceBroker = raftCluster.brokers.values().stream().findFirst().get().asInstanceOf[BrokerServer]
val sourceBroker = cluster.brokers.values().stream().findFirst().get().asInstanceOf[BrokerServer]
val controllerId = sourceBroker.raftManager.leaderAndEpoch.leaderId().getAsInt
val controllerServer = raftCluster.controllers.values().stream()
val controllerServer = cluster.controllers.values().stream()
.filter(_.config.nodeId == controllerId)
.findFirst()
.get()
@ -52,11 +50,10 @@ class AllocateProducerIdsRequestTest(cluster: ClusterInstance) {
@ClusterTest(controllers = 3)
def testAllocateProducersIdSentToNonController(): Unit = {
val raftCluster = cluster.asInstanceOf[RaftClusterInstance]
val sourceBroker = raftCluster.brokers.values().stream().findFirst().get().asInstanceOf[BrokerServer]
val sourceBroker = cluster.brokers.values().stream().findFirst().get().asInstanceOf[BrokerServer]
val controllerId = sourceBroker.raftManager.leaderAndEpoch.leaderId().getAsInt
val controllerServer = raftCluster.controllers().values().stream()
val controllerServer = cluster.controllers().values().stream()
.filter(_.config.nodeId != controllerId)
.findFirst()
.get()

View File

@ -17,7 +17,6 @@
package kafka.server
import org.apache.kafka.common.test.api.{ClusterConfigProperty, ClusterFeature, ClusterInstance, ClusterTest, ClusterTestDefaults, ClusterTestExtensions, Type}
import org.apache.kafka.common.test.api.RaftClusterInvocationContext.RaftClusterInstance
import kafka.utils.TestUtils
import org.apache.kafka.clients.admin.AlterConfigOp.OpType
import org.apache.kafka.clients.admin.{AlterConfigOp, ConfigEntry}
@ -78,7 +77,6 @@ class ConsumerGroupHeartbeatRequestTest(cluster: ClusterInstance) {
@ClusterTest
def testConsumerGroupHeartbeatIsAccessibleWhenNewGroupCoordinatorIsEnabled(): Unit = {
val raftCluster = cluster.asInstanceOf[RaftClusterInstance]
val admin = cluster.admin()
// Creates the __consumer_offsets topics because it won't be created automatically
@ -86,8 +84,8 @@ class ConsumerGroupHeartbeatRequestTest(cluster: ClusterInstance) {
try {
TestUtils.createOffsetsTopicWithAdmin(
admin = admin,
brokers = raftCluster.brokers.values().asScala.toSeq,
controllers = raftCluster.controllers().values().asScala.toSeq
brokers = cluster.brokers.values().asScala.toSeq,
controllers = cluster.controllers().values().asScala.toSeq
)
// Heartbeat request to join the group. Note that the member subscribes
@ -170,7 +168,6 @@ class ConsumerGroupHeartbeatRequestTest(cluster: ClusterInstance) {
@ClusterTest
def testConsumerGroupHeartbeatWithRegularExpression(): Unit = {
val raftCluster = cluster.asInstanceOf[RaftClusterInstance]
val admin = cluster.admin()
// Creates the __consumer_offsets topics because it won't be created automatically
@ -178,8 +175,8 @@ class ConsumerGroupHeartbeatRequestTest(cluster: ClusterInstance) {
try {
TestUtils.createOffsetsTopicWithAdmin(
admin = admin,
brokers = raftCluster.brokers.values().asScala.toSeq,
controllers = raftCluster.controllers().values().asScala.toSeq
brokers = cluster.brokers.values().asScala.toSeq,
controllers = cluster.controllers().values().asScala.toSeq
)
// Heartbeat request to join the group. Note that the member subscribes
@ -214,7 +211,6 @@ class ConsumerGroupHeartbeatRequestTest(cluster: ClusterInstance) {
@ClusterTest
def testConsumerGroupHeartbeatWithInvalidRegularExpression(): Unit = {
val raftCluster = cluster.asInstanceOf[RaftClusterInstance]
val admin = cluster.admin()
// Creates the __consumer_offsets topics because it won't be created automatically
@ -222,8 +218,8 @@ class ConsumerGroupHeartbeatRequestTest(cluster: ClusterInstance) {
try {
TestUtils.createOffsetsTopicWithAdmin(
admin = admin,
brokers = raftCluster.brokers.values().asScala.toSeq,
controllers = raftCluster.controllers().values().asScala.toSeq
brokers = cluster.brokers.values().asScala.toSeq,
controllers = cluster.controllers().values().asScala.toSeq
)
// Heartbeat request to join the group. Note that the member subscribes
@ -256,7 +252,6 @@ class ConsumerGroupHeartbeatRequestTest(cluster: ClusterInstance) {
@ClusterTest
def testRejoiningStaticMemberGetsAssignmentsBackWhenNewGroupCoordinatorIsEnabled(): Unit = {
val raftCluster = cluster.asInstanceOf[RaftClusterInstance]
val admin = cluster.admin()
try {
val instanceId = "instanceId"
@ -265,8 +260,8 @@ class ConsumerGroupHeartbeatRequestTest(cluster: ClusterInstance) {
// in this test because it does not use FindCoordinator API.
TestUtils.createOffsetsTopicWithAdmin(
admin = admin,
brokers = raftCluster.brokers.values().asScala.toSeq,
controllers = raftCluster.controllers().values().asScala.toSeq
brokers = cluster.brokers.values().asScala.toSeq,
controllers = cluster.controllers().values().asScala.toSeq
)
// Heartbeat request so that a static member joins the group
@ -381,7 +376,6 @@ class ConsumerGroupHeartbeatRequestTest(cluster: ClusterInstance) {
)
)
def testStaticMemberRemovedAfterSessionTimeoutExpiryWhenNewGroupCoordinatorIsEnabled(): Unit = {
val raftCluster = cluster.asInstanceOf[RaftClusterInstance]
val admin = cluster.admin()
try {
val instanceId = "instanceId"
@ -390,8 +384,8 @@ class ConsumerGroupHeartbeatRequestTest(cluster: ClusterInstance) {
// in this test because it does not use FindCoordinator API.
TestUtils.createOffsetsTopicWithAdmin(
admin = admin,
brokers = raftCluster.brokers.values().asScala.toSeq,
controllers = raftCluster.controllers().values().asScala.toSeq
brokers = cluster.brokers.values().asScala.toSeq,
controllers = cluster.controllers().values().asScala.toSeq
)
// Heartbeat request to join the group. Note that the member subscribes
@ -496,7 +490,6 @@ class ConsumerGroupHeartbeatRequestTest(cluster: ClusterInstance) {
)
)
def testUpdateConsumerGroupHeartbeatConfigSuccessful(): Unit = {
val raftCluster = cluster.asInstanceOf[RaftClusterInstance]
val admin = cluster.admin()
try {
val newHeartbeatIntervalMs = 10000
@ -507,8 +500,8 @@ class ConsumerGroupHeartbeatRequestTest(cluster: ClusterInstance) {
// in this test because it does not use FindCoordinator API.
TestUtils.createOffsetsTopicWithAdmin(
admin = admin,
brokers = raftCluster.brokers.values().asScala.toSeq,
controllers = raftCluster.controllers().values().asScala.toSeq
brokers = cluster.brokers.values().asScala.toSeq,
controllers = cluster.controllers().values().asScala.toSeq
)
// Heartbeat request to join the group. Note that the member subscribes
@ -569,7 +562,6 @@ class ConsumerGroupHeartbeatRequestTest(cluster: ClusterInstance) {
@ClusterTest
def testConsumerGroupHeartbeatFailureIfMemberIdMissingForVersionsAbove0(): Unit = {
val raftCluster = cluster.asInstanceOf[RaftClusterInstance]
val admin = cluster.admin()
// Creates the __consumer_offsets topics because it won't be created automatically
@ -577,8 +569,8 @@ class ConsumerGroupHeartbeatRequestTest(cluster: ClusterInstance) {
try {
TestUtils.createOffsetsTopicWithAdmin(
admin = admin,
brokers = raftCluster.brokers.values().asScala.toSeq,
controllers = raftCluster.controllers().values().asScala.toSeq
brokers = cluster.brokers.values().asScala.toSeq,
controllers = cluster.controllers().values().asScala.toSeq
)
val consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequest.Builder(
@ -603,15 +595,14 @@ class ConsumerGroupHeartbeatRequestTest(cluster: ClusterInstance) {
@ClusterTest
def testMemberIdGeneratedOnServerWhenApiVersionIs0(): Unit = {
val raftCluster = cluster.asInstanceOf[RaftClusterInstance]
val admin = cluster.admin()
// Creates the __consumer_offsets topics because it won't be created automatically
// in this test because it does not use FindCoordinator API.
TestUtils.createOffsetsTopicWithAdmin(
admin = admin,
brokers = raftCluster.brokers.values().asScala.toSeq,
controllers = raftCluster.controllers().values().asScala.toSeq
brokers = cluster.brokers.values().asScala.toSeq,
controllers = cluster.controllers().values().asScala.toSeq
)
val consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequest.Builder(

View File

@ -17,7 +17,6 @@
package kafka.server
import org.apache.kafka.common.test.api.{ClusterConfigProperty, ClusterInstance, ClusterTest, ClusterTestDefaults, ClusterTestExtensions, Type}
import org.apache.kafka.common.test.api.RaftClusterInvocationContext.RaftClusterInstance
import kafka.utils.TestUtils
import kafka.utils.TestUtils.waitForAllPartitionsMetadata
import org.apache.kafka.clients.admin.{Admin, NewPartitions}
@ -61,7 +60,6 @@ class ShareGroupHeartbeatRequestTest(cluster: ClusterInstance) {
new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1")
))
def testShareGroupHeartbeatIsAccessibleWhenShareGroupIsEnabled(): Unit = {
val raftCluster = cluster.asInstanceOf[RaftClusterInstance]
val admin = cluster.admin()
// Creates the __consumer_offsets topics because it won't be created automatically
@ -69,8 +67,8 @@ class ShareGroupHeartbeatRequestTest(cluster: ClusterInstance) {
try {
TestUtils.createOffsetsTopicWithAdmin(
admin = admin,
brokers = raftCluster.brokers.values().asScala.toSeq,
controllers = raftCluster.controllers().values().asScala.toSeq
brokers = cluster.brokers.values().asScala.toSeq,
controllers = cluster.controllers().values().asScala.toSeq
)
// Heartbeat request to join the group. Note that the member subscribes
@ -158,7 +156,6 @@ class ShareGroupHeartbeatRequestTest(cluster: ClusterInstance) {
new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1")
))
def testShareGroupHeartbeatWithMultipleMembers(): Unit = {
val raftCluster = cluster.asInstanceOf[RaftClusterInstance]
val admin = cluster.admin()
// Creates the __consumer_offsets topics because it won't be created automatically
@ -166,8 +163,8 @@ class ShareGroupHeartbeatRequestTest(cluster: ClusterInstance) {
try {
TestUtils.createOffsetsTopicWithAdmin(
admin = admin,
brokers = raftCluster.brokers.values().asScala.toSeq,
controllers = raftCluster.controllers().values().asScala.toSeq
brokers = cluster.brokers.values().asScala.toSeq,
controllers = cluster.controllers().values().asScala.toSeq
)
// Heartbeat request to join the group. Note that the member subscribes
@ -306,7 +303,6 @@ class ShareGroupHeartbeatRequestTest(cluster: ClusterInstance) {
new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1")
))
def testMemberLeavingAndRejoining(): Unit = {
val raftCluster = cluster.asInstanceOf[RaftClusterInstance]
val admin = cluster.admin()
// Creates the __consumer_offsets topics because it won't be created automatically
@ -314,8 +310,8 @@ class ShareGroupHeartbeatRequestTest(cluster: ClusterInstance) {
try {
TestUtils.createOffsetsTopicWithAdmin(
admin = admin,
brokers = raftCluster.brokers.values().asScala.toSeq,
controllers = raftCluster.controllers().values().asScala.toSeq
brokers = cluster.brokers.values().asScala.toSeq,
controllers = cluster.controllers().values().asScala.toSeq
)
// Heartbeat request to join the group. Note that the member subscribes
@ -423,15 +419,14 @@ class ShareGroupHeartbeatRequestTest(cluster: ClusterInstance) {
new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1")
))
def testPartitionAssignmentWithChangingTopics(): Unit = {
val raftCluster = cluster.asInstanceOf[RaftClusterInstance]
val admin = cluster.admin()
// Creates the __consumer_offsets topics because it won't be created automatically
// in this test because it does not use FindCoordinator API.
try {
TestUtils.createOffsetsTopicWithAdmin(
admin = admin,
brokers = raftCluster.brokers.values().asScala.toSeq,
controllers = raftCluster.controllers().values().asScala.toSeq
brokers = cluster.brokers.values().asScala.toSeq,
controllers = cluster.controllers().values().asScala.toSeq
)
// Heartbeat request to join the group. Note that the member subscribes
// to a nonexistent topic.
@ -569,8 +564,8 @@ class ShareGroupHeartbeatRequestTest(cluster: ClusterInstance) {
TestUtils.deleteTopicWithAdmin(
admin = admin,
topic = "foo",
brokers = raftCluster.brokers.values().asScala.toSeq,
controllers = raftCluster.controllers().values().asScala.toSeq
brokers = cluster.brokers.values().asScala.toSeq,
controllers = cluster.controllers().values().asScala.toSeq
)
expectedAssignment = new ShareGroupHeartbeatResponseData.Assignment()
@ -617,7 +612,6 @@ class ShareGroupHeartbeatRequestTest(cluster: ClusterInstance) {
new ClusterConfigProperty(key = "group.share.min.session.timeout.ms", value = "501")
))
def testMemberJoiningAndExpiring(): Unit = {
val raftCluster = cluster.asInstanceOf[RaftClusterInstance]
val admin = cluster.admin()
// Creates the __consumer_offsets topics because it won't be created automatically
@ -625,8 +619,8 @@ class ShareGroupHeartbeatRequestTest(cluster: ClusterInstance) {
try {
TestUtils.createOffsetsTopicWithAdmin(
admin = admin,
brokers = raftCluster.brokers.values().asScala.toSeq,
controllers = raftCluster.controllers().values().asScala.toSeq
brokers = cluster.brokers.values().asScala.toSeq,
controllers = cluster.controllers().values().asScala.toSeq
)
// Heartbeat request to join the group. Note that the member subscribes
@ -791,15 +785,14 @@ class ShareGroupHeartbeatRequestTest(cluster: ClusterInstance) {
new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1")
))
def testGroupCoordinatorChange(): Unit = {
val raftCluster = cluster.asInstanceOf[RaftClusterInstance]
val admin = cluster.admin()
// Creates the __consumer_offsets topics because it won't be created automatically
// in this test because it does not use FindCoordinator API.
try {
TestUtils.createOffsetsTopicWithAdmin(
admin = admin,
brokers = raftCluster.brokers.values().asScala.toSeq,
controllers = raftCluster.controllers().values().asScala.toSeq
brokers = cluster.brokers.values().asScala.toSeq,
controllers = cluster.controllers().values().asScala.toSeq
)
// Heartbeat request to join the group. Note that the member subscribes
// to an nonexistent topic.
@ -852,9 +845,9 @@ class ShareGroupHeartbeatRequestTest(cluster: ClusterInstance) {
assertEquals(2, shareGroupHeartbeatResponse.data.memberEpoch)
// Restart the only running broker.
val broker = raftCluster.brokers().values().iterator().next()
raftCluster.shutdownBroker(broker.config.brokerId)
raftCluster.startBroker(broker.config.brokerId)
val broker = cluster.brokers().values().iterator().next()
cluster.shutdownBroker(broker.config.brokerId)
cluster.startBroker(broker.config.brokerId)
// Prepare the next heartbeat for member with no updates.
shareGroupHeartbeatRequest = new ShareGroupHeartbeatRequest.Builder(

View File

@ -32,8 +32,10 @@ import java.util.Set;
import java.util.stream.Collectors;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrowsExactly;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
public class KafkaClusterTestKitTest {
@ParameterizedTest
@ -138,4 +140,17 @@ public class KafkaClusterTestKitTest {
assertTrue(Paths.get(broker.metadataDirectory()).startsWith(baseDirectory)));
}
}
@Test
public void testExposedFaultHandlers() {
TestKitNodes nodes = new TestKitNodes.Builder()
.setNumBrokerNodes(1)
.setNumControllerNodes(1)
.build();
try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(nodes).build()) {
assertNotNull(cluster.fatalFaultHandler(), "Fatal fault handler should not be null");
assertNotNull(cluster.nonFatalFaultHandler(), "Non-fatal fault handler should not be null");
} catch (Exception e) {
fail("Failed to initialize cluster", e);
}
}
}

View File

@ -42,6 +42,7 @@ import org.apache.kafka.common.serialization.BytesDeserializer;
import org.apache.kafka.common.serialization.BytesSerializer;
import org.apache.kafka.common.test.TestUtils;
import org.apache.kafka.server.authorizer.Authorizer;
import org.apache.kafka.server.fault.FaultHandlerException;
import org.apache.kafka.storage.internals.checkpoint.OffsetCheckpointFile;
import java.io.File;
@ -210,6 +211,17 @@ public interface ClusterInstance {
}
}
/**
* Returns the first recorded fatal exception, if any.
*
*/
Optional<FaultHandlerException> firstFatalException();
/**
* Return the first recorded non-fatal exception, if any.
*/
Optional<FaultHandlerException> firstNonFatalException();
//---------------------------[modify]---------------------------//
void start();

View File

@ -32,6 +32,7 @@ import org.apache.kafka.metadata.storage.FormatterException;
import org.apache.kafka.server.common.FeatureVersion;
import org.apache.kafka.server.common.Features;
import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.server.fault.FaultHandlerException;
import org.junit.jupiter.api.extension.AfterTestExecutionCallback;
import org.junit.jupiter.api.extension.BeforeTestExecutionCallback;
@ -97,7 +98,7 @@ public class RaftClusterInvocationContext implements TestTemplateInvocationConte
);
}
public static class RaftClusterInstance implements ClusterInstance {
private static class RaftClusterInstance implements ClusterInstance {
private final ClusterConfig clusterConfig;
final AtomicBoolean started = new AtomicBoolean(false);
@ -202,6 +203,16 @@ public class RaftClusterInvocationContext implements TestTemplateInvocationConte
findBrokerOrThrow(brokerId).startup();
}
@Override
public Optional<FaultHandlerException> firstFatalException() {
return Optional.ofNullable(clusterTestKit.fatalFaultHandler().firstException());
}
@Override
public Optional<FaultHandlerException> firstNonFatalException() {
return Optional.ofNullable(clusterTestKit.nonFatalFaultHandler().firstException());
}
@Override
public void waitForReadyBrokers() throws InterruptedException {
try {