mirror of https://github.com/apache/kafka.git
KAFKA-14588 UserScramCredentialsCommandTest rewritten in Java (#15832)
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, Igor Soarez <soarez@apple.com>
This commit is contained in:
parent
89d8045a15
commit
cdc4caa578
|
@ -123,6 +123,7 @@
|
|||
<allow pkg="kafka.zk"/>
|
||||
<allow pkg="org.apache.kafka.security"/>
|
||||
<allow pkg="org.apache.kafka.server"/>
|
||||
<allow pkg="org.apache.kafka.test"/>
|
||||
<allow pkg="kafka.test"/>
|
||||
<allow pkg="kafka.test.annotation"/>
|
||||
<allow pkg="kafka.test.junit"/>
|
||||
|
|
|
@ -0,0 +1,204 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package kafka.admin;
|
||||
|
||||
import kafka.test.ClusterInstance;
|
||||
import kafka.test.annotation.ClusterTest;
|
||||
import kafka.test.annotation.ClusterTestDefaults;
|
||||
import kafka.test.annotation.Type;
|
||||
import kafka.test.junit.ClusterTestExtensions;
|
||||
import kafka.utils.Exit;
|
||||
import org.apache.kafka.test.NoRetryException;
|
||||
import org.apache.kafka.test.TestUtils;
|
||||
import org.junit.jupiter.api.extension.ExtendWith;
|
||||
import scala.Console;
|
||||
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.PrintStream;
|
||||
import java.io.UnsupportedEncodingException;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.OptionalInt;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
||||
@SuppressWarnings("dontUseSystemExit")
|
||||
@ExtendWith(value = ClusterTestExtensions.class)
|
||||
@ClusterTestDefaults(clusterType = Type.ALL)
|
||||
public class UserScramCredentialsCommandTest {
|
||||
private static final String USER1 = "user1";
|
||||
private static final String USER2 = "user2";
|
||||
|
||||
private final ClusterInstance cluster;
|
||||
|
||||
public UserScramCredentialsCommandTest(ClusterInstance cluster) {
|
||||
this.cluster = cluster;
|
||||
}
|
||||
|
||||
static class ConfigCommandResult {
|
||||
public final String stdout;
|
||||
public final OptionalInt exitStatus;
|
||||
|
||||
public ConfigCommandResult(String stdout) {
|
||||
this(stdout, OptionalInt.empty());
|
||||
}
|
||||
|
||||
public ConfigCommandResult(String stdout, OptionalInt exitStatus) {
|
||||
this.stdout = stdout;
|
||||
this.exitStatus = exitStatus;
|
||||
}
|
||||
}
|
||||
|
||||
private ConfigCommandResult runConfigCommandViaBroker(String...args) {
|
||||
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
|
||||
String utf8 = StandardCharsets.UTF_8.name();
|
||||
PrintStream printStream;
|
||||
try {
|
||||
printStream = new PrintStream(byteArrayOutputStream, true, utf8);
|
||||
} catch (UnsupportedEncodingException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
AtomicReference<OptionalInt> exitStatus = new AtomicReference<>(OptionalInt.empty());
|
||||
Exit.setExitProcedure((status, __) -> {
|
||||
exitStatus.set(OptionalInt.of((Integer) status));
|
||||
throw new RuntimeException();
|
||||
});
|
||||
|
||||
List<String> commandArgs = new ArrayList<>(Arrays.asList("--bootstrap-server", cluster.bootstrapServers()));
|
||||
commandArgs.addAll(Arrays.asList(args));
|
||||
try {
|
||||
Console.withOut(printStream, () -> {
|
||||
ConfigCommand.main(commandArgs.toArray(new String[0]));
|
||||
return null;
|
||||
});
|
||||
return new ConfigCommandResult(byteArrayOutputStream.toString(utf8));
|
||||
} catch (Exception e) {
|
||||
return new ConfigCommandResult("", exitStatus.get());
|
||||
} finally {
|
||||
printStream.close();
|
||||
Exit.resetExitProcedure();
|
||||
}
|
||||
}
|
||||
|
||||
@ClusterTest
|
||||
public void testUserScramCredentialsRequests() throws Exception {
|
||||
createAndAlterUser(USER1);
|
||||
// now do the same thing for user2
|
||||
createAndAlterUser(USER2);
|
||||
|
||||
// describe both
|
||||
// we don't know the order that quota or scram users come out, so we have 2 possibilities for each, 4 total
|
||||
String quotaPossibilityAOut = quotaMessage(USER1) + quotaMessage(USER2);
|
||||
String quotaPossibilityBOut = quotaMessage(USER2) + quotaMessage(USER1);
|
||||
String scramPossibilityAOut = describeUserMessage(USER1) + describeUserMessage(USER2);
|
||||
String scramPossibilityBOut = describeUserMessage(USER2) + describeUserMessage(USER1);
|
||||
describeUsers(
|
||||
quotaPossibilityAOut + scramPossibilityAOut,
|
||||
quotaPossibilityAOut + scramPossibilityBOut,
|
||||
quotaPossibilityBOut + scramPossibilityAOut,
|
||||
quotaPossibilityBOut + scramPossibilityBOut);
|
||||
|
||||
// now delete configs, in opposite order, for user1 and user2, and describe
|
||||
deleteConfig(USER1, "consumer_byte_rate");
|
||||
deleteConfig(USER2, "SCRAM-SHA-256");
|
||||
describeUsers(quotaMessage(USER2) + describeUserMessage(USER1));
|
||||
|
||||
// now delete the rest of the configs, for user1 and user2, and describe
|
||||
deleteConfig(USER1, "SCRAM-SHA-256");
|
||||
deleteConfig(USER2, "consumer_byte_rate");
|
||||
describeUsers("");
|
||||
}
|
||||
|
||||
@ClusterTest
|
||||
public void testAlterWithEmptyPassword() {
|
||||
String user1 = "user1";
|
||||
ConfigCommandResult result = runConfigCommandViaBroker("--user", user1, "--alter", "--add-config", "SCRAM-SHA-256=[iterations=4096,password=]");
|
||||
assertTrue(result.exitStatus.isPresent(), "Expected System.exit() to be called with an empty password");
|
||||
assertEquals(1, result.exitStatus.getAsInt(), "Expected empty password to cause failure with exit status=1");
|
||||
}
|
||||
|
||||
@ClusterTest
|
||||
public void testDescribeUnknownUser() {
|
||||
String unknownUser = "unknownUser";
|
||||
ConfigCommandResult result = runConfigCommandViaBroker("--user", unknownUser, "--describe");
|
||||
assertFalse(result.exitStatus.isPresent(), "Expected System.exit() to not be called with an unknown user");
|
||||
assertEquals("", result.stdout);
|
||||
}
|
||||
|
||||
private void createAndAlterUser(String user) throws InterruptedException {
|
||||
// create and describe a credential
|
||||
ConfigCommandResult result = runConfigCommandViaBroker("--user", user, "--alter", "--add-config", "SCRAM-SHA-256=[iterations=4096,password=foo-secret]");
|
||||
assertEquals(updateUserMessage(user), result.stdout);
|
||||
TestUtils.waitForCondition(
|
||||
() -> {
|
||||
try {
|
||||
return Objects.equals(runConfigCommandViaBroker("--user", user, "--describe").stdout, describeUserMessage(user));
|
||||
} catch (Exception e) {
|
||||
throw new NoRetryException(e);
|
||||
}
|
||||
},
|
||||
() -> "Failed to describe SCRAM credential change '" + user + "'");
|
||||
// create a user quota and describe the user again
|
||||
result = runConfigCommandViaBroker("--user", user, "--alter", "--add-config", "consumer_byte_rate=20000");
|
||||
assertEquals(updateUserMessage(user), result.stdout);
|
||||
TestUtils.waitForCondition(
|
||||
() -> {
|
||||
try {
|
||||
return Objects.equals(runConfigCommandViaBroker("--user", user, "--describe").stdout, quotaMessage(user) + describeUserMessage(user));
|
||||
} catch (Exception e) {
|
||||
throw new NoRetryException(e);
|
||||
}
|
||||
},
|
||||
() -> "Failed to describe Quota change for '" + user + "'");
|
||||
}
|
||||
|
||||
private void deleteConfig(String user, String config) {
|
||||
ConfigCommandResult result = runConfigCommandViaBroker("--user", user, "--alter", "--delete-config", config);
|
||||
assertEquals(updateUserMessage(user), result.stdout);
|
||||
}
|
||||
|
||||
private void describeUsers(String... msgs) throws InterruptedException {
|
||||
TestUtils.waitForCondition(
|
||||
() -> {
|
||||
try {
|
||||
String output = runConfigCommandViaBroker("--entity-type", "users", "--describe").stdout;
|
||||
return Arrays.asList(msgs).contains(output);
|
||||
} catch (Exception e) {
|
||||
throw new NoRetryException(e);
|
||||
}
|
||||
},
|
||||
() -> "Failed to describe config");
|
||||
}
|
||||
|
||||
private static String describeUserMessage(String user) {
|
||||
return "SCRAM credential configs for user-principal '" + user + "' are SCRAM-SHA-256=iterations=4096\n";
|
||||
}
|
||||
|
||||
private static String updateUserMessage(String user) {
|
||||
return "Completed updating config for user " + user + ".\n";
|
||||
}
|
||||
|
||||
private static String quotaMessage(String user) {
|
||||
return "Quota configs for user-principal '" + user + "' are consumer_byte_rate=20000.0\n";
|
||||
}
|
||||
}
|
|
@ -1,142 +0,0 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package kafka.admin
|
||||
|
||||
import java.io.{ByteArrayOutputStream, PrintStream}
|
||||
import java.nio.charset.StandardCharsets
|
||||
|
||||
import kafka.server.BaseRequestTest
|
||||
import kafka.utils.Exit
|
||||
import kafka.utils.TestUtils
|
||||
import org.junit.jupiter.api.Assertions._
|
||||
import org.junit.jupiter.params.ParameterizedTest
|
||||
import org.junit.jupiter.params.provider.ValueSource
|
||||
|
||||
class UserScramCredentialsCommandTest extends BaseRequestTest {
|
||||
override def brokerCount = 1
|
||||
var exitStatus: Option[Int] = None
|
||||
var exitMessage: Option[String] = None
|
||||
|
||||
case class ConfigCommandResult(stdout: String, exitStatus: Option[Int] = None)
|
||||
|
||||
private def runConfigCommandViaBroker(args: Array[String]) : ConfigCommandResult = {
|
||||
val byteArrayOutputStream = new ByteArrayOutputStream()
|
||||
val utf8 = StandardCharsets.UTF_8.name
|
||||
val printStream = new PrintStream(byteArrayOutputStream, true, utf8)
|
||||
var exitStatus: Option[Int] = None
|
||||
Exit.setExitProcedure { (status, _) =>
|
||||
exitStatus = Some(status)
|
||||
throw new RuntimeException
|
||||
}
|
||||
val commandArgs = Array("--bootstrap-server", bootstrapServers()) ++ args
|
||||
try {
|
||||
Console.withOut(printStream) {
|
||||
ConfigCommand.main(commandArgs)
|
||||
}
|
||||
ConfigCommandResult(byteArrayOutputStream.toString(utf8))
|
||||
} catch {
|
||||
case e: Exception => {
|
||||
debug(s"Exception running ConfigCommand ${commandArgs.mkString(" ")}", e)
|
||||
ConfigCommandResult("", exitStatus)
|
||||
}
|
||||
} finally {
|
||||
printStream.close
|
||||
Exit.resetExitProcedure()
|
||||
}
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@ValueSource(strings = Array("kraft", "zk"))
|
||||
def testUserScramCredentialsRequests(quorum: String): Unit = {
|
||||
val user1 = "user1"
|
||||
// create and describe a credential
|
||||
var result = runConfigCommandViaBroker(Array("--user", user1, "--alter", "--add-config", "SCRAM-SHA-256=[iterations=4096,password=foo-secret]"))
|
||||
val alterConfigsUser1Out = s"Completed updating config for user $user1.\n"
|
||||
assertEquals(alterConfigsUser1Out, result.stdout)
|
||||
val scramCredentialConfigsUser1Out = s"SCRAM credential configs for user-principal '$user1' are SCRAM-SHA-256=iterations=4096\n"
|
||||
TestUtils.waitUntilTrue(() => runConfigCommandViaBroker(Array("--user", user1, "--describe")).stdout ==
|
||||
scramCredentialConfigsUser1Out, s"Failed to describe SCRAM credential change '$user1'")
|
||||
// create a user quota and describe the user again
|
||||
result = runConfigCommandViaBroker(Array("--user", user1, "--alter", "--add-config", "consumer_byte_rate=20000"))
|
||||
assertEquals(alterConfigsUser1Out, result.stdout)
|
||||
val quotaConfigsUser1Out = s"Quota configs for user-principal '$user1' are consumer_byte_rate=20000.0\n"
|
||||
TestUtils.waitUntilTrue(() => runConfigCommandViaBroker(Array("--user", user1, "--describe")).stdout ==
|
||||
s"$quotaConfigsUser1Out$scramCredentialConfigsUser1Out", s"Failed to describe Quota change for '$user1'")
|
||||
|
||||
// now do the same thing for user2
|
||||
val user2 = "user2"
|
||||
// create and describe a credential
|
||||
result = runConfigCommandViaBroker(Array("--user", user2, "--alter", "--add-config", "SCRAM-SHA-256=[iterations=4096,password=foo-secret]"))
|
||||
val alterConfigsUser2Out = s"Completed updating config for user $user2.\n"
|
||||
assertEquals(alterConfigsUser2Out, result.stdout)
|
||||
val scramCredentialConfigsUser2Out = s"SCRAM credential configs for user-principal '$user2' are SCRAM-SHA-256=iterations=4096\n"
|
||||
TestUtils.waitUntilTrue(() => runConfigCommandViaBroker(Array("--user", user2, "--describe")).stdout ==
|
||||
scramCredentialConfigsUser2Out, s"Failed to describe SCRAM credential change '$user2'")
|
||||
// create a user quota and describe the user again
|
||||
result = runConfigCommandViaBroker(Array("--user", user2, "--alter", "--add-config", "consumer_byte_rate=20000"))
|
||||
assertEquals(alterConfigsUser2Out, result.stdout)
|
||||
val quotaConfigsUser2Out = s"Quota configs for user-principal '$user2' are consumer_byte_rate=20000.0\n"
|
||||
TestUtils.waitUntilTrue(() => runConfigCommandViaBroker(Array("--user", user2, "--describe")).stdout ==
|
||||
s"$quotaConfigsUser2Out$scramCredentialConfigsUser2Out", s"Failed to describe Quota change for '$user2'")
|
||||
|
||||
// describe both
|
||||
result = runConfigCommandViaBroker(Array("--entity-type", "users", "--describe"))
|
||||
// we don't know the order that quota or scram users come out, so we have 2 possibilities for each, 4 total
|
||||
val quotaPossibilityAOut = s"$quotaConfigsUser1Out$quotaConfigsUser2Out"
|
||||
val quotaPossibilityBOut = s"$quotaConfigsUser2Out$quotaConfigsUser1Out"
|
||||
val scramPossibilityAOut = s"$scramCredentialConfigsUser1Out$scramCredentialConfigsUser2Out"
|
||||
val scramPossibilityBOut = s"$scramCredentialConfigsUser2Out$scramCredentialConfigsUser1Out"
|
||||
assertTrue(result.stdout.equals(s"$quotaPossibilityAOut$scramPossibilityAOut")
|
||||
|| result.stdout.equals(s"$quotaPossibilityAOut$scramPossibilityBOut")
|
||||
|| result.stdout.equals(s"$quotaPossibilityBOut$scramPossibilityAOut")
|
||||
|| result.stdout.equals(s"$quotaPossibilityBOut$scramPossibilityBOut"))
|
||||
|
||||
// now delete configs, in opposite order, for user1 and user2, and describe
|
||||
result = runConfigCommandViaBroker(Array("--user", user1, "--alter", "--delete-config", "consumer_byte_rate"))
|
||||
assertEquals(alterConfigsUser1Out, result.stdout)
|
||||
result = runConfigCommandViaBroker(Array("--user", user2, "--alter", "--delete-config", "SCRAM-SHA-256"))
|
||||
assertEquals(alterConfigsUser2Out, result.stdout)
|
||||
TestUtils.waitUntilTrue(() => runConfigCommandViaBroker(Array("--entity-type", "users", "--describe")).stdout ==
|
||||
s"$quotaConfigsUser2Out$scramCredentialConfigsUser1Out", s"Failed to describe Quota change for '$user2'")
|
||||
|
||||
// now delete the rest of the configs, for user1 and user2, and describe
|
||||
result = runConfigCommandViaBroker(Array("--user", user1, "--alter", "--delete-config", "SCRAM-SHA-256"))
|
||||
assertEquals(alterConfigsUser1Out, result.stdout)
|
||||
result = runConfigCommandViaBroker(Array("--user", user2, "--alter", "--delete-config", "consumer_byte_rate"))
|
||||
assertEquals(alterConfigsUser2Out, result.stdout)
|
||||
TestUtils.waitUntilTrue(() => runConfigCommandViaBroker(Array("--entity-type", "users", "--describe")).stdout == "",
|
||||
s"Failed to describe All users deleted")
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@ValueSource(strings = Array("kraft", "zk"))
|
||||
def testAlterWithEmptyPassword(quorum: String): Unit = {
|
||||
val user1 = "user1"
|
||||
val result = runConfigCommandViaBroker(Array("--user", user1, "--alter", "--add-config", "SCRAM-SHA-256=[iterations=4096,password=]"))
|
||||
assertTrue(result.exitStatus.isDefined, "Expected System.exit() to be called with an empty password")
|
||||
assertEquals(1, result.exitStatus.get, "Expected empty password to cause failure with exit status=1")
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@ValueSource(strings = Array("kraft", "zk"))
|
||||
def testDescribeUnknownUser(quorum: String): Unit = {
|
||||
val unknownUser = "unknownUser"
|
||||
val result = runConfigCommandViaBroker(Array("--user", unknownUser, "--describe"))
|
||||
assertTrue(result.exitStatus.isEmpty, "Expected System.exit() to not be called with an unknown user")
|
||||
assertEquals("", result.stdout)
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue