KAFKA-17856 Move ConfigCommandTest and ConfigCommandIntegrationTest to tool module (#17767)

Reviewers: TengYao Chi <kitingiao@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
Ken Huang 2025-03-09 21:05:36 +08:00 committed by GitHub
parent a5e5e2dcd5
commit d5413fdb48
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 23 additions and 39 deletions

View File

@ -334,6 +334,7 @@
<allow pkg="org.apache.kafka.tools.api" />
<allow pkg="org.apache.kafka.tools.filter" />
<allow pkg="org.apache.kafka.image" />
<allow pkg="org.apache.kafka.security" />
<subpackage name="consumer">
<allow pkg="org.apache.kafka.tools"/>

View File

@ -14,7 +14,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.admin;
package org.apache.kafka.tools;
import kafka.admin.ConfigCommand;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AdminClientTestUtils;
@ -35,8 +37,6 @@ import org.apache.kafka.test.TestUtils;
import org.mockito.Mockito;
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@ -62,6 +62,8 @@ import static org.apache.kafka.coordinator.group.GroupConfig.CONSUMER_SESSION_TI
import static org.apache.kafka.server.config.ReplicationConfigs.AUTO_LEADER_REBALANCE_ENABLE_CONFIG;
import static org.apache.kafka.server.config.ServerConfigs.MESSAGE_MAX_BYTES_CONFIG;
import static org.apache.kafka.server.config.ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG;
import static org.apache.kafka.tools.ToolsTestUtils.captureStandardErr;
import static org.apache.kafka.tools.ToolsTestUtils.captureStandardOut;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertNotNull;
@ -107,7 +109,7 @@ public class ConfigCommandIntegrationTest {
"--entity-type", "users",
"--entity-name", "admin",
"--alter", "--add-config", "consumer_byte_rate=20000"));
String message = captureStandardStream(false, run(command));
String message = captureStandardOut(run(command));
assertEquals("Completed updating config for user admin.", message);
}
@ -117,14 +119,14 @@ public class ConfigCommandIntegrationTest {
"--entity-type", "groups",
"--entity-name", "group",
"--alter", "--add-config", "consumer.session.timeout.ms=50000"));
String message = captureStandardStream(false, run(command));
String message = captureStandardOut(run(command));
assertEquals("Completed updating config for group group.", message);
// Test for the --group alias
command = Stream.concat(quorumArgs(), Stream.of(
"--group", "group",
"--alter", "--add-config", "consumer.session.timeout.ms=50000"));
message = captureStandardStream(false, run(command));
message = captureStandardOut(run(command));
assertEquals("Completed updating config for group group.", message);
}
@ -134,14 +136,14 @@ public class ConfigCommandIntegrationTest {
"--entity-type", "client-metrics",
"--entity-name", "cm",
"--alter", "--add-config", "metrics=org.apache"));
String message = captureStandardStream(false, run(command));
String message = captureStandardOut(run(command));
assertEquals("Completed updating config for client-metric cm.", message);
// Test for the --client-metrics alias
command = Stream.concat(quorumArgs(), Stream.of(
"--client-metrics", "cm",
"--alter", "--add-config", "metrics=org.apache"));
message = captureStandardStream(false, run(command));
message = captureStandardOut(run(command));
assertEquals("Completed updating config for client-metric cm.", message);
}
@ -155,13 +157,13 @@ public class ConfigCommandIntegrationTest {
"--entity-type", "topics",
"--entity-name", "topic",
"--alter", "--add-config", "cleanup.policy=[delete,compact]"));
String message = captureStandardStream(false, run(command));
String message = captureStandardOut(run(command));
assertEquals("Completed updating config for topic topic.", message);
command = Stream.concat(quorumArgs(), Stream.of(
"--entity-type", "topics",
"--entity-name", "topic",
"--describe"));
message = captureStandardStream(false, run(command));
message = captureStandardOut(run(command));
assertTrue(message.contains("cleanup.policy=delete,compact"), "Config entry was not added correctly");
}
}
@ -342,7 +344,7 @@ public class ConfigCommandIntegrationTest {
Stream.of("--bootstrap-server", cluster.bootstrapServers()),
Stream.of(entityOp(brokerIdOrDefault).toArray(new String[0]))),
Stream.of("--entity-type", "brokers", "--describe"));
String describeResult = captureStandardStream(false, run(describeCommand));
String describeResult = captureStandardOut(run(describeCommand));
// We will treat unknown config as sensitive
assertTrue(describeResult.contains("sensitive=true"), describeResult);
@ -434,7 +436,7 @@ public class ConfigCommandIntegrationTest {
throw new RuntimeException();
});
String errOut = captureStandardStream(true, run(args));
String errOut = captureStandardErr(run(args));
checkErrOut.accept(errOut);
assertNotNull(exitStatus.get());
@ -625,26 +627,4 @@ public class ConfigCommandIntegrationTest {
.map(e -> e.getKey() + "=" + e.getValue())
.collect(Collectors.joining(","));
}
// Copied from ToolsTestUtils.java, can be removed after we move ConfigCommand to tools module
static String captureStandardStream(boolean isErr, Runnable runnable) {
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
PrintStream currentStream = isErr ? System.err : System.out;
PrintStream tempStream = new PrintStream(outputStream);
if (isErr)
System.setErr(tempStream);
else
System.setOut(tempStream);
try {
runnable.run();
return outputStream.toString().trim();
} finally {
if (isErr)
System.setErr(currentStream);
else
System.setOut(currentStream);
tempStream.close();
}
}
}

View File

@ -14,7 +14,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.admin;
package org.apache.kafka.tools;
import kafka.admin.ConfigCommand;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AlterClientQuotasOptions;

View File

@ -14,7 +14,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.admin;
package org.apache.kafka.tools;
import kafka.admin.ConfigCommand;
import org.apache.kafka.common.test.ClusterInstance;
import org.apache.kafka.common.test.api.ClusterTest;
@ -29,6 +31,7 @@ import java.util.Objects;
import java.util.OptionalInt;
import java.util.concurrent.atomic.AtomicReference;
import static org.apache.kafka.tools.ToolsTestUtils.captureStandardOut;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
@ -68,9 +71,7 @@ public class UserScramCredentialsCommandTest {
List<String> commandArgs = new ArrayList<>(Arrays.asList("--bootstrap-server", cluster.bootstrapServers()));
commandArgs.addAll(Arrays.asList(args));
try {
String output = ConfigCommandIntegrationTest.captureStandardStream(false, () -> {
ConfigCommand.main(commandArgs.toArray(new String[0]));
});
String output = captureStandardOut(() -> ConfigCommand.main(commandArgs.toArray(new String[0])));
return new ConfigCommandResult(output);
} catch (Exception e) {
return new ConfigCommandResult("", exitStatus.get());