mirror of https://github.com/apache/kafka.git
KAFKA-14084: SCRAM support in KRaft. (#13114)
This commit adds support to store the SCRAM credentials in a cluster with KRaft quorum servers and no ZK cluster backing the metadata. This includes creating ScramControlManager in the controller, and adding support for SCRAM to MetadataImage and MetadataDelta. Change UserScramCredentialRecord to contain only a single tuple (name, mechanism, salt, pw, iter) rather than a mapping between name and a list. This will avoid creating an excessively large record if a single user has many entries. Because record ID 11 (UserScramCredentialRecord) has not been used before, this is a compatible change. SCRAM will be supported in 3.5-IV0 and later. This commit does not include KIP-900 SCRAM bootstrapping support, or updating the credential cache on the controller (as opposed to broker). We will implement these in follow-on commits. Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>, Colin P. McCabe <cmccabe@apache.org>
This commit is contained in:
parent
47450ee064
commit
38c409cf33
|
@ -256,6 +256,7 @@
|
||||||
<allow pkg="org.apache.kafka.image.writer" />
|
<allow pkg="org.apache.kafka.image.writer" />
|
||||||
<allow pkg="org.apache.kafka.metadata" />
|
<allow pkg="org.apache.kafka.metadata" />
|
||||||
<allow pkg="org.apache.kafka.queue" />
|
<allow pkg="org.apache.kafka.queue" />
|
||||||
|
<allow pkg="org.apache.kafka.clients.admin" />
|
||||||
<allow pkg="org.apache.kafka.raft" />
|
<allow pkg="org.apache.kafka.raft" />
|
||||||
<allow pkg="org.apache.kafka.server.common" />
|
<allow pkg="org.apache.kafka.server.common" />
|
||||||
<allow pkg="org.apache.kafka.server.fault" />
|
<allow pkg="org.apache.kafka.server.fault" />
|
||||||
|
|
|
@ -16,7 +16,7 @@
|
||||||
{
|
{
|
||||||
"apiKey": 51,
|
"apiKey": 51,
|
||||||
"type": "request",
|
"type": "request",
|
||||||
"listeners": ["zkBroker"],
|
"listeners": ["zkBroker", "broker", "controller"],
|
||||||
"name": "AlterUserScramCredentialsRequest",
|
"name": "AlterUserScramCredentialsRequest",
|
||||||
"validVersions": "0",
|
"validVersions": "0",
|
||||||
"flexibleVersions": "0+",
|
"flexibleVersions": "0+",
|
||||||
|
|
|
@ -16,7 +16,7 @@
|
||||||
{
|
{
|
||||||
"apiKey": 50,
|
"apiKey": 50,
|
||||||
"type": "request",
|
"type": "request",
|
||||||
"listeners": ["zkBroker"],
|
"listeners": ["zkBroker", "broker", "controller"],
|
||||||
"name": "DescribeUserScramCredentialsRequest",
|
"name": "DescribeUserScramCredentialsRequest",
|
||||||
"validVersions": "0",
|
"validVersions": "0",
|
||||||
"flexibleVersions": "0+",
|
"flexibleVersions": "0+",
|
||||||
|
|
|
@ -18,7 +18,7 @@
|
||||||
package kafka.security
|
package kafka.security
|
||||||
|
|
||||||
import java.util.{Collection, Properties}
|
import java.util.{Collection, Properties}
|
||||||
|
import org.apache.kafka.clients.admin.{ScramMechanism => AdminScramMechanism}
|
||||||
import org.apache.kafka.common.security.authenticator.CredentialCache
|
import org.apache.kafka.common.security.authenticator.CredentialCache
|
||||||
import org.apache.kafka.common.security.scram.ScramCredential
|
import org.apache.kafka.common.security.scram.ScramCredential
|
||||||
import org.apache.kafka.common.config.ConfigDef
|
import org.apache.kafka.common.config.ConfigDef
|
||||||
|
@ -42,6 +42,25 @@ class CredentialProvider(scramMechanisms: Collection[String], val tokenCache: De
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
def updateCredential(
|
||||||
|
mechanism: AdminScramMechanism,
|
||||||
|
name: String,
|
||||||
|
credential: ScramCredential
|
||||||
|
): Unit = {
|
||||||
|
val cache = credentialCache.cache(mechanism.mechanismName(), classOf[ScramCredential])
|
||||||
|
cache.put(name, credential)
|
||||||
|
}
|
||||||
|
|
||||||
|
def removeCredentials(
|
||||||
|
mechanism: AdminScramMechanism,
|
||||||
|
name: String
|
||||||
|
): Unit = {
|
||||||
|
val cache = credentialCache.cache(mechanism.mechanismName(), classOf[ScramCredential])
|
||||||
|
if (cache != null) {
|
||||||
|
cache.remove(name)
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
object CredentialProvider {
|
object CredentialProvider {
|
||||||
|
|
|
@ -457,6 +457,7 @@ class BrokerServer(
|
||||||
dynamicConfigHandlers.toMap,
|
dynamicConfigHandlers.toMap,
|
||||||
"broker"),
|
"broker"),
|
||||||
authorizer,
|
authorizer,
|
||||||
|
credentialProvider,
|
||||||
sharedServer.initialBrokerMetadataLoadFaultHandler,
|
sharedServer.initialBrokerMetadataLoadFaultHandler,
|
||||||
sharedServer.metadataPublishingFaultHandler)
|
sharedServer.metadataPublishingFaultHandler)
|
||||||
|
|
||||||
|
|
|
@ -99,6 +99,7 @@ class ControllerApis(val requestChannel: RequestChannel,
|
||||||
case ApiKeys.INCREMENTAL_ALTER_CONFIGS => handleIncrementalAlterConfigs(request)
|
case ApiKeys.INCREMENTAL_ALTER_CONFIGS => handleIncrementalAlterConfigs(request)
|
||||||
case ApiKeys.ALTER_PARTITION_REASSIGNMENTS => handleAlterPartitionReassignments(request)
|
case ApiKeys.ALTER_PARTITION_REASSIGNMENTS => handleAlterPartitionReassignments(request)
|
||||||
case ApiKeys.LIST_PARTITION_REASSIGNMENTS => handleListPartitionReassignments(request)
|
case ApiKeys.LIST_PARTITION_REASSIGNMENTS => handleListPartitionReassignments(request)
|
||||||
|
case ApiKeys.ALTER_USER_SCRAM_CREDENTIALS => handleAlterUserScramCredentials(request)
|
||||||
case ApiKeys.ENVELOPE => handleEnvelopeRequest(request, requestLocal)
|
case ApiKeys.ENVELOPE => handleEnvelopeRequest(request, requestLocal)
|
||||||
case ApiKeys.SASL_HANDSHAKE => handleSaslHandshakeRequest(request)
|
case ApiKeys.SASL_HANDSHAKE => handleSaslHandshakeRequest(request)
|
||||||
case ApiKeys.SASL_AUTHENTICATE => handleSaslAuthenticateRequest(request)
|
case ApiKeys.SASL_AUTHENTICATE => handleSaslAuthenticateRequest(request)
|
||||||
|
@ -816,6 +817,18 @@ class ControllerApis(val requestChannel: RequestChannel,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
def handleAlterUserScramCredentials(request: RequestChannel.Request): CompletableFuture[Unit] = {
|
||||||
|
val alterRequest = request.body[AlterUserScramCredentialsRequest]
|
||||||
|
authHelper.authorizeClusterOperation(request, ALTER)
|
||||||
|
val context = new ControllerRequestContext(request.context.header.data, request.context.principal,
|
||||||
|
OptionalLong.empty())
|
||||||
|
controller.alterUserScramCredentials(context, alterRequest.data)
|
||||||
|
.thenApply[Unit] { response =>
|
||||||
|
requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
|
||||||
|
new AlterUserScramCredentialsResponse(response.setThrottleTimeMs(requestThrottleMs)))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
def handleListPartitionReassignments(request: RequestChannel.Request): CompletableFuture[Unit] = {
|
def handleListPartitionReassignments(request: RequestChannel.Request): CompletableFuture[Unit] = {
|
||||||
val listRequest = request.body[ListPartitionReassignmentsRequest]
|
val listRequest = request.body[ListPartitionReassignmentsRequest]
|
||||||
authHelper.authorizeClusterOperation(request, DESCRIBE)
|
authHelper.authorizeClusterOperation(request, DESCRIBE)
|
||||||
|
|
|
@ -3305,17 +3305,23 @@ class KafkaApis(val requestChannel: RequestChannel,
|
||||||
}
|
}
|
||||||
|
|
||||||
def handleDescribeUserScramCredentialsRequest(request: RequestChannel.Request): Unit = {
|
def handleDescribeUserScramCredentialsRequest(request: RequestChannel.Request): Unit = {
|
||||||
val zkSupport = metadataSupport.requireZkOrThrow(KafkaApis.notYetSupported(request))
|
|
||||||
val describeUserScramCredentialsRequest = request.body[DescribeUserScramCredentialsRequest]
|
val describeUserScramCredentialsRequest = request.body[DescribeUserScramCredentialsRequest]
|
||||||
|
|
||||||
if (authHelper.authorize(request.context, DESCRIBE, CLUSTER, CLUSTER_NAME)) {
|
if (!authHelper.authorize(request.context, DESCRIBE, CLUSTER, CLUSTER_NAME)) {
|
||||||
val result = zkSupport.adminManager.describeUserScramCredentials(
|
|
||||||
Option(describeUserScramCredentialsRequest.data.users).map(_.asScala.map(_.name).toList))
|
|
||||||
requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
|
|
||||||
new DescribeUserScramCredentialsResponse(result.setThrottleTimeMs(requestThrottleMs)))
|
|
||||||
} else {
|
|
||||||
requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
|
requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
|
||||||
describeUserScramCredentialsRequest.getErrorResponse(requestThrottleMs, Errors.CLUSTER_AUTHORIZATION_FAILED.exception))
|
describeUserScramCredentialsRequest.getErrorResponse(requestThrottleMs, Errors.CLUSTER_AUTHORIZATION_FAILED.exception))
|
||||||
|
} else {
|
||||||
|
metadataSupport match {
|
||||||
|
case ZkSupport(adminManager, controller, zkClient, forwardingManager, metadataCache, _) =>
|
||||||
|
val result = adminManager.describeUserScramCredentials(
|
||||||
|
Option(describeUserScramCredentialsRequest.data.users).map(_.asScala.map(_.name).toList))
|
||||||
|
requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
|
||||||
|
new DescribeUserScramCredentialsResponse(result.setThrottleTimeMs(requestThrottleMs)))
|
||||||
|
case RaftSupport(_, metadataCache) =>
|
||||||
|
val result = metadataCache.describeScramCredentials(describeUserScramCredentialsRequest.data())
|
||||||
|
requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
|
||||||
|
new DescribeUserScramCredentialsResponse(result.setThrottleTimeMs(requestThrottleMs)))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3627,12 +3633,4 @@ object KafkaApis {
|
||||||
private def unsupported(text: String): Exception = {
|
private def unsupported(text: String): Exception = {
|
||||||
new UnsupportedVersionException(s"Unsupported when using a Raft-based metadata quorum: $text")
|
new UnsupportedVersionException(s"Unsupported when using a Raft-based metadata quorum: $text")
|
||||||
}
|
}
|
||||||
|
|
||||||
private def notYetSupported(request: RequestChannel.Request): Exception = {
|
|
||||||
notYetSupported(request.header.apiKey().toString)
|
|
||||||
}
|
|
||||||
|
|
||||||
private def notYetSupported(text: String): Exception = {
|
|
||||||
new UnsupportedVersionException(s"Not yet supported when using a Raft-based metadata quorum: $text")
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -21,6 +21,7 @@ import java.util.{OptionalInt, Properties}
|
||||||
import java.util.concurrent.atomic.AtomicLong
|
import java.util.concurrent.atomic.AtomicLong
|
||||||
import kafka.coordinator.transaction.TransactionCoordinator
|
import kafka.coordinator.transaction.TransactionCoordinator
|
||||||
import kafka.log.{LogManager, UnifiedLog}
|
import kafka.log.{LogManager, UnifiedLog}
|
||||||
|
import kafka.security.CredentialProvider
|
||||||
import kafka.server.{KafkaConfig, ReplicaManager, RequestLocal}
|
import kafka.server.{KafkaConfig, ReplicaManager, RequestLocal}
|
||||||
import kafka.utils.Logging
|
import kafka.utils.Logging
|
||||||
import org.apache.kafka.common.TopicPartition
|
import org.apache.kafka.common.TopicPartition
|
||||||
|
@ -104,6 +105,7 @@ class BrokerMetadataPublisher(
|
||||||
clientQuotaMetadataManager: ClientQuotaMetadataManager,
|
clientQuotaMetadataManager: ClientQuotaMetadataManager,
|
||||||
var dynamicConfigPublisher: DynamicConfigPublisher,
|
var dynamicConfigPublisher: DynamicConfigPublisher,
|
||||||
private val _authorizer: Option[Authorizer],
|
private val _authorizer: Option[Authorizer],
|
||||||
|
credentialProvider: CredentialProvider,
|
||||||
fatalFaultHandler: FaultHandler,
|
fatalFaultHandler: FaultHandler,
|
||||||
metadataPublishingFaultHandler: FaultHandler
|
metadataPublishingFaultHandler: FaultHandler
|
||||||
) extends MetadataPublisher with Logging {
|
) extends MetadataPublisher with Logging {
|
||||||
|
@ -223,6 +225,21 @@ class BrokerMetadataPublisher(
|
||||||
s"quotas in ${deltaName}", t)
|
s"quotas in ${deltaName}", t)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Apply changes to SCRAM credentials.
|
||||||
|
Option(delta.scramDelta()).foreach { scramDelta =>
|
||||||
|
scramDelta.changes().forEach {
|
||||||
|
case (mechanism, userChanges) =>
|
||||||
|
userChanges.forEach {
|
||||||
|
case (userName, change) =>
|
||||||
|
if (change.isPresent) {
|
||||||
|
credentialProvider.updateCredential(mechanism, userName, change.get().toCredential(mechanism))
|
||||||
|
} else {
|
||||||
|
credentialProvider.removeCredentials(mechanism, userName)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Apply changes to ACLs. This needs to be handled carefully because while we are
|
// Apply changes to ACLs. This needs to be handled carefully because while we are
|
||||||
// applying these changes, the Authorizer is continuing to return authorization
|
// applying these changes, the Authorizer is continuing to return authorization
|
||||||
// results in other threads. We never want to expose an invalid state. For example,
|
// results in other threads. We never want to expose an invalid state. For example,
|
||||||
|
|
|
@ -35,6 +35,7 @@ import java.util.concurrent.ThreadLocalRandom
|
||||||
import kafka.admin.BrokerMetadata
|
import kafka.admin.BrokerMetadata
|
||||||
import org.apache.kafka.common.config.ConfigResource
|
import org.apache.kafka.common.config.ConfigResource
|
||||||
import org.apache.kafka.common.message.{DescribeClientQuotasRequestData, DescribeClientQuotasResponseData}
|
import org.apache.kafka.common.message.{DescribeClientQuotasRequestData, DescribeClientQuotasResponseData}
|
||||||
|
import org.apache.kafka.common.message.{DescribeUserScramCredentialsRequestData, DescribeUserScramCredentialsResponseData}
|
||||||
import org.apache.kafka.metadata.{PartitionRegistration, Replicas}
|
import org.apache.kafka.metadata.{PartitionRegistration, Replicas}
|
||||||
import org.apache.kafka.server.common.MetadataVersion
|
import org.apache.kafka.server.common.MetadataVersion
|
||||||
|
|
||||||
|
@ -379,6 +380,10 @@ class KRaftMetadataCache(val brokerId: Int) extends MetadataCache with Logging w
|
||||||
_currentImage.clientQuotas().describe(request)
|
_currentImage.clientQuotas().describe(request)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
def describeScramCredentials(request: DescribeUserScramCredentialsRequestData): DescribeUserScramCredentialsResponseData = {
|
||||||
|
_currentImage.scram().describe(request)
|
||||||
|
}
|
||||||
|
|
||||||
override def metadataVersion(): MetadataVersion = _currentImage.features().metadataVersion()
|
override def metadataVersion(): MetadataVersion = _currentImage.features().metadataVersion()
|
||||||
|
|
||||||
override def features(): FinalizedFeaturesAndEpoch = {
|
override def features(): FinalizedFeaturesAndEpoch = {
|
||||||
|
|
|
@ -29,6 +29,8 @@ import org.apache.kafka.common.message.AlterPartitionRequestData;
|
||||||
import org.apache.kafka.common.message.AlterPartitionResponseData;
|
import org.apache.kafka.common.message.AlterPartitionResponseData;
|
||||||
import org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData;
|
import org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData;
|
||||||
import org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData;
|
import org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData;
|
||||||
|
import org.apache.kafka.common.message.AlterUserScramCredentialsRequestData;
|
||||||
|
import org.apache.kafka.common.message.AlterUserScramCredentialsResponseData;
|
||||||
import org.apache.kafka.common.message.BrokerHeartbeatRequestData;
|
import org.apache.kafka.common.message.BrokerHeartbeatRequestData;
|
||||||
import org.apache.kafka.common.message.BrokerRegistrationRequestData;
|
import org.apache.kafka.common.message.BrokerRegistrationRequestData;
|
||||||
import org.apache.kafka.common.message.CreatePartitionsRequestData.CreatePartitionsTopic;
|
import org.apache.kafka.common.message.CreatePartitionsRequestData.CreatePartitionsTopic;
|
||||||
|
@ -124,6 +126,14 @@ public class MockController implements Controller {
|
||||||
throw new UnsupportedOperationException();
|
throw new UnsupportedOperationException();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public CompletableFuture<AlterUserScramCredentialsResponseData> alterUserScramCredentials(
|
||||||
|
ControllerRequestContext context,
|
||||||
|
AlterUserScramCredentialsRequestData request
|
||||||
|
) {
|
||||||
|
throw new UnsupportedOperationException();
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
synchronized public CompletableFuture<CreateTopicsResponseData> createTopics(
|
synchronized public CompletableFuture<CreateTopicsResponseData> createTopics(
|
||||||
ControllerRequestContext context,
|
ControllerRequestContext context,
|
||||||
|
|
|
@ -19,9 +19,11 @@ package kafka.server
|
||||||
|
|
||||||
import java.nio.charset.StandardCharsets
|
import java.nio.charset.StandardCharsets
|
||||||
import java.util
|
import java.util
|
||||||
import java.util.Properties
|
import kafka.utils.TestUtils
|
||||||
|
import kafka.utils.TestInfoUtils
|
||||||
import kafka.network.SocketServer
|
import kafka.network.SocketServer
|
||||||
import kafka.security.authorizer.AclAuthorizer
|
import kafka.security.authorizer.AclAuthorizer
|
||||||
|
import org.apache.kafka.metadata.authorizer.StandardAuthorizer
|
||||||
import org.apache.kafka.clients.admin.ScramMechanism
|
import org.apache.kafka.clients.admin.ScramMechanism
|
||||||
import org.apache.kafka.common.message.AlterUserScramCredentialsResponseData.AlterUserScramCredentialsResult
|
import org.apache.kafka.common.message.AlterUserScramCredentialsResponseData.AlterUserScramCredentialsResult
|
||||||
import org.apache.kafka.common.message.DescribeUserScramCredentialsResponseData.DescribeUserScramCredentialsResult
|
import org.apache.kafka.common.message.DescribeUserScramCredentialsResponseData.DescribeUserScramCredentialsResult
|
||||||
|
@ -31,8 +33,11 @@ import org.apache.kafka.common.requests.{AlterUserScramCredentialsRequest, Alter
|
||||||
import org.apache.kafka.common.security.auth.{AuthenticationContext, KafkaPrincipal}
|
import org.apache.kafka.common.security.auth.{AuthenticationContext, KafkaPrincipal}
|
||||||
import org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder
|
import org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder
|
||||||
import org.apache.kafka.server.authorizer.{Action, AuthorizableRequestContext, AuthorizationResult}
|
import org.apache.kafka.server.authorizer.{Action, AuthorizableRequestContext, AuthorizationResult}
|
||||||
|
import org.apache.kafka.server.common.MetadataVersion
|
||||||
|
import org.junit.jupiter.api.{Test, BeforeEach, TestInfo}
|
||||||
import org.junit.jupiter.api.Assertions._
|
import org.junit.jupiter.api.Assertions._
|
||||||
import org.junit.jupiter.api.Test
|
import org.junit.jupiter.params.ParameterizedTest
|
||||||
|
import org.junit.jupiter.params.provider.ValueSource
|
||||||
|
|
||||||
import scala.jdk.CollectionConverters._
|
import scala.jdk.CollectionConverters._
|
||||||
|
|
||||||
|
@ -42,10 +47,25 @@ import scala.jdk.CollectionConverters._
|
||||||
* Also tests the Alter and Describe APIs for the case where credentials are successfully altered/described.
|
* Also tests the Alter and Describe APIs for the case where credentials are successfully altered/described.
|
||||||
*/
|
*/
|
||||||
class AlterUserScramCredentialsRequestTest extends BaseRequestTest {
|
class AlterUserScramCredentialsRequestTest extends BaseRequestTest {
|
||||||
override def brokerPropertyOverrides(properties: Properties): Unit = {
|
|
||||||
properties.put(KafkaConfig.ControlledShutdownEnableProp, "false")
|
protected var testMetadataVersion = MetadataVersion.latest
|
||||||
properties.put(KafkaConfig.AuthorizerClassNameProp, classOf[AlterCredentialsTest.TestAuthorizer].getName)
|
override protected def metadataVersion = testMetadataVersion
|
||||||
properties.put(KafkaConfig.PrincipalBuilderClassProp, classOf[AlterCredentialsTest.TestPrincipalBuilderReturningAuthorized].getName)
|
|
||||||
|
@BeforeEach
|
||||||
|
override def setUp(testInfo: TestInfo): Unit = {
|
||||||
|
if (TestInfoUtils.isKRaft(testInfo)) {
|
||||||
|
this.serverConfig.setProperty(KafkaConfig.AuthorizerClassNameProp, classOf[StandardAuthorizer].getName)
|
||||||
|
if (testInfo.getDisplayName().contains("quorum=kraft-IBP_3_4")) {
|
||||||
|
testMetadataVersion = MetadataVersion.IBP_3_4_IV0
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
this.serverConfig.setProperty(KafkaConfig.AuthorizerClassNameProp, classOf[AlterCredentialsTest.TestAuthorizer].getName)
|
||||||
|
|
||||||
|
}
|
||||||
|
this.serverConfig.setProperty(KafkaConfig.PrincipalBuilderClassProp, classOf[AlterCredentialsTest.TestPrincipalBuilderReturningAuthorized].getName)
|
||||||
|
this.serverConfig.setProperty(KafkaConfig.ControlledShutdownEnableProp, "false")
|
||||||
|
|
||||||
|
super.setUp(testInfo)
|
||||||
}
|
}
|
||||||
|
|
||||||
private val saltedPasswordBytes = "saltedPassword".getBytes(StandardCharsets.UTF_8)
|
private val saltedPasswordBytes = "saltedPassword".getBytes(StandardCharsets.UTF_8)
|
||||||
|
@ -54,8 +74,9 @@ class AlterUserScramCredentialsRequestTest extends BaseRequestTest {
|
||||||
private val user2 = "user2"
|
private val user2 = "user2"
|
||||||
private val unknownUser = "unknownUser"
|
private val unknownUser = "unknownUser"
|
||||||
|
|
||||||
@Test
|
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
|
||||||
def testAlterNothing(): Unit = {
|
@ValueSource(strings = Array("kraft", "zk"))
|
||||||
|
def testAlterNothing(quorum: String): Unit = {
|
||||||
val request = new AlterUserScramCredentialsRequest.Builder(
|
val request = new AlterUserScramCredentialsRequest.Builder(
|
||||||
new AlterUserScramCredentialsRequestData()
|
new AlterUserScramCredentialsRequestData()
|
||||||
.setDeletions(new util.ArrayList[AlterUserScramCredentialsRequestData.ScramCredentialDeletion])
|
.setDeletions(new util.ArrayList[AlterUserScramCredentialsRequestData.ScramCredentialDeletion])
|
||||||
|
@ -66,14 +87,29 @@ class AlterUserScramCredentialsRequestTest extends BaseRequestTest {
|
||||||
assertEquals(0, results.size)
|
assertEquals(0, results.size)
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
|
||||||
def testAlterSameThingTwice(): Unit = {
|
@ValueSource(strings = Array("kraft", "zk"))
|
||||||
|
def testAlterSameThingTwice(quorum: String): Unit = {
|
||||||
val deletion1 = new AlterUserScramCredentialsRequestData.ScramCredentialDeletion().setName(user1).setMechanism(ScramMechanism.SCRAM_SHA_256.`type`)
|
val deletion1 = new AlterUserScramCredentialsRequestData.ScramCredentialDeletion().setName(user1).setMechanism(ScramMechanism.SCRAM_SHA_256.`type`)
|
||||||
val deletion2 = new AlterUserScramCredentialsRequestData.ScramCredentialDeletion().setName(user2).setMechanism(ScramMechanism.SCRAM_SHA_256.`type`)
|
val deletion2 = new AlterUserScramCredentialsRequestData.ScramCredentialDeletion().setName(user2).setMechanism(ScramMechanism.SCRAM_SHA_256.`type`)
|
||||||
val upsertion1 = new AlterUserScramCredentialsRequestData.ScramCredentialUpsertion().setName(user1).setMechanism(ScramMechanism.SCRAM_SHA_256.`type`)
|
val upsertion1 = new AlterUserScramCredentialsRequestData.ScramCredentialUpsertion().setName(user1).setMechanism(ScramMechanism.SCRAM_SHA_256.`type`)
|
||||||
.setIterations(4096).setSalt(saltBytes).setSaltedPassword(saltedPasswordBytes)
|
.setIterations(4096).setSalt(saltBytes).setSaltedPassword(saltedPasswordBytes)
|
||||||
val upsertion2 = new AlterUserScramCredentialsRequestData.ScramCredentialUpsertion().setName(user2).setMechanism(ScramMechanism.SCRAM_SHA_256.`type`)
|
val upsertion2 = new AlterUserScramCredentialsRequestData.ScramCredentialUpsertion().setName(user2).setMechanism(ScramMechanism.SCRAM_SHA_256.`type`)
|
||||||
.setIterations(4096).setSalt(saltBytes).setSaltedPassword(saltedPasswordBytes)
|
.setIterations(4096).setSalt(saltBytes).setSaltedPassword(saltedPasswordBytes)
|
||||||
|
|
||||||
|
// Create user1 and user2 so delete returns duplicate instead of no resource
|
||||||
|
val init_requests = List (
|
||||||
|
new AlterUserScramCredentialsRequest.Builder(
|
||||||
|
new AlterUserScramCredentialsRequestData()
|
||||||
|
.setDeletions(util.Collections.emptyList())
|
||||||
|
.setUpsertions(util.Arrays.asList(upsertion1, upsertion2))).build(),
|
||||||
|
)
|
||||||
|
init_requests.foreach(request => {
|
||||||
|
val response = sendAlterUserScramCredentialsRequest(request)
|
||||||
|
val results = response.data.results
|
||||||
|
checkNoErrorsAlteringCredentials(results)
|
||||||
|
})
|
||||||
|
|
||||||
val requests = List (
|
val requests = List (
|
||||||
new AlterUserScramCredentialsRequest.Builder(
|
new AlterUserScramCredentialsRequest.Builder(
|
||||||
new AlterUserScramCredentialsRequestData()
|
new AlterUserScramCredentialsRequestData()
|
||||||
|
@ -92,8 +128,9 @@ class AlterUserScramCredentialsRequestTest extends BaseRequestTest {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
|
||||||
def testAlterEmptyUser(): Unit = {
|
@ValueSource(strings = Array("kraft", "zk"))
|
||||||
|
def testAlterEmptyUser(quorum: String): Unit = {
|
||||||
val deletionEmpty = new AlterUserScramCredentialsRequestData.ScramCredentialDeletion().setName("").setMechanism(ScramMechanism.SCRAM_SHA_256.`type`)
|
val deletionEmpty = new AlterUserScramCredentialsRequestData.ScramCredentialDeletion().setName("").setMechanism(ScramMechanism.SCRAM_SHA_256.`type`)
|
||||||
val upsertionEmpty = new AlterUserScramCredentialsRequestData.ScramCredentialUpsertion().setName("").setMechanism(ScramMechanism.SCRAM_SHA_256.`type`)
|
val upsertionEmpty = new AlterUserScramCredentialsRequestData.ScramCredentialUpsertion().setName("").setMechanism(ScramMechanism.SCRAM_SHA_256.`type`)
|
||||||
.setIterations(4096).setSalt(saltBytes).setSaltedPassword(saltedPasswordBytes)
|
.setIterations(4096).setSalt(saltBytes).setSaltedPassword(saltedPasswordBytes)
|
||||||
|
@ -120,8 +157,9 @@ class AlterUserScramCredentialsRequestTest extends BaseRequestTest {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
|
||||||
def testAlterUnknownMechanism(): Unit = {
|
@ValueSource(strings = Array("kraft", "zk"))
|
||||||
|
def testAlterUnknownMechanism(quorum: String): Unit = {
|
||||||
val deletionUnknown1 = new AlterUserScramCredentialsRequestData.ScramCredentialDeletion().setName(user1).setMechanism(ScramMechanism.UNKNOWN.`type`)
|
val deletionUnknown1 = new AlterUserScramCredentialsRequestData.ScramCredentialDeletion().setName(user1).setMechanism(ScramMechanism.UNKNOWN.`type`)
|
||||||
val deletionValid1 = new AlterUserScramCredentialsRequestData.ScramCredentialDeletion().setName(user1).setMechanism(ScramMechanism.SCRAM_SHA_256.`type`)
|
val deletionValid1 = new AlterUserScramCredentialsRequestData.ScramCredentialDeletion().setName(user1).setMechanism(ScramMechanism.SCRAM_SHA_256.`type`)
|
||||||
val deletionUnknown2 = new AlterUserScramCredentialsRequestData.ScramCredentialDeletion().setName(user2).setMechanism(10.toByte)
|
val deletionUnknown2 = new AlterUserScramCredentialsRequestData.ScramCredentialDeletion().setName(user2).setMechanism(10.toByte)
|
||||||
|
@ -147,8 +185,9 @@ class AlterUserScramCredentialsRequestTest extends BaseRequestTest {
|
||||||
results.asScala.foreach(result => assertEquals("Unknown SCRAM mechanism", result.errorMessage))
|
results.asScala.foreach(result => assertEquals("Unknown SCRAM mechanism", result.errorMessage))
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
|
||||||
def testAlterTooFewIterations(): Unit = {
|
@ValueSource(strings = Array("kraft", "zk"))
|
||||||
|
def testAlterTooFewIterations(quorum: String): Unit = {
|
||||||
val upsertionTooFewIterations = new AlterUserScramCredentialsRequestData.ScramCredentialUpsertion().setName(user1)
|
val upsertionTooFewIterations = new AlterUserScramCredentialsRequestData.ScramCredentialUpsertion().setName(user1)
|
||||||
.setMechanism(ScramMechanism.SCRAM_SHA_256.`type`).setIterations(1)
|
.setMechanism(ScramMechanism.SCRAM_SHA_256.`type`).setIterations(1)
|
||||||
.setSalt(saltBytes).setSaltedPassword(saltedPasswordBytes)
|
.setSalt(saltBytes).setSaltedPassword(saltedPasswordBytes)
|
||||||
|
@ -163,8 +202,9 @@ class AlterUserScramCredentialsRequestTest extends BaseRequestTest {
|
||||||
assertEquals("Too few iterations", results.get(0).errorMessage)
|
assertEquals("Too few iterations", results.get(0).errorMessage)
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
|
||||||
def testAlterTooManyIterations(): Unit = {
|
@ValueSource(strings = Array("kraft", "zk"))
|
||||||
|
def testAlterTooManyIterations(quorum: String): Unit = {
|
||||||
val upsertionTooFewIterations = new AlterUserScramCredentialsRequestData.ScramCredentialUpsertion().setName(user1)
|
val upsertionTooFewIterations = new AlterUserScramCredentialsRequestData.ScramCredentialUpsertion().setName(user1)
|
||||||
.setMechanism(ScramMechanism.SCRAM_SHA_256.`type`).setIterations(Integer.MAX_VALUE)
|
.setMechanism(ScramMechanism.SCRAM_SHA_256.`type`).setIterations(Integer.MAX_VALUE)
|
||||||
.setSalt(saltBytes).setSaltedPassword(saltedPasswordBytes)
|
.setSalt(saltBytes).setSaltedPassword(saltedPasswordBytes)
|
||||||
|
@ -179,8 +219,9 @@ class AlterUserScramCredentialsRequestTest extends BaseRequestTest {
|
||||||
assertEquals("Too many iterations", results.get(0).errorMessage)
|
assertEquals("Too many iterations", results.get(0).errorMessage)
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
|
||||||
def testDeleteSomethingThatDoesNotExist(): Unit = {
|
@ValueSource(strings = Array("kraft", "zk"))
|
||||||
|
def testDeleteSomethingThatDoesNotExist(quorum: String): Unit = {
|
||||||
val request = new AlterUserScramCredentialsRequest.Builder(
|
val request = new AlterUserScramCredentialsRequest.Builder(
|
||||||
new AlterUserScramCredentialsRequestData()
|
new AlterUserScramCredentialsRequestData()
|
||||||
.setDeletions(util.Arrays.asList(new AlterUserScramCredentialsRequestData.ScramCredentialDeletion().setName(user1).setMechanism(ScramMechanism.SCRAM_SHA_256.`type`)))
|
.setDeletions(util.Arrays.asList(new AlterUserScramCredentialsRequestData.ScramCredentialDeletion().setName(user1).setMechanism(ScramMechanism.SCRAM_SHA_256.`type`)))
|
||||||
|
@ -205,10 +246,11 @@ class AlterUserScramCredentialsRequestTest extends BaseRequestTest {
|
||||||
checkAllErrorsAlteringCredentials(results, Errors.NOT_CONTROLLER, "when routed incorrectly to a non-Controller broker")
|
checkAllErrorsAlteringCredentials(results, Errors.NOT_CONTROLLER, "when routed incorrectly to a non-Controller broker")
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
|
||||||
def testAlterAndDescribe(): Unit = {
|
@ValueSource(strings = Array("kraft", "zk"))
|
||||||
|
def testAlterAndDescribe(quorum: String): Unit = {
|
||||||
// create a bunch of credentials
|
// create a bunch of credentials
|
||||||
val request1 = new AlterUserScramCredentialsRequest.Builder(
|
val request1_0 = new AlterUserScramCredentialsRequest.Builder(
|
||||||
new AlterUserScramCredentialsRequestData()
|
new AlterUserScramCredentialsRequestData()
|
||||||
.setUpsertions(util.Arrays.asList(
|
.setUpsertions(util.Arrays.asList(
|
||||||
new AlterUserScramCredentialsRequestData.ScramCredentialUpsertion()
|
new AlterUserScramCredentialsRequestData.ScramCredentialUpsertion()
|
||||||
|
@ -216,6 +258,16 @@ class AlterUserScramCredentialsRequestTest extends BaseRequestTest {
|
||||||
.setIterations(4096)
|
.setIterations(4096)
|
||||||
.setSalt(saltBytes)
|
.setSalt(saltBytes)
|
||||||
.setSaltedPassword(saltedPasswordBytes),
|
.setSaltedPassword(saltedPasswordBytes),
|
||||||
|
))).build()
|
||||||
|
val results1_0 = sendAlterUserScramCredentialsRequest(request1_0).data.results
|
||||||
|
assertEquals(1, results1_0.size)
|
||||||
|
checkNoErrorsAlteringCredentials(results1_0)
|
||||||
|
checkUserAppearsInAlterResults(results1_0, user1)
|
||||||
|
|
||||||
|
// When creating credentials, do not update the same user more than once per request
|
||||||
|
val request1_1 = new AlterUserScramCredentialsRequest.Builder(
|
||||||
|
new AlterUserScramCredentialsRequestData()
|
||||||
|
.setUpsertions(util.Arrays.asList(
|
||||||
new AlterUserScramCredentialsRequestData.ScramCredentialUpsertion()
|
new AlterUserScramCredentialsRequestData.ScramCredentialUpsertion()
|
||||||
.setName(user1).setMechanism(ScramMechanism.SCRAM_SHA_512.`type`)
|
.setName(user1).setMechanism(ScramMechanism.SCRAM_SHA_512.`type`)
|
||||||
.setIterations(8192)
|
.setIterations(8192)
|
||||||
|
@ -227,11 +279,16 @@ class AlterUserScramCredentialsRequestTest extends BaseRequestTest {
|
||||||
.setSalt(saltBytes)
|
.setSalt(saltBytes)
|
||||||
.setSaltedPassword(saltedPasswordBytes),
|
.setSaltedPassword(saltedPasswordBytes),
|
||||||
))).build()
|
))).build()
|
||||||
val results1 = sendAlterUserScramCredentialsRequest(request1).data.results
|
val results1_1 = sendAlterUserScramCredentialsRequest(request1_1).data.results
|
||||||
assertEquals(2, results1.size)
|
assertEquals(2, results1_1.size)
|
||||||
checkNoErrorsAlteringCredentials(results1)
|
checkNoErrorsAlteringCredentials(results1_1)
|
||||||
checkUserAppearsInAlterResults(results1, user1)
|
checkUserAppearsInAlterResults(results1_1, user1)
|
||||||
checkUserAppearsInAlterResults(results1, user2)
|
checkUserAppearsInAlterResults(results1_1, user2)
|
||||||
|
|
||||||
|
// KRaft is eventually consistent so it is possible to call describe before
|
||||||
|
// the credential is propogated from the controller to the broker.
|
||||||
|
TestUtils.waitUntilTrue(() => describeAllWithNoTopLevelErrorConfirmed().data.results.size == 2,
|
||||||
|
"describeAllWithNoTopLevelErrorConfirmed does not see 2 users");
|
||||||
|
|
||||||
// now describe them all
|
// now describe them all
|
||||||
val results2 = describeAllWithNoTopLevelErrorConfirmed().data.results
|
val results2 = describeAllWithNoTopLevelErrorConfirmed().data.results
|
||||||
|
@ -290,6 +347,9 @@ class AlterUserScramCredentialsRequestTest extends BaseRequestTest {
|
||||||
checkUserAppearsInAlterResults(results4, user1)
|
checkUserAppearsInAlterResults(results4, user1)
|
||||||
checkUserAppearsInAlterResults(results4, user2)
|
checkUserAppearsInAlterResults(results4, user2)
|
||||||
|
|
||||||
|
TestUtils.waitUntilTrue(() => describeAllWithNoTopLevelErrorConfirmed().data.results.size == 1,
|
||||||
|
"describeAllWithNoTopLevelErrorConfirmed does not see only 1 user");
|
||||||
|
|
||||||
// now describe them all, which should just yield 1 credential
|
// now describe them all, which should just yield 1 credential
|
||||||
val results5 = describeAllWithNoTopLevelErrorConfirmed().data.results
|
val results5 = describeAllWithNoTopLevelErrorConfirmed().data.results
|
||||||
assertEquals(1, results5.size)
|
assertEquals(1, results5.size)
|
||||||
|
@ -307,16 +367,40 @@ class AlterUserScramCredentialsRequestTest extends BaseRequestTest {
|
||||||
checkNoErrorsAlteringCredentials(results6)
|
checkNoErrorsAlteringCredentials(results6)
|
||||||
checkUserAppearsInAlterResults(results6, user1)
|
checkUserAppearsInAlterResults(results6, user1)
|
||||||
|
|
||||||
|
TestUtils.waitUntilTrue(() => describeAllWithNoTopLevelErrorConfirmed().data.results.size == 0,
|
||||||
|
"describeAllWithNoTopLevelErrorConfirmed does not see empty user");
|
||||||
|
|
||||||
// now describe them all, which should yield 0 credentials
|
// now describe them all, which should yield 0 credentials
|
||||||
val results7 = describeAllWithNoTopLevelErrorConfirmed().data.results
|
val results7 = describeAllWithNoTopLevelErrorConfirmed().data.results
|
||||||
assertEquals(0, results7.size)
|
assertEquals(0, results7.size)
|
||||||
}
|
}
|
||||||
|
|
||||||
private def sendAlterUserScramCredentialsRequest(request: AlterUserScramCredentialsRequest, socketServer: SocketServer = controllerSocketServer): AlterUserScramCredentialsResponse = {
|
/*
|
||||||
|
* Test that SCRAM alter command on KRaft cluster with IBP version less that IBP_3_5 fails
|
||||||
|
*/
|
||||||
|
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
|
||||||
|
@ValueSource(strings = Array("kraft-IBP_3_4"))
|
||||||
|
def testMetadataVersionTooLow(quorum: String): Unit = {
|
||||||
|
val upsertionMetadataVersionTooLow = new AlterUserScramCredentialsRequestData.ScramCredentialUpsertion().setName(user1)
|
||||||
|
.setMechanism(ScramMechanism.SCRAM_SHA_256.`type`).setIterations(8192)
|
||||||
|
.setSalt(saltBytes).setSaltedPassword(saltedPasswordBytes)
|
||||||
|
val request = new AlterUserScramCredentialsRequest.Builder(
|
||||||
|
new AlterUserScramCredentialsRequestData()
|
||||||
|
.setDeletions(util.Collections.emptyList())
|
||||||
|
.setUpsertions(util.Arrays.asList(upsertionMetadataVersionTooLow))).build()
|
||||||
|
val response = sendAlterUserScramCredentialsRequest(request)
|
||||||
|
val results = response.data.results
|
||||||
|
assertEquals(1, results.size)
|
||||||
|
checkAllErrorsAlteringCredentials(results, Errors.UNSUPPORTED_VERSION,
|
||||||
|
"when altering the credentials on unsupported IPB version")
|
||||||
|
assertEquals("The current metadata version does not support SCRAM", results.get(0).errorMessage)
|
||||||
|
}
|
||||||
|
|
||||||
|
private def sendAlterUserScramCredentialsRequest(request: AlterUserScramCredentialsRequest, socketServer: SocketServer = adminSocketServer): AlterUserScramCredentialsResponse = {
|
||||||
connectAndReceive[AlterUserScramCredentialsResponse](request, destination = socketServer)
|
connectAndReceive[AlterUserScramCredentialsResponse](request, destination = socketServer)
|
||||||
}
|
}
|
||||||
|
|
||||||
private def sendDescribeUserScramCredentialsRequest(request: DescribeUserScramCredentialsRequest, socketServer: SocketServer = controllerSocketServer): DescribeUserScramCredentialsResponse = {
|
private def sendDescribeUserScramCredentialsRequest(request: DescribeUserScramCredentialsRequest, socketServer: SocketServer = adminSocketServer): DescribeUserScramCredentialsResponse = {
|
||||||
connectAndReceive[DescribeUserScramCredentialsResponse](request, destination = socketServer)
|
connectAndReceive[DescribeUserScramCredentialsResponse](request, destination = socketServer)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -17,9 +17,10 @@
|
||||||
package kafka.server
|
package kafka.server
|
||||||
|
|
||||||
import java.util
|
import java.util
|
||||||
import java.util.Properties
|
import kafka.utils.TestInfoUtils
|
||||||
import kafka.network.SocketServer
|
import kafka.network.SocketServer
|
||||||
import kafka.security.authorizer.AclAuthorizer
|
import kafka.security.authorizer.AclAuthorizer
|
||||||
|
import org.apache.kafka.metadata.authorizer.StandardAuthorizer
|
||||||
import org.apache.kafka.common.message.{DescribeUserScramCredentialsRequestData, DescribeUserScramCredentialsResponseData}
|
import org.apache.kafka.common.message.{DescribeUserScramCredentialsRequestData, DescribeUserScramCredentialsResponseData}
|
||||||
import org.apache.kafka.common.message.DescribeUserScramCredentialsRequestData.UserName
|
import org.apache.kafka.common.message.DescribeUserScramCredentialsRequestData.UserName
|
||||||
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
|
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
|
||||||
|
@ -27,8 +28,10 @@ import org.apache.kafka.common.requests.{DescribeUserScramCredentialsRequest, De
|
||||||
import org.apache.kafka.common.security.auth.{AuthenticationContext, KafkaPrincipal}
|
import org.apache.kafka.common.security.auth.{AuthenticationContext, KafkaPrincipal}
|
||||||
import org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder
|
import org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder
|
||||||
import org.apache.kafka.server.authorizer.{Action, AuthorizableRequestContext, AuthorizationResult}
|
import org.apache.kafka.server.authorizer.{Action, AuthorizableRequestContext, AuthorizationResult}
|
||||||
|
import org.junit.jupiter.api.{Test, BeforeEach, TestInfo}
|
||||||
import org.junit.jupiter.api.Assertions._
|
import org.junit.jupiter.api.Assertions._
|
||||||
import org.junit.jupiter.api.Test
|
import org.junit.jupiter.params.ParameterizedTest
|
||||||
|
import org.junit.jupiter.params.provider.ValueSource
|
||||||
|
|
||||||
import scala.jdk.CollectionConverters._
|
import scala.jdk.CollectionConverters._
|
||||||
|
|
||||||
|
@ -38,15 +41,23 @@ import scala.jdk.CollectionConverters._
|
||||||
* Testing the API for the case where there are actually credentials to describe is performed elsewhere.
|
* Testing the API for the case where there are actually credentials to describe is performed elsewhere.
|
||||||
*/
|
*/
|
||||||
class DescribeUserScramCredentialsRequestTest extends BaseRequestTest {
|
class DescribeUserScramCredentialsRequestTest extends BaseRequestTest {
|
||||||
|
@BeforeEach
|
||||||
|
override def setUp(testInfo: TestInfo): Unit = {
|
||||||
|
if (TestInfoUtils.isKRaft(testInfo)) {
|
||||||
|
this.serverConfig.setProperty(KafkaConfig.AuthorizerClassNameProp, classOf[StandardAuthorizer].getName)
|
||||||
|
} else {
|
||||||
|
this.serverConfig.setProperty(KafkaConfig.AuthorizerClassNameProp, classOf[AlterCredentialsTest.TestAuthorizer].getName)
|
||||||
|
|
||||||
override def brokerPropertyOverrides(properties: Properties): Unit = {
|
}
|
||||||
properties.put(KafkaConfig.ControlledShutdownEnableProp, "false")
|
this.serverConfig.setProperty(KafkaConfig.PrincipalBuilderClassProp, classOf[AlterCredentialsTest.TestPrincipalBuilderReturningAuthorized].getName)
|
||||||
properties.put(KafkaConfig.AuthorizerClassNameProp, classOf[DescribeCredentialsTest.TestAuthorizer].getName)
|
this.serverConfig.setProperty(KafkaConfig.ControlledShutdownEnableProp, "false")
|
||||||
properties.put(KafkaConfig.PrincipalBuilderClassProp, classOf[DescribeCredentialsTest.TestPrincipalBuilderReturningAuthorized].getName)
|
|
||||||
|
super.setUp(testInfo)
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
|
||||||
def testDescribeNothing(): Unit = {
|
@ValueSource(strings = Array("kraft", "zk"))
|
||||||
|
def testDescribeNothing(quorum: String): Unit = {
|
||||||
val request = new DescribeUserScramCredentialsRequest.Builder(
|
val request = new DescribeUserScramCredentialsRequest.Builder(
|
||||||
new DescribeUserScramCredentialsRequestData()).build()
|
new DescribeUserScramCredentialsRequestData()).build()
|
||||||
val response = sendDescribeUserScramCredentialsRequest(request)
|
val response = sendDescribeUserScramCredentialsRequest(request)
|
||||||
|
@ -56,8 +67,9 @@ class DescribeUserScramCredentialsRequestTest extends BaseRequestTest {
|
||||||
assertEquals(0, response.data.results.size, "Expected no credentials when describing everything and there are no credentials")
|
assertEquals(0, response.data.results.size, "Expected no credentials when describing everything and there are no credentials")
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
|
||||||
def testDescribeWithNull(): Unit = {
|
@ValueSource(strings = Array("kraft", "zk"))
|
||||||
|
def testDescribeWithNull(quorum: String): Unit = {
|
||||||
val request = new DescribeUserScramCredentialsRequest.Builder(
|
val request = new DescribeUserScramCredentialsRequest.Builder(
|
||||||
new DescribeUserScramCredentialsRequestData().setUsers(null)).build()
|
new DescribeUserScramCredentialsRequestData().setUsers(null)).build()
|
||||||
val response = sendDescribeUserScramCredentialsRequest(request)
|
val response = sendDescribeUserScramCredentialsRequest(request)
|
||||||
|
@ -77,8 +89,9 @@ class DescribeUserScramCredentialsRequestTest extends BaseRequestTest {
|
||||||
assertEquals(Errors.NONE.code, error, "Did not expect controller error when routed to non-controller")
|
assertEquals(Errors.NONE.code, error, "Did not expect controller error when routed to non-controller")
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
|
||||||
def testDescribeSameUserTwice(): Unit = {
|
@ValueSource(strings = Array("kraft", "zk"))
|
||||||
|
def testDescribeSameUserTwice(quorum: String): Unit = {
|
||||||
val user = "user1"
|
val user = "user1"
|
||||||
val userName = new UserName().setName(user)
|
val userName = new UserName().setName(user)
|
||||||
val request = new DescribeUserScramCredentialsRequest.Builder(
|
val request = new DescribeUserScramCredentialsRequest.Builder(
|
||||||
|
@ -92,8 +105,9 @@ class DescribeUserScramCredentialsRequestTest extends BaseRequestTest {
|
||||||
assertEquals(s"Cannot describe SCRAM credentials for the same user twice in a single request: $user", result.errorMessage)
|
assertEquals(s"Cannot describe SCRAM credentials for the same user twice in a single request: $user", result.errorMessage)
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
|
||||||
def testUnknownUser(): Unit = {
|
@ValueSource(strings = Array("kraft", "zk"))
|
||||||
|
def testUnknownUser(quorum: String): Unit = {
|
||||||
val unknownUser = "unknownUser"
|
val unknownUser = "unknownUser"
|
||||||
val request = new DescribeUserScramCredentialsRequest.Builder(
|
val request = new DescribeUserScramCredentialsRequest.Builder(
|
||||||
new DescribeUserScramCredentialsRequestData().setUsers(List(new UserName().setName(unknownUser)).asJava)).build()
|
new DescribeUserScramCredentialsRequestData().setUsers(List(new UserName().setName(unknownUser)).asJava)).build()
|
||||||
|
@ -106,7 +120,7 @@ class DescribeUserScramCredentialsRequestTest extends BaseRequestTest {
|
||||||
assertEquals(s"Attempt to describe a user credential that does not exist: $unknownUser", result.errorMessage)
|
assertEquals(s"Attempt to describe a user credential that does not exist: $unknownUser", result.errorMessage)
|
||||||
}
|
}
|
||||||
|
|
||||||
private def sendDescribeUserScramCredentialsRequest(request: DescribeUserScramCredentialsRequest, socketServer: SocketServer = controllerSocketServer): DescribeUserScramCredentialsResponse = {
|
private def sendDescribeUserScramCredentialsRequest(request: DescribeUserScramCredentialsRequest, socketServer: SocketServer = adminSocketServer): DescribeUserScramCredentialsResponse = {
|
||||||
connectAndReceive[DescribeUserScramCredentialsResponse](request, destination = socketServer)
|
connectAndReceive[DescribeUserScramCredentialsResponse](request, destination = socketServer)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -71,7 +71,8 @@ object MetadataCacheTest {
|
||||||
image.configs(),
|
image.configs(),
|
||||||
image.clientQuotas(),
|
image.clientQuotas(),
|
||||||
image.producerIds(),
|
image.producerIds(),
|
||||||
image.acls())
|
image.acls(),
|
||||||
|
image.scram())
|
||||||
val delta = new MetadataDelta.Builder().setImage(partialImage).build()
|
val delta = new MetadataDelta.Builder().setImage(partialImage).build()
|
||||||
|
|
||||||
def toRecord(broker: UpdateMetadataBroker): RegisterBrokerRecord = {
|
def toRecord(broker: UpdateMetadataBroker): RegisterBrokerRecord = {
|
||||||
|
|
|
@ -54,7 +54,7 @@ import org.apache.kafka.common.requests._
|
||||||
import org.apache.kafka.common.security.auth.KafkaPrincipal
|
import org.apache.kafka.common.security.auth.KafkaPrincipal
|
||||||
import org.apache.kafka.common.utils.{LogContext, Time, Utils}
|
import org.apache.kafka.common.utils.{LogContext, Time, Utils}
|
||||||
import org.apache.kafka.common.{IsolationLevel, Node, TopicIdPartition, TopicPartition, Uuid}
|
import org.apache.kafka.common.{IsolationLevel, Node, TopicIdPartition, TopicPartition, Uuid}
|
||||||
import org.apache.kafka.image.{AclsImage, ClientQuotasImage, ClusterImageTest, ConfigurationsImage, FeaturesImage, MetadataImage, MetadataProvenance, ProducerIdsImage, TopicsDelta, TopicsImage}
|
import org.apache.kafka.image._
|
||||||
import org.apache.kafka.metadata.LeaderConstants.NO_LEADER
|
import org.apache.kafka.metadata.LeaderConstants.NO_LEADER
|
||||||
import org.apache.kafka.metadata.LeaderRecoveryState
|
import org.apache.kafka.metadata.LeaderRecoveryState
|
||||||
import org.apache.kafka.server.common.OffsetAndEpoch
|
import org.apache.kafka.server.common.OffsetAndEpoch
|
||||||
|
@ -4362,7 +4362,8 @@ class ReplicaManagerTest {
|
||||||
ConfigurationsImage.EMPTY,
|
ConfigurationsImage.EMPTY,
|
||||||
ClientQuotasImage.EMPTY,
|
ClientQuotasImage.EMPTY,
|
||||||
ProducerIdsImage.EMPTY,
|
ProducerIdsImage.EMPTY,
|
||||||
AclsImage.EMPTY
|
AclsImage.EMPTY,
|
||||||
|
ScramImage.EMPTY
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -32,6 +32,7 @@ import org.apache.kafka.raft.OffsetAndEpoch
|
||||||
import org.apache.kafka.server.common.ApiMessageAndVersion
|
import org.apache.kafka.server.common.ApiMessageAndVersion
|
||||||
import org.apache.kafka.snapshot.{MockRawSnapshotWriter, RecordsSnapshotWriter, SnapshotWriter}
|
import org.apache.kafka.snapshot.{MockRawSnapshotWriter, RecordsSnapshotWriter, SnapshotWriter}
|
||||||
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}
|
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}
|
||||||
|
import org.junit.jupiter.api.Timeout
|
||||||
import org.junit.jupiter.api.Test
|
import org.junit.jupiter.api.Test
|
||||||
|
|
||||||
import java.util
|
import java.util
|
||||||
|
@ -58,7 +59,7 @@ class BrokerMetadataSnapshotterTest {
|
||||||
new MockRawSnapshotWriter(offsetAndEpoch, consumeSnapshotBuffer(committedOffset, committedEpoch, lastContainedLogTime))
|
new MockRawSnapshotWriter(offsetAndEpoch, consumeSnapshotBuffer(committedOffset, committedEpoch, lastContainedLogTime))
|
||||||
)
|
)
|
||||||
},
|
},
|
||||||
1024,
|
4096,
|
||||||
MemoryPool.NONE,
|
MemoryPool.NONE,
|
||||||
Time.SYSTEM,
|
Time.SYSTEM,
|
||||||
lastContainedLogTime,
|
lastContainedLogTime,
|
||||||
|
@ -96,6 +97,7 @@ class BrokerMetadataSnapshotterTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@Timeout(30)
|
||||||
def testCreateSnapshot(): Unit = {
|
def testCreateSnapshot(): Unit = {
|
||||||
val writerBuilder = new MockSnapshotWriterBuilder()
|
val writerBuilder = new MockSnapshotWriterBuilder()
|
||||||
val snapshotter = new BrokerMetadataSnapshotter(0, Time.SYSTEM, None, writerBuilder)
|
val snapshotter = new BrokerMetadataSnapshotter(0, Time.SYSTEM, None, writerBuilder)
|
||||||
|
|
|
@ -26,6 +26,8 @@ import org.apache.kafka.common.message.AlterPartitionRequestData;
|
||||||
import org.apache.kafka.common.message.AlterPartitionResponseData;
|
import org.apache.kafka.common.message.AlterPartitionResponseData;
|
||||||
import org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData;
|
import org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData;
|
||||||
import org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData;
|
import org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData;
|
||||||
|
import org.apache.kafka.common.message.AlterUserScramCredentialsRequestData;
|
||||||
|
import org.apache.kafka.common.message.AlterUserScramCredentialsResponseData;
|
||||||
import org.apache.kafka.common.message.BrokerHeartbeatRequestData;
|
import org.apache.kafka.common.message.BrokerHeartbeatRequestData;
|
||||||
import org.apache.kafka.common.message.BrokerRegistrationRequestData;
|
import org.apache.kafka.common.message.BrokerRegistrationRequestData;
|
||||||
import org.apache.kafka.common.message.CreatePartitionsRequestData.CreatePartitionsTopic;
|
import org.apache.kafka.common.message.CreatePartitionsRequestData.CreatePartitionsTopic;
|
||||||
|
@ -67,6 +69,19 @@ public interface Controller extends AclMutator, AutoCloseable {
|
||||||
AlterPartitionRequestData request
|
AlterPartitionRequestData request
|
||||||
);
|
);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Alter the user SCRAM credentials.
|
||||||
|
*
|
||||||
|
* @param context The controller request context.
|
||||||
|
* @param request The AlterUserScramCredentialsRequest data.
|
||||||
|
*
|
||||||
|
* @return A future yielding the response.
|
||||||
|
*/
|
||||||
|
CompletableFuture<AlterUserScramCredentialsResponseData> alterUserScramCredentials(
|
||||||
|
ControllerRequestContext context,
|
||||||
|
AlterUserScramCredentialsRequestData request
|
||||||
|
);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create a batch of topics.
|
* Create a batch of topics.
|
||||||
*
|
*
|
||||||
|
|
|
@ -144,6 +144,8 @@ final class ControllerMetricsManager {
|
||||||
case PRODUCER_IDS_RECORD:
|
case PRODUCER_IDS_RECORD:
|
||||||
case ACCESS_CONTROL_ENTRY_RECORD:
|
case ACCESS_CONTROL_ENTRY_RECORD:
|
||||||
case REMOVE_ACCESS_CONTROL_ENTRY_RECORD:
|
case REMOVE_ACCESS_CONTROL_ENTRY_RECORD:
|
||||||
|
case USER_SCRAM_CREDENTIAL_RECORD:
|
||||||
|
case REMOVE_USER_SCRAM_CREDENTIAL_RECORD:
|
||||||
case NO_OP_RECORD:
|
case NO_OP_RECORD:
|
||||||
case ZK_MIGRATION_STATE_RECORD:
|
case ZK_MIGRATION_STATE_RECORD:
|
||||||
// These record types do not affect metrics
|
// These record types do not affect metrics
|
||||||
|
|
|
@ -36,6 +36,8 @@ import org.apache.kafka.common.message.AlterPartitionRequestData;
|
||||||
import org.apache.kafka.common.message.AlterPartitionResponseData;
|
import org.apache.kafka.common.message.AlterPartitionResponseData;
|
||||||
import org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData;
|
import org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData;
|
||||||
import org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData;
|
import org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData;
|
||||||
|
import org.apache.kafka.common.message.AlterUserScramCredentialsRequestData;
|
||||||
|
import org.apache.kafka.common.message.AlterUserScramCredentialsResponseData;
|
||||||
import org.apache.kafka.common.message.BrokerHeartbeatRequestData;
|
import org.apache.kafka.common.message.BrokerHeartbeatRequestData;
|
||||||
import org.apache.kafka.common.message.BrokerRegistrationRequestData;
|
import org.apache.kafka.common.message.BrokerRegistrationRequestData;
|
||||||
import org.apache.kafka.common.message.CreatePartitionsRequestData.CreatePartitionsTopic;
|
import org.apache.kafka.common.message.CreatePartitionsRequestData.CreatePartitionsTopic;
|
||||||
|
@ -62,6 +64,8 @@ import org.apache.kafka.common.metadata.ProducerIdsRecord;
|
||||||
import org.apache.kafka.common.metadata.RegisterBrokerRecord;
|
import org.apache.kafka.common.metadata.RegisterBrokerRecord;
|
||||||
import org.apache.kafka.common.metadata.RemoveAccessControlEntryRecord;
|
import org.apache.kafka.common.metadata.RemoveAccessControlEntryRecord;
|
||||||
import org.apache.kafka.common.metadata.RemoveTopicRecord;
|
import org.apache.kafka.common.metadata.RemoveTopicRecord;
|
||||||
|
import org.apache.kafka.common.metadata.UserScramCredentialRecord;
|
||||||
|
import org.apache.kafka.common.metadata.RemoveUserScramCredentialRecord;
|
||||||
import org.apache.kafka.common.metadata.TopicRecord;
|
import org.apache.kafka.common.metadata.TopicRecord;
|
||||||
import org.apache.kafka.common.metadata.UnfenceBrokerRecord;
|
import org.apache.kafka.common.metadata.UnfenceBrokerRecord;
|
||||||
import org.apache.kafka.common.metadata.UnregisterBrokerRecord;
|
import org.apache.kafka.common.metadata.UnregisterBrokerRecord;
|
||||||
|
@ -1409,6 +1413,12 @@ public final class QuorumController implements Controller {
|
||||||
case REMOVE_ACCESS_CONTROL_ENTRY_RECORD:
|
case REMOVE_ACCESS_CONTROL_ENTRY_RECORD:
|
||||||
aclControlManager.replay((RemoveAccessControlEntryRecord) message, snapshotId);
|
aclControlManager.replay((RemoveAccessControlEntryRecord) message, snapshotId);
|
||||||
break;
|
break;
|
||||||
|
case USER_SCRAM_CREDENTIAL_RECORD:
|
||||||
|
scramControlManager.replay((UserScramCredentialRecord) message);
|
||||||
|
break;
|
||||||
|
case REMOVE_USER_SCRAM_CREDENTIAL_RECORD:
|
||||||
|
scramControlManager.replay((RemoveUserScramCredentialRecord) message);
|
||||||
|
break;
|
||||||
case NO_OP_RECORD:
|
case NO_OP_RECORD:
|
||||||
// NoOpRecord is an empty record and doesn't need to be replayed
|
// NoOpRecord is an empty record and doesn't need to be replayed
|
||||||
break;
|
break;
|
||||||
|
@ -1529,6 +1539,11 @@ public final class QuorumController implements Controller {
|
||||||
*/
|
*/
|
||||||
private final ReplicationControlManager replicationControl;
|
private final ReplicationControlManager replicationControl;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Manages SCRAM credentials, if there are any.
|
||||||
|
*/
|
||||||
|
private final ScramControlManager scramControlManager;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The ClusterMetadataAuthorizer, if one is configured. Note that this will still be
|
* The ClusterMetadataAuthorizer, if one is configured. Note that this will still be
|
||||||
* Optional.empty() if an Authorizer is configured that doesn't use __cluster_metadata.
|
* Optional.empty() if an Authorizer is configured that doesn't use __cluster_metadata.
|
||||||
|
@ -1728,6 +1743,10 @@ public final class QuorumController implements Controller {
|
||||||
setCreateTopicPolicy(createTopicPolicy).
|
setCreateTopicPolicy(createTopicPolicy).
|
||||||
setFeatureControl(featureControl).
|
setFeatureControl(featureControl).
|
||||||
build();
|
build();
|
||||||
|
this.scramControlManager = new ScramControlManager.Builder().
|
||||||
|
setLogContext(logContext).
|
||||||
|
setSnapshotRegistry(snapshotRegistry).
|
||||||
|
build();
|
||||||
this.authorizer = authorizer;
|
this.authorizer = authorizer;
|
||||||
authorizer.ifPresent(a -> a.setAclMutator(this));
|
authorizer.ifPresent(a -> a.setAclMutator(this));
|
||||||
this.aclControlManager = new AclControlManager(snapshotRegistry, authorizer);
|
this.aclControlManager = new AclControlManager(snapshotRegistry, authorizer);
|
||||||
|
@ -1762,6 +1781,18 @@ public final class QuorumController implements Controller {
|
||||||
() -> replicationControl.alterPartition(context, request));
|
() -> replicationControl.alterPartition(context, request));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public CompletableFuture<AlterUserScramCredentialsResponseData> alterUserScramCredentials(
|
||||||
|
ControllerRequestContext context,
|
||||||
|
AlterUserScramCredentialsRequestData request
|
||||||
|
) {
|
||||||
|
if (request.deletions().isEmpty() && request.upsertions().isEmpty()) {
|
||||||
|
return CompletableFuture.completedFuture(new AlterUserScramCredentialsResponseData());
|
||||||
|
}
|
||||||
|
return appendWriteEvent("alterUserScramCredentials", context.deadlineNs(),
|
||||||
|
() -> scramControlManager.alterCredentials(request, featureControl.metadataVersion()));
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public CompletableFuture<CreateTopicsResponseData> createTopics(
|
public CompletableFuture<CreateTopicsResponseData> createTopics(
|
||||||
ControllerRequestContext context,
|
ControllerRequestContext context,
|
||||||
|
|
|
@ -0,0 +1,324 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.kafka.controller;
|
||||||
|
|
||||||
|
import org.apache.kafka.clients.admin.ScramMechanism;
|
||||||
|
import org.apache.kafka.common.message.AlterUserScramCredentialsRequestData;
|
||||||
|
import org.apache.kafka.common.message.AlterUserScramCredentialsRequestData.ScramCredentialDeletion;
|
||||||
|
import org.apache.kafka.common.message.AlterUserScramCredentialsRequestData.ScramCredentialUpsertion;
|
||||||
|
import org.apache.kafka.common.message.AlterUserScramCredentialsResponseData;
|
||||||
|
import org.apache.kafka.common.message.AlterUserScramCredentialsResponseData.AlterUserScramCredentialsResult;
|
||||||
|
import org.apache.kafka.common.metadata.RemoveUserScramCredentialRecord;
|
||||||
|
import org.apache.kafka.common.metadata.UserScramCredentialRecord;
|
||||||
|
import org.apache.kafka.common.requests.ApiError;
|
||||||
|
import org.apache.kafka.common.utils.LogContext;
|
||||||
|
import org.apache.kafka.server.common.ApiMessageAndVersion;
|
||||||
|
import org.apache.kafka.server.common.MetadataVersion;
|
||||||
|
import org.apache.kafka.timeline.SnapshotRegistry;
|
||||||
|
import org.apache.kafka.timeline.TimelineHashMap;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Map.Entry;
|
||||||
|
import java.util.Objects;
|
||||||
|
|
||||||
|
import static org.apache.kafka.common.protocol.Errors.DUPLICATE_RESOURCE;
|
||||||
|
import static org.apache.kafka.common.protocol.Errors.NONE;
|
||||||
|
import static org.apache.kafka.common.protocol.Errors.RESOURCE_NOT_FOUND;
|
||||||
|
import static org.apache.kafka.common.protocol.Errors.UNSUPPORTED_VERSION;
|
||||||
|
import static org.apache.kafka.common.protocol.Errors.UNACCEPTABLE_CREDENTIAL;
|
||||||
|
import static org.apache.kafka.common.protocol.Errors.UNSUPPORTED_SASL_MECHANISM;
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Manages SCRAM credentials.
|
||||||
|
*/
|
||||||
|
public class ScramControlManager {
|
||||||
|
static final int MAX_ITERATIONS = 16384;
|
||||||
|
|
||||||
|
static class Builder {
|
||||||
|
private LogContext logContext = null;
|
||||||
|
private SnapshotRegistry snapshotRegistry = null;
|
||||||
|
|
||||||
|
Builder setLogContext(LogContext logContext) {
|
||||||
|
this.logContext = logContext;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
Builder setSnapshotRegistry(SnapshotRegistry snapshotRegistry) {
|
||||||
|
this.snapshotRegistry = snapshotRegistry;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
ScramControlManager build() {
|
||||||
|
if (logContext == null) logContext = new LogContext();
|
||||||
|
if (snapshotRegistry == null) snapshotRegistry = new SnapshotRegistry(logContext);
|
||||||
|
return new ScramControlManager(logContext,
|
||||||
|
snapshotRegistry);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static class ScramCredentialKey {
|
||||||
|
private final String username;
|
||||||
|
private final ScramMechanism mechanism;
|
||||||
|
|
||||||
|
ScramCredentialKey(String username, ScramMechanism mechanism) {
|
||||||
|
this.username = username;
|
||||||
|
this.mechanism = mechanism;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int hashCode() {
|
||||||
|
return Objects.hash(username, mechanism);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean equals(Object o) {
|
||||||
|
if (o == null) return false;
|
||||||
|
if (!(o.getClass() == this.getClass())) return false;
|
||||||
|
ScramCredentialKey other = (ScramCredentialKey) o;
|
||||||
|
return username.equals(other.username) &&
|
||||||
|
mechanism.equals(other.mechanism);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return "ScramCredentialKey" +
|
||||||
|
"(username=" + username +
|
||||||
|
", mechanism=" + mechanism +
|
||||||
|
")";
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static class ScramCredentialValue {
|
||||||
|
private final byte[] salt;
|
||||||
|
private final byte[] saltedPassword;
|
||||||
|
private final int iterations;
|
||||||
|
|
||||||
|
ScramCredentialValue(
|
||||||
|
byte[] salt,
|
||||||
|
byte[] saltedPassword,
|
||||||
|
int iterations
|
||||||
|
) {
|
||||||
|
this.salt = salt;
|
||||||
|
this.saltedPassword = saltedPassword;
|
||||||
|
this.iterations = iterations;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int hashCode() {
|
||||||
|
return Objects.hash(salt, saltedPassword, iterations);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean equals(Object o) {
|
||||||
|
if (o == null) return false;
|
||||||
|
if (!(o.getClass() == this.getClass())) return false;
|
||||||
|
ScramCredentialValue other = (ScramCredentialValue) o;
|
||||||
|
return Arrays.equals(salt, other.salt) &&
|
||||||
|
Arrays.equals(saltedPassword, other.saltedPassword) &&
|
||||||
|
iterations == other.iterations;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return "ScramCredentialValue" +
|
||||||
|
"(salt=" + "[hidden]" +
|
||||||
|
", saltedPassword=" + "[hidden]" +
|
||||||
|
", iterations=" + "[hidden]" +
|
||||||
|
")";
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private final Logger log;
|
||||||
|
private final TimelineHashMap<ScramCredentialKey, ScramCredentialValue> credentials;
|
||||||
|
|
||||||
|
private ScramControlManager(
|
||||||
|
LogContext logContext,
|
||||||
|
SnapshotRegistry snapshotRegistry
|
||||||
|
) {
|
||||||
|
this.log = logContext.logger(ScramControlManager.class);
|
||||||
|
this.credentials = new TimelineHashMap<>(snapshotRegistry, 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Pass in the MetadataVersion so that we can return a response to the caller
|
||||||
|
* if the current metadataVersion is too low.
|
||||||
|
*/
|
||||||
|
public ControllerResult<AlterUserScramCredentialsResponseData> alterCredentials(
|
||||||
|
AlterUserScramCredentialsRequestData request,
|
||||||
|
MetadataVersion metadataVersion
|
||||||
|
) {
|
||||||
|
boolean scramIsSupported = metadataVersion.isScramSupported();
|
||||||
|
Map<String, ScramCredentialDeletion> userToDeletion = new HashMap<>();
|
||||||
|
Map<String, ScramCredentialUpsertion> userToUpsert = new HashMap<>();
|
||||||
|
Map<String, ApiError> userToError = new HashMap<>();
|
||||||
|
|
||||||
|
for (ScramCredentialDeletion deletion : request.deletions()) {
|
||||||
|
if (!userToError.containsKey(deletion.name())) {
|
||||||
|
if (userToDeletion.remove(deletion.name()) != null) {
|
||||||
|
userToError.put(deletion.name(), new ApiError(DUPLICATE_RESOURCE,
|
||||||
|
"A user credential cannot be altered twice in the same request"));
|
||||||
|
} else {
|
||||||
|
if (!scramIsSupported) {
|
||||||
|
userToError.put(deletion.name(), new ApiError(UNSUPPORTED_VERSION,
|
||||||
|
"The current metadata version does not support SCRAM"));
|
||||||
|
} else {
|
||||||
|
ApiError error = validateDeletion(deletion);
|
||||||
|
if (error.isFailure()) {
|
||||||
|
userToError.put(deletion.name(), error);
|
||||||
|
} else {
|
||||||
|
userToDeletion.put(deletion.name(), deletion);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for (ScramCredentialUpsertion upsertion : request.upsertions()) {
|
||||||
|
if (!userToError.containsKey(upsertion.name())) {
|
||||||
|
if (userToDeletion.remove(upsertion.name()) != null ||
|
||||||
|
userToUpsert.remove(upsertion.name()) != null) {
|
||||||
|
userToError.put(upsertion.name(), new ApiError(DUPLICATE_RESOURCE,
|
||||||
|
"A user credential cannot be altered twice in the same request"));
|
||||||
|
} else {
|
||||||
|
if (!scramIsSupported) {
|
||||||
|
userToError.put(upsertion.name(), new ApiError(UNSUPPORTED_VERSION,
|
||||||
|
"The current metadata version does not support SCRAM"));
|
||||||
|
} else {
|
||||||
|
ApiError error = validateUpsertion(upsertion);
|
||||||
|
if (error.isFailure()) {
|
||||||
|
userToError.put(upsertion.name(), error);
|
||||||
|
} else {
|
||||||
|
userToUpsert.put(upsertion.name(), upsertion);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
AlterUserScramCredentialsResponseData response = new AlterUserScramCredentialsResponseData();
|
||||||
|
List<ApiMessageAndVersion> records = new ArrayList<>();
|
||||||
|
for (ScramCredentialDeletion deletion : userToDeletion.values()) {
|
||||||
|
response.results().add(new AlterUserScramCredentialsResult().
|
||||||
|
setUser(deletion.name()).
|
||||||
|
setErrorCode(NONE.code()).
|
||||||
|
setErrorMessage(null));
|
||||||
|
records.add(new ApiMessageAndVersion(new RemoveUserScramCredentialRecord().
|
||||||
|
setName(deletion.name()).
|
||||||
|
setMechanism(deletion.mechanism()), (short) 0));
|
||||||
|
}
|
||||||
|
for (ScramCredentialUpsertion upsertion : userToUpsert.values()) {
|
||||||
|
response.results().add(new AlterUserScramCredentialsResult().
|
||||||
|
setUser(upsertion.name()).
|
||||||
|
setErrorCode(NONE.code()).
|
||||||
|
setErrorMessage(null));
|
||||||
|
records.add(new ApiMessageAndVersion(new UserScramCredentialRecord().
|
||||||
|
setName(upsertion.name()).
|
||||||
|
setMechanism(upsertion.mechanism()).
|
||||||
|
setSalt(upsertion.salt()).
|
||||||
|
setSaltedPassword(upsertion.saltedPassword()).
|
||||||
|
setIterations(upsertion.iterations()), (short) 0));
|
||||||
|
}
|
||||||
|
for (Entry<String, ApiError> entry : userToError.entrySet()) {
|
||||||
|
response.results().add(new AlterUserScramCredentialsResult().
|
||||||
|
setUser(entry.getKey()).
|
||||||
|
setErrorCode(entry.getValue().error().code()).
|
||||||
|
setErrorMessage(entry.getValue().message()));
|
||||||
|
}
|
||||||
|
return ControllerResult.atomicOf(records, response);
|
||||||
|
}
|
||||||
|
|
||||||
|
static ApiError validateUpsertion(ScramCredentialUpsertion upsertion) {
|
||||||
|
ScramMechanism mechanism = ScramMechanism.fromType(upsertion.mechanism());
|
||||||
|
ApiError error = validateScramUsernameAndMechanism(upsertion.name(), mechanism);
|
||||||
|
if (error.isFailure()) return error;
|
||||||
|
org.apache.kafka.common.security.scram.internals.ScramMechanism internalMechanism =
|
||||||
|
org.apache.kafka.common.security.scram.internals.ScramMechanism.forMechanismName(mechanism.mechanismName());
|
||||||
|
if (upsertion.iterations() < internalMechanism.minIterations()) {
|
||||||
|
return new ApiError(UNACCEPTABLE_CREDENTIAL, "Too few iterations");
|
||||||
|
} else if (upsertion.iterations() > MAX_ITERATIONS) {
|
||||||
|
return new ApiError(UNACCEPTABLE_CREDENTIAL, "Too many iterations");
|
||||||
|
}
|
||||||
|
return ApiError.NONE;
|
||||||
|
}
|
||||||
|
|
||||||
|
ApiError validateDeletion(ScramCredentialDeletion deletion) {
|
||||||
|
ApiError error = validateScramUsernameAndMechanism(deletion.name(),
|
||||||
|
ScramMechanism.fromType(deletion.mechanism()));
|
||||||
|
if (error.isFailure()) return error;
|
||||||
|
ScramCredentialKey key = new ScramCredentialKey(deletion.name(),
|
||||||
|
ScramMechanism.fromType(deletion.mechanism()));
|
||||||
|
if (!credentials.containsKey(key)) {
|
||||||
|
return new ApiError(RESOURCE_NOT_FOUND,
|
||||||
|
"Attempt to delete a user credential that does not exist");
|
||||||
|
}
|
||||||
|
return ApiError.NONE;
|
||||||
|
}
|
||||||
|
|
||||||
|
static ApiError validateScramUsernameAndMechanism(
|
||||||
|
String username,
|
||||||
|
ScramMechanism mechanism
|
||||||
|
) {
|
||||||
|
if (username.isEmpty()) {
|
||||||
|
return new ApiError(UNACCEPTABLE_CREDENTIAL, "Username must not be empty");
|
||||||
|
}
|
||||||
|
if (mechanism == ScramMechanism.UNKNOWN) {
|
||||||
|
return new ApiError(UNSUPPORTED_SASL_MECHANISM, "Unknown SCRAM mechanism");
|
||||||
|
}
|
||||||
|
return ApiError.NONE;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void replay(RemoveUserScramCredentialRecord record) {
|
||||||
|
ScramCredentialKey key = new ScramCredentialKey(record.name(),
|
||||||
|
ScramMechanism.fromType(record.mechanism()));
|
||||||
|
if (credentials.remove(key) == null) {
|
||||||
|
throw new RuntimeException("Unable to find credential to delete: " + key);
|
||||||
|
}
|
||||||
|
log.info("Removed SCRAM credential for {} with mechanism {}.",
|
||||||
|
key.username, key.mechanism);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void replay(UserScramCredentialRecord record) {
|
||||||
|
ScramCredentialKey key = new ScramCredentialKey(record.name(),
|
||||||
|
ScramMechanism.fromType(record.mechanism()));
|
||||||
|
ScramCredentialValue value = new ScramCredentialValue(record.salt(),
|
||||||
|
record.saltedPassword(),
|
||||||
|
record.iterations());
|
||||||
|
if (credentials.put(key, value) == null) {
|
||||||
|
log.info("Created new SCRAM credential for {} with mechanism {}.",
|
||||||
|
key.username, key.mechanism);
|
||||||
|
} else {
|
||||||
|
log.info("Modified SCRAM credential for {} with mechanism {}.",
|
||||||
|
key.username, key.mechanism);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
ApiMessageAndVersion toRecord(ScramCredentialKey key, ScramCredentialValue value) {
|
||||||
|
return new ApiMessageAndVersion(new UserScramCredentialRecord().
|
||||||
|
setName(key.username).
|
||||||
|
setMechanism(key.mechanism.type()).
|
||||||
|
setSalt(value.salt).
|
||||||
|
setSaltedPassword(value.saltedPassword).
|
||||||
|
setIterations(value.iterations),
|
||||||
|
(short) 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -30,9 +30,11 @@ import org.apache.kafka.common.metadata.ProducerIdsRecord;
|
||||||
import org.apache.kafka.common.metadata.RegisterBrokerRecord;
|
import org.apache.kafka.common.metadata.RegisterBrokerRecord;
|
||||||
import org.apache.kafka.common.metadata.RemoveAccessControlEntryRecord;
|
import org.apache.kafka.common.metadata.RemoveAccessControlEntryRecord;
|
||||||
import org.apache.kafka.common.metadata.RemoveTopicRecord;
|
import org.apache.kafka.common.metadata.RemoveTopicRecord;
|
||||||
|
import org.apache.kafka.common.metadata.RemoveUserScramCredentialRecord;
|
||||||
import org.apache.kafka.common.metadata.TopicRecord;
|
import org.apache.kafka.common.metadata.TopicRecord;
|
||||||
import org.apache.kafka.common.metadata.UnfenceBrokerRecord;
|
import org.apache.kafka.common.metadata.UnfenceBrokerRecord;
|
||||||
import org.apache.kafka.common.metadata.UnregisterBrokerRecord;
|
import org.apache.kafka.common.metadata.UnregisterBrokerRecord;
|
||||||
|
import org.apache.kafka.common.metadata.UserScramCredentialRecord;
|
||||||
import org.apache.kafka.common.protocol.ApiMessage;
|
import org.apache.kafka.common.protocol.ApiMessage;
|
||||||
import org.apache.kafka.server.common.MetadataVersion;
|
import org.apache.kafka.server.common.MetadataVersion;
|
||||||
|
|
||||||
|
@ -72,6 +74,8 @@ public final class MetadataDelta {
|
||||||
|
|
||||||
private AclsDelta aclsDelta = null;
|
private AclsDelta aclsDelta = null;
|
||||||
|
|
||||||
|
private ScramDelta scramDelta = null;
|
||||||
|
|
||||||
public MetadataDelta(MetadataImage image) {
|
public MetadataDelta(MetadataImage image) {
|
||||||
this.image = image;
|
this.image = image;
|
||||||
}
|
}
|
||||||
|
@ -145,6 +149,15 @@ public final class MetadataDelta {
|
||||||
return aclsDelta;
|
return aclsDelta;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public ScramDelta scramDelta() {
|
||||||
|
return scramDelta;
|
||||||
|
}
|
||||||
|
|
||||||
|
public ScramDelta getOrCreateScramDelta() {
|
||||||
|
if (scramDelta == null) scramDelta = new ScramDelta(image.scram());
|
||||||
|
return scramDelta;
|
||||||
|
}
|
||||||
|
|
||||||
public Optional<MetadataVersion> metadataVersionChanged() {
|
public Optional<MetadataVersion> metadataVersionChanged() {
|
||||||
if (featuresDelta == null) {
|
if (featuresDelta == null) {
|
||||||
return Optional.empty();
|
return Optional.empty();
|
||||||
|
@ -183,6 +196,9 @@ public final class MetadataDelta {
|
||||||
case REMOVE_TOPIC_RECORD:
|
case REMOVE_TOPIC_RECORD:
|
||||||
replay((RemoveTopicRecord) record);
|
replay((RemoveTopicRecord) record);
|
||||||
break;
|
break;
|
||||||
|
case USER_SCRAM_CREDENTIAL_RECORD:
|
||||||
|
replay((UserScramCredentialRecord) record);
|
||||||
|
break;
|
||||||
case FEATURE_LEVEL_RECORD:
|
case FEATURE_LEVEL_RECORD:
|
||||||
replay((FeatureLevelRecord) record);
|
replay((FeatureLevelRecord) record);
|
||||||
break;
|
break;
|
||||||
|
@ -201,6 +217,9 @@ public final class MetadataDelta {
|
||||||
case REMOVE_ACCESS_CONTROL_ENTRY_RECORD:
|
case REMOVE_ACCESS_CONTROL_ENTRY_RECORD:
|
||||||
replay((RemoveAccessControlEntryRecord) record);
|
replay((RemoveAccessControlEntryRecord) record);
|
||||||
break;
|
break;
|
||||||
|
case REMOVE_USER_SCRAM_CREDENTIAL_RECORD:
|
||||||
|
replay((RemoveUserScramCredentialRecord) record);
|
||||||
|
break;
|
||||||
case NO_OP_RECORD:
|
case NO_OP_RECORD:
|
||||||
/* NoOpRecord is an empty record and doesn't need to be replayed beyond
|
/* NoOpRecord is an empty record and doesn't need to be replayed beyond
|
||||||
* updating the highest offset and epoch.
|
* updating the highest offset and epoch.
|
||||||
|
@ -251,6 +270,10 @@ public final class MetadataDelta {
|
||||||
getOrCreateConfigsDelta().replay(record, topicName);
|
getOrCreateConfigsDelta().replay(record, topicName);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void replay(UserScramCredentialRecord record) {
|
||||||
|
getOrCreateScramDelta().replay(record);
|
||||||
|
}
|
||||||
|
|
||||||
public void replay(FeatureLevelRecord record) {
|
public void replay(FeatureLevelRecord record) {
|
||||||
getOrCreateFeaturesDelta().replay(record);
|
getOrCreateFeaturesDelta().replay(record);
|
||||||
featuresDelta.metadataVersionChange().ifPresent(changedMetadataVersion -> {
|
featuresDelta.metadataVersionChange().ifPresent(changedMetadataVersion -> {
|
||||||
|
@ -261,6 +284,7 @@ public final class MetadataDelta {
|
||||||
getOrCreateClientQuotasDelta().handleMetadataVersionChange(changedMetadataVersion);
|
getOrCreateClientQuotasDelta().handleMetadataVersionChange(changedMetadataVersion);
|
||||||
getOrCreateProducerIdsDelta().handleMetadataVersionChange(changedMetadataVersion);
|
getOrCreateProducerIdsDelta().handleMetadataVersionChange(changedMetadataVersion);
|
||||||
getOrCreateAclsDelta().handleMetadataVersionChange(changedMetadataVersion);
|
getOrCreateAclsDelta().handleMetadataVersionChange(changedMetadataVersion);
|
||||||
|
getOrCreateScramDelta().handleMetadataVersionChange(changedMetadataVersion);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -284,6 +308,10 @@ public final class MetadataDelta {
|
||||||
getOrCreateAclsDelta().replay(record);
|
getOrCreateAclsDelta().replay(record);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void replay(RemoveUserScramCredentialRecord record) {
|
||||||
|
getOrCreateScramDelta().replay(record);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create removal deltas for anything which was in the base image, but which was not
|
* Create removal deltas for anything which was in the base image, but which was not
|
||||||
* referenced in the snapshot records we just applied.
|
* referenced in the snapshot records we just applied.
|
||||||
|
@ -296,6 +324,7 @@ public final class MetadataDelta {
|
||||||
getOrCreateClientQuotasDelta().finishSnapshot();
|
getOrCreateClientQuotasDelta().finishSnapshot();
|
||||||
getOrCreateProducerIdsDelta().finishSnapshot();
|
getOrCreateProducerIdsDelta().finishSnapshot();
|
||||||
getOrCreateAclsDelta().finishSnapshot();
|
getOrCreateAclsDelta().finishSnapshot();
|
||||||
|
getOrCreateScramDelta().finishSnapshot();
|
||||||
}
|
}
|
||||||
|
|
||||||
public MetadataImage apply(MetadataProvenance provenance) {
|
public MetadataImage apply(MetadataProvenance provenance) {
|
||||||
|
@ -341,6 +370,12 @@ public final class MetadataDelta {
|
||||||
} else {
|
} else {
|
||||||
newAcls = aclsDelta.apply();
|
newAcls = aclsDelta.apply();
|
||||||
}
|
}
|
||||||
|
ScramImage newScram;
|
||||||
|
if (scramDelta == null) {
|
||||||
|
newScram = image.scram();
|
||||||
|
} else {
|
||||||
|
newScram = scramDelta.apply();
|
||||||
|
}
|
||||||
return new MetadataImage(
|
return new MetadataImage(
|
||||||
provenance,
|
provenance,
|
||||||
newFeatures,
|
newFeatures,
|
||||||
|
@ -349,7 +384,8 @@ public final class MetadataDelta {
|
||||||
newConfigs,
|
newConfigs,
|
||||||
newClientQuotas,
|
newClientQuotas,
|
||||||
newProducerIds,
|
newProducerIds,
|
||||||
newAcls
|
newAcls,
|
||||||
|
newScram
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -363,6 +399,7 @@ public final class MetadataDelta {
|
||||||
", clientQuotasDelta=" + clientQuotasDelta +
|
", clientQuotasDelta=" + clientQuotasDelta +
|
||||||
", producerIdsDelta=" + producerIdsDelta +
|
", producerIdsDelta=" + producerIdsDelta +
|
||||||
", aclsDelta=" + aclsDelta +
|
", aclsDelta=" + aclsDelta +
|
||||||
|
", scramDelta=" + scramDelta +
|
||||||
')';
|
')';
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -38,7 +38,8 @@ public final class MetadataImage {
|
||||||
ConfigurationsImage.EMPTY,
|
ConfigurationsImage.EMPTY,
|
||||||
ClientQuotasImage.EMPTY,
|
ClientQuotasImage.EMPTY,
|
||||||
ProducerIdsImage.EMPTY,
|
ProducerIdsImage.EMPTY,
|
||||||
AclsImage.EMPTY);
|
AclsImage.EMPTY,
|
||||||
|
ScramImage.EMPTY);
|
||||||
|
|
||||||
private final MetadataProvenance provenance;
|
private final MetadataProvenance provenance;
|
||||||
|
|
||||||
|
@ -56,6 +57,8 @@ public final class MetadataImage {
|
||||||
|
|
||||||
private final AclsImage acls;
|
private final AclsImage acls;
|
||||||
|
|
||||||
|
private final ScramImage scram;
|
||||||
|
|
||||||
public MetadataImage(
|
public MetadataImage(
|
||||||
MetadataProvenance provenance,
|
MetadataProvenance provenance,
|
||||||
FeaturesImage features,
|
FeaturesImage features,
|
||||||
|
@ -64,7 +67,8 @@ public final class MetadataImage {
|
||||||
ConfigurationsImage configs,
|
ConfigurationsImage configs,
|
||||||
ClientQuotasImage clientQuotas,
|
ClientQuotasImage clientQuotas,
|
||||||
ProducerIdsImage producerIds,
|
ProducerIdsImage producerIds,
|
||||||
AclsImage acls
|
AclsImage acls,
|
||||||
|
ScramImage scram
|
||||||
) {
|
) {
|
||||||
this.provenance = provenance;
|
this.provenance = provenance;
|
||||||
this.features = features;
|
this.features = features;
|
||||||
|
@ -74,6 +78,7 @@ public final class MetadataImage {
|
||||||
this.clientQuotas = clientQuotas;
|
this.clientQuotas = clientQuotas;
|
||||||
this.producerIds = producerIds;
|
this.producerIds = producerIds;
|
||||||
this.acls = acls;
|
this.acls = acls;
|
||||||
|
this.scram = scram;
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean isEmpty() {
|
public boolean isEmpty() {
|
||||||
|
@ -83,7 +88,8 @@ public final class MetadataImage {
|
||||||
configs.isEmpty() &&
|
configs.isEmpty() &&
|
||||||
clientQuotas.isEmpty() &&
|
clientQuotas.isEmpty() &&
|
||||||
producerIds.isEmpty() &&
|
producerIds.isEmpty() &&
|
||||||
acls.isEmpty();
|
acls.isEmpty() &&
|
||||||
|
scram.isEmpty();
|
||||||
}
|
}
|
||||||
|
|
||||||
public MetadataProvenance provenance() {
|
public MetadataProvenance provenance() {
|
||||||
|
@ -126,6 +132,10 @@ public final class MetadataImage {
|
||||||
return acls;
|
return acls;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public ScramImage scram() {
|
||||||
|
return scram;
|
||||||
|
}
|
||||||
|
|
||||||
public void write(ImageWriter writer, ImageWriterOptions options) {
|
public void write(ImageWriter writer, ImageWriterOptions options) {
|
||||||
// Features should be written out first so we can include the metadata.version at the beginning of the
|
// Features should be written out first so we can include the metadata.version at the beginning of the
|
||||||
// snapshot
|
// snapshot
|
||||||
|
@ -136,6 +146,7 @@ public final class MetadataImage {
|
||||||
clientQuotas.write(writer, options);
|
clientQuotas.write(writer, options);
|
||||||
producerIds.write(writer, options);
|
producerIds.write(writer, options);
|
||||||
acls.write(writer, options);
|
acls.write(writer, options);
|
||||||
|
scram.write(writer, options);
|
||||||
writer.close(true);
|
writer.close(true);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -150,7 +161,8 @@ public final class MetadataImage {
|
||||||
configs.equals(other.configs) &&
|
configs.equals(other.configs) &&
|
||||||
clientQuotas.equals(other.clientQuotas) &&
|
clientQuotas.equals(other.clientQuotas) &&
|
||||||
producerIds.equals(other.producerIds) &&
|
producerIds.equals(other.producerIds) &&
|
||||||
acls.equals(other.acls);
|
acls.equals(other.acls) &&
|
||||||
|
scram.equals(other.scram);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -163,7 +175,8 @@ public final class MetadataImage {
|
||||||
configs,
|
configs,
|
||||||
clientQuotas,
|
clientQuotas,
|
||||||
producerIds,
|
producerIds,
|
||||||
acls);
|
acls,
|
||||||
|
scram);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -177,6 +190,7 @@ public final class MetadataImage {
|
||||||
", clientQuotas=" + clientQuotas +
|
", clientQuotas=" + clientQuotas +
|
||||||
", producerIdsImage=" + producerIds +
|
", producerIdsImage=" + producerIds +
|
||||||
", acls=" + acls +
|
", acls=" + acls +
|
||||||
|
", scram=" + scram +
|
||||||
")";
|
")";
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,118 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.kafka.image;
|
||||||
|
|
||||||
|
import org.apache.kafka.clients.admin.ScramMechanism;
|
||||||
|
import org.apache.kafka.common.metadata.UserScramCredentialRecord;
|
||||||
|
import org.apache.kafka.common.security.scram.ScramCredential;
|
||||||
|
import org.apache.kafka.common.security.scram.internals.ScramFormatter;
|
||||||
|
|
||||||
|
import java.security.GeneralSecurityException;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.Objects;
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Represents the ACLs in the metadata image.
|
||||||
|
*
|
||||||
|
* This class is thread-safe.
|
||||||
|
*/
|
||||||
|
public final class ScramCredentialData {
|
||||||
|
private final byte[] salt;
|
||||||
|
private final byte[] saltedPassword;
|
||||||
|
private final int iterations;
|
||||||
|
|
||||||
|
static ScramCredentialData fromRecord(
|
||||||
|
UserScramCredentialRecord record
|
||||||
|
) {
|
||||||
|
return new ScramCredentialData(
|
||||||
|
record.salt(),
|
||||||
|
record.saltedPassword(),
|
||||||
|
record.iterations());
|
||||||
|
}
|
||||||
|
|
||||||
|
public ScramCredentialData(
|
||||||
|
byte[] salt,
|
||||||
|
byte[] saltedPassword,
|
||||||
|
int iterations
|
||||||
|
) {
|
||||||
|
this.salt = salt;
|
||||||
|
this.saltedPassword = saltedPassword;
|
||||||
|
this.iterations = iterations;
|
||||||
|
}
|
||||||
|
|
||||||
|
public byte[] salt() {
|
||||||
|
return salt;
|
||||||
|
}
|
||||||
|
|
||||||
|
public byte[] saltedPassword() {
|
||||||
|
return saltedPassword;
|
||||||
|
}
|
||||||
|
|
||||||
|
public int iterations() {
|
||||||
|
return iterations;
|
||||||
|
}
|
||||||
|
|
||||||
|
public UserScramCredentialRecord toRecord(
|
||||||
|
String userName,
|
||||||
|
ScramMechanism mechanism
|
||||||
|
) {
|
||||||
|
return new UserScramCredentialRecord().
|
||||||
|
setName(userName).
|
||||||
|
setMechanism(mechanism.type()).
|
||||||
|
setSalt(salt).
|
||||||
|
setSaltedPassword(saltedPassword).
|
||||||
|
setIterations(iterations);
|
||||||
|
}
|
||||||
|
|
||||||
|
public ScramCredential toCredential(
|
||||||
|
ScramMechanism mechanism
|
||||||
|
) throws GeneralSecurityException {
|
||||||
|
org.apache.kafka.common.security.scram.internals.ScramMechanism internalMechanism =
|
||||||
|
org.apache.kafka.common.security.scram.internals.ScramMechanism.forMechanismName(mechanism.mechanismName());
|
||||||
|
ScramFormatter formatter = new ScramFormatter(internalMechanism);
|
||||||
|
return new ScramCredential(salt,
|
||||||
|
formatter.storedKey(formatter.clientKey(saltedPassword)),
|
||||||
|
formatter.serverKey(saltedPassword),
|
||||||
|
iterations);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int hashCode() {
|
||||||
|
return Objects.hash(salt, saltedPassword, iterations);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean equals(Object o) {
|
||||||
|
if (o == null) return false;
|
||||||
|
if (!o.getClass().equals(ScramCredentialData.class)) return false;
|
||||||
|
ScramCredentialData other = (ScramCredentialData) o;
|
||||||
|
return Arrays.equals(salt, other.salt) &&
|
||||||
|
Arrays.equals(saltedPassword, other.saltedPassword) &&
|
||||||
|
iterations == other.iterations;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return "ScramCredentialData" +
|
||||||
|
"(salt=" + "[hidden]" +
|
||||||
|
", saltedPassword=" + "[hidden]" +
|
||||||
|
", iterations=" + "[hidden]" +
|
||||||
|
")";
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,109 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.kafka.image;
|
||||||
|
|
||||||
|
import org.apache.kafka.clients.admin.ScramMechanism;
|
||||||
|
import org.apache.kafka.common.metadata.RemoveUserScramCredentialRecord;
|
||||||
|
import org.apache.kafka.common.metadata.UserScramCredentialRecord;
|
||||||
|
import org.apache.kafka.server.common.MetadataVersion;
|
||||||
|
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Map.Entry;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Optional;
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Represents changes to a topic in the metadata image.
|
||||||
|
*/
|
||||||
|
public final class ScramDelta {
|
||||||
|
private final ScramImage image;
|
||||||
|
|
||||||
|
private final Map<ScramMechanism, Map<String, Optional<ScramCredentialData>>> changes = new HashMap<>();
|
||||||
|
|
||||||
|
public ScramDelta(ScramImage image) {
|
||||||
|
this.image = image;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void finishSnapshot() {
|
||||||
|
for (Entry<ScramMechanism, Map<String, ScramCredentialData>> mechanismEntry : image.mechanisms().entrySet()) {
|
||||||
|
Map<String, Optional<ScramCredentialData>> userNameMap =
|
||||||
|
changes.computeIfAbsent(mechanismEntry.getKey(), __ -> new HashMap<>());
|
||||||
|
for (String userName : mechanismEntry.getValue().keySet()) {
|
||||||
|
if (!userNameMap.containsKey(userName)) {
|
||||||
|
userNameMap.put(userName, Optional.empty());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public ScramImage image() {
|
||||||
|
return image;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Map<ScramMechanism, Map<String, Optional<ScramCredentialData>>> changes() {
|
||||||
|
return changes;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void replay(UserScramCredentialRecord record) {
|
||||||
|
ScramMechanism mechanism = ScramMechanism.fromType(record.mechanism());
|
||||||
|
Map<String, Optional<ScramCredentialData>> userChanges =
|
||||||
|
changes.computeIfAbsent(mechanism, __ -> new HashMap<>());
|
||||||
|
userChanges.put(record.name(), Optional.of(ScramCredentialData.fromRecord(record)));
|
||||||
|
}
|
||||||
|
|
||||||
|
public void replay(RemoveUserScramCredentialRecord record) {
|
||||||
|
ScramMechanism mechanism = ScramMechanism.fromType(record.mechanism());
|
||||||
|
Map<String, Optional<ScramCredentialData>> userChanges =
|
||||||
|
changes.computeIfAbsent(mechanism, __ -> new HashMap<>());
|
||||||
|
userChanges.put(record.name(), Optional.empty());
|
||||||
|
}
|
||||||
|
|
||||||
|
public void handleMetadataVersionChange(MetadataVersion changedMetadataVersion) {
|
||||||
|
// nothing to do
|
||||||
|
}
|
||||||
|
|
||||||
|
public ScramImage apply() {
|
||||||
|
Map<ScramMechanism, Map<String, ScramCredentialData>> newMechanisms = new HashMap<>();
|
||||||
|
for (Entry<ScramMechanism, Map<String, ScramCredentialData>> mechanismEntry : image.mechanisms().entrySet()) {
|
||||||
|
newMechanisms.put(mechanismEntry.getKey(), new HashMap<>(mechanismEntry.getValue()));
|
||||||
|
}
|
||||||
|
for (Entry<ScramMechanism, Map<String, Optional<ScramCredentialData>>> mechanismChangeEntry : changes.entrySet()) {
|
||||||
|
Map<String, ScramCredentialData> userMap =
|
||||||
|
newMechanisms.computeIfAbsent(mechanismChangeEntry.getKey(), __ -> new HashMap<>());
|
||||||
|
for (Entry<String, Optional<ScramCredentialData>> userNameEntry : mechanismChangeEntry.getValue().entrySet()) {
|
||||||
|
if (userNameEntry.getValue().isPresent()) {
|
||||||
|
userMap.put(userNameEntry.getKey(), userNameEntry.getValue().get());
|
||||||
|
} else {
|
||||||
|
userMap.remove(userNameEntry.getKey());
|
||||||
|
if (userMap.isEmpty()) {
|
||||||
|
newMechanisms.remove(mechanismChangeEntry.getKey());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return new ScramImage(newMechanisms);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return "ScramDelta(" +
|
||||||
|
"changes=" + changes +
|
||||||
|
')';
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,178 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.kafka.image;
|
||||||
|
|
||||||
|
import org.apache.kafka.image.writer.ImageWriter;
|
||||||
|
import org.apache.kafka.image.writer.ImageWriterOptions;
|
||||||
|
import org.apache.kafka.clients.admin.ScramMechanism;
|
||||||
|
import org.apache.kafka.common.message.DescribeUserScramCredentialsRequestData;
|
||||||
|
import org.apache.kafka.common.message.DescribeUserScramCredentialsRequestData.UserName;
|
||||||
|
import org.apache.kafka.common.message.DescribeUserScramCredentialsResponseData;
|
||||||
|
import org.apache.kafka.common.message.DescribeUserScramCredentialsResponseData.CredentialInfo;
|
||||||
|
import org.apache.kafka.common.message.DescribeUserScramCredentialsResponseData.DescribeUserScramCredentialsResult;
|
||||||
|
import org.apache.kafka.common.protocol.Errors;
|
||||||
|
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Map.Entry;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Represents the SCRAM credentials in the metadata image.
|
||||||
|
*
|
||||||
|
* This class is thread-safe.
|
||||||
|
*/
|
||||||
|
public final class ScramImage {
|
||||||
|
public static final ScramImage EMPTY = new ScramImage(Collections.emptyMap());
|
||||||
|
|
||||||
|
private final Map<ScramMechanism, Map<String, ScramCredentialData>> mechanisms;
|
||||||
|
|
||||||
|
public ScramImage(Map<ScramMechanism, Map<String, ScramCredentialData>> mechanisms) {
|
||||||
|
this.mechanisms = Collections.unmodifiableMap(mechanisms);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void write(ImageWriter writer, ImageWriterOptions options) {
|
||||||
|
if (options.metadataVersion().isScramSupported()) {
|
||||||
|
for (Entry<ScramMechanism, Map<String, ScramCredentialData>> mechanismEntry : mechanisms.entrySet()) {
|
||||||
|
for (Entry<String, ScramCredentialData> userEntry : mechanismEntry.getValue().entrySet()) {
|
||||||
|
writer.write(0, userEntry.getValue().toRecord(userEntry.getKey(), mechanismEntry.getKey()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
boolean isEmpty = true;
|
||||||
|
StringBuffer scramImageString = new StringBuffer("ScramImage({");
|
||||||
|
for (Entry<ScramMechanism, Map<String, ScramCredentialData>> mechanismEntry : mechanisms.entrySet()) {
|
||||||
|
if (!mechanismEntry.getValue().isEmpty()) {
|
||||||
|
scramImageString.append(mechanismEntry.getKey() + ":");
|
||||||
|
List<String> users = new ArrayList<>(mechanismEntry.getValue().keySet());
|
||||||
|
scramImageString.append(users.stream().collect(Collectors.joining(", ")));
|
||||||
|
scramImageString.append("},{");
|
||||||
|
isEmpty = false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!isEmpty) {
|
||||||
|
scramImageString.append("})");
|
||||||
|
options.handleLoss(scramImageString.toString());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static final String DESCRIBE_DUPLICATE_USER = "Cannot describe SCRAM credentials for the same user twice in a single request: ";
|
||||||
|
private static final String DESCRIBE_USER_THAT_DOES_NOT_EXIST = "Attempt to describe a user credential that does not exist: ";
|
||||||
|
public DescribeUserScramCredentialsResponseData describe(DescribeUserScramCredentialsRequestData request) {
|
||||||
|
|
||||||
|
List<UserName> users = request.users();
|
||||||
|
Map<String, Boolean> uniqueUsers = new HashMap<String, Boolean>();
|
||||||
|
|
||||||
|
if ((users == null) || (users.size() == 0)) {
|
||||||
|
// If there are no users listed then get all the users
|
||||||
|
for (Map<String, ScramCredentialData> scramCredentialDataSet : mechanisms.values()) {
|
||||||
|
for (String user : scramCredentialDataSet.keySet()) {
|
||||||
|
uniqueUsers.put(user, false);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// Filter out duplicates
|
||||||
|
for (UserName user : users) {
|
||||||
|
if (uniqueUsers.containsKey(user.name())) {
|
||||||
|
uniqueUsers.put(user.name(), true);
|
||||||
|
} else {
|
||||||
|
uniqueUsers.put(user.name(), false);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
DescribeUserScramCredentialsResponseData retval = new DescribeUserScramCredentialsResponseData();
|
||||||
|
|
||||||
|
for (Map.Entry<String, Boolean> user : uniqueUsers.entrySet()) {
|
||||||
|
DescribeUserScramCredentialsResult result = new DescribeUserScramCredentialsResult().setUser(user.getKey());
|
||||||
|
|
||||||
|
if (!user.getValue()) {
|
||||||
|
boolean datafound = false;
|
||||||
|
List<CredentialInfo> credentialInfos = new ArrayList<CredentialInfo>();
|
||||||
|
for (Map.Entry<ScramMechanism, Map<String, ScramCredentialData>> mechanismsEntry : mechanisms.entrySet()) {
|
||||||
|
Map<String, ScramCredentialData> credentialDataSet = mechanismsEntry.getValue();
|
||||||
|
if (credentialDataSet.containsKey(user.getKey())) {
|
||||||
|
credentialInfos.add(new CredentialInfo().setMechanism(mechanismsEntry.getKey().type())
|
||||||
|
.setIterations(credentialDataSet.get(user.getKey()).iterations()));
|
||||||
|
datafound = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (datafound) {
|
||||||
|
result.setCredentialInfos(credentialInfos);
|
||||||
|
} else {
|
||||||
|
result.setErrorCode(Errors.RESOURCE_NOT_FOUND.code())
|
||||||
|
.setErrorMessage(DESCRIBE_USER_THAT_DOES_NOT_EXIST + user.getKey());
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
result.setErrorCode(Errors.DUPLICATE_RESOURCE.code())
|
||||||
|
.setErrorMessage(DESCRIBE_DUPLICATE_USER + user.getKey());
|
||||||
|
}
|
||||||
|
retval.results().add(result);
|
||||||
|
}
|
||||||
|
return retval;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Map<ScramMechanism, Map<String, ScramCredentialData>> mechanisms() {
|
||||||
|
return mechanisms;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isEmpty() {
|
||||||
|
return mechanisms.isEmpty();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int hashCode() {
|
||||||
|
return mechanisms.hashCode();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean equals(Object o) {
|
||||||
|
if (o == null) return false;
|
||||||
|
if (!o.getClass().equals(ScramImage.class)) return false;
|
||||||
|
ScramImage other = (ScramImage) o;
|
||||||
|
return mechanisms.equals(other.mechanisms);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
StringBuilder builder = new StringBuilder();
|
||||||
|
builder.append("ScramImage(");
|
||||||
|
List<ScramMechanism> sortedMechanisms = mechanisms.keySet().stream().sorted().collect(Collectors.toList());
|
||||||
|
String preMechanismComma = "";
|
||||||
|
for (ScramMechanism mechanism : sortedMechanisms) {
|
||||||
|
builder.append(preMechanismComma).append(mechanism).append(": {");
|
||||||
|
Map<String, ScramCredentialData> userMap = mechanisms.get(mechanism);
|
||||||
|
List<String> sortedUserNames = userMap.keySet().stream().sorted().collect(Collectors.toList());
|
||||||
|
String preUserNameComma = "";
|
||||||
|
for (String userName : sortedUserNames) {
|
||||||
|
builder.append(preUserNameComma).append(userName).append("=").append(userMap.get(userName));
|
||||||
|
preUserNameComma = ", ";
|
||||||
|
}
|
||||||
|
builder.append("}");
|
||||||
|
preMechanismComma = ", ";
|
||||||
|
}
|
||||||
|
builder.append(")");
|
||||||
|
return builder.toString();
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,28 @@
|
||||||
|
// Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
// contributor license agreements. See the NOTICE file distributed with
|
||||||
|
// this work for additional information regarding copyright ownership.
|
||||||
|
// The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
// (the "License"); you may not use this file except in compliance with
|
||||||
|
// the License. You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
{
|
||||||
|
"apiKey": 22,
|
||||||
|
"type": "metadata",
|
||||||
|
"name": "RemoveUserScramCredentialRecord",
|
||||||
|
"validVersions": "0",
|
||||||
|
"flexibleVersions": "0+",
|
||||||
|
"fields": [
|
||||||
|
{ "name": "Name", "type": "string", "versions": "0+",
|
||||||
|
"about": "The user name." },
|
||||||
|
{ "name": "Mechanism", "type": "int8", "versions": "0+",
|
||||||
|
"about": "The SCRAM mechanism." }
|
||||||
|
]
|
||||||
|
}
|
|
@ -22,15 +22,13 @@
|
||||||
"fields": [
|
"fields": [
|
||||||
{ "name": "Name", "type": "string", "versions": "0+",
|
{ "name": "Name", "type": "string", "versions": "0+",
|
||||||
"about": "The user name." },
|
"about": "The user name." },
|
||||||
{ "name": "CredentialInfos", "type": "[]CredentialInfo", "versions": "0+",
|
{ "name": "Mechanism", "type": "int8", "versions": "0+",
|
||||||
"about": "The mechanism and related information associated with the user's SCRAM credential.", "fields": [
|
"about": "The SCRAM mechanism." },
|
||||||
{ "name": "Mechanism", "type": "int8", "versions": "0+",
|
{ "name": "Salt", "type": "bytes", "versions": "0+",
|
||||||
"about": "The SCRAM mechanism." },
|
"about": "A random salt generated by the client." },
|
||||||
{ "name": "Salt", "type": "bytes", "versions": "0+",
|
{ "name": "SaltedPassword", "type": "bytes", "versions": "0+",
|
||||||
"about": "A random salt generated by the client." },
|
"about": "The salted password." },
|
||||||
{ "name": "SaltedPassword", "type": "bytes", "versions": "0+",
|
{ "name": "Iterations", "type": "int32", "versions": "0+",
|
||||||
"about": "The salted password." },
|
"about": "The number of iterations used in the SCRAM credential." }
|
||||||
{ "name": "Iterations", "type": "int32", "versions": "0+",
|
|
||||||
"about": "The number of iterations used in the SCRAM credential." }]}
|
|
||||||
]
|
]
|
||||||
}
|
}
|
||||||
|
|
|
@ -43,7 +43,8 @@ public class MetadataImageTest {
|
||||||
ConfigurationsImageTest.IMAGE1,
|
ConfigurationsImageTest.IMAGE1,
|
||||||
ClientQuotasImageTest.IMAGE1,
|
ClientQuotasImageTest.IMAGE1,
|
||||||
ProducerIdsImageTest.IMAGE1,
|
ProducerIdsImageTest.IMAGE1,
|
||||||
AclsImageTest.IMAGE1);
|
AclsImageTest.IMAGE1,
|
||||||
|
ScramImageTest.IMAGE1);
|
||||||
|
|
||||||
DELTA1 = new MetadataDelta.Builder().
|
DELTA1 = new MetadataDelta.Builder().
|
||||||
setImage(IMAGE1).
|
setImage(IMAGE1).
|
||||||
|
@ -55,6 +56,7 @@ public class MetadataImageTest {
|
||||||
RecordTestUtils.replayAll(DELTA1, ClientQuotasImageTest.DELTA1_RECORDS);
|
RecordTestUtils.replayAll(DELTA1, ClientQuotasImageTest.DELTA1_RECORDS);
|
||||||
RecordTestUtils.replayAll(DELTA1, ProducerIdsImageTest.DELTA1_RECORDS);
|
RecordTestUtils.replayAll(DELTA1, ProducerIdsImageTest.DELTA1_RECORDS);
|
||||||
RecordTestUtils.replayAll(DELTA1, AclsImageTest.DELTA1_RECORDS);
|
RecordTestUtils.replayAll(DELTA1, AclsImageTest.DELTA1_RECORDS);
|
||||||
|
RecordTestUtils.replayAll(DELTA1, ScramImageTest.DELTA1_RECORDS);
|
||||||
|
|
||||||
IMAGE2 = new MetadataImage(
|
IMAGE2 = new MetadataImage(
|
||||||
new MetadataProvenance(200, 5, 4000),
|
new MetadataProvenance(200, 5, 4000),
|
||||||
|
@ -64,7 +66,8 @@ public class MetadataImageTest {
|
||||||
ConfigurationsImageTest.IMAGE2,
|
ConfigurationsImageTest.IMAGE2,
|
||||||
ClientQuotasImageTest.IMAGE2,
|
ClientQuotasImageTest.IMAGE2,
|
||||||
ProducerIdsImageTest.IMAGE2,
|
ProducerIdsImageTest.IMAGE2,
|
||||||
AclsImageTest.IMAGE2);
|
AclsImageTest.IMAGE2,
|
||||||
|
ScramImageTest.IMAGE2);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|
|
@ -0,0 +1,161 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.kafka.image;
|
||||||
|
|
||||||
|
import org.apache.kafka.clients.admin.ScramMechanism;
|
||||||
|
import org.apache.kafka.common.metadata.RemoveUserScramCredentialRecord;
|
||||||
|
import org.apache.kafka.common.metadata.UserScramCredentialRecord;
|
||||||
|
import org.apache.kafka.image.writer.ImageWriterOptions;
|
||||||
|
import org.apache.kafka.image.writer.RecordListWriter;
|
||||||
|
import org.apache.kafka.server.common.MetadataVersion;
|
||||||
|
import org.apache.kafka.server.util.MockRandom;
|
||||||
|
import org.apache.kafka.metadata.RecordTestUtils;
|
||||||
|
import org.apache.kafka.server.common.ApiMessageAndVersion;
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
|
import org.junit.jupiter.api.Timeout;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Random;
|
||||||
|
|
||||||
|
import static org.apache.kafka.clients.admin.ScramMechanism.SCRAM_SHA_256;
|
||||||
|
import static org.apache.kafka.clients.admin.ScramMechanism.SCRAM_SHA_512;
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||||
|
import static org.junit.jupiter.api.Assertions.fail;
|
||||||
|
|
||||||
|
|
||||||
|
@Timeout(value = 40)
|
||||||
|
public class ScramImageTest {
|
||||||
|
final static ScramImage IMAGE1;
|
||||||
|
|
||||||
|
final static List<ApiMessageAndVersion> DELTA1_RECORDS;
|
||||||
|
|
||||||
|
final static ScramDelta DELTA1;
|
||||||
|
|
||||||
|
final static ScramImage IMAGE2;
|
||||||
|
|
||||||
|
static byte[] randomBuffer(Random random, int length) {
|
||||||
|
byte[] buf = new byte[length];
|
||||||
|
random.nextBytes(buf);
|
||||||
|
return buf;
|
||||||
|
}
|
||||||
|
|
||||||
|
static ScramCredentialData randomScramCredentialData(Random random) {
|
||||||
|
return new ScramCredentialData(
|
||||||
|
randomBuffer(random, 1024),
|
||||||
|
randomBuffer(random, 1024),
|
||||||
|
1024 + random.nextInt(1024));
|
||||||
|
}
|
||||||
|
|
||||||
|
static {
|
||||||
|
MockRandom random = new MockRandom();
|
||||||
|
|
||||||
|
Map<ScramMechanism, Map<String, ScramCredentialData>> image1mechanisms = new HashMap<>();
|
||||||
|
|
||||||
|
Map<String, ScramCredentialData> image1sha256 = new HashMap<>();
|
||||||
|
image1sha256.put("alpha", randomScramCredentialData(random));
|
||||||
|
image1sha256.put("beta", randomScramCredentialData(random));
|
||||||
|
image1mechanisms.put(SCRAM_SHA_256, image1sha256);
|
||||||
|
|
||||||
|
Map<String, ScramCredentialData> image1sha512 = new HashMap<>();
|
||||||
|
image1sha512.put("alpha", randomScramCredentialData(random));
|
||||||
|
image1sha512.put("gamma", randomScramCredentialData(random));
|
||||||
|
image1mechanisms.put(SCRAM_SHA_512, image1sha512);
|
||||||
|
|
||||||
|
IMAGE1 = new ScramImage(image1mechanisms);
|
||||||
|
|
||||||
|
DELTA1_RECORDS = new ArrayList<>();
|
||||||
|
DELTA1_RECORDS.add(new ApiMessageAndVersion(new RemoveUserScramCredentialRecord().
|
||||||
|
setName("gamma").
|
||||||
|
setMechanism(SCRAM_SHA_512.type()), (short) 0));
|
||||||
|
ScramCredentialData secondAlpha256Credential = randomScramCredentialData(random);
|
||||||
|
DELTA1_RECORDS.add(new ApiMessageAndVersion(new UserScramCredentialRecord().
|
||||||
|
setName("alpha").
|
||||||
|
setMechanism(SCRAM_SHA_256.type()).
|
||||||
|
setIterations(secondAlpha256Credential.iterations()).
|
||||||
|
setSalt(secondAlpha256Credential.salt()).
|
||||||
|
setSaltedPassword(secondAlpha256Credential.saltedPassword()), (short) 0));
|
||||||
|
DELTA1 = new ScramDelta(IMAGE1);
|
||||||
|
RecordTestUtils.replayAll(DELTA1, DELTA1_RECORDS);
|
||||||
|
|
||||||
|
Map<ScramMechanism, Map<String, ScramCredentialData>> image2mechanisms = new HashMap<>();
|
||||||
|
|
||||||
|
Map<String, ScramCredentialData> image2sha256 = new HashMap<>();
|
||||||
|
image2sha256.put("alpha", secondAlpha256Credential);
|
||||||
|
image2sha256.put("beta", image1sha256.get("beta"));
|
||||||
|
image2mechanisms.put(SCRAM_SHA_256, image2sha256);
|
||||||
|
|
||||||
|
Map<String, ScramCredentialData> image2sha512 = new HashMap<>();
|
||||||
|
image2sha512.put("alpha", image1sha512.get("alpha"));
|
||||||
|
image2mechanisms.put(SCRAM_SHA_512, image2sha512);
|
||||||
|
|
||||||
|
IMAGE2 = new ScramImage(image2mechanisms);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testEmptyImageRoundTrip() throws Throwable {
|
||||||
|
testToImageAndBack(ScramImage.EMPTY);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testImage1RoundTrip() throws Throwable {
|
||||||
|
testToImageAndBack(IMAGE1);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testApplyDelta1() throws Throwable {
|
||||||
|
assertEquals(IMAGE2, DELTA1.apply());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testImage2RoundTrip() throws Throwable {
|
||||||
|
testToImageAndBack(IMAGE2);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void testToImageAndBack(ScramImage image) throws Throwable {
|
||||||
|
RecordListWriter writer = new RecordListWriter();
|
||||||
|
image.write(writer, new ImageWriterOptions.Builder().build());
|
||||||
|
ScramDelta delta = new ScramDelta(ScramImage.EMPTY);
|
||||||
|
RecordTestUtils.replayAll(delta, writer.records());
|
||||||
|
ScramImage nextImage = delta.apply();
|
||||||
|
assertEquals(image, nextImage);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testEmptyWithInvalidIBP() throws Throwable {
|
||||||
|
ImageWriterOptions imageWriterOptions = new ImageWriterOptions.Builder().
|
||||||
|
setMetadataVersion(MetadataVersion.IBP_3_4_IV0).build();
|
||||||
|
RecordListWriter writer = new RecordListWriter();
|
||||||
|
ScramImage.EMPTY.write(writer, imageWriterOptions);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testImage1withInvalidIBP() throws Throwable {
|
||||||
|
ImageWriterOptions imageWriterOptions = new ImageWriterOptions.Builder().
|
||||||
|
setMetadataVersion(MetadataVersion.IBP_3_4_IV0).build();
|
||||||
|
RecordListWriter writer = new RecordListWriter();
|
||||||
|
try {
|
||||||
|
IMAGE1.write(writer, imageWriterOptions);
|
||||||
|
fail("expected exception writing IMAGE with SCRAM records for MetadataVersion.IBP_3_4_IV0");
|
||||||
|
} catch (Exception expected) {
|
||||||
|
// ignore, expected
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -167,7 +167,7 @@ public enum MetadataVersion {
|
||||||
// and updates to a handful of RPCs.
|
// and updates to a handful of RPCs.
|
||||||
IBP_3_4_IV0(8, "3.4", "IV0", true),
|
IBP_3_4_IV0(8, "3.4", "IV0", true),
|
||||||
|
|
||||||
// Support for tiered storage (KIP-405)
|
// Support for tiered storage (KIP-405) and SCRAM
|
||||||
IBP_3_5_IV0(9, "3.5", "IV0", false);
|
IBP_3_5_IV0(9, "3.5", "IV0", false);
|
||||||
|
|
||||||
// NOTE: update the default version in @ClusterTest annotation to point to the latest version
|
// NOTE: update the default version in @ClusterTest annotation to point to the latest version
|
||||||
|
@ -250,6 +250,10 @@ public enum MetadataVersion {
|
||||||
return this.isAtLeast(IBP_3_4_IV0);
|
return this.isAtLeast(IBP_3_4_IV0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public boolean isScramSupported() {
|
||||||
|
return this.isAtLeast(IBP_3_5_IV0);
|
||||||
|
}
|
||||||
|
|
||||||
public boolean isKRaftSupported() {
|
public boolean isKRaftSupported() {
|
||||||
return this.featureLevel > 0;
|
return this.featureLevel > 0;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue