diff --git a/checkstyle/import-control-core.xml b/checkstyle/import-control-core.xml index a6753636c31..1b445414cd0 100644 --- a/checkstyle/import-control-core.xml +++ b/checkstyle/import-control-core.xml @@ -123,6 +123,7 @@ + diff --git a/core/src/test/java/kafka/admin/UserScramCredentialsCommandTest.java b/core/src/test/java/kafka/admin/UserScramCredentialsCommandTest.java new file mode 100644 index 00000000000..eb89b0ec567 --- /dev/null +++ b/core/src/test/java/kafka/admin/UserScramCredentialsCommandTest.java @@ -0,0 +1,204 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.admin; + +import kafka.test.ClusterInstance; +import kafka.test.annotation.ClusterTest; +import kafka.test.annotation.ClusterTestDefaults; +import kafka.test.annotation.Type; +import kafka.test.junit.ClusterTestExtensions; +import kafka.utils.Exit; +import org.apache.kafka.test.NoRetryException; +import org.apache.kafka.test.TestUtils; +import org.junit.jupiter.api.extension.ExtendWith; +import scala.Console; + +import java.io.ByteArrayOutputStream; +import java.io.PrintStream; +import java.io.UnsupportedEncodingException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; +import java.util.OptionalInt; +import java.util.concurrent.atomic.AtomicReference; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +@SuppressWarnings("dontUseSystemExit") +@ExtendWith(value = ClusterTestExtensions.class) +@ClusterTestDefaults(clusterType = Type.ALL) +public class UserScramCredentialsCommandTest { + private static final String USER1 = "user1"; + private static final String USER2 = "user2"; + + private final ClusterInstance cluster; + + public UserScramCredentialsCommandTest(ClusterInstance cluster) { + this.cluster = cluster; + } + + static class ConfigCommandResult { + public final String stdout; + public final OptionalInt exitStatus; + + public ConfigCommandResult(String stdout) { + this(stdout, OptionalInt.empty()); + } + + public ConfigCommandResult(String stdout, OptionalInt exitStatus) { + this.stdout = stdout; + this.exitStatus = exitStatus; + } + } + + private ConfigCommandResult runConfigCommandViaBroker(String...args) { + ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); + String utf8 = StandardCharsets.UTF_8.name(); + PrintStream printStream; + try { + printStream = new PrintStream(byteArrayOutputStream, true, utf8); + } catch (UnsupportedEncodingException e) { + throw new RuntimeException(e); + } + AtomicReference exitStatus = new AtomicReference<>(OptionalInt.empty()); + Exit.setExitProcedure((status, __) -> { + exitStatus.set(OptionalInt.of((Integer) status)); + throw new RuntimeException(); + }); + + List commandArgs = new ArrayList<>(Arrays.asList("--bootstrap-server", cluster.bootstrapServers())); + commandArgs.addAll(Arrays.asList(args)); + try { + Console.withOut(printStream, () -> { + ConfigCommand.main(commandArgs.toArray(new String[0])); + return null; + }); + return new ConfigCommandResult(byteArrayOutputStream.toString(utf8)); + } catch (Exception e) { + return new ConfigCommandResult("", exitStatus.get()); + } finally { + printStream.close(); + Exit.resetExitProcedure(); + } + } + + @ClusterTest + public void testUserScramCredentialsRequests() throws Exception { + createAndAlterUser(USER1); + // now do the same thing for user2 + createAndAlterUser(USER2); + + // describe both + // we don't know the order that quota or scram users come out, so we have 2 possibilities for each, 4 total + String quotaPossibilityAOut = quotaMessage(USER1) + quotaMessage(USER2); + String quotaPossibilityBOut = quotaMessage(USER2) + quotaMessage(USER1); + String scramPossibilityAOut = describeUserMessage(USER1) + describeUserMessage(USER2); + String scramPossibilityBOut = describeUserMessage(USER2) + describeUserMessage(USER1); + describeUsers( + quotaPossibilityAOut + scramPossibilityAOut, + quotaPossibilityAOut + scramPossibilityBOut, + quotaPossibilityBOut + scramPossibilityAOut, + quotaPossibilityBOut + scramPossibilityBOut); + + // now delete configs, in opposite order, for user1 and user2, and describe + deleteConfig(USER1, "consumer_byte_rate"); + deleteConfig(USER2, "SCRAM-SHA-256"); + describeUsers(quotaMessage(USER2) + describeUserMessage(USER1)); + + // now delete the rest of the configs, for user1 and user2, and describe + deleteConfig(USER1, "SCRAM-SHA-256"); + deleteConfig(USER2, "consumer_byte_rate"); + describeUsers(""); + } + + @ClusterTest + public void testAlterWithEmptyPassword() { + String user1 = "user1"; + ConfigCommandResult result = runConfigCommandViaBroker("--user", user1, "--alter", "--add-config", "SCRAM-SHA-256=[iterations=4096,password=]"); + assertTrue(result.exitStatus.isPresent(), "Expected System.exit() to be called with an empty password"); + assertEquals(1, result.exitStatus.getAsInt(), "Expected empty password to cause failure with exit status=1"); + } + + @ClusterTest + public void testDescribeUnknownUser() { + String unknownUser = "unknownUser"; + ConfigCommandResult result = runConfigCommandViaBroker("--user", unknownUser, "--describe"); + assertFalse(result.exitStatus.isPresent(), "Expected System.exit() to not be called with an unknown user"); + assertEquals("", result.stdout); + } + + private void createAndAlterUser(String user) throws InterruptedException { + // create and describe a credential + ConfigCommandResult result = runConfigCommandViaBroker("--user", user, "--alter", "--add-config", "SCRAM-SHA-256=[iterations=4096,password=foo-secret]"); + assertEquals(updateUserMessage(user), result.stdout); + TestUtils.waitForCondition( + () -> { + try { + return Objects.equals(runConfigCommandViaBroker("--user", user, "--describe").stdout, describeUserMessage(user)); + } catch (Exception e) { + throw new NoRetryException(e); + } + }, + () -> "Failed to describe SCRAM credential change '" + user + "'"); + // create a user quota and describe the user again + result = runConfigCommandViaBroker("--user", user, "--alter", "--add-config", "consumer_byte_rate=20000"); + assertEquals(updateUserMessage(user), result.stdout); + TestUtils.waitForCondition( + () -> { + try { + return Objects.equals(runConfigCommandViaBroker("--user", user, "--describe").stdout, quotaMessage(user) + describeUserMessage(user)); + } catch (Exception e) { + throw new NoRetryException(e); + } + }, + () -> "Failed to describe Quota change for '" + user + "'"); + } + + private void deleteConfig(String user, String config) { + ConfigCommandResult result = runConfigCommandViaBroker("--user", user, "--alter", "--delete-config", config); + assertEquals(updateUserMessage(user), result.stdout); + } + + private void describeUsers(String... msgs) throws InterruptedException { + TestUtils.waitForCondition( + () -> { + try { + String output = runConfigCommandViaBroker("--entity-type", "users", "--describe").stdout; + return Arrays.asList(msgs).contains(output); + } catch (Exception e) { + throw new NoRetryException(e); + } + }, + () -> "Failed to describe config"); + } + + private static String describeUserMessage(String user) { + return "SCRAM credential configs for user-principal '" + user + "' are SCRAM-SHA-256=iterations=4096\n"; + } + + private static String updateUserMessage(String user) { + return "Completed updating config for user " + user + ".\n"; + } + + private static String quotaMessage(String user) { + return "Quota configs for user-principal '" + user + "' are consumer_byte_rate=20000.0\n"; + } +} diff --git a/core/src/test/scala/unit/kafka/admin/UserScramCredentialsCommandTest.scala b/core/src/test/scala/unit/kafka/admin/UserScramCredentialsCommandTest.scala deleted file mode 100644 index 7dd2143ed30..00000000000 --- a/core/src/test/scala/unit/kafka/admin/UserScramCredentialsCommandTest.scala +++ /dev/null @@ -1,142 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package kafka.admin - -import java.io.{ByteArrayOutputStream, PrintStream} -import java.nio.charset.StandardCharsets - -import kafka.server.BaseRequestTest -import kafka.utils.Exit -import kafka.utils.TestUtils -import org.junit.jupiter.api.Assertions._ -import org.junit.jupiter.params.ParameterizedTest -import org.junit.jupiter.params.provider.ValueSource - -class UserScramCredentialsCommandTest extends BaseRequestTest { - override def brokerCount = 1 - var exitStatus: Option[Int] = None - var exitMessage: Option[String] = None - - case class ConfigCommandResult(stdout: String, exitStatus: Option[Int] = None) - - private def runConfigCommandViaBroker(args: Array[String]) : ConfigCommandResult = { - val byteArrayOutputStream = new ByteArrayOutputStream() - val utf8 = StandardCharsets.UTF_8.name - val printStream = new PrintStream(byteArrayOutputStream, true, utf8) - var exitStatus: Option[Int] = None - Exit.setExitProcedure { (status, _) => - exitStatus = Some(status) - throw new RuntimeException - } - val commandArgs = Array("--bootstrap-server", bootstrapServers()) ++ args - try { - Console.withOut(printStream) { - ConfigCommand.main(commandArgs) - } - ConfigCommandResult(byteArrayOutputStream.toString(utf8)) - } catch { - case e: Exception => { - debug(s"Exception running ConfigCommand ${commandArgs.mkString(" ")}", e) - ConfigCommandResult("", exitStatus) - } - } finally { - printStream.close - Exit.resetExitProcedure() - } - } - - @ParameterizedTest - @ValueSource(strings = Array("kraft", "zk")) - def testUserScramCredentialsRequests(quorum: String): Unit = { - val user1 = "user1" - // create and describe a credential - var result = runConfigCommandViaBroker(Array("--user", user1, "--alter", "--add-config", "SCRAM-SHA-256=[iterations=4096,password=foo-secret]")) - val alterConfigsUser1Out = s"Completed updating config for user $user1.\n" - assertEquals(alterConfigsUser1Out, result.stdout) - val scramCredentialConfigsUser1Out = s"SCRAM credential configs for user-principal '$user1' are SCRAM-SHA-256=iterations=4096\n" - TestUtils.waitUntilTrue(() => runConfigCommandViaBroker(Array("--user", user1, "--describe")).stdout == - scramCredentialConfigsUser1Out, s"Failed to describe SCRAM credential change '$user1'") - // create a user quota and describe the user again - result = runConfigCommandViaBroker(Array("--user", user1, "--alter", "--add-config", "consumer_byte_rate=20000")) - assertEquals(alterConfigsUser1Out, result.stdout) - val quotaConfigsUser1Out = s"Quota configs for user-principal '$user1' are consumer_byte_rate=20000.0\n" - TestUtils.waitUntilTrue(() => runConfigCommandViaBroker(Array("--user", user1, "--describe")).stdout == - s"$quotaConfigsUser1Out$scramCredentialConfigsUser1Out", s"Failed to describe Quota change for '$user1'") - - // now do the same thing for user2 - val user2 = "user2" - // create and describe a credential - result = runConfigCommandViaBroker(Array("--user", user2, "--alter", "--add-config", "SCRAM-SHA-256=[iterations=4096,password=foo-secret]")) - val alterConfigsUser2Out = s"Completed updating config for user $user2.\n" - assertEquals(alterConfigsUser2Out, result.stdout) - val scramCredentialConfigsUser2Out = s"SCRAM credential configs for user-principal '$user2' are SCRAM-SHA-256=iterations=4096\n" - TestUtils.waitUntilTrue(() => runConfigCommandViaBroker(Array("--user", user2, "--describe")).stdout == - scramCredentialConfigsUser2Out, s"Failed to describe SCRAM credential change '$user2'") - // create a user quota and describe the user again - result = runConfigCommandViaBroker(Array("--user", user2, "--alter", "--add-config", "consumer_byte_rate=20000")) - assertEquals(alterConfigsUser2Out, result.stdout) - val quotaConfigsUser2Out = s"Quota configs for user-principal '$user2' are consumer_byte_rate=20000.0\n" - TestUtils.waitUntilTrue(() => runConfigCommandViaBroker(Array("--user", user2, "--describe")).stdout == - s"$quotaConfigsUser2Out$scramCredentialConfigsUser2Out", s"Failed to describe Quota change for '$user2'") - - // describe both - result = runConfigCommandViaBroker(Array("--entity-type", "users", "--describe")) - // we don't know the order that quota or scram users come out, so we have 2 possibilities for each, 4 total - val quotaPossibilityAOut = s"$quotaConfigsUser1Out$quotaConfigsUser2Out" - val quotaPossibilityBOut = s"$quotaConfigsUser2Out$quotaConfigsUser1Out" - val scramPossibilityAOut = s"$scramCredentialConfigsUser1Out$scramCredentialConfigsUser2Out" - val scramPossibilityBOut = s"$scramCredentialConfigsUser2Out$scramCredentialConfigsUser1Out" - assertTrue(result.stdout.equals(s"$quotaPossibilityAOut$scramPossibilityAOut") - || result.stdout.equals(s"$quotaPossibilityAOut$scramPossibilityBOut") - || result.stdout.equals(s"$quotaPossibilityBOut$scramPossibilityAOut") - || result.stdout.equals(s"$quotaPossibilityBOut$scramPossibilityBOut")) - - // now delete configs, in opposite order, for user1 and user2, and describe - result = runConfigCommandViaBroker(Array("--user", user1, "--alter", "--delete-config", "consumer_byte_rate")) - assertEquals(alterConfigsUser1Out, result.stdout) - result = runConfigCommandViaBroker(Array("--user", user2, "--alter", "--delete-config", "SCRAM-SHA-256")) - assertEquals(alterConfigsUser2Out, result.stdout) - TestUtils.waitUntilTrue(() => runConfigCommandViaBroker(Array("--entity-type", "users", "--describe")).stdout == - s"$quotaConfigsUser2Out$scramCredentialConfigsUser1Out", s"Failed to describe Quota change for '$user2'") - - // now delete the rest of the configs, for user1 and user2, and describe - result = runConfigCommandViaBroker(Array("--user", user1, "--alter", "--delete-config", "SCRAM-SHA-256")) - assertEquals(alterConfigsUser1Out, result.stdout) - result = runConfigCommandViaBroker(Array("--user", user2, "--alter", "--delete-config", "consumer_byte_rate")) - assertEquals(alterConfigsUser2Out, result.stdout) - TestUtils.waitUntilTrue(() => runConfigCommandViaBroker(Array("--entity-type", "users", "--describe")).stdout == "", - s"Failed to describe All users deleted") - } - - @ParameterizedTest - @ValueSource(strings = Array("kraft", "zk")) - def testAlterWithEmptyPassword(quorum: String): Unit = { - val user1 = "user1" - val result = runConfigCommandViaBroker(Array("--user", user1, "--alter", "--add-config", "SCRAM-SHA-256=[iterations=4096,password=]")) - assertTrue(result.exitStatus.isDefined, "Expected System.exit() to be called with an empty password") - assertEquals(1, result.exitStatus.get, "Expected empty password to cause failure with exit status=1") - } - - @ParameterizedTest - @ValueSource(strings = Array("kraft", "zk")) - def testDescribeUnknownUser(quorum: String): Unit = { - val unknownUser = "unknownUser" - val result = runConfigCommandViaBroker(Array("--user", unknownUser, "--describe")) - assertTrue(result.exitStatus.isEmpty, "Expected System.exit() to not be called with an unknown user") - assertEquals("", result.stdout) - } -}