diff --git a/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala b/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala index 3a8042e2590..7f1f8b6aae3 100644 --- a/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala @@ -16,81 +16,48 @@ */ package kafka.server -import kafka.api.SaslSetup -import kafka.security.JaasTestUtils -import kafka.server.SaslApiVersionsRequestTest.{kafkaClientSaslMechanism, kafkaServerSaslMechanisms} -import org.apache.kafka.common.test.api.{ClusterConfig, ClusterTemplate, Type} -import org.apache.kafka.common.config.SaslConfigs -import org.apache.kafka.common.config.internals.BrokerSecurityConfigs import org.apache.kafka.common.message.SaslHandshakeRequestData import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.requests.{ApiVersionsRequest, ApiVersionsResponse, SaslHandshakeRequest, SaslHandshakeResponse} import org.apache.kafka.common.security.auth.SecurityProtocol +import org.apache.kafka.common.test.api.{ClusterTest, Type} import org.apache.kafka.common.test.ClusterInstance -import org.apache.kafka.network.SocketServerConfigs +import org.apache.kafka.common.test.junit.ClusterTestExtensions import org.junit.jupiter.api.Assertions._ -import org.junit.jupiter.api.{AfterEach, BeforeEach, Disabled} +import org.junit.jupiter.api.extension.ExtendWith import java.net.Socket import java.util.Collections import scala.jdk.CollectionConverters._ -object SaslApiVersionsRequestTest { - val kafkaClientSaslMechanism = "PLAIN" - val kafkaServerSaslMechanisms: Seq[String] = List("PLAIN") - val controlPlaneListenerName = "CONTROL_PLANE" - val securityProtocol = SecurityProtocol.SASL_PLAINTEXT - - def saslApiVersionsRequestClusterConfig(): java.util.List[ClusterConfig] = { - val saslServerProperties = new java.util.HashMap[String, String]() - saslServerProperties.put(BrokerSecurityConfigs.SASL_MECHANISM_INTER_BROKER_PROTOCOL_CONFIG, kafkaClientSaslMechanism) - saslServerProperties.put(BrokerSecurityConfigs.SASL_ENABLED_MECHANISMS_CONFIG, kafkaServerSaslMechanisms.mkString(",")) - - val saslClientProperties = new java.util.HashMap[String, String]() - saslClientProperties.put(SaslConfigs.SASL_MECHANISM, kafkaClientSaslMechanism) - - // Configure control plane listener to make sure we have separate listeners for testing. - val serverProperties = new java.util.HashMap[String, String]() - serverProperties.put(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG, s"$controlPlaneListenerName:$securityProtocol,$securityProtocol:$securityProtocol") - serverProperties.put("listeners", s"$securityProtocol://localhost:0,$controlPlaneListenerName://localhost:0") - serverProperties.put(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG, s"$securityProtocol://localhost:0,$controlPlaneListenerName://localhost:0") - - List(ClusterConfig.defaultBuilder - .setBrokerSecurityProtocol(securityProtocol) - .setTypes(Set(Type.KRAFT).asJava) - .setSaslServerProperties(saslServerProperties) - .setSaslClientProperties(saslClientProperties) - .setServerProperties(serverProperties) - .build()).asJava - } -} - -@Disabled("TODO: KAFKA-17631 - Convert SaslApiVersionsRequestTest to kraft") +@ExtendWith(value = Array(classOf[ClusterTestExtensions])) class SaslApiVersionsRequestTest(cluster: ClusterInstance) extends AbstractApiVersionsRequestTest(cluster) { - private var sasl: SaslSetup = _ - @BeforeEach - def setupSasl(): Unit = { - sasl = new SaslSetup() {} - sasl.startSasl(sasl.jaasSections(kafkaServerSaslMechanisms, Some(kafkaClientSaslMechanism), JaasTestUtils.KAFKA_SERVER_CONTEXT_NAME)) - } - - @ClusterTemplate("saslApiVersionsRequestClusterConfig") + @ClusterTest(types = Array(Type.KRAFT), + brokerSecurityProtocol = SecurityProtocol.SASL_PLAINTEXT, + controllerSecurityProtocol = SecurityProtocol.SASL_PLAINTEXT + ) def testApiVersionsRequestBeforeSaslHandshakeRequest(): Unit = { val socket = IntegrationTestUtils.connect(cluster.brokerSocketServers().asScala.head, cluster.clientListener()) try { val apiVersionsResponse = IntegrationTestUtils.sendAndReceive[ApiVersionsResponse]( new ApiVersionsRequest.Builder().build(0), socket) - validateApiVersionsResponse(apiVersionsResponse, + validateApiVersionsResponse( + apiVersionsResponse, enableUnstableLastVersion = !"false".equals( - cluster.config().serverProperties().get("unstable.api.versions.enable"))) + cluster.config().serverProperties().get("unstable.api.versions.enable")), + apiVersion = 0.toShort + ) sendSaslHandshakeRequestValidateResponse(socket) } finally { socket.close() } } - @ClusterTemplate("saslApiVersionsRequestClusterConfig") + @ClusterTest(types = Array(Type.KRAFT), + brokerSecurityProtocol = SecurityProtocol.SASL_PLAINTEXT, + controllerSecurityProtocol = SecurityProtocol.SASL_PLAINTEXT + ) def testApiVersionsRequestAfterSaslHandshakeRequest(): Unit = { val socket = IntegrationTestUtils.connect(cluster.brokerSocketServers().asScala.head, cluster.clientListener()) try { @@ -103,7 +70,10 @@ class SaslApiVersionsRequestTest(cluster: ClusterInstance) extends AbstractApiVe } } - @ClusterTemplate("saslApiVersionsRequestClusterConfig") + @ClusterTest(types = Array(Type.KRAFT), + brokerSecurityProtocol = SecurityProtocol.SASL_PLAINTEXT, + controllerSecurityProtocol = SecurityProtocol.SASL_PLAINTEXT + ) def testApiVersionsRequestWithUnsupportedVersion(): Unit = { val socket = IntegrationTestUtils.connect(cluster.brokerSocketServers().asScala.head, cluster.clientListener()) try { @@ -112,20 +82,18 @@ class SaslApiVersionsRequestTest(cluster: ClusterInstance) extends AbstractApiVe assertEquals(Errors.UNSUPPORTED_VERSION.code, apiVersionsResponse.data.errorCode) val apiVersionsResponse2 = IntegrationTestUtils.sendAndReceive[ApiVersionsResponse]( new ApiVersionsRequest.Builder().build(0), socket) - validateApiVersionsResponse(apiVersionsResponse2, + validateApiVersionsResponse( + apiVersionsResponse2, enableUnstableLastVersion = !"false".equals( - cluster.config().serverProperties().get("unstable.api.versions.enable"))) + cluster.config().serverProperties().get("unstable.api.versions.enable")), + apiVersion = 0.toShort + ) sendSaslHandshakeRequestValidateResponse(socket) } finally { socket.close() } } - @AfterEach - def closeSasl(): Unit = { - sasl.closeSasl() - } - private def sendSaslHandshakeRequestValidateResponse(socket: Socket): Unit = { val request = new SaslHandshakeRequest(new SaslHandshakeRequestData().setMechanism("PLAIN"), ApiKeys.SASL_HANDSHAKE.latestVersion)