diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 7d078121b25..b4f679b004b 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -256,6 +256,7 @@
+
diff --git a/clients/src/main/resources/common/message/AlterUserScramCredentialsRequest.json b/clients/src/main/resources/common/message/AlterUserScramCredentialsRequest.json
index 70e14830970..8937394ef6e 100644
--- a/clients/src/main/resources/common/message/AlterUserScramCredentialsRequest.json
+++ b/clients/src/main/resources/common/message/AlterUserScramCredentialsRequest.json
@@ -16,7 +16,7 @@
{
"apiKey": 51,
"type": "request",
- "listeners": ["zkBroker"],
+ "listeners": ["zkBroker", "broker", "controller"],
"name": "AlterUserScramCredentialsRequest",
"validVersions": "0",
"flexibleVersions": "0+",
diff --git a/clients/src/main/resources/common/message/DescribeUserScramCredentialsRequest.json b/clients/src/main/resources/common/message/DescribeUserScramCredentialsRequest.json
index cef8929ac24..2f7a1112c48 100644
--- a/clients/src/main/resources/common/message/DescribeUserScramCredentialsRequest.json
+++ b/clients/src/main/resources/common/message/DescribeUserScramCredentialsRequest.json
@@ -16,7 +16,7 @@
{
"apiKey": 50,
"type": "request",
- "listeners": ["zkBroker"],
+ "listeners": ["zkBroker", "broker", "controller"],
"name": "DescribeUserScramCredentialsRequest",
"validVersions": "0",
"flexibleVersions": "0+",
diff --git a/core/src/main/scala/kafka/security/CredentialProvider.scala b/core/src/main/scala/kafka/security/CredentialProvider.scala
index 9aa8bc915d4..85951a526a6 100644
--- a/core/src/main/scala/kafka/security/CredentialProvider.scala
+++ b/core/src/main/scala/kafka/security/CredentialProvider.scala
@@ -18,7 +18,7 @@
package kafka.security
import java.util.{Collection, Properties}
-
+import org.apache.kafka.clients.admin.{ScramMechanism => AdminScramMechanism}
import org.apache.kafka.common.security.authenticator.CredentialCache
import org.apache.kafka.common.security.scram.ScramCredential
import org.apache.kafka.common.config.ConfigDef
@@ -42,6 +42,25 @@ class CredentialProvider(scramMechanisms: Collection[String], val tokenCache: De
}
}
}
+
+ def updateCredential(
+ mechanism: AdminScramMechanism,
+ name: String,
+ credential: ScramCredential
+ ): Unit = {
+ val cache = credentialCache.cache(mechanism.mechanismName(), classOf[ScramCredential])
+ cache.put(name, credential)
+ }
+
+ def removeCredentials(
+ mechanism: AdminScramMechanism,
+ name: String
+ ): Unit = {
+ val cache = credentialCache.cache(mechanism.mechanismName(), classOf[ScramCredential])
+ if (cache != null) {
+ cache.remove(name)
+ }
+ }
}
object CredentialProvider {
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala b/core/src/main/scala/kafka/server/BrokerServer.scala
index 76023d51538..232393c4d37 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -457,6 +457,7 @@ class BrokerServer(
dynamicConfigHandlers.toMap,
"broker"),
authorizer,
+ credentialProvider,
sharedServer.initialBrokerMetadataLoadFaultHandler,
sharedServer.metadataPublishingFaultHandler)
diff --git a/core/src/main/scala/kafka/server/ControllerApis.scala b/core/src/main/scala/kafka/server/ControllerApis.scala
index 5a7b5d711eb..0964c07215c 100644
--- a/core/src/main/scala/kafka/server/ControllerApis.scala
+++ b/core/src/main/scala/kafka/server/ControllerApis.scala
@@ -99,6 +99,7 @@ class ControllerApis(val requestChannel: RequestChannel,
case ApiKeys.INCREMENTAL_ALTER_CONFIGS => handleIncrementalAlterConfigs(request)
case ApiKeys.ALTER_PARTITION_REASSIGNMENTS => handleAlterPartitionReassignments(request)
case ApiKeys.LIST_PARTITION_REASSIGNMENTS => handleListPartitionReassignments(request)
+ case ApiKeys.ALTER_USER_SCRAM_CREDENTIALS => handleAlterUserScramCredentials(request)
case ApiKeys.ENVELOPE => handleEnvelopeRequest(request, requestLocal)
case ApiKeys.SASL_HANDSHAKE => handleSaslHandshakeRequest(request)
case ApiKeys.SASL_AUTHENTICATE => handleSaslAuthenticateRequest(request)
@@ -816,6 +817,18 @@ class ControllerApis(val requestChannel: RequestChannel,
}
}
+ def handleAlterUserScramCredentials(request: RequestChannel.Request): CompletableFuture[Unit] = {
+ val alterRequest = request.body[AlterUserScramCredentialsRequest]
+ authHelper.authorizeClusterOperation(request, ALTER)
+ val context = new ControllerRequestContext(request.context.header.data, request.context.principal,
+ OptionalLong.empty())
+ controller.alterUserScramCredentials(context, alterRequest.data)
+ .thenApply[Unit] { response =>
+ requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
+ new AlterUserScramCredentialsResponse(response.setThrottleTimeMs(requestThrottleMs)))
+ }
+ }
+
def handleListPartitionReassignments(request: RequestChannel.Request): CompletableFuture[Unit] = {
val listRequest = request.body[ListPartitionReassignmentsRequest]
authHelper.authorizeClusterOperation(request, DESCRIBE)
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 754b6e323fd..59e7fb022f7 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -3305,17 +3305,23 @@ class KafkaApis(val requestChannel: RequestChannel,
}
def handleDescribeUserScramCredentialsRequest(request: RequestChannel.Request): Unit = {
- val zkSupport = metadataSupport.requireZkOrThrow(KafkaApis.notYetSupported(request))
val describeUserScramCredentialsRequest = request.body[DescribeUserScramCredentialsRequest]
- if (authHelper.authorize(request.context, DESCRIBE, CLUSTER, CLUSTER_NAME)) {
- val result = zkSupport.adminManager.describeUserScramCredentials(
- Option(describeUserScramCredentialsRequest.data.users).map(_.asScala.map(_.name).toList))
- requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
- new DescribeUserScramCredentialsResponse(result.setThrottleTimeMs(requestThrottleMs)))
- } else {
+ if (!authHelper.authorize(request.context, DESCRIBE, CLUSTER, CLUSTER_NAME)) {
requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
describeUserScramCredentialsRequest.getErrorResponse(requestThrottleMs, Errors.CLUSTER_AUTHORIZATION_FAILED.exception))
+ } else {
+ metadataSupport match {
+ case ZkSupport(adminManager, controller, zkClient, forwardingManager, metadataCache, _) =>
+ val result = adminManager.describeUserScramCredentials(
+ Option(describeUserScramCredentialsRequest.data.users).map(_.asScala.map(_.name).toList))
+ requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
+ new DescribeUserScramCredentialsResponse(result.setThrottleTimeMs(requestThrottleMs)))
+ case RaftSupport(_, metadataCache) =>
+ val result = metadataCache.describeScramCredentials(describeUserScramCredentialsRequest.data())
+ requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
+ new DescribeUserScramCredentialsResponse(result.setThrottleTimeMs(requestThrottleMs)))
+ }
}
}
@@ -3627,12 +3633,4 @@ object KafkaApis {
private def unsupported(text: String): Exception = {
new UnsupportedVersionException(s"Unsupported when using a Raft-based metadata quorum: $text")
}
-
- private def notYetSupported(request: RequestChannel.Request): Exception = {
- notYetSupported(request.header.apiKey().toString)
- }
-
- private def notYetSupported(text: String): Exception = {
- new UnsupportedVersionException(s"Not yet supported when using a Raft-based metadata quorum: $text")
- }
}
diff --git a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
index 43d79e88601..e7a81f1d1dc 100644
--- a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
+++ b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
@@ -21,6 +21,7 @@ import java.util.{OptionalInt, Properties}
import java.util.concurrent.atomic.AtomicLong
import kafka.coordinator.transaction.TransactionCoordinator
import kafka.log.{LogManager, UnifiedLog}
+import kafka.security.CredentialProvider
import kafka.server.{KafkaConfig, ReplicaManager, RequestLocal}
import kafka.utils.Logging
import org.apache.kafka.common.TopicPartition
@@ -104,6 +105,7 @@ class BrokerMetadataPublisher(
clientQuotaMetadataManager: ClientQuotaMetadataManager,
var dynamicConfigPublisher: DynamicConfigPublisher,
private val _authorizer: Option[Authorizer],
+ credentialProvider: CredentialProvider,
fatalFaultHandler: FaultHandler,
metadataPublishingFaultHandler: FaultHandler
) extends MetadataPublisher with Logging {
@@ -223,6 +225,21 @@ class BrokerMetadataPublisher(
s"quotas in ${deltaName}", t)
}
+ // Apply changes to SCRAM credentials.
+ Option(delta.scramDelta()).foreach { scramDelta =>
+ scramDelta.changes().forEach {
+ case (mechanism, userChanges) =>
+ userChanges.forEach {
+ case (userName, change) =>
+ if (change.isPresent) {
+ credentialProvider.updateCredential(mechanism, userName, change.get().toCredential(mechanism))
+ } else {
+ credentialProvider.removeCredentials(mechanism, userName)
+ }
+ }
+ }
+ }
+
// Apply changes to ACLs. This needs to be handled carefully because while we are
// applying these changes, the Authorizer is continuing to return authorization
// results in other threads. We never want to expose an invalid state. For example,
diff --git a/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala b/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala
index 7e6ad7bfd09..6ebfeb82d01 100644
--- a/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala
+++ b/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala
@@ -35,6 +35,7 @@ import java.util.concurrent.ThreadLocalRandom
import kafka.admin.BrokerMetadata
import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.message.{DescribeClientQuotasRequestData, DescribeClientQuotasResponseData}
+import org.apache.kafka.common.message.{DescribeUserScramCredentialsRequestData, DescribeUserScramCredentialsResponseData}
import org.apache.kafka.metadata.{PartitionRegistration, Replicas}
import org.apache.kafka.server.common.MetadataVersion
@@ -379,6 +380,10 @@ class KRaftMetadataCache(val brokerId: Int) extends MetadataCache with Logging w
_currentImage.clientQuotas().describe(request)
}
+ def describeScramCredentials(request: DescribeUserScramCredentialsRequestData): DescribeUserScramCredentialsResponseData = {
+ _currentImage.scram().describe(request)
+ }
+
override def metadataVersion(): MetadataVersion = _currentImage.features().metadataVersion()
override def features(): FinalizedFeaturesAndEpoch = {
diff --git a/core/src/test/java/kafka/test/MockController.java b/core/src/test/java/kafka/test/MockController.java
index 061e19213e3..bd9dd5a649a 100644
--- a/core/src/test/java/kafka/test/MockController.java
+++ b/core/src/test/java/kafka/test/MockController.java
@@ -29,6 +29,8 @@ import org.apache.kafka.common.message.AlterPartitionRequestData;
import org.apache.kafka.common.message.AlterPartitionResponseData;
import org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData;
import org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData;
+import org.apache.kafka.common.message.AlterUserScramCredentialsRequestData;
+import org.apache.kafka.common.message.AlterUserScramCredentialsResponseData;
import org.apache.kafka.common.message.BrokerHeartbeatRequestData;
import org.apache.kafka.common.message.BrokerRegistrationRequestData;
import org.apache.kafka.common.message.CreatePartitionsRequestData.CreatePartitionsTopic;
@@ -124,6 +126,14 @@ public class MockController implements Controller {
throw new UnsupportedOperationException();
}
+ @Override
+ public CompletableFuture alterUserScramCredentials(
+ ControllerRequestContext context,
+ AlterUserScramCredentialsRequestData request
+ ) {
+ throw new UnsupportedOperationException();
+ }
+
@Override
synchronized public CompletableFuture createTopics(
ControllerRequestContext context,
diff --git a/core/src/test/scala/unit/kafka/server/AlterUserScramCredentialsRequestTest.scala b/core/src/test/scala/unit/kafka/server/AlterUserScramCredentialsRequestTest.scala
index e0121b17c5d..326fbb92775 100644
--- a/core/src/test/scala/unit/kafka/server/AlterUserScramCredentialsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AlterUserScramCredentialsRequestTest.scala
@@ -19,9 +19,11 @@ package kafka.server
import java.nio.charset.StandardCharsets
import java.util
-import java.util.Properties
+import kafka.utils.TestUtils
+import kafka.utils.TestInfoUtils
import kafka.network.SocketServer
import kafka.security.authorizer.AclAuthorizer
+import org.apache.kafka.metadata.authorizer.StandardAuthorizer
import org.apache.kafka.clients.admin.ScramMechanism
import org.apache.kafka.common.message.AlterUserScramCredentialsResponseData.AlterUserScramCredentialsResult
import org.apache.kafka.common.message.DescribeUserScramCredentialsResponseData.DescribeUserScramCredentialsResult
@@ -31,8 +33,11 @@ import org.apache.kafka.common.requests.{AlterUserScramCredentialsRequest, Alter
import org.apache.kafka.common.security.auth.{AuthenticationContext, KafkaPrincipal}
import org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder
import org.apache.kafka.server.authorizer.{Action, AuthorizableRequestContext, AuthorizationResult}
+import org.apache.kafka.server.common.MetadataVersion
+import org.junit.jupiter.api.{Test, BeforeEach, TestInfo}
import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.Test
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.ValueSource
import scala.jdk.CollectionConverters._
@@ -42,10 +47,25 @@ import scala.jdk.CollectionConverters._
* Also tests the Alter and Describe APIs for the case where credentials are successfully altered/described.
*/
class AlterUserScramCredentialsRequestTest extends BaseRequestTest {
- override def brokerPropertyOverrides(properties: Properties): Unit = {
- properties.put(KafkaConfig.ControlledShutdownEnableProp, "false")
- properties.put(KafkaConfig.AuthorizerClassNameProp, classOf[AlterCredentialsTest.TestAuthorizer].getName)
- properties.put(KafkaConfig.PrincipalBuilderClassProp, classOf[AlterCredentialsTest.TestPrincipalBuilderReturningAuthorized].getName)
+
+ protected var testMetadataVersion = MetadataVersion.latest
+ override protected def metadataVersion = testMetadataVersion
+
+ @BeforeEach
+ override def setUp(testInfo: TestInfo): Unit = {
+ if (TestInfoUtils.isKRaft(testInfo)) {
+ this.serverConfig.setProperty(KafkaConfig.AuthorizerClassNameProp, classOf[StandardAuthorizer].getName)
+ if (testInfo.getDisplayName().contains("quorum=kraft-IBP_3_4")) {
+ testMetadataVersion = MetadataVersion.IBP_3_4_IV0
+ }
+ } else {
+ this.serverConfig.setProperty(KafkaConfig.AuthorizerClassNameProp, classOf[AlterCredentialsTest.TestAuthorizer].getName)
+
+ }
+ this.serverConfig.setProperty(KafkaConfig.PrincipalBuilderClassProp, classOf[AlterCredentialsTest.TestPrincipalBuilderReturningAuthorized].getName)
+ this.serverConfig.setProperty(KafkaConfig.ControlledShutdownEnableProp, "false")
+
+ super.setUp(testInfo)
}
private val saltedPasswordBytes = "saltedPassword".getBytes(StandardCharsets.UTF_8)
@@ -54,8 +74,9 @@ class AlterUserScramCredentialsRequestTest extends BaseRequestTest {
private val user2 = "user2"
private val unknownUser = "unknownUser"
- @Test
- def testAlterNothing(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("kraft", "zk"))
+ def testAlterNothing(quorum: String): Unit = {
val request = new AlterUserScramCredentialsRequest.Builder(
new AlterUserScramCredentialsRequestData()
.setDeletions(new util.ArrayList[AlterUserScramCredentialsRequestData.ScramCredentialDeletion])
@@ -66,14 +87,29 @@ class AlterUserScramCredentialsRequestTest extends BaseRequestTest {
assertEquals(0, results.size)
}
- @Test
- def testAlterSameThingTwice(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("kraft", "zk"))
+ def testAlterSameThingTwice(quorum: String): Unit = {
val deletion1 = new AlterUserScramCredentialsRequestData.ScramCredentialDeletion().setName(user1).setMechanism(ScramMechanism.SCRAM_SHA_256.`type`)
val deletion2 = new AlterUserScramCredentialsRequestData.ScramCredentialDeletion().setName(user2).setMechanism(ScramMechanism.SCRAM_SHA_256.`type`)
val upsertion1 = new AlterUserScramCredentialsRequestData.ScramCredentialUpsertion().setName(user1).setMechanism(ScramMechanism.SCRAM_SHA_256.`type`)
.setIterations(4096).setSalt(saltBytes).setSaltedPassword(saltedPasswordBytes)
val upsertion2 = new AlterUserScramCredentialsRequestData.ScramCredentialUpsertion().setName(user2).setMechanism(ScramMechanism.SCRAM_SHA_256.`type`)
.setIterations(4096).setSalt(saltBytes).setSaltedPassword(saltedPasswordBytes)
+
+ // Create user1 and user2 so delete returns duplicate instead of no resource
+ val init_requests = List (
+ new AlterUserScramCredentialsRequest.Builder(
+ new AlterUserScramCredentialsRequestData()
+ .setDeletions(util.Collections.emptyList())
+ .setUpsertions(util.Arrays.asList(upsertion1, upsertion2))).build(),
+ )
+ init_requests.foreach(request => {
+ val response = sendAlterUserScramCredentialsRequest(request)
+ val results = response.data.results
+ checkNoErrorsAlteringCredentials(results)
+ })
+
val requests = List (
new AlterUserScramCredentialsRequest.Builder(
new AlterUserScramCredentialsRequestData()
@@ -92,8 +128,9 @@ class AlterUserScramCredentialsRequestTest extends BaseRequestTest {
})
}
- @Test
- def testAlterEmptyUser(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("kraft", "zk"))
+ def testAlterEmptyUser(quorum: String): Unit = {
val deletionEmpty = new AlterUserScramCredentialsRequestData.ScramCredentialDeletion().setName("").setMechanism(ScramMechanism.SCRAM_SHA_256.`type`)
val upsertionEmpty = new AlterUserScramCredentialsRequestData.ScramCredentialUpsertion().setName("").setMechanism(ScramMechanism.SCRAM_SHA_256.`type`)
.setIterations(4096).setSalt(saltBytes).setSaltedPassword(saltedPasswordBytes)
@@ -120,8 +157,9 @@ class AlterUserScramCredentialsRequestTest extends BaseRequestTest {
})
}
- @Test
- def testAlterUnknownMechanism(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("kraft", "zk"))
+ def testAlterUnknownMechanism(quorum: String): Unit = {
val deletionUnknown1 = new AlterUserScramCredentialsRequestData.ScramCredentialDeletion().setName(user1).setMechanism(ScramMechanism.UNKNOWN.`type`)
val deletionValid1 = new AlterUserScramCredentialsRequestData.ScramCredentialDeletion().setName(user1).setMechanism(ScramMechanism.SCRAM_SHA_256.`type`)
val deletionUnknown2 = new AlterUserScramCredentialsRequestData.ScramCredentialDeletion().setName(user2).setMechanism(10.toByte)
@@ -147,8 +185,9 @@ class AlterUserScramCredentialsRequestTest extends BaseRequestTest {
results.asScala.foreach(result => assertEquals("Unknown SCRAM mechanism", result.errorMessage))
}
- @Test
- def testAlterTooFewIterations(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("kraft", "zk"))
+ def testAlterTooFewIterations(quorum: String): Unit = {
val upsertionTooFewIterations = new AlterUserScramCredentialsRequestData.ScramCredentialUpsertion().setName(user1)
.setMechanism(ScramMechanism.SCRAM_SHA_256.`type`).setIterations(1)
.setSalt(saltBytes).setSaltedPassword(saltedPasswordBytes)
@@ -163,8 +202,9 @@ class AlterUserScramCredentialsRequestTest extends BaseRequestTest {
assertEquals("Too few iterations", results.get(0).errorMessage)
}
- @Test
- def testAlterTooManyIterations(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("kraft", "zk"))
+ def testAlterTooManyIterations(quorum: String): Unit = {
val upsertionTooFewIterations = new AlterUserScramCredentialsRequestData.ScramCredentialUpsertion().setName(user1)
.setMechanism(ScramMechanism.SCRAM_SHA_256.`type`).setIterations(Integer.MAX_VALUE)
.setSalt(saltBytes).setSaltedPassword(saltedPasswordBytes)
@@ -179,8 +219,9 @@ class AlterUserScramCredentialsRequestTest extends BaseRequestTest {
assertEquals("Too many iterations", results.get(0).errorMessage)
}
- @Test
- def testDeleteSomethingThatDoesNotExist(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("kraft", "zk"))
+ def testDeleteSomethingThatDoesNotExist(quorum: String): Unit = {
val request = new AlterUserScramCredentialsRequest.Builder(
new AlterUserScramCredentialsRequestData()
.setDeletions(util.Arrays.asList(new AlterUserScramCredentialsRequestData.ScramCredentialDeletion().setName(user1).setMechanism(ScramMechanism.SCRAM_SHA_256.`type`)))
@@ -205,10 +246,11 @@ class AlterUserScramCredentialsRequestTest extends BaseRequestTest {
checkAllErrorsAlteringCredentials(results, Errors.NOT_CONTROLLER, "when routed incorrectly to a non-Controller broker")
}
- @Test
- def testAlterAndDescribe(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("kraft", "zk"))
+ def testAlterAndDescribe(quorum: String): Unit = {
// create a bunch of credentials
- val request1 = new AlterUserScramCredentialsRequest.Builder(
+ val request1_0 = new AlterUserScramCredentialsRequest.Builder(
new AlterUserScramCredentialsRequestData()
.setUpsertions(util.Arrays.asList(
new AlterUserScramCredentialsRequestData.ScramCredentialUpsertion()
@@ -216,6 +258,16 @@ class AlterUserScramCredentialsRequestTest extends BaseRequestTest {
.setIterations(4096)
.setSalt(saltBytes)
.setSaltedPassword(saltedPasswordBytes),
+ ))).build()
+ val results1_0 = sendAlterUserScramCredentialsRequest(request1_0).data.results
+ assertEquals(1, results1_0.size)
+ checkNoErrorsAlteringCredentials(results1_0)
+ checkUserAppearsInAlterResults(results1_0, user1)
+
+ // When creating credentials, do not update the same user more than once per request
+ val request1_1 = new AlterUserScramCredentialsRequest.Builder(
+ new AlterUserScramCredentialsRequestData()
+ .setUpsertions(util.Arrays.asList(
new AlterUserScramCredentialsRequestData.ScramCredentialUpsertion()
.setName(user1).setMechanism(ScramMechanism.SCRAM_SHA_512.`type`)
.setIterations(8192)
@@ -227,11 +279,16 @@ class AlterUserScramCredentialsRequestTest extends BaseRequestTest {
.setSalt(saltBytes)
.setSaltedPassword(saltedPasswordBytes),
))).build()
- val results1 = sendAlterUserScramCredentialsRequest(request1).data.results
- assertEquals(2, results1.size)
- checkNoErrorsAlteringCredentials(results1)
- checkUserAppearsInAlterResults(results1, user1)
- checkUserAppearsInAlterResults(results1, user2)
+ val results1_1 = sendAlterUserScramCredentialsRequest(request1_1).data.results
+ assertEquals(2, results1_1.size)
+ checkNoErrorsAlteringCredentials(results1_1)
+ checkUserAppearsInAlterResults(results1_1, user1)
+ checkUserAppearsInAlterResults(results1_1, user2)
+
+ // KRaft is eventually consistent so it is possible to call describe before
+ // the credential is propogated from the controller to the broker.
+ TestUtils.waitUntilTrue(() => describeAllWithNoTopLevelErrorConfirmed().data.results.size == 2,
+ "describeAllWithNoTopLevelErrorConfirmed does not see 2 users");
// now describe them all
val results2 = describeAllWithNoTopLevelErrorConfirmed().data.results
@@ -290,6 +347,9 @@ class AlterUserScramCredentialsRequestTest extends BaseRequestTest {
checkUserAppearsInAlterResults(results4, user1)
checkUserAppearsInAlterResults(results4, user2)
+ TestUtils.waitUntilTrue(() => describeAllWithNoTopLevelErrorConfirmed().data.results.size == 1,
+ "describeAllWithNoTopLevelErrorConfirmed does not see only 1 user");
+
// now describe them all, which should just yield 1 credential
val results5 = describeAllWithNoTopLevelErrorConfirmed().data.results
assertEquals(1, results5.size)
@@ -307,16 +367,40 @@ class AlterUserScramCredentialsRequestTest extends BaseRequestTest {
checkNoErrorsAlteringCredentials(results6)
checkUserAppearsInAlterResults(results6, user1)
+ TestUtils.waitUntilTrue(() => describeAllWithNoTopLevelErrorConfirmed().data.results.size == 0,
+ "describeAllWithNoTopLevelErrorConfirmed does not see empty user");
+
// now describe them all, which should yield 0 credentials
val results7 = describeAllWithNoTopLevelErrorConfirmed().data.results
assertEquals(0, results7.size)
}
- private def sendAlterUserScramCredentialsRequest(request: AlterUserScramCredentialsRequest, socketServer: SocketServer = controllerSocketServer): AlterUserScramCredentialsResponse = {
+ /*
+ * Test that SCRAM alter command on KRaft cluster with IBP version less that IBP_3_5 fails
+ */
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("kraft-IBP_3_4"))
+ def testMetadataVersionTooLow(quorum: String): Unit = {
+ val upsertionMetadataVersionTooLow = new AlterUserScramCredentialsRequestData.ScramCredentialUpsertion().setName(user1)
+ .setMechanism(ScramMechanism.SCRAM_SHA_256.`type`).setIterations(8192)
+ .setSalt(saltBytes).setSaltedPassword(saltedPasswordBytes)
+ val request = new AlterUserScramCredentialsRequest.Builder(
+ new AlterUserScramCredentialsRequestData()
+ .setDeletions(util.Collections.emptyList())
+ .setUpsertions(util.Arrays.asList(upsertionMetadataVersionTooLow))).build()
+ val response = sendAlterUserScramCredentialsRequest(request)
+ val results = response.data.results
+ assertEquals(1, results.size)
+ checkAllErrorsAlteringCredentials(results, Errors.UNSUPPORTED_VERSION,
+ "when altering the credentials on unsupported IPB version")
+ assertEquals("The current metadata version does not support SCRAM", results.get(0).errorMessage)
+ }
+
+ private def sendAlterUserScramCredentialsRequest(request: AlterUserScramCredentialsRequest, socketServer: SocketServer = adminSocketServer): AlterUserScramCredentialsResponse = {
connectAndReceive[AlterUserScramCredentialsResponse](request, destination = socketServer)
}
- private def sendDescribeUserScramCredentialsRequest(request: DescribeUserScramCredentialsRequest, socketServer: SocketServer = controllerSocketServer): DescribeUserScramCredentialsResponse = {
+ private def sendDescribeUserScramCredentialsRequest(request: DescribeUserScramCredentialsRequest, socketServer: SocketServer = adminSocketServer): DescribeUserScramCredentialsResponse = {
connectAndReceive[DescribeUserScramCredentialsResponse](request, destination = socketServer)
}
diff --git a/core/src/test/scala/unit/kafka/server/DescribeUserScramCredentialsRequestTest.scala b/core/src/test/scala/unit/kafka/server/DescribeUserScramCredentialsRequestTest.scala
index 012f83303e7..cc8cef1d7b9 100644
--- a/core/src/test/scala/unit/kafka/server/DescribeUserScramCredentialsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DescribeUserScramCredentialsRequestTest.scala
@@ -17,9 +17,10 @@
package kafka.server
import java.util
-import java.util.Properties
+import kafka.utils.TestInfoUtils
import kafka.network.SocketServer
import kafka.security.authorizer.AclAuthorizer
+import org.apache.kafka.metadata.authorizer.StandardAuthorizer
import org.apache.kafka.common.message.{DescribeUserScramCredentialsRequestData, DescribeUserScramCredentialsResponseData}
import org.apache.kafka.common.message.DescribeUserScramCredentialsRequestData.UserName
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
@@ -27,8 +28,10 @@ import org.apache.kafka.common.requests.{DescribeUserScramCredentialsRequest, De
import org.apache.kafka.common.security.auth.{AuthenticationContext, KafkaPrincipal}
import org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder
import org.apache.kafka.server.authorizer.{Action, AuthorizableRequestContext, AuthorizationResult}
+import org.junit.jupiter.api.{Test, BeforeEach, TestInfo}
import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.Test
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.ValueSource
import scala.jdk.CollectionConverters._
@@ -38,15 +41,23 @@ import scala.jdk.CollectionConverters._
* Testing the API for the case where there are actually credentials to describe is performed elsewhere.
*/
class DescribeUserScramCredentialsRequestTest extends BaseRequestTest {
+ @BeforeEach
+ override def setUp(testInfo: TestInfo): Unit = {
+ if (TestInfoUtils.isKRaft(testInfo)) {
+ this.serverConfig.setProperty(KafkaConfig.AuthorizerClassNameProp, classOf[StandardAuthorizer].getName)
+ } else {
+ this.serverConfig.setProperty(KafkaConfig.AuthorizerClassNameProp, classOf[AlterCredentialsTest.TestAuthorizer].getName)
- override def brokerPropertyOverrides(properties: Properties): Unit = {
- properties.put(KafkaConfig.ControlledShutdownEnableProp, "false")
- properties.put(KafkaConfig.AuthorizerClassNameProp, classOf[DescribeCredentialsTest.TestAuthorizer].getName)
- properties.put(KafkaConfig.PrincipalBuilderClassProp, classOf[DescribeCredentialsTest.TestPrincipalBuilderReturningAuthorized].getName)
+ }
+ this.serverConfig.setProperty(KafkaConfig.PrincipalBuilderClassProp, classOf[AlterCredentialsTest.TestPrincipalBuilderReturningAuthorized].getName)
+ this.serverConfig.setProperty(KafkaConfig.ControlledShutdownEnableProp, "false")
+
+ super.setUp(testInfo)
}
- @Test
- def testDescribeNothing(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("kraft", "zk"))
+ def testDescribeNothing(quorum: String): Unit = {
val request = new DescribeUserScramCredentialsRequest.Builder(
new DescribeUserScramCredentialsRequestData()).build()
val response = sendDescribeUserScramCredentialsRequest(request)
@@ -56,8 +67,9 @@ class DescribeUserScramCredentialsRequestTest extends BaseRequestTest {
assertEquals(0, response.data.results.size, "Expected no credentials when describing everything and there are no credentials")
}
- @Test
- def testDescribeWithNull(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("kraft", "zk"))
+ def testDescribeWithNull(quorum: String): Unit = {
val request = new DescribeUserScramCredentialsRequest.Builder(
new DescribeUserScramCredentialsRequestData().setUsers(null)).build()
val response = sendDescribeUserScramCredentialsRequest(request)
@@ -77,8 +89,9 @@ class DescribeUserScramCredentialsRequestTest extends BaseRequestTest {
assertEquals(Errors.NONE.code, error, "Did not expect controller error when routed to non-controller")
}
- @Test
- def testDescribeSameUserTwice(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("kraft", "zk"))
+ def testDescribeSameUserTwice(quorum: String): Unit = {
val user = "user1"
val userName = new UserName().setName(user)
val request = new DescribeUserScramCredentialsRequest.Builder(
@@ -92,8 +105,9 @@ class DescribeUserScramCredentialsRequestTest extends BaseRequestTest {
assertEquals(s"Cannot describe SCRAM credentials for the same user twice in a single request: $user", result.errorMessage)
}
- @Test
- def testUnknownUser(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("kraft", "zk"))
+ def testUnknownUser(quorum: String): Unit = {
val unknownUser = "unknownUser"
val request = new DescribeUserScramCredentialsRequest.Builder(
new DescribeUserScramCredentialsRequestData().setUsers(List(new UserName().setName(unknownUser)).asJava)).build()
@@ -106,7 +120,7 @@ class DescribeUserScramCredentialsRequestTest extends BaseRequestTest {
assertEquals(s"Attempt to describe a user credential that does not exist: $unknownUser", result.errorMessage)
}
- private def sendDescribeUserScramCredentialsRequest(request: DescribeUserScramCredentialsRequest, socketServer: SocketServer = controllerSocketServer): DescribeUserScramCredentialsResponse = {
+ private def sendDescribeUserScramCredentialsRequest(request: DescribeUserScramCredentialsRequest, socketServer: SocketServer = adminSocketServer): DescribeUserScramCredentialsResponse = {
connectAndReceive[DescribeUserScramCredentialsResponse](request, destination = socketServer)
}
}
diff --git a/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala b/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
index d2df68f6da9..9685a613970 100644
--- a/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
+++ b/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
@@ -71,7 +71,8 @@ object MetadataCacheTest {
image.configs(),
image.clientQuotas(),
image.producerIds(),
- image.acls())
+ image.acls(),
+ image.scram())
val delta = new MetadataDelta.Builder().setImage(partialImage).build()
def toRecord(broker: UpdateMetadataBroker): RegisterBrokerRecord = {
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 91387ea315b..069a7d8e0dd 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -54,7 +54,7 @@ import org.apache.kafka.common.requests._
import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.common.utils.{LogContext, Time, Utils}
import org.apache.kafka.common.{IsolationLevel, Node, TopicIdPartition, TopicPartition, Uuid}
-import org.apache.kafka.image.{AclsImage, ClientQuotasImage, ClusterImageTest, ConfigurationsImage, FeaturesImage, MetadataImage, MetadataProvenance, ProducerIdsImage, TopicsDelta, TopicsImage}
+import org.apache.kafka.image._
import org.apache.kafka.metadata.LeaderConstants.NO_LEADER
import org.apache.kafka.metadata.LeaderRecoveryState
import org.apache.kafka.server.common.OffsetAndEpoch
@@ -4362,7 +4362,8 @@ class ReplicaManagerTest {
ConfigurationsImage.EMPTY,
ClientQuotasImage.EMPTY,
ProducerIdsImage.EMPTY,
- AclsImage.EMPTY
+ AclsImage.EMPTY,
+ ScramImage.EMPTY
)
}
diff --git a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataSnapshotterTest.scala b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataSnapshotterTest.scala
index de3c71b04e8..c086c3dd8b2 100644
--- a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataSnapshotterTest.scala
+++ b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataSnapshotterTest.scala
@@ -32,6 +32,7 @@ import org.apache.kafka.raft.OffsetAndEpoch
import org.apache.kafka.server.common.ApiMessageAndVersion
import org.apache.kafka.snapshot.{MockRawSnapshotWriter, RecordsSnapshotWriter, SnapshotWriter}
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}
+import org.junit.jupiter.api.Timeout
import org.junit.jupiter.api.Test
import java.util
@@ -58,7 +59,7 @@ class BrokerMetadataSnapshotterTest {
new MockRawSnapshotWriter(offsetAndEpoch, consumeSnapshotBuffer(committedOffset, committedEpoch, lastContainedLogTime))
)
},
- 1024,
+ 4096,
MemoryPool.NONE,
Time.SYSTEM,
lastContainedLogTime,
@@ -96,6 +97,7 @@ class BrokerMetadataSnapshotterTest {
}
@Test
+ @Timeout(30)
def testCreateSnapshot(): Unit = {
val writerBuilder = new MockSnapshotWriterBuilder()
val snapshotter = new BrokerMetadataSnapshotter(0, Time.SYSTEM, None, writerBuilder)
diff --git a/metadata/src/main/java/org/apache/kafka/controller/Controller.java b/metadata/src/main/java/org/apache/kafka/controller/Controller.java
index ed6c5237533..35958d55e69 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/Controller.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/Controller.java
@@ -26,6 +26,8 @@ import org.apache.kafka.common.message.AlterPartitionRequestData;
import org.apache.kafka.common.message.AlterPartitionResponseData;
import org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData;
import org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData;
+import org.apache.kafka.common.message.AlterUserScramCredentialsRequestData;
+import org.apache.kafka.common.message.AlterUserScramCredentialsResponseData;
import org.apache.kafka.common.message.BrokerHeartbeatRequestData;
import org.apache.kafka.common.message.BrokerRegistrationRequestData;
import org.apache.kafka.common.message.CreatePartitionsRequestData.CreatePartitionsTopic;
@@ -67,6 +69,19 @@ public interface Controller extends AclMutator, AutoCloseable {
AlterPartitionRequestData request
);
+ /**
+ * Alter the user SCRAM credentials.
+ *
+ * @param context The controller request context.
+ * @param request The AlterUserScramCredentialsRequest data.
+ *
+ * @return A future yielding the response.
+ */
+ CompletableFuture alterUserScramCredentials(
+ ControllerRequestContext context,
+ AlterUserScramCredentialsRequestData request
+ );
+
/**
* Create a batch of topics.
*
diff --git a/metadata/src/main/java/org/apache/kafka/controller/ControllerMetricsManager.java b/metadata/src/main/java/org/apache/kafka/controller/ControllerMetricsManager.java
index 60c48312038..0201677bd5f 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/ControllerMetricsManager.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/ControllerMetricsManager.java
@@ -144,6 +144,8 @@ final class ControllerMetricsManager {
case PRODUCER_IDS_RECORD:
case ACCESS_CONTROL_ENTRY_RECORD:
case REMOVE_ACCESS_CONTROL_ENTRY_RECORD:
+ case USER_SCRAM_CREDENTIAL_RECORD:
+ case REMOVE_USER_SCRAM_CREDENTIAL_RECORD:
case NO_OP_RECORD:
case ZK_MIGRATION_STATE_RECORD:
// These record types do not affect metrics
diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
index 424a4d42685..ff11211e343 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
@@ -36,6 +36,8 @@ import org.apache.kafka.common.message.AlterPartitionRequestData;
import org.apache.kafka.common.message.AlterPartitionResponseData;
import org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData;
import org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData;
+import org.apache.kafka.common.message.AlterUserScramCredentialsRequestData;
+import org.apache.kafka.common.message.AlterUserScramCredentialsResponseData;
import org.apache.kafka.common.message.BrokerHeartbeatRequestData;
import org.apache.kafka.common.message.BrokerRegistrationRequestData;
import org.apache.kafka.common.message.CreatePartitionsRequestData.CreatePartitionsTopic;
@@ -62,6 +64,8 @@ import org.apache.kafka.common.metadata.ProducerIdsRecord;
import org.apache.kafka.common.metadata.RegisterBrokerRecord;
import org.apache.kafka.common.metadata.RemoveAccessControlEntryRecord;
import org.apache.kafka.common.metadata.RemoveTopicRecord;
+import org.apache.kafka.common.metadata.UserScramCredentialRecord;
+import org.apache.kafka.common.metadata.RemoveUserScramCredentialRecord;
import org.apache.kafka.common.metadata.TopicRecord;
import org.apache.kafka.common.metadata.UnfenceBrokerRecord;
import org.apache.kafka.common.metadata.UnregisterBrokerRecord;
@@ -1409,6 +1413,12 @@ public final class QuorumController implements Controller {
case REMOVE_ACCESS_CONTROL_ENTRY_RECORD:
aclControlManager.replay((RemoveAccessControlEntryRecord) message, snapshotId);
break;
+ case USER_SCRAM_CREDENTIAL_RECORD:
+ scramControlManager.replay((UserScramCredentialRecord) message);
+ break;
+ case REMOVE_USER_SCRAM_CREDENTIAL_RECORD:
+ scramControlManager.replay((RemoveUserScramCredentialRecord) message);
+ break;
case NO_OP_RECORD:
// NoOpRecord is an empty record and doesn't need to be replayed
break;
@@ -1529,6 +1539,11 @@ public final class QuorumController implements Controller {
*/
private final ReplicationControlManager replicationControl;
+ /**
+ * Manages SCRAM credentials, if there are any.
+ */
+ private final ScramControlManager scramControlManager;
+
/**
* The ClusterMetadataAuthorizer, if one is configured. Note that this will still be
* Optional.empty() if an Authorizer is configured that doesn't use __cluster_metadata.
@@ -1728,6 +1743,10 @@ public final class QuorumController implements Controller {
setCreateTopicPolicy(createTopicPolicy).
setFeatureControl(featureControl).
build();
+ this.scramControlManager = new ScramControlManager.Builder().
+ setLogContext(logContext).
+ setSnapshotRegistry(snapshotRegistry).
+ build();
this.authorizer = authorizer;
authorizer.ifPresent(a -> a.setAclMutator(this));
this.aclControlManager = new AclControlManager(snapshotRegistry, authorizer);
@@ -1762,6 +1781,18 @@ public final class QuorumController implements Controller {
() -> replicationControl.alterPartition(context, request));
}
+ @Override
+ public CompletableFuture alterUserScramCredentials(
+ ControllerRequestContext context,
+ AlterUserScramCredentialsRequestData request
+ ) {
+ if (request.deletions().isEmpty() && request.upsertions().isEmpty()) {
+ return CompletableFuture.completedFuture(new AlterUserScramCredentialsResponseData());
+ }
+ return appendWriteEvent("alterUserScramCredentials", context.deadlineNs(),
+ () -> scramControlManager.alterCredentials(request, featureControl.metadataVersion()));
+ }
+
@Override
public CompletableFuture createTopics(
ControllerRequestContext context,
diff --git a/metadata/src/main/java/org/apache/kafka/controller/ScramControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ScramControlManager.java
new file mode 100644
index 00000000000..e50a9cd9798
--- /dev/null
+++ b/metadata/src/main/java/org/apache/kafka/controller/ScramControlManager.java
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.clients.admin.ScramMechanism;
+import org.apache.kafka.common.message.AlterUserScramCredentialsRequestData;
+import org.apache.kafka.common.message.AlterUserScramCredentialsRequestData.ScramCredentialDeletion;
+import org.apache.kafka.common.message.AlterUserScramCredentialsRequestData.ScramCredentialUpsertion;
+import org.apache.kafka.common.message.AlterUserScramCredentialsResponseData;
+import org.apache.kafka.common.message.AlterUserScramCredentialsResponseData.AlterUserScramCredentialsResult;
+import org.apache.kafka.common.metadata.RemoveUserScramCredentialRecord;
+import org.apache.kafka.common.metadata.UserScramCredentialRecord;
+import org.apache.kafka.common.requests.ApiError;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.common.MetadataVersion;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+
+import static org.apache.kafka.common.protocol.Errors.DUPLICATE_RESOURCE;
+import static org.apache.kafka.common.protocol.Errors.NONE;
+import static org.apache.kafka.common.protocol.Errors.RESOURCE_NOT_FOUND;
+import static org.apache.kafka.common.protocol.Errors.UNSUPPORTED_VERSION;
+import static org.apache.kafka.common.protocol.Errors.UNACCEPTABLE_CREDENTIAL;
+import static org.apache.kafka.common.protocol.Errors.UNSUPPORTED_SASL_MECHANISM;
+
+
+/**
+ * Manages SCRAM credentials.
+ */
+public class ScramControlManager {
+ static final int MAX_ITERATIONS = 16384;
+
+ static class Builder {
+ private LogContext logContext = null;
+ private SnapshotRegistry snapshotRegistry = null;
+
+ Builder setLogContext(LogContext logContext) {
+ this.logContext = logContext;
+ return this;
+ }
+
+ Builder setSnapshotRegistry(SnapshotRegistry snapshotRegistry) {
+ this.snapshotRegistry = snapshotRegistry;
+ return this;
+ }
+
+ ScramControlManager build() {
+ if (logContext == null) logContext = new LogContext();
+ if (snapshotRegistry == null) snapshotRegistry = new SnapshotRegistry(logContext);
+ return new ScramControlManager(logContext,
+ snapshotRegistry);
+ }
+ }
+
+ static class ScramCredentialKey {
+ private final String username;
+ private final ScramMechanism mechanism;
+
+ ScramCredentialKey(String username, ScramMechanism mechanism) {
+ this.username = username;
+ this.mechanism = mechanism;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(username, mechanism);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o == null) return false;
+ if (!(o.getClass() == this.getClass())) return false;
+ ScramCredentialKey other = (ScramCredentialKey) o;
+ return username.equals(other.username) &&
+ mechanism.equals(other.mechanism);
+ }
+
+ @Override
+ public String toString() {
+ return "ScramCredentialKey" +
+ "(username=" + username +
+ ", mechanism=" + mechanism +
+ ")";
+ }
+ }
+
+ static class ScramCredentialValue {
+ private final byte[] salt;
+ private final byte[] saltedPassword;
+ private final int iterations;
+
+ ScramCredentialValue(
+ byte[] salt,
+ byte[] saltedPassword,
+ int iterations
+ ) {
+ this.salt = salt;
+ this.saltedPassword = saltedPassword;
+ this.iterations = iterations;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(salt, saltedPassword, iterations);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o == null) return false;
+ if (!(o.getClass() == this.getClass())) return false;
+ ScramCredentialValue other = (ScramCredentialValue) o;
+ return Arrays.equals(salt, other.salt) &&
+ Arrays.equals(saltedPassword, other.saltedPassword) &&
+ iterations == other.iterations;
+ }
+
+ @Override
+ public String toString() {
+ return "ScramCredentialValue" +
+ "(salt=" + "[hidden]" +
+ ", saltedPassword=" + "[hidden]" +
+ ", iterations=" + "[hidden]" +
+ ")";
+ }
+ }
+
+ private final Logger log;
+ private final TimelineHashMap credentials;
+
+ private ScramControlManager(
+ LogContext logContext,
+ SnapshotRegistry snapshotRegistry
+ ) {
+ this.log = logContext.logger(ScramControlManager.class);
+ this.credentials = new TimelineHashMap<>(snapshotRegistry, 0);
+ }
+
+ /*
+ * Pass in the MetadataVersion so that we can return a response to the caller
+ * if the current metadataVersion is too low.
+ */
+ public ControllerResult alterCredentials(
+ AlterUserScramCredentialsRequestData request,
+ MetadataVersion metadataVersion
+ ) {
+ boolean scramIsSupported = metadataVersion.isScramSupported();
+ Map userToDeletion = new HashMap<>();
+ Map userToUpsert = new HashMap<>();
+ Map userToError = new HashMap<>();
+
+ for (ScramCredentialDeletion deletion : request.deletions()) {
+ if (!userToError.containsKey(deletion.name())) {
+ if (userToDeletion.remove(deletion.name()) != null) {
+ userToError.put(deletion.name(), new ApiError(DUPLICATE_RESOURCE,
+ "A user credential cannot be altered twice in the same request"));
+ } else {
+ if (!scramIsSupported) {
+ userToError.put(deletion.name(), new ApiError(UNSUPPORTED_VERSION,
+ "The current metadata version does not support SCRAM"));
+ } else {
+ ApiError error = validateDeletion(deletion);
+ if (error.isFailure()) {
+ userToError.put(deletion.name(), error);
+ } else {
+ userToDeletion.put(deletion.name(), deletion);
+ }
+ }
+ }
+ }
+ }
+ for (ScramCredentialUpsertion upsertion : request.upsertions()) {
+ if (!userToError.containsKey(upsertion.name())) {
+ if (userToDeletion.remove(upsertion.name()) != null ||
+ userToUpsert.remove(upsertion.name()) != null) {
+ userToError.put(upsertion.name(), new ApiError(DUPLICATE_RESOURCE,
+ "A user credential cannot be altered twice in the same request"));
+ } else {
+ if (!scramIsSupported) {
+ userToError.put(upsertion.name(), new ApiError(UNSUPPORTED_VERSION,
+ "The current metadata version does not support SCRAM"));
+ } else {
+ ApiError error = validateUpsertion(upsertion);
+ if (error.isFailure()) {
+ userToError.put(upsertion.name(), error);
+ } else {
+ userToUpsert.put(upsertion.name(), upsertion);
+ }
+ }
+ }
+ }
+ }
+ AlterUserScramCredentialsResponseData response = new AlterUserScramCredentialsResponseData();
+ List records = new ArrayList<>();
+ for (ScramCredentialDeletion deletion : userToDeletion.values()) {
+ response.results().add(new AlterUserScramCredentialsResult().
+ setUser(deletion.name()).
+ setErrorCode(NONE.code()).
+ setErrorMessage(null));
+ records.add(new ApiMessageAndVersion(new RemoveUserScramCredentialRecord().
+ setName(deletion.name()).
+ setMechanism(deletion.mechanism()), (short) 0));
+ }
+ for (ScramCredentialUpsertion upsertion : userToUpsert.values()) {
+ response.results().add(new AlterUserScramCredentialsResult().
+ setUser(upsertion.name()).
+ setErrorCode(NONE.code()).
+ setErrorMessage(null));
+ records.add(new ApiMessageAndVersion(new UserScramCredentialRecord().
+ setName(upsertion.name()).
+ setMechanism(upsertion.mechanism()).
+ setSalt(upsertion.salt()).
+ setSaltedPassword(upsertion.saltedPassword()).
+ setIterations(upsertion.iterations()), (short) 0));
+ }
+ for (Entry entry : userToError.entrySet()) {
+ response.results().add(new AlterUserScramCredentialsResult().
+ setUser(entry.getKey()).
+ setErrorCode(entry.getValue().error().code()).
+ setErrorMessage(entry.getValue().message()));
+ }
+ return ControllerResult.atomicOf(records, response);
+ }
+
+ static ApiError validateUpsertion(ScramCredentialUpsertion upsertion) {
+ ScramMechanism mechanism = ScramMechanism.fromType(upsertion.mechanism());
+ ApiError error = validateScramUsernameAndMechanism(upsertion.name(), mechanism);
+ if (error.isFailure()) return error;
+ org.apache.kafka.common.security.scram.internals.ScramMechanism internalMechanism =
+ org.apache.kafka.common.security.scram.internals.ScramMechanism.forMechanismName(mechanism.mechanismName());
+ if (upsertion.iterations() < internalMechanism.minIterations()) {
+ return new ApiError(UNACCEPTABLE_CREDENTIAL, "Too few iterations");
+ } else if (upsertion.iterations() > MAX_ITERATIONS) {
+ return new ApiError(UNACCEPTABLE_CREDENTIAL, "Too many iterations");
+ }
+ return ApiError.NONE;
+ }
+
+ ApiError validateDeletion(ScramCredentialDeletion deletion) {
+ ApiError error = validateScramUsernameAndMechanism(deletion.name(),
+ ScramMechanism.fromType(deletion.mechanism()));
+ if (error.isFailure()) return error;
+ ScramCredentialKey key = new ScramCredentialKey(deletion.name(),
+ ScramMechanism.fromType(deletion.mechanism()));
+ if (!credentials.containsKey(key)) {
+ return new ApiError(RESOURCE_NOT_FOUND,
+ "Attempt to delete a user credential that does not exist");
+ }
+ return ApiError.NONE;
+ }
+
+ static ApiError validateScramUsernameAndMechanism(
+ String username,
+ ScramMechanism mechanism
+ ) {
+ if (username.isEmpty()) {
+ return new ApiError(UNACCEPTABLE_CREDENTIAL, "Username must not be empty");
+ }
+ if (mechanism == ScramMechanism.UNKNOWN) {
+ return new ApiError(UNSUPPORTED_SASL_MECHANISM, "Unknown SCRAM mechanism");
+ }
+ return ApiError.NONE;
+ }
+
+ public void replay(RemoveUserScramCredentialRecord record) {
+ ScramCredentialKey key = new ScramCredentialKey(record.name(),
+ ScramMechanism.fromType(record.mechanism()));
+ if (credentials.remove(key) == null) {
+ throw new RuntimeException("Unable to find credential to delete: " + key);
+ }
+ log.info("Removed SCRAM credential for {} with mechanism {}.",
+ key.username, key.mechanism);
+ }
+
+ public void replay(UserScramCredentialRecord record) {
+ ScramCredentialKey key = new ScramCredentialKey(record.name(),
+ ScramMechanism.fromType(record.mechanism()));
+ ScramCredentialValue value = new ScramCredentialValue(record.salt(),
+ record.saltedPassword(),
+ record.iterations());
+ if (credentials.put(key, value) == null) {
+ log.info("Created new SCRAM credential for {} with mechanism {}.",
+ key.username, key.mechanism);
+ } else {
+ log.info("Modified SCRAM credential for {} with mechanism {}.",
+ key.username, key.mechanism);
+ }
+ }
+
+ ApiMessageAndVersion toRecord(ScramCredentialKey key, ScramCredentialValue value) {
+ return new ApiMessageAndVersion(new UserScramCredentialRecord().
+ setName(key.username).
+ setMechanism(key.mechanism.type()).
+ setSalt(value.salt).
+ setSaltedPassword(value.saltedPassword).
+ setIterations(value.iterations),
+ (short) 0);
+ }
+
+}
diff --git a/metadata/src/main/java/org/apache/kafka/image/MetadataDelta.java b/metadata/src/main/java/org/apache/kafka/image/MetadataDelta.java
index ab4fd68f41a..05fc879bfe9 100644
--- a/metadata/src/main/java/org/apache/kafka/image/MetadataDelta.java
+++ b/metadata/src/main/java/org/apache/kafka/image/MetadataDelta.java
@@ -30,9 +30,11 @@ import org.apache.kafka.common.metadata.ProducerIdsRecord;
import org.apache.kafka.common.metadata.RegisterBrokerRecord;
import org.apache.kafka.common.metadata.RemoveAccessControlEntryRecord;
import org.apache.kafka.common.metadata.RemoveTopicRecord;
+import org.apache.kafka.common.metadata.RemoveUserScramCredentialRecord;
import org.apache.kafka.common.metadata.TopicRecord;
import org.apache.kafka.common.metadata.UnfenceBrokerRecord;
import org.apache.kafka.common.metadata.UnregisterBrokerRecord;
+import org.apache.kafka.common.metadata.UserScramCredentialRecord;
import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.server.common.MetadataVersion;
@@ -72,6 +74,8 @@ public final class MetadataDelta {
private AclsDelta aclsDelta = null;
+ private ScramDelta scramDelta = null;
+
public MetadataDelta(MetadataImage image) {
this.image = image;
}
@@ -145,6 +149,15 @@ public final class MetadataDelta {
return aclsDelta;
}
+ public ScramDelta scramDelta() {
+ return scramDelta;
+ }
+
+ public ScramDelta getOrCreateScramDelta() {
+ if (scramDelta == null) scramDelta = new ScramDelta(image.scram());
+ return scramDelta;
+ }
+
public Optional metadataVersionChanged() {
if (featuresDelta == null) {
return Optional.empty();
@@ -183,6 +196,9 @@ public final class MetadataDelta {
case REMOVE_TOPIC_RECORD:
replay((RemoveTopicRecord) record);
break;
+ case USER_SCRAM_CREDENTIAL_RECORD:
+ replay((UserScramCredentialRecord) record);
+ break;
case FEATURE_LEVEL_RECORD:
replay((FeatureLevelRecord) record);
break;
@@ -201,6 +217,9 @@ public final class MetadataDelta {
case REMOVE_ACCESS_CONTROL_ENTRY_RECORD:
replay((RemoveAccessControlEntryRecord) record);
break;
+ case REMOVE_USER_SCRAM_CREDENTIAL_RECORD:
+ replay((RemoveUserScramCredentialRecord) record);
+ break;
case NO_OP_RECORD:
/* NoOpRecord is an empty record and doesn't need to be replayed beyond
* updating the highest offset and epoch.
@@ -251,6 +270,10 @@ public final class MetadataDelta {
getOrCreateConfigsDelta().replay(record, topicName);
}
+ public void replay(UserScramCredentialRecord record) {
+ getOrCreateScramDelta().replay(record);
+ }
+
public void replay(FeatureLevelRecord record) {
getOrCreateFeaturesDelta().replay(record);
featuresDelta.metadataVersionChange().ifPresent(changedMetadataVersion -> {
@@ -261,6 +284,7 @@ public final class MetadataDelta {
getOrCreateClientQuotasDelta().handleMetadataVersionChange(changedMetadataVersion);
getOrCreateProducerIdsDelta().handleMetadataVersionChange(changedMetadataVersion);
getOrCreateAclsDelta().handleMetadataVersionChange(changedMetadataVersion);
+ getOrCreateScramDelta().handleMetadataVersionChange(changedMetadataVersion);
});
}
@@ -284,6 +308,10 @@ public final class MetadataDelta {
getOrCreateAclsDelta().replay(record);
}
+ public void replay(RemoveUserScramCredentialRecord record) {
+ getOrCreateScramDelta().replay(record);
+ }
+
/**
* Create removal deltas for anything which was in the base image, but which was not
* referenced in the snapshot records we just applied.
@@ -296,6 +324,7 @@ public final class MetadataDelta {
getOrCreateClientQuotasDelta().finishSnapshot();
getOrCreateProducerIdsDelta().finishSnapshot();
getOrCreateAclsDelta().finishSnapshot();
+ getOrCreateScramDelta().finishSnapshot();
}
public MetadataImage apply(MetadataProvenance provenance) {
@@ -341,6 +370,12 @@ public final class MetadataDelta {
} else {
newAcls = aclsDelta.apply();
}
+ ScramImage newScram;
+ if (scramDelta == null) {
+ newScram = image.scram();
+ } else {
+ newScram = scramDelta.apply();
+ }
return new MetadataImage(
provenance,
newFeatures,
@@ -349,7 +384,8 @@ public final class MetadataDelta {
newConfigs,
newClientQuotas,
newProducerIds,
- newAcls
+ newAcls,
+ newScram
);
}
@@ -363,6 +399,7 @@ public final class MetadataDelta {
", clientQuotasDelta=" + clientQuotasDelta +
", producerIdsDelta=" + producerIdsDelta +
", aclsDelta=" + aclsDelta +
+ ", scramDelta=" + scramDelta +
')';
}
}
diff --git a/metadata/src/main/java/org/apache/kafka/image/MetadataImage.java b/metadata/src/main/java/org/apache/kafka/image/MetadataImage.java
index 2202b4fe2fe..8643a23e03c 100644
--- a/metadata/src/main/java/org/apache/kafka/image/MetadataImage.java
+++ b/metadata/src/main/java/org/apache/kafka/image/MetadataImage.java
@@ -38,7 +38,8 @@ public final class MetadataImage {
ConfigurationsImage.EMPTY,
ClientQuotasImage.EMPTY,
ProducerIdsImage.EMPTY,
- AclsImage.EMPTY);
+ AclsImage.EMPTY,
+ ScramImage.EMPTY);
private final MetadataProvenance provenance;
@@ -56,6 +57,8 @@ public final class MetadataImage {
private final AclsImage acls;
+ private final ScramImage scram;
+
public MetadataImage(
MetadataProvenance provenance,
FeaturesImage features,
@@ -64,7 +67,8 @@ public final class MetadataImage {
ConfigurationsImage configs,
ClientQuotasImage clientQuotas,
ProducerIdsImage producerIds,
- AclsImage acls
+ AclsImage acls,
+ ScramImage scram
) {
this.provenance = provenance;
this.features = features;
@@ -74,6 +78,7 @@ public final class MetadataImage {
this.clientQuotas = clientQuotas;
this.producerIds = producerIds;
this.acls = acls;
+ this.scram = scram;
}
public boolean isEmpty() {
@@ -83,7 +88,8 @@ public final class MetadataImage {
configs.isEmpty() &&
clientQuotas.isEmpty() &&
producerIds.isEmpty() &&
- acls.isEmpty();
+ acls.isEmpty() &&
+ scram.isEmpty();
}
public MetadataProvenance provenance() {
@@ -126,6 +132,10 @@ public final class MetadataImage {
return acls;
}
+ public ScramImage scram() {
+ return scram;
+ }
+
public void write(ImageWriter writer, ImageWriterOptions options) {
// Features should be written out first so we can include the metadata.version at the beginning of the
// snapshot
@@ -136,6 +146,7 @@ public final class MetadataImage {
clientQuotas.write(writer, options);
producerIds.write(writer, options);
acls.write(writer, options);
+ scram.write(writer, options);
writer.close(true);
}
@@ -150,7 +161,8 @@ public final class MetadataImage {
configs.equals(other.configs) &&
clientQuotas.equals(other.clientQuotas) &&
producerIds.equals(other.producerIds) &&
- acls.equals(other.acls);
+ acls.equals(other.acls) &&
+ scram.equals(other.scram);
}
@Override
@@ -163,7 +175,8 @@ public final class MetadataImage {
configs,
clientQuotas,
producerIds,
- acls);
+ acls,
+ scram);
}
@Override
@@ -177,6 +190,7 @@ public final class MetadataImage {
", clientQuotas=" + clientQuotas +
", producerIdsImage=" + producerIds +
", acls=" + acls +
+ ", scram=" + scram +
")";
}
}
diff --git a/metadata/src/main/java/org/apache/kafka/image/ScramCredentialData.java b/metadata/src/main/java/org/apache/kafka/image/ScramCredentialData.java
new file mode 100644
index 00000000000..ab45046d257
--- /dev/null
+++ b/metadata/src/main/java/org/apache/kafka/image/ScramCredentialData.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image;
+
+import org.apache.kafka.clients.admin.ScramMechanism;
+import org.apache.kafka.common.metadata.UserScramCredentialRecord;
+import org.apache.kafka.common.security.scram.ScramCredential;
+import org.apache.kafka.common.security.scram.internals.ScramFormatter;
+
+import java.security.GeneralSecurityException;
+import java.util.Arrays;
+import java.util.Objects;
+
+
+/**
+ * Represents the ACLs in the metadata image.
+ *
+ * This class is thread-safe.
+ */
+public final class ScramCredentialData {
+ private final byte[] salt;
+ private final byte[] saltedPassword;
+ private final int iterations;
+
+ static ScramCredentialData fromRecord(
+ UserScramCredentialRecord record
+ ) {
+ return new ScramCredentialData(
+ record.salt(),
+ record.saltedPassword(),
+ record.iterations());
+ }
+
+ public ScramCredentialData(
+ byte[] salt,
+ byte[] saltedPassword,
+ int iterations
+ ) {
+ this.salt = salt;
+ this.saltedPassword = saltedPassword;
+ this.iterations = iterations;
+ }
+
+ public byte[] salt() {
+ return salt;
+ }
+
+ public byte[] saltedPassword() {
+ return saltedPassword;
+ }
+
+ public int iterations() {
+ return iterations;
+ }
+
+ public UserScramCredentialRecord toRecord(
+ String userName,
+ ScramMechanism mechanism
+ ) {
+ return new UserScramCredentialRecord().
+ setName(userName).
+ setMechanism(mechanism.type()).
+ setSalt(salt).
+ setSaltedPassword(saltedPassword).
+ setIterations(iterations);
+ }
+
+ public ScramCredential toCredential(
+ ScramMechanism mechanism
+ ) throws GeneralSecurityException {
+ org.apache.kafka.common.security.scram.internals.ScramMechanism internalMechanism =
+ org.apache.kafka.common.security.scram.internals.ScramMechanism.forMechanismName(mechanism.mechanismName());
+ ScramFormatter formatter = new ScramFormatter(internalMechanism);
+ return new ScramCredential(salt,
+ formatter.storedKey(formatter.clientKey(saltedPassword)),
+ formatter.serverKey(saltedPassword),
+ iterations);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(salt, saltedPassword, iterations);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o == null) return false;
+ if (!o.getClass().equals(ScramCredentialData.class)) return false;
+ ScramCredentialData other = (ScramCredentialData) o;
+ return Arrays.equals(salt, other.salt) &&
+ Arrays.equals(saltedPassword, other.saltedPassword) &&
+ iterations == other.iterations;
+ }
+
+ @Override
+ public String toString() {
+ return "ScramCredentialData" +
+ "(salt=" + "[hidden]" +
+ ", saltedPassword=" + "[hidden]" +
+ ", iterations=" + "[hidden]" +
+ ")";
+ }
+}
diff --git a/metadata/src/main/java/org/apache/kafka/image/ScramDelta.java b/metadata/src/main/java/org/apache/kafka/image/ScramDelta.java
new file mode 100644
index 00000000000..a26b18c7446
--- /dev/null
+++ b/metadata/src/main/java/org/apache/kafka/image/ScramDelta.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image;
+
+import org.apache.kafka.clients.admin.ScramMechanism;
+import org.apache.kafka.common.metadata.RemoveUserScramCredentialRecord;
+import org.apache.kafka.common.metadata.UserScramCredentialRecord;
+import org.apache.kafka.server.common.MetadataVersion;
+
+import java.util.HashMap;
+import java.util.Map.Entry;
+import java.util.Map;
+import java.util.Optional;
+
+
+/**
+ * Represents changes to a topic in the metadata image.
+ */
+public final class ScramDelta {
+ private final ScramImage image;
+
+ private final Map>> changes = new HashMap<>();
+
+ public ScramDelta(ScramImage image) {
+ this.image = image;
+ }
+
+ public void finishSnapshot() {
+ for (Entry> mechanismEntry : image.mechanisms().entrySet()) {
+ Map> userNameMap =
+ changes.computeIfAbsent(mechanismEntry.getKey(), __ -> new HashMap<>());
+ for (String userName : mechanismEntry.getValue().keySet()) {
+ if (!userNameMap.containsKey(userName)) {
+ userNameMap.put(userName, Optional.empty());
+ }
+ }
+ }
+ }
+
+ public ScramImage image() {
+ return image;
+ }
+
+ public Map>> changes() {
+ return changes;
+ }
+
+ public void replay(UserScramCredentialRecord record) {
+ ScramMechanism mechanism = ScramMechanism.fromType(record.mechanism());
+ Map> userChanges =
+ changes.computeIfAbsent(mechanism, __ -> new HashMap<>());
+ userChanges.put(record.name(), Optional.of(ScramCredentialData.fromRecord(record)));
+ }
+
+ public void replay(RemoveUserScramCredentialRecord record) {
+ ScramMechanism mechanism = ScramMechanism.fromType(record.mechanism());
+ Map> userChanges =
+ changes.computeIfAbsent(mechanism, __ -> new HashMap<>());
+ userChanges.put(record.name(), Optional.empty());
+ }
+
+ public void handleMetadataVersionChange(MetadataVersion changedMetadataVersion) {
+ // nothing to do
+ }
+
+ public ScramImage apply() {
+ Map> newMechanisms = new HashMap<>();
+ for (Entry> mechanismEntry : image.mechanisms().entrySet()) {
+ newMechanisms.put(mechanismEntry.getKey(), new HashMap<>(mechanismEntry.getValue()));
+ }
+ for (Entry>> mechanismChangeEntry : changes.entrySet()) {
+ Map userMap =
+ newMechanisms.computeIfAbsent(mechanismChangeEntry.getKey(), __ -> new HashMap<>());
+ for (Entry> userNameEntry : mechanismChangeEntry.getValue().entrySet()) {
+ if (userNameEntry.getValue().isPresent()) {
+ userMap.put(userNameEntry.getKey(), userNameEntry.getValue().get());
+ } else {
+ userMap.remove(userNameEntry.getKey());
+ if (userMap.isEmpty()) {
+ newMechanisms.remove(mechanismChangeEntry.getKey());
+ }
+ }
+ }
+ }
+ return new ScramImage(newMechanisms);
+ }
+
+ @Override
+ public String toString() {
+ return "ScramDelta(" +
+ "changes=" + changes +
+ ')';
+ }
+}
diff --git a/metadata/src/main/java/org/apache/kafka/image/ScramImage.java b/metadata/src/main/java/org/apache/kafka/image/ScramImage.java
new file mode 100644
index 00000000000..97289b19f88
--- /dev/null
+++ b/metadata/src/main/java/org/apache/kafka/image/ScramImage.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image;
+
+import org.apache.kafka.image.writer.ImageWriter;
+import org.apache.kafka.image.writer.ImageWriterOptions;
+import org.apache.kafka.clients.admin.ScramMechanism;
+import org.apache.kafka.common.message.DescribeUserScramCredentialsRequestData;
+import org.apache.kafka.common.message.DescribeUserScramCredentialsRequestData.UserName;
+import org.apache.kafka.common.message.DescribeUserScramCredentialsResponseData;
+import org.apache.kafka.common.message.DescribeUserScramCredentialsResponseData.CredentialInfo;
+import org.apache.kafka.common.message.DescribeUserScramCredentialsResponseData.DescribeUserScramCredentialsResult;
+import org.apache.kafka.common.protocol.Errors;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.Map.Entry;
+import java.util.stream.Collectors;
+
+
+/**
+ * Represents the SCRAM credentials in the metadata image.
+ *
+ * This class is thread-safe.
+ */
+public final class ScramImage {
+ public static final ScramImage EMPTY = new ScramImage(Collections.emptyMap());
+
+ private final Map> mechanisms;
+
+ public ScramImage(Map> mechanisms) {
+ this.mechanisms = Collections.unmodifiableMap(mechanisms);
+ }
+
+ public void write(ImageWriter writer, ImageWriterOptions options) {
+ if (options.metadataVersion().isScramSupported()) {
+ for (Entry> mechanismEntry : mechanisms.entrySet()) {
+ for (Entry userEntry : mechanismEntry.getValue().entrySet()) {
+ writer.write(0, userEntry.getValue().toRecord(userEntry.getKey(), mechanismEntry.getKey()));
+ }
+ }
+ } else {
+ boolean isEmpty = true;
+ StringBuffer scramImageString = new StringBuffer("ScramImage({");
+ for (Entry> mechanismEntry : mechanisms.entrySet()) {
+ if (!mechanismEntry.getValue().isEmpty()) {
+ scramImageString.append(mechanismEntry.getKey() + ":");
+ List users = new ArrayList<>(mechanismEntry.getValue().keySet());
+ scramImageString.append(users.stream().collect(Collectors.joining(", ")));
+ scramImageString.append("},{");
+ isEmpty = false;
+ }
+ }
+
+ if (!isEmpty) {
+ scramImageString.append("})");
+ options.handleLoss(scramImageString.toString());
+ }
+ }
+ }
+
+ private static final String DESCRIBE_DUPLICATE_USER = "Cannot describe SCRAM credentials for the same user twice in a single request: ";
+ private static final String DESCRIBE_USER_THAT_DOES_NOT_EXIST = "Attempt to describe a user credential that does not exist: ";
+ public DescribeUserScramCredentialsResponseData describe(DescribeUserScramCredentialsRequestData request) {
+
+ List users = request.users();
+ Map uniqueUsers = new HashMap();
+
+ if ((users == null) || (users.size() == 0)) {
+ // If there are no users listed then get all the users
+ for (Map scramCredentialDataSet : mechanisms.values()) {
+ for (String user : scramCredentialDataSet.keySet()) {
+ uniqueUsers.put(user, false);
+ }
+ }
+ } else {
+ // Filter out duplicates
+ for (UserName user : users) {
+ if (uniqueUsers.containsKey(user.name())) {
+ uniqueUsers.put(user.name(), true);
+ } else {
+ uniqueUsers.put(user.name(), false);
+ }
+ }
+ }
+
+ DescribeUserScramCredentialsResponseData retval = new DescribeUserScramCredentialsResponseData();
+
+ for (Map.Entry user : uniqueUsers.entrySet()) {
+ DescribeUserScramCredentialsResult result = new DescribeUserScramCredentialsResult().setUser(user.getKey());
+
+ if (!user.getValue()) {
+ boolean datafound = false;
+ List credentialInfos = new ArrayList();
+ for (Map.Entry> mechanismsEntry : mechanisms.entrySet()) {
+ Map credentialDataSet = mechanismsEntry.getValue();
+ if (credentialDataSet.containsKey(user.getKey())) {
+ credentialInfos.add(new CredentialInfo().setMechanism(mechanismsEntry.getKey().type())
+ .setIterations(credentialDataSet.get(user.getKey()).iterations()));
+ datafound = true;
+ }
+ }
+ if (datafound) {
+ result.setCredentialInfos(credentialInfos);
+ } else {
+ result.setErrorCode(Errors.RESOURCE_NOT_FOUND.code())
+ .setErrorMessage(DESCRIBE_USER_THAT_DOES_NOT_EXIST + user.getKey());
+ }
+ } else {
+ result.setErrorCode(Errors.DUPLICATE_RESOURCE.code())
+ .setErrorMessage(DESCRIBE_DUPLICATE_USER + user.getKey());
+ }
+ retval.results().add(result);
+ }
+ return retval;
+ }
+
+ public Map> mechanisms() {
+ return mechanisms;
+ }
+
+ public boolean isEmpty() {
+ return mechanisms.isEmpty();
+ }
+
+ @Override
+ public int hashCode() {
+ return mechanisms.hashCode();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o == null) return false;
+ if (!o.getClass().equals(ScramImage.class)) return false;
+ ScramImage other = (ScramImage) o;
+ return mechanisms.equals(other.mechanisms);
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder builder = new StringBuilder();
+ builder.append("ScramImage(");
+ List sortedMechanisms = mechanisms.keySet().stream().sorted().collect(Collectors.toList());
+ String preMechanismComma = "";
+ for (ScramMechanism mechanism : sortedMechanisms) {
+ builder.append(preMechanismComma).append(mechanism).append(": {");
+ Map userMap = mechanisms.get(mechanism);
+ List sortedUserNames = userMap.keySet().stream().sorted().collect(Collectors.toList());
+ String preUserNameComma = "";
+ for (String userName : sortedUserNames) {
+ builder.append(preUserNameComma).append(userName).append("=").append(userMap.get(userName));
+ preUserNameComma = ", ";
+ }
+ builder.append("}");
+ preMechanismComma = ", ";
+ }
+ builder.append(")");
+ return builder.toString();
+ }
+}
diff --git a/metadata/src/main/resources/common/metadata/RemoveUserScramCredentialRecord.json b/metadata/src/main/resources/common/metadata/RemoveUserScramCredentialRecord.json
new file mode 100644
index 00000000000..1254b2a7efa
--- /dev/null
+++ b/metadata/src/main/resources/common/metadata/RemoveUserScramCredentialRecord.json
@@ -0,0 +1,28 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+ "apiKey": 22,
+ "type": "metadata",
+ "name": "RemoveUserScramCredentialRecord",
+ "validVersions": "0",
+ "flexibleVersions": "0+",
+ "fields": [
+ { "name": "Name", "type": "string", "versions": "0+",
+ "about": "The user name." },
+ { "name": "Mechanism", "type": "int8", "versions": "0+",
+ "about": "The SCRAM mechanism." }
+ ]
+}
diff --git a/metadata/src/main/resources/common/metadata/UserScramCredentialRecord.json b/metadata/src/main/resources/common/metadata/UserScramCredentialRecord.json
index 2f106ff7cde..a24cc53194a 100644
--- a/metadata/src/main/resources/common/metadata/UserScramCredentialRecord.json
+++ b/metadata/src/main/resources/common/metadata/UserScramCredentialRecord.json
@@ -22,15 +22,13 @@
"fields": [
{ "name": "Name", "type": "string", "versions": "0+",
"about": "The user name." },
- { "name": "CredentialInfos", "type": "[]CredentialInfo", "versions": "0+",
- "about": "The mechanism and related information associated with the user's SCRAM credential.", "fields": [
- { "name": "Mechanism", "type": "int8", "versions": "0+",
- "about": "The SCRAM mechanism." },
- { "name": "Salt", "type": "bytes", "versions": "0+",
- "about": "A random salt generated by the client." },
- { "name": "SaltedPassword", "type": "bytes", "versions": "0+",
- "about": "The salted password." },
- { "name": "Iterations", "type": "int32", "versions": "0+",
- "about": "The number of iterations used in the SCRAM credential." }]}
+ { "name": "Mechanism", "type": "int8", "versions": "0+",
+ "about": "The SCRAM mechanism." },
+ { "name": "Salt", "type": "bytes", "versions": "0+",
+ "about": "A random salt generated by the client." },
+ { "name": "SaltedPassword", "type": "bytes", "versions": "0+",
+ "about": "The salted password." },
+ { "name": "Iterations", "type": "int32", "versions": "0+",
+ "about": "The number of iterations used in the SCRAM credential." }
]
}
diff --git a/metadata/src/test/java/org/apache/kafka/image/MetadataImageTest.java b/metadata/src/test/java/org/apache/kafka/image/MetadataImageTest.java
index be21a87bd69..2a6e3e6f3e5 100644
--- a/metadata/src/test/java/org/apache/kafka/image/MetadataImageTest.java
+++ b/metadata/src/test/java/org/apache/kafka/image/MetadataImageTest.java
@@ -43,7 +43,8 @@ public class MetadataImageTest {
ConfigurationsImageTest.IMAGE1,
ClientQuotasImageTest.IMAGE1,
ProducerIdsImageTest.IMAGE1,
- AclsImageTest.IMAGE1);
+ AclsImageTest.IMAGE1,
+ ScramImageTest.IMAGE1);
DELTA1 = new MetadataDelta.Builder().
setImage(IMAGE1).
@@ -55,6 +56,7 @@ public class MetadataImageTest {
RecordTestUtils.replayAll(DELTA1, ClientQuotasImageTest.DELTA1_RECORDS);
RecordTestUtils.replayAll(DELTA1, ProducerIdsImageTest.DELTA1_RECORDS);
RecordTestUtils.replayAll(DELTA1, AclsImageTest.DELTA1_RECORDS);
+ RecordTestUtils.replayAll(DELTA1, ScramImageTest.DELTA1_RECORDS);
IMAGE2 = new MetadataImage(
new MetadataProvenance(200, 5, 4000),
@@ -64,7 +66,8 @@ public class MetadataImageTest {
ConfigurationsImageTest.IMAGE2,
ClientQuotasImageTest.IMAGE2,
ProducerIdsImageTest.IMAGE2,
- AclsImageTest.IMAGE2);
+ AclsImageTest.IMAGE2,
+ ScramImageTest.IMAGE2);
}
@Test
diff --git a/metadata/src/test/java/org/apache/kafka/image/ScramImageTest.java b/metadata/src/test/java/org/apache/kafka/image/ScramImageTest.java
new file mode 100644
index 00000000000..b96be23cded
--- /dev/null
+++ b/metadata/src/test/java/org/apache/kafka/image/ScramImageTest.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image;
+
+import org.apache.kafka.clients.admin.ScramMechanism;
+import org.apache.kafka.common.metadata.RemoveUserScramCredentialRecord;
+import org.apache.kafka.common.metadata.UserScramCredentialRecord;
+import org.apache.kafka.image.writer.ImageWriterOptions;
+import org.apache.kafka.image.writer.RecordListWriter;
+import org.apache.kafka.server.common.MetadataVersion;
+import org.apache.kafka.server.util.MockRandom;
+import org.apache.kafka.metadata.RecordTestUtils;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+
+import static org.apache.kafka.clients.admin.ScramMechanism.SCRAM_SHA_256;
+import static org.apache.kafka.clients.admin.ScramMechanism.SCRAM_SHA_512;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.fail;
+
+
+@Timeout(value = 40)
+public class ScramImageTest {
+ final static ScramImage IMAGE1;
+
+ final static List DELTA1_RECORDS;
+
+ final static ScramDelta DELTA1;
+
+ final static ScramImage IMAGE2;
+
+ static byte[] randomBuffer(Random random, int length) {
+ byte[] buf = new byte[length];
+ random.nextBytes(buf);
+ return buf;
+ }
+
+ static ScramCredentialData randomScramCredentialData(Random random) {
+ return new ScramCredentialData(
+ randomBuffer(random, 1024),
+ randomBuffer(random, 1024),
+ 1024 + random.nextInt(1024));
+ }
+
+ static {
+ MockRandom random = new MockRandom();
+
+ Map> image1mechanisms = new HashMap<>();
+
+ Map image1sha256 = new HashMap<>();
+ image1sha256.put("alpha", randomScramCredentialData(random));
+ image1sha256.put("beta", randomScramCredentialData(random));
+ image1mechanisms.put(SCRAM_SHA_256, image1sha256);
+
+ Map image1sha512 = new HashMap<>();
+ image1sha512.put("alpha", randomScramCredentialData(random));
+ image1sha512.put("gamma", randomScramCredentialData(random));
+ image1mechanisms.put(SCRAM_SHA_512, image1sha512);
+
+ IMAGE1 = new ScramImage(image1mechanisms);
+
+ DELTA1_RECORDS = new ArrayList<>();
+ DELTA1_RECORDS.add(new ApiMessageAndVersion(new RemoveUserScramCredentialRecord().
+ setName("gamma").
+ setMechanism(SCRAM_SHA_512.type()), (short) 0));
+ ScramCredentialData secondAlpha256Credential = randomScramCredentialData(random);
+ DELTA1_RECORDS.add(new ApiMessageAndVersion(new UserScramCredentialRecord().
+ setName("alpha").
+ setMechanism(SCRAM_SHA_256.type()).
+ setIterations(secondAlpha256Credential.iterations()).
+ setSalt(secondAlpha256Credential.salt()).
+ setSaltedPassword(secondAlpha256Credential.saltedPassword()), (short) 0));
+ DELTA1 = new ScramDelta(IMAGE1);
+ RecordTestUtils.replayAll(DELTA1, DELTA1_RECORDS);
+
+ Map> image2mechanisms = new HashMap<>();
+
+ Map image2sha256 = new HashMap<>();
+ image2sha256.put("alpha", secondAlpha256Credential);
+ image2sha256.put("beta", image1sha256.get("beta"));
+ image2mechanisms.put(SCRAM_SHA_256, image2sha256);
+
+ Map image2sha512 = new HashMap<>();
+ image2sha512.put("alpha", image1sha512.get("alpha"));
+ image2mechanisms.put(SCRAM_SHA_512, image2sha512);
+
+ IMAGE2 = new ScramImage(image2mechanisms);
+ }
+
+ @Test
+ public void testEmptyImageRoundTrip() throws Throwable {
+ testToImageAndBack(ScramImage.EMPTY);
+ }
+
+ @Test
+ public void testImage1RoundTrip() throws Throwable {
+ testToImageAndBack(IMAGE1);
+ }
+
+ @Test
+ public void testApplyDelta1() throws Throwable {
+ assertEquals(IMAGE2, DELTA1.apply());
+ }
+
+ @Test
+ public void testImage2RoundTrip() throws Throwable {
+ testToImageAndBack(IMAGE2);
+ }
+
+ private void testToImageAndBack(ScramImage image) throws Throwable {
+ RecordListWriter writer = new RecordListWriter();
+ image.write(writer, new ImageWriterOptions.Builder().build());
+ ScramDelta delta = new ScramDelta(ScramImage.EMPTY);
+ RecordTestUtils.replayAll(delta, writer.records());
+ ScramImage nextImage = delta.apply();
+ assertEquals(image, nextImage);
+ }
+
+ @Test
+ public void testEmptyWithInvalidIBP() throws Throwable {
+ ImageWriterOptions imageWriterOptions = new ImageWriterOptions.Builder().
+ setMetadataVersion(MetadataVersion.IBP_3_4_IV0).build();
+ RecordListWriter writer = new RecordListWriter();
+ ScramImage.EMPTY.write(writer, imageWriterOptions);
+ }
+
+ @Test
+ public void testImage1withInvalidIBP() throws Throwable {
+ ImageWriterOptions imageWriterOptions = new ImageWriterOptions.Builder().
+ setMetadataVersion(MetadataVersion.IBP_3_4_IV0).build();
+ RecordListWriter writer = new RecordListWriter();
+ try {
+ IMAGE1.write(writer, imageWriterOptions);
+ fail("expected exception writing IMAGE with SCRAM records for MetadataVersion.IBP_3_4_IV0");
+ } catch (Exception expected) {
+ // ignore, expected
+ }
+ }
+}
diff --git a/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java b/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
index 8c8fe202ae8..67d878bc60e 100644
--- a/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
+++ b/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
@@ -167,7 +167,7 @@ public enum MetadataVersion {
// and updates to a handful of RPCs.
IBP_3_4_IV0(8, "3.4", "IV0", true),
- // Support for tiered storage (KIP-405)
+ // Support for tiered storage (KIP-405) and SCRAM
IBP_3_5_IV0(9, "3.5", "IV0", false);
// NOTE: update the default version in @ClusterTest annotation to point to the latest version
@@ -250,6 +250,10 @@ public enum MetadataVersion {
return this.isAtLeast(IBP_3_4_IV0);
}
+ public boolean isScramSupported() {
+ return this.isAtLeast(IBP_3_5_IV0);
+ }
+
public boolean isKRaftSupported() {
return this.featureLevel > 0;
}