KAFKA-17631 Convert SaslApiVersionsRequestTest to kraft (#18330)

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
PoAn Yang 2025-02-03 21:01:38 +08:00 committed by GitHub
parent 9ba2621620
commit f6f41dc5eb
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 26 additions and 58 deletions

View File

@ -16,81 +16,48 @@
*/ */
package kafka.server package kafka.server
import kafka.api.SaslSetup
import kafka.security.JaasTestUtils
import kafka.server.SaslApiVersionsRequestTest.{kafkaClientSaslMechanism, kafkaServerSaslMechanisms}
import org.apache.kafka.common.test.api.{ClusterConfig, ClusterTemplate, Type}
import org.apache.kafka.common.config.SaslConfigs
import org.apache.kafka.common.config.internals.BrokerSecurityConfigs
import org.apache.kafka.common.message.SaslHandshakeRequestData import org.apache.kafka.common.message.SaslHandshakeRequestData
import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.requests.{ApiVersionsRequest, ApiVersionsResponse, SaslHandshakeRequest, SaslHandshakeResponse} import org.apache.kafka.common.requests.{ApiVersionsRequest, ApiVersionsResponse, SaslHandshakeRequest, SaslHandshakeResponse}
import org.apache.kafka.common.security.auth.SecurityProtocol import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.test.api.{ClusterTest, Type}
import org.apache.kafka.common.test.ClusterInstance import org.apache.kafka.common.test.ClusterInstance
import org.apache.kafka.network.SocketServerConfigs import org.apache.kafka.common.test.junit.ClusterTestExtensions
import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, Disabled} import org.junit.jupiter.api.extension.ExtendWith
import java.net.Socket import java.net.Socket
import java.util.Collections import java.util.Collections
import scala.jdk.CollectionConverters._ import scala.jdk.CollectionConverters._
object SaslApiVersionsRequestTest { @ExtendWith(value = Array(classOf[ClusterTestExtensions]))
val kafkaClientSaslMechanism = "PLAIN"
val kafkaServerSaslMechanisms: Seq[String] = List("PLAIN")
val controlPlaneListenerName = "CONTROL_PLANE"
val securityProtocol = SecurityProtocol.SASL_PLAINTEXT
def saslApiVersionsRequestClusterConfig(): java.util.List[ClusterConfig] = {
val saslServerProperties = new java.util.HashMap[String, String]()
saslServerProperties.put(BrokerSecurityConfigs.SASL_MECHANISM_INTER_BROKER_PROTOCOL_CONFIG, kafkaClientSaslMechanism)
saslServerProperties.put(BrokerSecurityConfigs.SASL_ENABLED_MECHANISMS_CONFIG, kafkaServerSaslMechanisms.mkString(","))
val saslClientProperties = new java.util.HashMap[String, String]()
saslClientProperties.put(SaslConfigs.SASL_MECHANISM, kafkaClientSaslMechanism)
// Configure control plane listener to make sure we have separate listeners for testing.
val serverProperties = new java.util.HashMap[String, String]()
serverProperties.put(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG, s"$controlPlaneListenerName:$securityProtocol,$securityProtocol:$securityProtocol")
serverProperties.put("listeners", s"$securityProtocol://localhost:0,$controlPlaneListenerName://localhost:0")
serverProperties.put(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG, s"$securityProtocol://localhost:0,$controlPlaneListenerName://localhost:0")
List(ClusterConfig.defaultBuilder
.setBrokerSecurityProtocol(securityProtocol)
.setTypes(Set(Type.KRAFT).asJava)
.setSaslServerProperties(saslServerProperties)
.setSaslClientProperties(saslClientProperties)
.setServerProperties(serverProperties)
.build()).asJava
}
}
@Disabled("TODO: KAFKA-17631 - Convert SaslApiVersionsRequestTest to kraft")
class SaslApiVersionsRequestTest(cluster: ClusterInstance) extends AbstractApiVersionsRequestTest(cluster) { class SaslApiVersionsRequestTest(cluster: ClusterInstance) extends AbstractApiVersionsRequestTest(cluster) {
private var sasl: SaslSetup = _
@BeforeEach @ClusterTest(types = Array(Type.KRAFT),
def setupSasl(): Unit = { brokerSecurityProtocol = SecurityProtocol.SASL_PLAINTEXT,
sasl = new SaslSetup() {} controllerSecurityProtocol = SecurityProtocol.SASL_PLAINTEXT
sasl.startSasl(sasl.jaasSections(kafkaServerSaslMechanisms, Some(kafkaClientSaslMechanism), JaasTestUtils.KAFKA_SERVER_CONTEXT_NAME)) )
}
@ClusterTemplate("saslApiVersionsRequestClusterConfig")
def testApiVersionsRequestBeforeSaslHandshakeRequest(): Unit = { def testApiVersionsRequestBeforeSaslHandshakeRequest(): Unit = {
val socket = IntegrationTestUtils.connect(cluster.brokerSocketServers().asScala.head, cluster.clientListener()) val socket = IntegrationTestUtils.connect(cluster.brokerSocketServers().asScala.head, cluster.clientListener())
try { try {
val apiVersionsResponse = IntegrationTestUtils.sendAndReceive[ApiVersionsResponse]( val apiVersionsResponse = IntegrationTestUtils.sendAndReceive[ApiVersionsResponse](
new ApiVersionsRequest.Builder().build(0), socket) new ApiVersionsRequest.Builder().build(0), socket)
validateApiVersionsResponse(apiVersionsResponse, validateApiVersionsResponse(
apiVersionsResponse,
enableUnstableLastVersion = !"false".equals( enableUnstableLastVersion = !"false".equals(
cluster.config().serverProperties().get("unstable.api.versions.enable"))) cluster.config().serverProperties().get("unstable.api.versions.enable")),
apiVersion = 0.toShort
)
sendSaslHandshakeRequestValidateResponse(socket) sendSaslHandshakeRequestValidateResponse(socket)
} finally { } finally {
socket.close() socket.close()
} }
} }
@ClusterTemplate("saslApiVersionsRequestClusterConfig") @ClusterTest(types = Array(Type.KRAFT),
brokerSecurityProtocol = SecurityProtocol.SASL_PLAINTEXT,
controllerSecurityProtocol = SecurityProtocol.SASL_PLAINTEXT
)
def testApiVersionsRequestAfterSaslHandshakeRequest(): Unit = { def testApiVersionsRequestAfterSaslHandshakeRequest(): Unit = {
val socket = IntegrationTestUtils.connect(cluster.brokerSocketServers().asScala.head, cluster.clientListener()) val socket = IntegrationTestUtils.connect(cluster.brokerSocketServers().asScala.head, cluster.clientListener())
try { try {
@ -103,7 +70,10 @@ class SaslApiVersionsRequestTest(cluster: ClusterInstance) extends AbstractApiVe
} }
} }
@ClusterTemplate("saslApiVersionsRequestClusterConfig") @ClusterTest(types = Array(Type.KRAFT),
brokerSecurityProtocol = SecurityProtocol.SASL_PLAINTEXT,
controllerSecurityProtocol = SecurityProtocol.SASL_PLAINTEXT
)
def testApiVersionsRequestWithUnsupportedVersion(): Unit = { def testApiVersionsRequestWithUnsupportedVersion(): Unit = {
val socket = IntegrationTestUtils.connect(cluster.brokerSocketServers().asScala.head, cluster.clientListener()) val socket = IntegrationTestUtils.connect(cluster.brokerSocketServers().asScala.head, cluster.clientListener())
try { try {
@ -112,20 +82,18 @@ class SaslApiVersionsRequestTest(cluster: ClusterInstance) extends AbstractApiVe
assertEquals(Errors.UNSUPPORTED_VERSION.code, apiVersionsResponse.data.errorCode) assertEquals(Errors.UNSUPPORTED_VERSION.code, apiVersionsResponse.data.errorCode)
val apiVersionsResponse2 = IntegrationTestUtils.sendAndReceive[ApiVersionsResponse]( val apiVersionsResponse2 = IntegrationTestUtils.sendAndReceive[ApiVersionsResponse](
new ApiVersionsRequest.Builder().build(0), socket) new ApiVersionsRequest.Builder().build(0), socket)
validateApiVersionsResponse(apiVersionsResponse2, validateApiVersionsResponse(
apiVersionsResponse2,
enableUnstableLastVersion = !"false".equals( enableUnstableLastVersion = !"false".equals(
cluster.config().serverProperties().get("unstable.api.versions.enable"))) cluster.config().serverProperties().get("unstable.api.versions.enable")),
apiVersion = 0.toShort
)
sendSaslHandshakeRequestValidateResponse(socket) sendSaslHandshakeRequestValidateResponse(socket)
} finally { } finally {
socket.close() socket.close()
} }
} }
@AfterEach
def closeSasl(): Unit = {
sasl.closeSasl()
}
private def sendSaslHandshakeRequestValidateResponse(socket: Socket): Unit = { private def sendSaslHandshakeRequestValidateResponse(socket: Socket): Unit = {
val request = new SaslHandshakeRequest(new SaslHandshakeRequestData().setMechanism("PLAIN"), val request = new SaslHandshakeRequest(new SaslHandshakeRequestData().setMechanism("PLAIN"),
ApiKeys.SASL_HANDSHAKE.latestVersion) ApiKeys.SASL_HANDSHAKE.latestVersion)