KAFKA-17587 Refactor test infrastructure (#18602)

This patch reorganizes our test infrastructure into three Gradle modules:

":test-common:test-common-internal-api" is now a minimal dependency which exposes interfaces and annotations only. It has one project dependency on server-common to expose commonly used data classes (MetadataVersion, Feature, etc). Since this pulls in server-common, this module is Java 17+. It cannot be used by ":clients" or other Java 11 modules.

":test-common:test-common-util" includes the auto-quarantined JUnit extension. The @Flaky annotation has been moved here. Since this module has no project dependencies, we can add it to the Java 11 list so that ":clients" and others can utilize the @Flaky annotation

":test-common:test-common-runtime" now includes all of the test infrastructure code (TestKitNodes, etc). This module carries heavy dependencies (core, etc) and so it should not normally be included as a compile-time dependency.

In addition to this reorganization, this patch leverages JUnit SPI service discovery so that modules can utilize the integration test framework without depending on ":core". This will allow us to start moving integration tests out of core and into the appropriate sub-module. This is done by adding ":test-common:test-common-runtime" as a testRuntimeOnly dependency rather than as a testImplementation dependency. A trivial example was added to QuorumControllerTest to illustrate this.

Reviewers: Ismael Juma <ismael@juma.me.uk>, Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
David Arthur 2025-01-24 09:03:43 -05:00 committed by GitHub
parent 0c9df75295
commit 8c0a0e07ce
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
126 changed files with 365 additions and 403 deletions

View File

@ -91,7 +91,10 @@ method_matcher = re.compile(r"([a-zA-Z_$][a-zA-Z0-9_$]+).*")
def clean_test_name(test_name: str) -> str:
cleaned = test_name.strip("\"").rstrip("()")
m = method_matcher.match(cleaned)
return m.group(1)
if m is None:
raise ValueError(f"Could not parse test name '{test_name}'. Expected a valid Java method name.")
else:
return m.group(1)
class TestCatalogExporter:

View File

@ -49,7 +49,7 @@ ext {
gradleVersion = versions.gradle
minClientJavaVersion = 11
minNonClientJavaVersion = 17
modulesNeedingJava11 = [":clients", ":generator", ":streams", ":streams:test-utils", ":streams-scala", ":test-common:test-common-runtime"]
modulesNeedingJava11 = [":clients", ":generator", ":streams", ":streams:test-utils", ":streams-scala", ":test-common:test-common-util"]
buildVersionFileName = "kafka-version.properties"
@ -139,10 +139,11 @@ ext {
runtimeTestLibs = [
libs.slf4jLog4j2,
libs.junitPlatformLanucher,
project(":test-common:test-common-runtime")
libs.jacksonDatabindYaml,
project(":test-common:test-common-util")
]
log4jRuntimeLibs = [
log4jReleaseLibs = [
libs.slf4jLog4j2,
libs.log4j1Bridge2Api,
libs.jacksonDatabindYaml
@ -1059,7 +1060,7 @@ project(':core') {
}
dependencies {
releaseOnly log4jRuntimeLibs
releaseOnly log4jReleaseLibs
// `core` is often used in users' tests, define the following dependencies as `api` for backwards compatibility
// even though the `core` module doesn't expose any public API
api project(':clients')
@ -1102,8 +1103,9 @@ project(':core') {
testImplementation project(':server-common').sourceSets.test.output
testImplementation project(':storage:storage-api').sourceSets.test.output
testImplementation project(':server').sourceSets.test.output
testImplementation project(':test-common')
testImplementation project(':test-common:test-common-api')
testImplementation project(':test-common:test-common-runtime')
testImplementation project(':test-common:test-common-internal-api')
testImplementation project(':test-common:test-common-util')
testImplementation libs.bcpkix
testImplementation libs.mockitoCore
testImplementation(libs.apacheda) {
@ -1535,54 +1537,16 @@ project(':group-coordinator') {
srcJar.dependsOn 'processMessages'
}
project(':test-common') {
// Test framework stuff. Implementations that support test-common-api
project(':test-common:test-common-internal-api') {
// Interfaces, config classes, and other test APIs. Java 17 only
base {
archivesName = "kafka-test-common"
archivesName = "kafka-test-common-internal-api"
}
dependencies {
implementation project(':core')
implementation project(':metadata')
implementation project(':server')
implementation project(':raft')
implementation project(':storage')
implementation project(':server-common')
implementation libs.jacksonDatabindYaml
implementation libs.slf4jApi
implementation project(':server-common') // Only project dependency allowed
testImplementation libs.junitJupiter
testImplementation libs.mockitoCore
testImplementation testLog4j2Libs
testRuntimeOnly runtimeTestLibs
}
checkstyle {
configProperties = checkstyleConfigProperties("import-control-test-common.xml")
}
javadoc {
enabled = false
}
}
project(':test-common:test-common-api') {
// Interfaces, config classes, and other test APIs
base {
archivesName = "kafka-test-common-api"
}
dependencies {
implementation project(':clients')
implementation project(':core')
implementation project(':group-coordinator')
implementation project(':metadata')
implementation project(':raft')
implementation project(':server')
implementation project(':server-common')
implementation project(':storage')
implementation project(':test-common')
implementation libs.junitJupiterApi
testImplementation libs.junitJupiter
@ -1593,7 +1557,7 @@ project(':test-common:test-common-api') {
}
checkstyle {
configProperties = checkstyleConfigProperties("import-control-test-common-api.xml")
configProperties = checkstyleConfigProperties("import-control-test-common-internal-api.xml")
}
javadoc {
@ -1601,10 +1565,10 @@ project(':test-common:test-common-api') {
}
}
project(':test-common:test-common-runtime') {
// Runtime-only test code including JUnit extentions
project(':test-common:test-common-util') {
// Runtime-only JUnit extensions for entire project. Java 11 only
base {
archivesName = "kafka-test-common-runtime"
archivesName = "kafka-test-common-util"
}
dependencies {
@ -1616,7 +1580,45 @@ project(':test-common:test-common-runtime') {
}
checkstyle {
configProperties = checkstyleConfigProperties("import-control-test-common-api.xml")
configProperties = checkstyleConfigProperties("import-control-test-common-util.xml")
}
javadoc {
enabled = false
}
}
project(':test-common:test-common-runtime') {
// Runtime-only JUnit extensions for integration tests. Java 17 only
base {
archivesName = "kafka-test-common-runtime"
}
dependencies {
implementation project(':test-common:test-common-internal-api')
implementation project(':clients')
implementation project(':core')
implementation project(':group-coordinator')
implementation project(':metadata')
implementation project(':raft')
implementation project(':server')
implementation project(':server-common')
implementation project(':storage')
implementation libs.junitPlatformLanucher
implementation libs.junitJupiter
implementation libs.jacksonDatabindYaml
implementation libs.slf4jApi
testImplementation libs.junitJupiter
testImplementation libs.mockitoCore
testImplementation testLog4j2Libs
testRuntimeOnly runtimeTestLibs
}
checkstyle {
configProperties = checkstyleConfigProperties("import-control-test-common-runtime.xml")
}
javadoc {
@ -1644,8 +1646,8 @@ project(':transaction-coordinator') {
testImplementation libs.junitJupiter
testImplementation libs.mockitoCore
testImplementation project(':clients').sourceSets.test.output
testImplementation project(':test-common')
testImplementation project(':test-common:test-common-api')
testImplementation project(':test-common:test-common-runtime')
testImplementation project(':test-common:test-common-internal-api')
testRuntimeOnly runtimeTestLibs
@ -1866,6 +1868,7 @@ project(':clients') {
compileOnly libs.jose4j // for SASL/OAUTHBEARER JWT validation; only used by broker
testImplementation project(':test-common:test-common-util')
testImplementation libs.bcpkix
testImplementation libs.jacksonJakartarsJsonProvider
testImplementation libs.jose4j
@ -1880,7 +1883,6 @@ project(':clients') {
testRuntimeOnly libs.jacksonDatabind
testRuntimeOnly libs.jacksonJDK8Datatypes
testRuntimeOnly runtimeTestLibs
testRuntimeOnly log4jRuntimeLibs
generator project(':generator')
}
@ -2267,7 +2269,8 @@ project(':storage') {
testImplementation project(':clients').sourceSets.test.output
testImplementation project(':core')
testImplementation project(':core').sourceSets.test.output
testImplementation project(':test-common:test-common-api')
testImplementation project(':test-common:test-common-internal-api')
testImplementation project(':test-common:test-common-runtime')
testImplementation project(':server')
testImplementation project(':server-common')
testImplementation project(':server-common').sourceSets.test.output
@ -2424,7 +2427,7 @@ project(':tools') {
}
dependencies {
releaseOnly log4jRuntimeLibs
releaseOnly log4jReleaseLibs
implementation project(':clients')
implementation project(':metadata')
@ -2456,7 +2459,8 @@ project(':tools') {
testImplementation project(':server').sourceSets.test.output
testImplementation project(':core')
testImplementation project(':core').sourceSets.test.output
testImplementation project(':test-common:test-common-api')
testImplementation project(':test-common:test-common-internal-api')
testImplementation project(':test-common:test-common-runtime')
testImplementation project(':server-common')
testImplementation project(':server-common').sourceSets.test.output
testImplementation project(':connect:api')
@ -2467,7 +2471,6 @@ project(':tools') {
testImplementation project(':streams')
testImplementation project(':streams').sourceSets.test.output
testImplementation project(':streams:integration-tests').sourceSets.test.output
testImplementation project(':test-common')
testImplementation libs.junitJupiter
testImplementation libs.mockitoCore
testImplementation libs.mockitoJunitJupiter // supports MockitoExtension
@ -2648,7 +2651,6 @@ project(':streams') {
testRuntimeOnly project(':streams:test-utils')
testRuntimeOnly runtimeTestLibs
testRuntimeOnly log4jRuntimeLibs
generator project(':generator')
}
@ -2839,7 +2841,7 @@ project(':streams:integration-tests') {
testImplementation project(':storage')
testImplementation project(':streams').sourceSets.test.output
testImplementation project(':streams:streams-scala')
testImplementation project(':test-common')
testImplementation project(':test-common:test-common-runtime')
testImplementation project(':tools')
testImplementation project(':transaction-coordinator')
testImplementation libs.bcpkix
@ -3515,14 +3517,15 @@ project(':connect:runtime') {
testImplementation project(':server')
testImplementation project(':metadata')
testImplementation project(':server-common')
testImplementation project(':test-common')
testImplementation project(':test-common:test-common-internal-api')
testImplementation project(':test-common:test-common-util')
testImplementation project(':test-common:test-common-runtime')
testImplementation project(':server-common')
testImplementation project(':server')
testImplementation project(':group-coordinator')
testImplementation project(':storage')
testImplementation project(':connect:test-plugins')
testImplementation project(':server-common').sourceSets.test.output
testImplementation project(':test-common:test-common-api')
testImplementation libs.jacksonDatabindYaml
testImplementation libs.junitJupiter
@ -3636,7 +3639,7 @@ project(':connect:file') {
testImplementation project(':connect:runtime')
testImplementation project(':connect:runtime').sourceSets.test.output
testImplementation project(':core')
testImplementation project(':test-common')
testImplementation project(':test-common:test-common-runtime')
testImplementation project(':server-common').sourceSets.test.output
testRuntimeOnly runtimeTestLibs
@ -3740,7 +3743,7 @@ project(':connect:mirror') {
testImplementation project(':clients').sourceSets.test.output
testImplementation project(':connect:runtime').sourceSets.test.output
testImplementation project(':core')
testImplementation project(':test-common')
testImplementation project(':test-common:test-common-runtime')
testImplementation project(':server')
testImplementation project(':server-common').sourceSets.test.output

View File

@ -37,6 +37,7 @@
<allow pkg="org.apache.kafka.common.security" />
<allow pkg="org.apache.kafka.common.serialization" />
<allow pkg="org.apache.kafka.common.utils" />
<allow pkg="org.apache.kafka.common.test.api" />
<subpackage name="coordinator">
<subpackage name="common">

View File

@ -36,6 +36,7 @@
<allow pkg="kafka.utils" />
<allow pkg="kafka.serializer" />
<allow pkg="org.apache.kafka.common" />
<allow pkg="org.apache.kafka.common.test.api" />
<allow pkg="org.mockito" class="AssignmentsManagerTest"/>
<allow pkg="org.apache.kafka.server"/>
<allow pkg="org.opentest4j" class="RemoteLogManagerTest"/>

View File

@ -45,6 +45,7 @@
<allow pkg="org.apache.kafka.common.utils" />
<allow pkg="org.apache.kafka.common.errors" exact-match="true" />
<allow pkg="org.apache.kafka.common.memory" />
<allow pkg="org.apache.kafka.common.test.api" />
<subpackage name="coordinator">
<subpackage name="group">

View File

@ -44,6 +44,7 @@
<allow pkg="org.apache.kafka.common.utils" />
<allow pkg="org.apache.kafka.common.errors" exact-match="true" />
<allow pkg="org.apache.kafka.common.memory" />
<allow pkg="org.apache.kafka.common.test.api" />
<!-- persistent collection factories/non-library-specific wrappers -->
<allow pkg="org.apache.kafka.server.immutable" exact-match="true" />

View File

@ -44,6 +44,7 @@
<allow pkg="org.apache.kafka.common.utils" />
<allow pkg="org.apache.kafka.common.errors" exact-match="true" />
<allow pkg="org.apache.kafka.common.memory" />
<allow pkg="org.apache.kafka.common.test.api" />
<!-- persistent collection factories/non-library-specific wrappers -->
<allow pkg="org.apache.kafka.server.immutable" exact-match="true" />

View File

@ -47,6 +47,7 @@
<allow pkg="org.apache.kafka.common.memory" />
<allow pkg="org.apache.kafka.common.network" />
<allow pkg="org.apache.kafka.server.config"/>
<allow pkg="org.apache.kafka.common.test.api" />
<!-- protocol, records and request/response utilities -->

View File

@ -40,6 +40,7 @@
<allow pkg="org.apache.kafka.common.security" />
<allow pkg="org.apache.kafka.common.serialization" />
<allow pkg="org.apache.kafka.common.utils" />
<allow pkg="org.apache.kafka.common.test.api" />
<subpackage name="coordinator">
<subpackage name="share">

View File

@ -45,6 +45,7 @@
<allow pkg="org.apache.kafka.common.errors" exact-match="true" />
<allow pkg="org.apache.kafka.common.memory" />
<allow pkg="org.apache.kafka.common.test" />
<allow pkg="org.apache.kafka.common.test.api" />
<subpackage name="server">

View File

@ -0,0 +1,43 @@
<!DOCTYPE import-control PUBLIC
"-//Puppy Crawl//DTD Import Control 1.1//EN"
"http://www.puppycrawl.com/dtds/import_control_1_1.dtd">
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<import-control pkg="org.apache.kafka">
<!-- no one depends on the server -->
<disallow pkg="kafka" />
<!-- anyone can use public classes -->
<allow pkg="org.apache.kafka.common" exact-match="true" />
<allow pkg="org.apache.kafka.common.security" />
<allow pkg="org.apache.kafka.common.serialization" />
<allow pkg="org.apache.kafka.common.utils" />
<allow pkg="org.apache.kafka.common.errors" exact-match="true" />
<allow pkg="org.apache.kafka.common.memory" />
<allow pkg="org.apache.kafka.common.network" />
<allow pkg="org.apache.kafka.common.test" />
<!-- things from server-common -->
<allow pkg="org.apache.kafka.server.common" />
<allow pkg="java" />
<allow pkg="javax.security" />
<allow pkg="org.junit" />
<allow pkg="org.slf4j" />
</import-control>

View File

@ -19,10 +19,14 @@
-->
<import-control pkg="org.apache.kafka">
<allow pkg="java" />
<allow pkg="org" />
<!-- allow any import from kafka -->
<allow pkg="org.apache.kafka" />
<allow pkg="kafka" />
<allow pkg="org.slf4j"/>
<allow pkg="org.mockito"/>
<allow pkg="org.junit"/>
<allow pkg="java" />
<allow pkg="scala.jdk.javaapi" />
<allow pkg="javax.security" />
</import-control>

View File

@ -19,8 +19,8 @@
-->
<import-control pkg="org.apache.kafka">
<!-- Only JUnit and slf4j allowed here -->
<allow pkg="java" />
<allow pkg="org" />
<allow pkg="kafka" />
<allow pkg="scala" />
<allow pkg="org.junit" />
<allow pkg="org.slf4j" />
</import-control>

View File

@ -37,6 +37,7 @@
<allow pkg="org.apache.kafka.coordinator.common.runtime" />
<allow pkg="org.apache.kafka.coordinator.transaction" />
<allow pkg="org.apache.kafka.common" />
<allow pkg="org.apache.kafka.common.test.api" />
<allow pkg="org.slf4j" />
<subpackage name="generated">
<allow pkg="org.apache.kafka.common.protocol" />

View File

@ -46,6 +46,9 @@
<allow pkg="org.apache.kafka.common.utils" />
<allow pkg="org.apache.kafka.common.errors" exact-match="true" />
<allow pkg="org.apache.kafka.common.memory" />
<!-- anyone can use internal test apis -->
<allow pkg="org.apache.kafka.common.test.api" />
<subpackage name="common">
<allow class="org.apache.kafka.clients.consumer.ConsumerRecord" exact-match="true" />

View File

@ -51,6 +51,7 @@ import org.apache.kafka.common.requests.LeaveGroupResponse;
import org.apache.kafka.common.requests.RequestTestUtils;
import org.apache.kafka.common.requests.SyncGroupRequest;
import org.apache.kafka.common.requests.SyncGroupResponse;
import org.apache.kafka.common.test.api.Flaky;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
@ -59,7 +60,6 @@ import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import java.nio.ByteBuffer;
@ -1435,7 +1435,7 @@ public class AbstractCoordinatorTest {
awaitFirstHeartbeat(heartbeatReceived);
}
@Tag("flaky") // "KAFKA-18310"
@Flaky("KAFKA-18310")
@Test
public void testWakeupAfterSyncGroupSentExternalCompletion() throws Exception {
setupCoordinator();
@ -1472,7 +1472,7 @@ public class AbstractCoordinatorTest {
awaitFirstHeartbeat(heartbeatReceived);
}
@Tag("flaky") // "KAFKA-18310"
@Flaky("KAFKA-18310")
@Test
public void testWakeupAfterSyncGroupReceived() throws Exception {
setupCoordinator();
@ -1506,7 +1506,7 @@ public class AbstractCoordinatorTest {
awaitFirstHeartbeat(heartbeatReceived);
}
@Tag("flaky") // KAFKA-15474 and KAFKA-18310
@Flaky("KAFKA-15474,KAFKA-18310")
@Test
public void testWakeupAfterSyncGroupReceivedExternalCompletion() throws Exception {
setupCoordinator();

View File

@ -27,17 +27,14 @@ import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.errors.InvalidProducerEpochException;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.test.ClusterInstance;
import org.apache.kafka.common.test.api.ClusterConfigProperty;
import org.apache.kafka.common.test.api.ClusterInstance;
import org.apache.kafka.common.test.api.ClusterTest;
import org.apache.kafka.common.test.api.ClusterTestDefaults;
import org.apache.kafka.common.test.api.ClusterTestExtensions;
import org.apache.kafka.coordinator.transaction.TransactionLogConfig;
import org.apache.kafka.coordinator.transaction.TransactionStateManagerConfig;
import org.apache.kafka.server.config.ServerLogConfigs;
import org.junit.jupiter.api.extension.ExtendWith;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
@ -54,7 +51,6 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
@ClusterConfigProperty(key = TransactionLogConfig.TRANSACTIONS_TOPIC_MIN_ISR_CONFIG, value = "1"),
@ClusterConfigProperty(key = TransactionStateManagerConfig.TRANSACTIONS_ABORT_TIMED_OUT_TRANSACTION_CLEANUP_INTERVAL_MS_CONFIG, value = "2000")
})
@ExtendWith(ClusterTestExtensions.class)
public class AdminFenceProducersTest {
private static final String TOPIC_NAME = "mytopic";
private static final String TXN_ID = "mytxnid";

View File

@ -37,16 +37,13 @@ import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.MetricsReporter;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.test.ClusterInstance;
import org.apache.kafka.common.test.api.ClusterConfigProperty;
import org.apache.kafka.common.test.api.ClusterInstance;
import org.apache.kafka.common.test.api.ClusterTest;
import org.apache.kafka.common.test.api.ClusterTestExtensions;
import org.apache.kafka.common.test.api.Type;
import org.apache.kafka.server.telemetry.ClientTelemetry;
import org.apache.kafka.server.telemetry.ClientTelemetryReceiver;
import org.junit.jupiter.api.extension.ExtendWith;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
@ -68,7 +65,6 @@ import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
@ExtendWith(value = ClusterTestExtensions.class)
public class ClientTelemetryTest {
@ClusterTest(

View File

@ -25,16 +25,14 @@ import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.InvalidConfigurationException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.test.ClusterInstance;
import org.apache.kafka.common.test.api.ClusterConfigProperty;
import org.apache.kafka.common.test.api.ClusterInstance;
import org.apache.kafka.common.test.api.ClusterTest;
import org.apache.kafka.common.test.api.ClusterTestExtensions;
import org.apache.kafka.common.test.api.Type;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mockito;
import java.io.ByteArrayOutputStream;
@ -71,7 +69,6 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
@ExtendWith(value = ClusterTestExtensions.class)
public class ConfigCommandIntegrationTest {
private final String defaultBrokerId = "0";
private final String defaultGroupName = "group";

View File

@ -30,12 +30,11 @@ import org.apache.kafka.common.errors.TopicDeletionDisabledException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.SimpleRecord;
import org.apache.kafka.common.test.ClusterInstance;
import org.apache.kafka.common.test.TestUtils;
import org.apache.kafka.common.test.api.ClusterConfigProperty;
import org.apache.kafka.common.test.api.ClusterInstance;
import org.apache.kafka.common.test.api.ClusterTest;
import org.apache.kafka.common.test.api.ClusterTestDefaults;
import org.apache.kafka.common.test.api.ClusterTestExtensions;
import org.apache.kafka.common.test.api.Type;
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
import org.apache.kafka.metadata.BrokerState;
@ -44,8 +43,6 @@ import org.apache.kafka.server.config.ServerConfigs;
import org.apache.kafka.storage.internals.log.AppendOrigin;
import org.apache.kafka.storage.internals.log.VerificationGuard;
import org.junit.jupiter.api.extension.ExtendWith;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
@ -64,7 +61,6 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.fail;
@ExtendWith(value = ClusterTestExtensions.class)
@ClusterTestDefaults(types = {Type.KRAFT},
brokers = 3,
serverProperties = {

View File

@ -16,15 +16,12 @@
*/
package kafka.admin;
import org.apache.kafka.common.test.api.ClusterInstance;
import org.apache.kafka.common.test.ClusterInstance;
import org.apache.kafka.common.test.api.ClusterTest;
import org.apache.kafka.common.test.api.ClusterTestExtensions;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.test.NoRetryException;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.extension.ExtendWith;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
@ -37,7 +34,6 @@ import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
@SuppressWarnings("dontUseSystemExit")
@ExtendWith(value = ClusterTestExtensions.class)
public class UserScramCredentialsCommandTest {
private static final String USER1 = "user1";
private static final String USER2 = "user2";

View File

@ -31,16 +31,13 @@ import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.test.ClusterInstance;
import org.apache.kafka.common.test.TestUtils;
import org.apache.kafka.common.test.api.ClusterConfigProperty;
import org.apache.kafka.common.test.api.ClusterInstance;
import org.apache.kafka.common.test.api.ClusterTest;
import org.apache.kafka.common.test.api.ClusterTestExtensions;
import org.apache.kafka.common.test.api.ClusterTests;
import org.apache.kafka.common.test.api.Type;
import org.junit.jupiter.api.extension.ExtendWith;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
@ -50,7 +47,6 @@ import java.util.Map;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
@ExtendWith(ClusterTestExtensions.class)
public class ConsumerIntegrationTest {
@ClusterTests({

View File

@ -51,18 +51,16 @@ import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.resource.PatternType;
import org.apache.kafka.common.resource.ResourcePattern;
import org.apache.kafka.common.resource.ResourceType;
import org.apache.kafka.common.test.ClusterInstance;
import org.apache.kafka.common.test.api.ClusterConfigProperty;
import org.apache.kafka.common.test.api.ClusterInstance;
import org.apache.kafka.common.test.api.ClusterTest;
import org.apache.kafka.common.test.api.ClusterTestDefaults;
import org.apache.kafka.common.test.api.ClusterTestExtensions;
import org.apache.kafka.common.test.api.Type;
import org.apache.kafka.metadata.authorizer.StandardAuthorizer;
import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.extension.ExtendWith;
import java.util.Arrays;
import java.util.Collection;
@ -88,7 +86,6 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
@Timeout(120)
@ExtendWith(ClusterTestExtensions.class)
@ClusterTestDefaults(types = {Type.KRAFT})
public class BootstrapControllersIntegrationTest {
private Map<String, Object> adminConfig(ClusterInstance clusterInstance, boolean usingBootstrapControllers) {

View File

@ -31,15 +31,12 @@ import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.TopicPartitionInfo;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.test.api.ClusterInstance;
import org.apache.kafka.common.test.ClusterInstance;
import org.apache.kafka.common.test.api.ClusterTest;
import org.apache.kafka.common.test.api.ClusterTestExtensions;
import org.apache.kafka.common.test.api.Type;
import org.apache.kafka.storage.internals.checkpoint.PartitionMetadataFile;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.extension.ExtendWith;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
@ -55,7 +52,6 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
@ExtendWith(value = ClusterTestExtensions.class)
public class LogManagerIntegrationTest {
private final ClusterInstance cluster;

View File

@ -50,11 +50,10 @@ import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.test.ClusterInstance;
import org.apache.kafka.common.test.api.ClusterConfigProperty;
import org.apache.kafka.common.test.api.ClusterInstance;
import org.apache.kafka.common.test.api.ClusterTest;
import org.apache.kafka.common.test.api.ClusterTestDefaults;
import org.apache.kafka.common.test.api.ClusterTestExtensions;
import org.apache.kafka.common.test.api.Flaky;
import org.apache.kafka.common.test.api.Type;
import org.apache.kafka.coordinator.group.GroupConfig;
@ -62,7 +61,6 @@ import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.extension.ExtendWith;
import java.time.Duration;
import java.util.ArrayList;
@ -100,7 +98,6 @@ import static org.junit.jupiter.api.Assertions.fail;
@Timeout(1200)
@Tag("integration")
@ExtendWith(ClusterTestExtensions.class)
@ClusterTestDefaults(
serverProperties = {
@ClusterConfigProperty(key = "auto.create.topics.enable", value = "false"),

View File

@ -13,9 +13,7 @@
package kafka.api
import kafka.log.UnifiedLog
import org.apache.kafka.common.test.api.ClusterInstance
import org.apache.kafka.common.test.api.{ClusterConfigProperty, ClusterTest, Type}
import org.apache.kafka.common.test.api.ClusterTestExtensions
import kafka.utils.TestUtils
import org.apache.kafka.clients.admin.{Admin, ConsumerGroupDescription}
import org.apache.kafka.clients.consumer.{Consumer, GroupProtocol, OffsetAndMetadata}
@ -26,17 +24,16 @@ import org.junit.jupiter.api.Assertions._
import scala.jdk.CollectionConverters._
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.record.CompressionType
import org.apache.kafka.common.test.ClusterInstance
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
import org.apache.kafka.server.config.ServerConfigs
import org.junit.jupiter.api.Timeout
import org.junit.jupiter.api.extension.ExtendWith
import java.time.Duration
import java.util.Collections
import java.util.concurrent.TimeUnit
@Timeout(120)
@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
class GroupCoordinatorIntegrationTest(cluster: ClusterInstance) {
@ClusterTest(

View File

@ -25,18 +25,17 @@ import org.apache.kafka.clients.producer.{Producer, ProducerConfig, ProducerReco
import org.apache.kafka.common.config.TopicConfig
import org.apache.kafka.common.errors.RecordTooLargeException
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.test.api.{ClusterConfigProperty, ClusterFeature, ClusterInstance, ClusterTest, ClusterTestDefaults, ClusterTestExtensions, ClusterTests, Type}
import org.apache.kafka.common.test.api.{ClusterConfigProperty, ClusterFeature, ClusterTest, ClusterTestDefaults, ClusterTests, Type}
import org.apache.kafka.common.message.InitProducerIdRequestData
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.record.RecordBatch
import org.apache.kafka.common.requests.{InitProducerIdRequest, InitProducerIdResponse}
import org.apache.kafka.common.test.TestUtils
import org.apache.kafka.common.test.{ClusterInstance, TestUtils}
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
import org.apache.kafka.coordinator.transaction.TransactionLogConfig
import org.apache.kafka.server.common.{Feature, MetadataVersion}
import org.junit.jupiter.api.Assertions.{assertEquals, assertInstanceOf, assertThrows, assertTrue}
import org.junit.jupiter.api.extension.ExtendWith
import java.time.Duration
import java.util
@ -53,7 +52,6 @@ import scala.jdk.CollectionConverters._
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
))
@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
class ProducerIntegrationTest {
@ClusterTests(Array(

View File

@ -20,7 +20,6 @@ package kafka.server
import kafka.log.UnifiedLog
import kafka.network.SocketServer
import kafka.server.IntegrationTestUtils.connectAndReceive
import org.apache.kafka.common.test.{KafkaClusterTestKit, TestKitNodes}
import kafka.utils.TestUtils
import org.apache.kafka.clients.admin.AlterConfigOp.OpType
import org.apache.kafka.clients.admin._
@ -36,6 +35,7 @@ import org.apache.kafka.common.quota.ClientQuotaAlteration.Op
import org.apache.kafka.common.quota.{ClientQuotaAlteration, ClientQuotaEntity, ClientQuotaFilter, ClientQuotaFilterComponent}
import org.apache.kafka.common.requests.{ApiError, DescribeClusterRequest, DescribeClusterResponse}
import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.common.test.{KafkaClusterTestKit, TestKitNodes}
import org.apache.kafka.common.{Cluster, Endpoint, Reconfigurable, TopicPartition, TopicPartitionInfo}
import org.apache.kafka.controller.{QuorumController, QuorumControllerIntegrationTestUtils}
import org.apache.kafka.image.ClusterImage

View File

@ -17,19 +17,16 @@
package kafka.server
import org.apache.kafka.common.test.api.ClusterInstance
import org.apache.kafka.common.test.api.{ClusterTest, ClusterTests, Type}
import org.apache.kafka.common.test.api.ClusterTestExtensions
import kafka.utils.TestUtils
import org.apache.kafka.clients.admin.FeatureUpdate.UpgradeType
import org.apache.kafka.clients.admin.{FeatureUpdate, UpdateFeaturesOptions}
import org.apache.kafka.common.test.ClusterInstance
import org.apache.kafka.server.common.MetadataVersion
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.extension.ExtendWith
import scala.jdk.CollectionConverters._
@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
class MetadataVersionIntegrationTest {
@ClusterTests(value = Array(
new ClusterTest(types = Array(Type.KRAFT), metadataVersion = MetadataVersion.IBP_3_3_IV0),

View File

@ -17,9 +17,8 @@
package kafka.server
import org.apache.kafka.common.test.KafkaClusterTestKit
import org.apache.kafka.common.test.TestKitNodes
import kafka.utils.TestUtils
import org.apache.kafka.common.test.{KafkaClusterTestKit, TestKitNodes}
import org.apache.kafka.common.utils.BufferSupplier
import org.apache.kafka.metadata.MetadataRecordSerde
import org.apache.kafka.server.config.KRaftConfigs

View File

@ -16,7 +16,6 @@
*/
package kafka.server
import org.apache.kafka.common.test.api.ClusterInstance
import org.apache.kafka.clients.NodeApiVersions
import org.apache.kafka.common.message.ApiMessageType.ListenerType
import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersion
@ -24,6 +23,7 @@ import org.apache.kafka.common.message.ApiMessageType
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.protocol.ApiKeys
import org.apache.kafka.common.requests.{ApiVersionsRequest, ApiVersionsResponse, RequestUtils}
import org.apache.kafka.common.test.ClusterInstance
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.server.common.{EligibleLeaderReplicasVersion, GroupVersion, MetadataVersion, TransactionVersion}
import org.apache.kafka.test.TestUtils

View File

@ -18,17 +18,14 @@ package unit.kafka.server
import kafka.network.SocketServer
import kafka.server.{BrokerServer, ControllerServer, IntegrationTestUtils}
import org.apache.kafka.common.test.api.ClusterInstance
import org.apache.kafka.common.test.api.{ClusterTest, ClusterTestDefaults, Type}
import org.apache.kafka.common.test.api.ClusterTestExtensions
import org.apache.kafka.common.message.AllocateProducerIdsRequestData
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests._
import org.apache.kafka.common.test.ClusterInstance
import org.apache.kafka.server.common.ProducerIdsBlock
import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
import org.junit.jupiter.api.extension.ExtendWith
@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
@ClusterTestDefaults(types = Array(Type.KRAFT))
class AllocateProducerIdsRequestTest(cluster: ClusterInstance) {

View File

@ -17,17 +17,14 @@
package kafka.server
import org.apache.kafka.common.test.api.ClusterInstance
import org.apache.kafka.common.message.ApiVersionsRequestData
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.requests.ApiVersionsRequest
import org.apache.kafka.common.test.ClusterInstance
import org.apache.kafka.common.test.api.{ClusterConfigProperty, ClusterTest, Type}
import org.apache.kafka.common.test.api.ClusterTestExtensions
import org.apache.kafka.server.common.MetadataVersion
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.extension.ExtendWith
@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
class ApiVersionsRequestTest(cluster: ClusterInstance) extends AbstractApiVersionsRequestTest(cluster) {
@ClusterTest(types = Array(Type.KRAFT, Type.CO_KRAFT), serverProperties = Array(

View File

@ -17,18 +17,15 @@
package kafka.server
import org.apache.kafka.common.test.api.ClusterInstance
import org.apache.kafka.common.test.api.ClusterTest
import org.apache.kafka.common.test.api.ClusterTestExtensions
import kafka.utils.TestUtils
import org.apache.kafka.common.test.ClusterInstance
import org.apache.kafka.server.metrics.KafkaYammerMetrics
import org.junit.jupiter.api.AfterEach
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.extension.ExtendWith
import scala.jdk.CollectionConverters._
@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
class BrokerMetricNamesTest(cluster: ClusterInstance) {
@AfterEach
def tearDown(): Unit = {

View File

@ -17,7 +17,7 @@
package kafka.server
import org.apache.kafka.common.test.api.{ClusterInstance, ClusterTest, ClusterTestExtensions, Type}
import org.apache.kafka.common.test.api.{ClusterTest, Type}
import org.apache.kafka.clients.ClientResponse
import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic
import org.apache.kafka.common.message.{BrokerRegistrationRequestData, CreateTopicsRequestData}
@ -26,11 +26,11 @@ import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests._
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.test.ClusterInstance
import org.apache.kafka.common.utils.Time
import org.apache.kafka.common.{Node, Uuid}
import org.apache.kafka.server.common.{ControllerRequestCompletionHandler, Feature, MetadataVersion, NodeToControllerChannelManager}
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.extension.ExtendWith
import java.util
import java.util.Collections
@ -39,7 +39,6 @@ import java.util.concurrent.{CompletableFuture, TimeUnit, TimeoutException}
/**
* This test simulates a broker registering with the KRaft quorum under different configurations.
*/
@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
class BrokerRegistrationRequestTest {
def brokerToControllerChannelManager(clusterInstance: ClusterInstance): NodeToControllerChannelManager = {

View File

@ -20,23 +20,20 @@ package kafka.server
import java.net.InetAddress
import java.util
import java.util.concurrent.{ExecutionException, TimeUnit}
import org.apache.kafka.common.test.api.ClusterInstance
import org.apache.kafka.common.test.api.ClusterTest
import org.apache.kafka.common.test.api.ClusterTestExtensions
import kafka.utils.TestUtils
import org.apache.kafka.clients.admin.{ScramCredentialInfo, ScramMechanism, UserScramCredentialUpsertion}
import org.apache.kafka.common.errors.{InvalidRequestException, UnsupportedVersionException}
import org.apache.kafka.common.internals.KafkaFutureImpl
import org.apache.kafka.common.quota.{ClientQuotaAlteration, ClientQuotaEntity, ClientQuotaFilter, ClientQuotaFilterComponent}
import org.apache.kafka.common.requests.{AlterClientQuotasRequest, AlterClientQuotasResponse, DescribeClientQuotasRequest, DescribeClientQuotasResponse}
import org.apache.kafka.common.test.ClusterInstance
import org.apache.kafka.server.config.{QuotaConfig, ZooKeeperInternals}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Disabled
import org.junit.jupiter.api.extension.ExtendWith
import scala.jdk.CollectionConverters._
@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
class ClientQuotasRequestTest(cluster: ClusterInstance) {
@ClusterTest
def testAlterClientQuotasRequest(): Unit = {

View File

@ -16,9 +16,7 @@
*/
package kafka.server
import org.apache.kafka.common.test.api.ClusterInstance
import org.apache.kafka.common.test.api._
import org.apache.kafka.common.test.api.ClusterTestExtensions
import kafka.utils.TestUtils
import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor
import org.apache.kafka.clients.consumer.internals.ConsumerProtocol
@ -28,18 +26,17 @@ import org.apache.kafka.common.message.{ConsumerGroupDescribeRequestData, Consum
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.requests.{ConsumerGroupDescribeRequest, ConsumerGroupDescribeResponse}
import org.apache.kafka.common.resource.ResourceType
import org.apache.kafka.common.test.ClusterInstance
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
import org.apache.kafka.security.authorizer.AclEntry
import org.apache.kafka.server.common.Feature
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse}
import org.junit.jupiter.api.extension.ExtendWith
import java.lang.{Byte => JByte}
import java.util.Collections
import scala.jdk.CollectionConverters._
@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
@ClusterTestDefaults(types = Array(Type.KRAFT), brokers = 1)
class ConsumerGroupDescribeRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBaseRequestTest(cluster) {

View File

@ -16,7 +16,7 @@
*/
package kafka.server
import org.apache.kafka.common.test.api.{ClusterConfigProperty, ClusterFeature, ClusterInstance, ClusterTest, ClusterTestDefaults, ClusterTestExtensions, Type}
import org.apache.kafka.common.test.api.{ClusterConfigProperty, ClusterFeature, ClusterTest, ClusterTestDefaults, Type}
import kafka.utils.TestUtils
import org.apache.kafka.clients.admin.AlterConfigOp.OpType
import org.apache.kafka.clients.admin.{AlterConfigOp, ConfigEntry}
@ -25,15 +25,14 @@ import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.message.{ConsumerGroupHeartbeatRequestData, ConsumerGroupHeartbeatResponseData}
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.{ConsumerGroupHeartbeatRequest, ConsumerGroupHeartbeatResponse}
import org.apache.kafka.common.test.ClusterInstance
import org.apache.kafka.coordinator.group.{GroupConfig, GroupCoordinatorConfig}
import org.apache.kafka.server.common.Feature
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertNotEquals, assertNotNull}
import org.junit.jupiter.api.extension.ExtendWith
import scala.collection.Map
import scala.jdk.CollectionConverters._
@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
@ClusterTestDefaults(
types = Array(Type.KRAFT),
serverProperties = Array(

View File

@ -20,23 +20,20 @@ import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor
import org.apache.kafka.clients.consumer.internals.ConsumerProtocol
import org.apache.kafka.common.{TopicPartition, Uuid}
import org.apache.kafka.common.message.{JoinGroupResponseData, ListGroupsResponseData, OffsetFetchResponseData, SyncGroupResponseData}
import org.apache.kafka.common.test.api.ClusterInstance
import org.apache.kafka.common.test.api.{ClusterConfigProperty, ClusterTest, ClusterTestDefaults, Type}
import org.apache.kafka.common.test.api.ClusterTestExtensions
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.test.ClusterInstance
import org.apache.kafka.coordinator.group.{Group, GroupCoordinatorConfig}
import org.apache.kafka.coordinator.group.classic.ClassicGroupState
import org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroup.ConsumerGroupState
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Timeout
import org.junit.jupiter.api.extension.ExtendWith
import java.nio.ByteBuffer
import java.util.Collections
import scala.jdk.CollectionConverters._
@Timeout(120)
@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
@ClusterTestDefaults(types = Array(Type.KRAFT))
class ConsumerProtocolMigrationTest(cluster: ClusterInstance) extends GroupCoordinatorBaseRequestTest(cluster) {
@ClusterTest(

View File

@ -21,7 +21,6 @@ import kafka.network.RequestChannel
import kafka.raft.RaftManager
import kafka.server.QuotaFactory.QuotaManagers
import kafka.server.metadata.KRaftMetadataCache
import org.apache.kafka.common.test.MockController
import org.apache.kafka.clients.admin.AlterConfigOp
import org.apache.kafka.common.Uuid.ZERO_UUID
import org.apache.kafka.common.acl.AclOperation
@ -47,6 +46,7 @@ import org.apache.kafka.common.protocol.{ApiKeys, ApiMessage, Errors}
import org.apache.kafka.common.requests._
import org.apache.kafka.common.resource.{PatternType, Resource, ResourcePattern, ResourceType}
import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
import org.apache.kafka.common.test.MockController
import org.apache.kafka.common.utils.MockTime
import org.apache.kafka.common.{ElectionType, Uuid}
import org.apache.kafka.controller.ControllerRequestContextUtil.ANONYMOUS_CONTEXT

View File

@ -16,17 +16,14 @@
*/
package kafka.server
import org.apache.kafka.common.test.api.ClusterInstance
import org.apache.kafka.common.test.api.{ClusterConfigProperty, ClusterTest, ClusterTestDefaults, Type}
import org.apache.kafka.common.test.api.ClusterTestExtensions
import org.apache.kafka.common.message.DescribeGroupsResponseData.DescribedGroup
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.test.ClusterInstance
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
import org.apache.kafka.coordinator.group.classic.ClassicGroupState
import org.junit.jupiter.api.Assertions.{assertEquals, fail}
import org.junit.jupiter.api.extension.ExtendWith
@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
@ClusterTestDefaults(types = Array(Type.KRAFT))
class DeleteGroupsRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBaseRequestTest(cluster) {
@ClusterTest(

View File

@ -16,19 +16,16 @@
*/
package kafka.server
import org.apache.kafka.common.test.api.ClusterInstance
import org.apache.kafka.common.test.api.{ClusterConfigProperty, ClusterTest, ClusterTestDefaults, Type}
import org.apache.kafka.common.test.api.ClusterTestExtensions
import org.apache.kafka.common.message.DescribeGroupsResponseData.{DescribedGroup, DescribedGroupMember}
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.test.ClusterInstance
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
import org.apache.kafka.coordinator.group.classic.ClassicGroupState
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.extension.ExtendWith
import scala.jdk.CollectionConverters._
@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
@ClusterTestDefaults(types = Array(Type.KRAFT))
class DescribeGroupsRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBaseRequestTest(cluster) {
@ClusterTest(serverProperties = Array(

View File

@ -16,19 +16,16 @@
*/
package kafka.server
import org.apache.kafka.common.test.api.ClusterInstance
import org.apache.kafka.common.test.api.{ClusterTest, ClusterTestDefaults, Type}
import org.apache.kafka.common.test.api.ClusterTestExtensions
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.requests.DescribeQuorumRequest.singletonRequest
import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse, DescribeQuorumRequest, DescribeQuorumResponse}
import org.apache.kafka.common.test.ClusterInstance
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.extension.ExtendWith
import scala.jdk.CollectionConverters._
import scala.reflect.ClassTag
@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
@ClusterTestDefaults(types = Array(Type.KRAFT))
class DescribeQuorumRequestTest(cluster: ClusterInstance) {

View File

@ -17,7 +17,6 @@
package kafka.server
import kafka.network.SocketServer
import org.apache.kafka.common.test.api.ClusterInstance
import kafka.utils.TestUtils
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord, RecordMetadata}
import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid}
@ -29,6 +28,7 @@ import org.apache.kafka.common.message.{AddOffsetsToTxnRequestData, AddOffsetsTo
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse, AddOffsetsToTxnRequest, AddOffsetsToTxnResponse, ConsumerGroupDescribeRequest, ConsumerGroupDescribeResponse, ConsumerGroupHeartbeatRequest, ConsumerGroupHeartbeatResponse, DeleteGroupsRequest, DeleteGroupsResponse, DescribeGroupsRequest, DescribeGroupsResponse, EndTxnRequest, EndTxnResponse, HeartbeatRequest, HeartbeatResponse, InitProducerIdRequest, InitProducerIdResponse, JoinGroupRequest, JoinGroupResponse, LeaveGroupRequest, LeaveGroupResponse, ListGroupsRequest, ListGroupsResponse, OffsetCommitRequest, OffsetCommitResponse, OffsetDeleteRequest, OffsetDeleteResponse, OffsetFetchRequest, OffsetFetchResponse, ShareGroupDescribeRequest, ShareGroupDescribeResponse, ShareGroupHeartbeatRequest, ShareGroupHeartbeatResponse, SyncGroupRequest, SyncGroupResponse, TxnOffsetCommitRequest, TxnOffsetCommitResponse}
import org.apache.kafka.common.serialization.StringSerializer
import org.apache.kafka.common.test.ClusterInstance
import org.apache.kafka.common.utils.ProducerIdAndEpoch
import org.apache.kafka.controller.ControllerRequestContextUtil.ANONYMOUS_CONTEXT
import org.junit.jupiter.api.Assertions.{assertEquals, fail}

View File

@ -16,23 +16,20 @@
*/
package kafka.server
import org.apache.kafka.common.test.api.ClusterInstance
import org.apache.kafka.common.test.api.{ClusterConfigProperty, ClusterTest, ClusterTestDefaults, Type}
import org.apache.kafka.common.test.api.ClusterTestExtensions
import kafka.utils.TestUtils
import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor
import org.apache.kafka.clients.consumer.internals.ConsumerProtocol
import org.apache.kafka.common.message.SyncGroupRequestData
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.test.ClusterInstance
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
import org.apache.kafka.coordinator.group.classic.ClassicGroupState
import org.junit.jupiter.api.extension.ExtendWith
import java.util.Collections
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future
@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
@ClusterTestDefaults(types = Array(Type.KRAFT))
class HeartbeatRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBaseRequestTest(cluster) {
@ClusterTest(serverProperties = Array(

View File

@ -16,19 +16,17 @@
*/
package kafka.server
import org.apache.kafka.common.test.api.ClusterInstance
import org.apache.kafka.common.test.api.{ClusterConfigProperty, ClusterTest, Type}
import org.apache.kafka.common.test.api.ClusterTestExtensions
import kafka.utils.TestUtils
import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor
import org.apache.kafka.clients.consumer.internals.ConsumerProtocol
import org.apache.kafka.common.message.JoinGroupResponseData.JoinGroupResponseMember
import org.apache.kafka.common.message.{JoinGroupResponseData, SyncGroupRequestData}
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.test.ClusterInstance
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
import org.apache.kafka.coordinator.group.classic.ClassicGroupState
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.extension.ExtendWith
import java.util.Collections
import scala.concurrent.ExecutionContext.Implicits.global
@ -36,7 +34,6 @@ import scala.concurrent.duration.Duration
import scala.concurrent.{Await, Future}
import scala.jdk.CollectionConverters._
@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
class JoinGroupRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBaseRequestTest(cluster) {
@ClusterTest(types = Array(Type.KRAFT), serverProperties = Array(
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),

View File

@ -18,20 +18,17 @@ package kafka.server
import org.apache.kafka.common.Uuid
import org.apache.kafka.common.message.LeaveGroupResponseData
import org.apache.kafka.common.test.api.ClusterInstance
import org.apache.kafka.common.test.api.{ClusterConfigProperty, ClusterTest, ClusterTestDefaults, Type}
import org.apache.kafka.common.test.api.ClusterTestExtensions
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.requests.JoinGroupRequest
import org.apache.kafka.common.test.ClusterInstance
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
import org.apache.kafka.coordinator.group.classic.ClassicGroupState
import org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroup.ConsumerGroupState
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.extension.ExtendWith
import scala.jdk.CollectionConverters._
@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
@ClusterTestDefaults(types = Array(Type.KRAFT), serverProperties = Array(
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")

View File

@ -16,18 +16,15 @@
*/
package kafka.server
import org.apache.kafka.common.test.api.ClusterInstance
import org.apache.kafka.common.test.api.{ClusterConfigProperty, ClusterTest, ClusterTestDefaults, Type}
import org.apache.kafka.common.test.api.ClusterTestExtensions
import org.apache.kafka.common.message.ListGroupsResponseData
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.test.ClusterInstance
import org.apache.kafka.coordinator.group.classic.ClassicGroupState
import org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroup.ConsumerGroupState
import org.apache.kafka.coordinator.group.{Group, GroupCoordinatorConfig}
import org.junit.jupiter.api.Assertions.{assertEquals, fail}
import org.junit.jupiter.api.extension.ExtendWith
@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
@ClusterTestDefaults(types = Array(Type.KRAFT))
class ListGroupsRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBaseRequestTest(cluster) {
@ClusterTest(

View File

@ -16,15 +16,12 @@
*/
package kafka.server
import org.apache.kafka.common.test.api.ClusterInstance
import org.apache.kafka.common.test.api.{ClusterConfigProperty, ClusterTest, ClusterTestDefaults, Type}
import org.apache.kafka.common.test.api.ClusterTestExtensions
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.test.ClusterInstance
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
import org.junit.jupiter.api.Assertions.fail
import org.junit.jupiter.api.extension.ExtendWith
@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
@ClusterTestDefaults(types = Array(Type.KRAFT))
class OffsetCommitRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBaseRequestTest(cluster) {

View File

@ -16,15 +16,12 @@
*/
package kafka.server
import org.apache.kafka.common.test.api.ClusterInstance
import org.apache.kafka.common.test.api.{ClusterConfigProperty, ClusterTest, ClusterTestDefaults, Type}
import org.apache.kafka.common.test.api.ClusterTestExtensions
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.test.ClusterInstance
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
import org.junit.jupiter.api.Assertions.fail
import org.junit.jupiter.api.extension.ExtendWith
@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
@ClusterTestDefaults(types = Array(Type.KRAFT))
class OffsetDeleteRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBaseRequestTest(cluster) {
@ClusterTest(

View File

@ -16,20 +16,16 @@
*/
package kafka.server
import org.apache.kafka.common.test.api.ClusterInstance
import org.apache.kafka.common.test.api.{ClusterConfigProperty, ClusterTest, ClusterTestDefaults, Type}
import org.apache.kafka.common.test.api.ClusterTestExtensions
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.message.OffsetFetchResponseData
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.test.ClusterInstance
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
import org.junit.jupiter.api.Assertions.{assertEquals, fail}
import org.junit.jupiter.api.extension.ExtendWith
import scala.jdk.CollectionConverters._
@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
@ClusterTestDefaults(types = Array(Type.KRAFT))
class OffsetFetchRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBaseRequestTest(cluster) {

View File

@ -19,16 +19,16 @@ package kafka.server
import kafka.api.SaslSetup
import kafka.security.JaasTestUtils
import kafka.server.SaslApiVersionsRequestTest.{kafkaClientSaslMechanism, kafkaServerSaslMechanisms}
import org.apache.kafka.common.test.api.{ClusterTemplate, Type, ClusterTestExtensions, ClusterConfig, ClusterInstance}
import org.apache.kafka.common.test.api.{ClusterConfig, ClusterTemplate, Type}
import org.apache.kafka.common.config.SaslConfigs
import org.apache.kafka.common.config.internals.BrokerSecurityConfigs
import org.apache.kafka.common.message.SaslHandshakeRequestData
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.requests.{ApiVersionsRequest, ApiVersionsResponse, SaslHandshakeRequest, SaslHandshakeResponse}
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.test.ClusterInstance
import org.apache.kafka.network.SocketServerConfigs
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.extension.ExtendWith
import org.junit.jupiter.api.{AfterEach, BeforeEach, Disabled}
import java.net.Socket
@ -66,7 +66,6 @@ object SaslApiVersionsRequestTest {
}
@Disabled("TODO: KAFKA-17631 - Convert SaslApiVersionsRequestTest to kraft")
@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
class SaslApiVersionsRequestTest(cluster: ClusterInstance) extends AbstractApiVersionsRequestTest(cluster) {
private var sasl: SaslSetup = _

View File

@ -17,22 +17,21 @@
package kafka.server
import kafka.utils.TestUtils
import org.apache.kafka.common.test.api.{ClusterConfigProperty, ClusterInstance, ClusterTest, ClusterTestDefaults, ClusterTestExtensions, ClusterTests, Type}
import org.apache.kafka.common.test.api.{ClusterConfigProperty, ClusterTest, ClusterTestDefaults, ClusterTests, Type}
import org.apache.kafka.common.message.ShareFetchResponseData.AcquiredRecords
import org.apache.kafka.common.message.{ShareAcknowledgeRequestData, ShareAcknowledgeResponseData, ShareFetchRequestData, ShareFetchResponseData}
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid}
import org.apache.kafka.common.requests.{ShareAcknowledgeRequest, ShareAcknowledgeResponse, ShareFetchRequest, ShareFetchResponse, ShareRequestMetadata}
import org.apache.kafka.common.test.ClusterInstance
import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
import org.junit.jupiter.api.{AfterEach, Tag, Timeout}
import org.junit.jupiter.api.extension.ExtendWith
import java.util
import java.util.Collections
import scala.jdk.CollectionConverters._
@Timeout(1200)
@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
@ClusterTestDefaults(types = Array(Type.KRAFT), brokers = 1, serverProperties = Array(
new ClusterConfigProperty(key = "group.share.persister.class.name", value = "")
))

View File

@ -16,9 +16,7 @@
*/
package kafka.server
import org.apache.kafka.common.test.api.ClusterInstance
import org.apache.kafka.common.test.api._
import org.apache.kafka.common.test.api.ClusterTestExtensions
import kafka.utils.TestUtils
import org.apache.kafka.common.GroupState
import org.apache.kafka.common.message.ShareGroupDescribeResponseData.DescribedGroup
@ -26,6 +24,7 @@ import org.apache.kafka.common.message.{ShareGroupDescribeRequestData, ShareGrou
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.requests.{ShareGroupDescribeRequest, ShareGroupDescribeResponse}
import org.apache.kafka.common.resource.ResourceType
import org.apache.kafka.common.test.ClusterInstance
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig
@ -33,13 +32,11 @@ import org.apache.kafka.security.authorizer.AclEntry
import org.apache.kafka.server.config.ServerConfigs
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.{Tag, Timeout}
import org.junit.jupiter.api.extension.ExtendWith
import java.lang.{Byte => JByte}
import scala.jdk.CollectionConverters._
@Timeout(120)
@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
@ClusterTestDefaults(types = Array(Type.KRAFT), brokers = 1, serverProperties = Array(
new ClusterConfigProperty(key = ShareGroupConfig.SHARE_GROUP_PERSISTER_CLASS_NAME_CONFIG, value = "")
))

View File

@ -16,7 +16,7 @@
*/
package kafka.server
import org.apache.kafka.common.test.api.{ClusterConfigProperty, ClusterInstance, ClusterTest, ClusterTestDefaults, ClusterTestExtensions, Type}
import org.apache.kafka.common.test.api.{ClusterConfigProperty, ClusterTest, ClusterTestDefaults, Type}
import kafka.utils.TestUtils
import kafka.utils.TestUtils.waitForAllPartitionsMetadata
import org.apache.kafka.clients.admin.{Admin, NewPartitions}
@ -24,14 +24,13 @@ import org.apache.kafka.common.{TopicPartition, Uuid}
import org.apache.kafka.common.message.{ShareGroupHeartbeatRequestData, ShareGroupHeartbeatResponseData}
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.{ShareGroupHeartbeatRequest, ShareGroupHeartbeatResponse}
import org.apache.kafka.common.test.ClusterInstance
import org.junit.jupiter.api.Assertions.{assertEquals, assertNotEquals, assertNotNull, assertNull, assertTrue}
import org.junit.jupiter.api.{Tag, Timeout}
import org.junit.jupiter.api.extension.ExtendWith
import scala.jdk.CollectionConverters._
@Timeout(120)
@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
@ClusterTestDefaults(types = Array(Type.KRAFT), brokers = 1, serverProperties = Array(
new ClusterConfigProperty(key = "group.share.persister.class.name", value = "")
))

View File

@ -16,24 +16,21 @@
*/
package kafka.server
import org.apache.kafka.common.test.api.ClusterInstance
import org.apache.kafka.common.test.api.{ClusterConfigProperty, ClusterTest, ClusterTestDefaults, Type}
import org.apache.kafka.common.test.api.ClusterTestExtensions
import kafka.utils.TestUtils
import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor
import org.apache.kafka.clients.consumer.internals.ConsumerProtocol
import org.apache.kafka.common.message.SyncGroupRequestData
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.test.ClusterInstance
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
import org.apache.kafka.coordinator.group.classic.ClassicGroupState
import org.junit.jupiter.api.extension.ExtendWith
import java.util.Collections
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration.Duration
import scala.concurrent.{Await, Future}
@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
@ClusterTestDefaults(types = Array(Type.KRAFT))
class SyncGroupRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBaseRequestTest(cluster) {
@ClusterTest(serverProperties = Array(

View File

@ -16,21 +16,20 @@
*/
package kafka.server
import org.apache.kafka.common.test.api.{ClusterConfigProperty, ClusterInstance, ClusterTest, ClusterTestDefaults, ClusterTestExtensions, Type}
import org.apache.kafka.common.test.api.{ClusterConfigProperty, ClusterTest, ClusterTestDefaults, Type}
import kafka.utils.TestUtils
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.UnsupportedVersionException
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.requests.JoinGroupRequest
import org.apache.kafka.common.test.ClusterInstance
import org.apache.kafka.common.utils.ProducerIdAndEpoch
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
import org.apache.kafka.coordinator.transaction.TransactionLogConfig
import org.junit.jupiter.api.Assertions.{assertThrows, assertTrue, fail}
import org.junit.jupiter.api.extension.ExtendWith
import scala.jdk.CollectionConverters.IterableHasAsScala
@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
@ClusterTestDefaults(types = Array(Type.KRAFT), serverProperties = Array(
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"),

View File

@ -24,12 +24,12 @@ import java.util.Properties
import java.util.concurrent.atomic.{AtomicInteger, AtomicReference}
import kafka.log.LogManager
import kafka.server.{BrokerServer, KafkaConfig, ReplicaManager}
import org.apache.kafka.common.test.{KafkaClusterTestKit, TestKitNodes}
import kafka.utils.TestUtils
import org.apache.kafka.clients.admin.AlterConfigOp.OpType.SET
import org.apache.kafka.clients.admin.{Admin, AlterConfigOp, ConfigEntry, NewTopic}
import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.config.ConfigResource.Type.BROKER
import org.apache.kafka.common.test.{KafkaClusterTestKit, TestKitNodes}
import org.apache.kafka.common.utils.Exit
import org.apache.kafka.coordinator.group.GroupCoordinator
import org.apache.kafka.coordinator.share.ShareCoordinator

View File

@ -110,8 +110,8 @@ include 'clients',
'tools:tools-api',
'transaction-coordinator',
'trogdor',
'test-common',
'test-common:test-common-api',
'test-common:test-common-internal-api',
'test-common:test-common-util',
'test-common:test-common-runtime'
project(":storage:api").name = "storage-api"

View File

@ -19,10 +19,9 @@ package org.apache.kafka.server.log.remote.metadata.storage;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.test.api.ClusterInstance;
import org.apache.kafka.common.test.ClusterInstance;
import org.apache.kafka.common.test.api.ClusterTest;
import org.apache.kafka.common.test.api.ClusterTestDefaults;
import org.apache.kafka.common.test.api.ClusterTestExtensions;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.log.remote.storage.RemoteLogMetadataManager;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
@ -33,8 +32,6 @@ import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
import org.apache.kafka.storage.internals.log.EpochEntry;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.extension.ExtendWith;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
@ -54,7 +51,6 @@ import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
@ClusterTestDefaults(brokers = 3)
@ExtendWith(value = ClusterTestExtensions.class)
public class RemoteLogSegmentLifecycleTest {
private final int segSize = 1048576;

View File

@ -23,17 +23,14 @@ import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.test.api.ClusterInstance;
import org.apache.kafka.common.test.ClusterInstance;
import org.apache.kafka.common.test.api.ClusterTest;
import org.apache.kafka.common.test.api.ClusterTestDefaults;
import org.apache.kafka.common.test.api.ClusterTestExtensions;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
import org.junit.jupiter.api.extension.ExtendWith;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
@ -52,7 +49,6 @@ import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
@ExtendWith(ClusterTestExtensions.class)
@ClusterTestDefaults(brokers = 3)
public class TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest {
private final ClusterInstance clusterInstance;

View File

@ -21,23 +21,19 @@ import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.test.api.ClusterInstance;
import org.apache.kafka.common.test.ClusterInstance;
import org.apache.kafka.common.test.api.ClusterTest;
import org.apache.kafka.common.test.api.ClusterTestExtensions;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.extension.ExtendWith;
import java.util.Arrays;
import java.util.Collections;
import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.LOG_DIR;
import static org.junit.jupiter.api.Assertions.assertTrue;
@ExtendWith(value = ClusterTestExtensions.class)
public class TopicBasedRemoteLogMetadataManagerRestartTest {
private final Time time = Time.SYSTEM;

View File

@ -21,10 +21,9 @@ import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.test.api.ClusterInstance;
import org.apache.kafka.common.test.ClusterInstance;
import org.apache.kafka.common.test.api.ClusterTest;
import org.apache.kafka.common.test.api.ClusterTestDefaults;
import org.apache.kafka.common.test.api.ClusterTestExtensions;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
@ -32,7 +31,6 @@ import org.apache.kafka.server.log.remote.storage.RemoteResourceNotFoundExceptio
import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.extension.ExtendWith;
import java.io.IOException;
import java.util.Arrays;
@ -50,7 +48,6 @@ import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
@ExtendWith(ClusterTestExtensions.class)
@ClusterTestDefaults(brokers = 3)
public class TopicBasedRemoteLogMetadataManagerTest {
private static final int SEG_SIZE = 1048576;

View File

@ -19,17 +19,14 @@ package org.apache.kafka.server.log.remote.storage;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.test.api.ClusterInstance;
import org.apache.kafka.common.test.ClusterInstance;
import org.apache.kafka.common.test.api.ClusterTest;
import org.apache.kafka.common.test.api.ClusterTestDefaults;
import org.apache.kafka.common.test.api.ClusterTestExtensions;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.log.remote.metadata.storage.RemoteLogMetadataManagerTestUtils;
import org.apache.kafka.server.log.remote.metadata.storage.RemotePartitionMetadataStore;
import org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManager;
import org.junit.jupiter.api.extension.ExtendWith;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
@ -44,7 +41,6 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
@ExtendWith(ClusterTestExtensions.class)
@ClusterTestDefaults(brokers = 3)
public class RemoteLogMetadataManagerTest {
private final ClusterInstance clusterInstance;

View File

@ -33,10 +33,10 @@ import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.apache.kafka.common.test.TestKitNodes.DEFAULT_BROKER_LISTENER_NAME;
import static org.apache.kafka.common.test.TestKitNodes.DEFAULT_BROKER_SECURITY_PROTOCOL;
import static org.apache.kafka.common.test.TestKitNodes.DEFAULT_CONTROLLER_LISTENER_NAME;
import static org.apache.kafka.common.test.TestKitNodes.DEFAULT_CONTROLLER_SECURITY_PROTOCOL;
import static org.apache.kafka.common.test.api.TestKitDefaults.DEFAULT_BROKER_LISTENER_NAME;
import static org.apache.kafka.common.test.api.TestKitDefaults.DEFAULT_BROKER_SECURITY_PROTOCOL;
import static org.apache.kafka.common.test.api.TestKitDefaults.DEFAULT_CONTROLLER_LISTENER_NAME;
import static org.apache.kafka.common.test.api.TestKitDefaults.DEFAULT_CONTROLLER_SECURITY_PROTOCOL;
/**
* Represents an immutable requested configuration of a Kafka cluster for integration testing.

View File

@ -30,8 +30,8 @@ import java.lang.annotation.Target;
import static java.lang.annotation.ElementType.METHOD;
import static java.lang.annotation.RetentionPolicy.RUNTIME;
import static org.apache.kafka.common.test.TestKitNodes.DEFAULT_BROKER_LISTENER_NAME;
import static org.apache.kafka.common.test.TestKitNodes.DEFAULT_CONTROLLER_LISTENER_NAME;
import static org.apache.kafka.common.test.api.TestKitDefaults.DEFAULT_BROKER_LISTENER_NAME;
import static org.apache.kafka.common.test.api.TestKitDefaults.DEFAULT_CONTROLLER_LISTENER_NAME;
@Documented
@Target({METHOD})

View File

@ -79,7 +79,7 @@ a JUnit extension called `ClusterTestExtensions` which knows how to process thes
invocations. Test classes that wish to make use of these annotations need to explicitly register this extension:
```scala
import org.apache.kafka.common.test.api.ClusterTestExtensions
import org.apache.kafka.common.test.junit.ClusterTestExtensions
@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
class ApiVersionsRequestTest {

View File

@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.test.api;
import org.apache.kafka.common.security.auth.SecurityProtocol;
/**
* Constants used by TestKitNodes and ClusterTest annotation defaults
*/
public class TestKitDefaults {
public static final int CONTROLLER_ID_OFFSET = 3000;
public static final int BROKER_ID_OFFSET = 0;
public static final SecurityProtocol DEFAULT_BROKER_SECURITY_PROTOCOL = SecurityProtocol.PLAINTEXT;
public static final String DEFAULT_BROKER_LISTENER_NAME = "EXTERNAL";
public static final SecurityProtocol DEFAULT_CONTROLLER_SECURITY_PROTOCOL = SecurityProtocol.PLAINTEXT;
public static final String DEFAULT_CONTROLLER_LISTENER_NAME = "CONTROLLER";
private TestKitDefaults() {
}
}

View File

@ -16,25 +16,9 @@
*/
package org.apache.kafka.common.test.api;
import org.junit.jupiter.api.extension.TestTemplateInvocationContext;
/**
* The type of cluster config being requested. Used by {@link org.apache.kafka.common.test.api.ClusterConfig} and the test annotations.
*/
public enum Type {
KRAFT {
@Override
public TestTemplateInvocationContext invocationContexts(String baseDisplayName, ClusterConfig config) {
return new RaftClusterInvocationContext(baseDisplayName, config, false);
}
},
CO_KRAFT {
@Override
public TestTemplateInvocationContext invocationContexts(String baseDisplayName, ClusterConfig config) {
return new RaftClusterInvocationContext(baseDisplayName, config, true);
}
};
public abstract TestTemplateInvocationContext invocationContexts(String baseDisplayName, ClusterConfig config);
KRAFT, CO_KRAFT;
}

View File

@ -19,7 +19,6 @@ package org.apache.kafka.common.test.api;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.test.TestUtils;
import org.apache.kafka.server.common.MetadataVersion;
import org.junit.jupiter.api.Assertions;
@ -28,6 +27,7 @@ import org.junit.jupiter.api.Test;
import java.io.File;
import java.io.IOException;
import java.lang.reflect.Field;
import java.nio.file.Files;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
@ -35,10 +35,10 @@ import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import static org.apache.kafka.common.test.TestKitNodes.DEFAULT_BROKER_LISTENER_NAME;
import static org.apache.kafka.common.test.TestKitNodes.DEFAULT_BROKER_SECURITY_PROTOCOL;
import static org.apache.kafka.common.test.TestKitNodes.DEFAULT_CONTROLLER_LISTENER_NAME;
import static org.apache.kafka.common.test.TestKitNodes.DEFAULT_CONTROLLER_SECURITY_PROTOCOL;
import static org.apache.kafka.common.test.api.TestKitDefaults.DEFAULT_BROKER_LISTENER_NAME;
import static org.apache.kafka.common.test.api.TestKitDefaults.DEFAULT_BROKER_SECURITY_PROTOCOL;
import static org.apache.kafka.common.test.api.TestKitDefaults.DEFAULT_CONTROLLER_LISTENER_NAME;
import static org.apache.kafka.common.test.api.TestKitDefaults.DEFAULT_CONTROLLER_SECURITY_PROTOCOL;
public class ClusterConfigTest {
@ -51,7 +51,8 @@ public class ClusterConfigTest {
@Test
public void testCopy() throws IOException {
File trustStoreFile = TestUtils.tempFile();
File trustStoreFile = Files.createTempFile("kafka", ".tmp").toFile();
trustStoreFile.deleteOnExit();
ClusterConfig clusterConfig = ClusterConfig.builder()
.setTypes(Collections.singleton(Type.KRAFT))

View File

@ -15,7 +15,7 @@
* limitations under the License.
*/
package org.apache.kafka.common.test.api;
package org.apache.kafka.common.test;
import kafka.log.UnifiedLog;
import kafka.network.SocketServer;
@ -45,8 +45,9 @@ import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.test.JaasUtils;
import org.apache.kafka.common.test.TestUtils;
import org.apache.kafka.common.test.api.ClusterConfig;
import org.apache.kafka.common.test.api.ClusterTest;
import org.apache.kafka.common.test.api.Type;
import org.apache.kafka.server.authorizer.Authorizer;
import org.apache.kafka.server.fault.FaultHandlerException;
import org.apache.kafka.storage.internals.checkpoint.OffsetCheckpointFile;

View File

@ -20,6 +20,7 @@ package org.apache.kafka.common.test;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.test.api.TestKitDefaults;
import org.apache.kafka.metadata.bootstrap.BootstrapMetadata;
import org.apache.kafka.metadata.properties.MetaProperties;
import org.apache.kafka.metadata.properties.MetaPropertiesEnsemble;
@ -43,13 +44,6 @@ import java.util.stream.Stream;
@SuppressWarnings("NPathComplexity")
public class TestKitNodes {
public static final int CONTROLLER_ID_OFFSET = 3000;
public static final int BROKER_ID_OFFSET = 0;
public static final SecurityProtocol DEFAULT_BROKER_SECURITY_PROTOCOL = SecurityProtocol.PLAINTEXT;
public static final String DEFAULT_BROKER_LISTENER_NAME = "EXTERNAL";
public static final SecurityProtocol DEFAULT_CONTROLLER_SECURITY_PROTOCOL = SecurityProtocol.PLAINTEXT;
public static final String DEFAULT_CONTROLLER_LISTENER_NAME = "CONTROLLER";
public static class Builder {
private boolean combined;
private String clusterId;
@ -69,10 +63,10 @@ public class TestKitNodes {
}
// The broker and controller listener name and SecurityProtocol configurations must
// be kept in sync with the default values in ClusterTest.
private ListenerName brokerListenerName = ListenerName.normalised(DEFAULT_BROKER_LISTENER_NAME);
private SecurityProtocol brokerSecurityProtocol = DEFAULT_BROKER_SECURITY_PROTOCOL;
private ListenerName controllerListenerName = ListenerName.normalised(DEFAULT_CONTROLLER_LISTENER_NAME);
private SecurityProtocol controllerSecurityProtocol = DEFAULT_CONTROLLER_SECURITY_PROTOCOL;
private ListenerName brokerListenerName = ListenerName.normalised(TestKitDefaults.DEFAULT_BROKER_LISTENER_NAME);
private SecurityProtocol brokerSecurityProtocol = TestKitDefaults.DEFAULT_BROKER_SECURITY_PROTOCOL;
private ListenerName controllerListenerName = ListenerName.normalised(TestKitDefaults.DEFAULT_CONTROLLER_LISTENER_NAME);
private SecurityProtocol controllerSecurityProtocol = TestKitDefaults.DEFAULT_CONTROLLER_SECURITY_PROTOCOL;
public Builder setClusterId(String clusterId) {
this.clusterId = clusterId;
@ -169,11 +163,11 @@ public class TestKitNodes {
clusterId = Uuid.randomUuid().toString();
}
int controllerId = combined ? BROKER_ID_OFFSET : BROKER_ID_OFFSET + CONTROLLER_ID_OFFSET;
int controllerId = combined ? TestKitDefaults.BROKER_ID_OFFSET : TestKitDefaults.BROKER_ID_OFFSET + TestKitDefaults.CONTROLLER_ID_OFFSET;
List<Integer> controllerNodeIds = IntStream.range(controllerId, controllerId + numControllerNodes)
.boxed()
.collect(Collectors.toList());
List<Integer> brokerNodeIds = IntStream.range(BROKER_ID_OFFSET, BROKER_ID_OFFSET + numBrokerNodes)
List<Integer> brokerNodeIds = IntStream.range(TestKitDefaults.BROKER_ID_OFFSET, TestKitDefaults.BROKER_ID_OFFSET + numBrokerNodes)
.boxed()
.collect(Collectors.toList());

View File

@ -15,17 +15,19 @@
* limitations under the License.
*/
package org.apache.kafka.common.test.api;
package org.apache.kafka.common.test.junit;
import org.apache.kafka.common.test.ClusterInstance;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtensionContext;
import org.junit.jupiter.api.extension.ParameterContext;
import org.junit.jupiter.api.extension.ParameterResolver;
import org.junit.platform.commons.util.AnnotationUtils;
import java.lang.reflect.Executable;
import static org.junit.platform.commons.util.AnnotationUtils.isAnnotated;
/**
* This resolver provides an instance of {@link ClusterInstance} to a test invocation. The instance represents the
@ -57,7 +59,7 @@ public class ClusterInstanceParameterResolver implements ParameterResolver {
} else {
// If we're injecting into a method, make sure it's a test method and not a lifecycle method
Executable parameterizedMethod = parameterContext.getParameter().getDeclaringExecutable();
return isAnnotated(parameterizedMethod, TestTemplate.class);
return AnnotationUtils.isAnnotated(parameterizedMethod, TestTemplate.class);
}
}

View File

@ -14,9 +14,19 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.test.api;
package org.apache.kafka.common.test.junit;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.test.api.AutoStart;
import org.apache.kafka.common.test.api.ClusterConfig;
import org.apache.kafka.common.test.api.ClusterConfigProperty;
import org.apache.kafka.common.test.api.ClusterFeature;
import org.apache.kafka.common.test.api.ClusterTemplate;
import org.apache.kafka.common.test.api.ClusterTest;
import org.apache.kafka.common.test.api.ClusterTestDefaults;
import org.apache.kafka.common.test.api.ClusterTests;
import org.apache.kafka.common.test.api.DetectThreadLeak;
import org.apache.kafka.common.test.api.Type;
import org.apache.kafka.server.common.Feature;
import org.apache.kafka.server.util.timer.SystemTimer;
@ -110,6 +120,13 @@ public class ClusterTestExtensions implements TestTemplateInvocationContextProvi
return true;
}
private boolean isClusterTest(ExtensionContext context) {
Method method = context.getRequiredTestMethod();
return method.getDeclaredAnnotation(ClusterTemplate.class) != null ||
method.getDeclaredAnnotation(ClusterTest.class) != null ||
method.getDeclaredAnnotation(ClusterTests.class) != null;
}
@Override
public Stream<TestTemplateInvocationContext> provideTestTemplateInvocationContexts(ExtensionContext context) {
ClusterTestDefaults defaults = getClusterTestDefaults(context.getRequiredTestClass());
@ -133,30 +150,29 @@ public class ClusterTestExtensions implements TestTemplateInvocationContextProvi
generatedContexts.addAll(processClusterTests(context, clusterTestsAnnot.value(), defaults));
}
if (generatedContexts.isEmpty()) {
throw new IllegalStateException("Please annotate test methods with @ClusterTemplate, @ClusterTest, or " +
"@ClusterTests when using the ClusterTestExtensions provider");
}
return generatedContexts.stream();
}
@Override
public void beforeEach(ExtensionContext context) {
DetectThreadLeak detectThreadLeak = DetectThreadLeak.of(thread ->
if (isClusterTest(context)) {
DetectThreadLeak detectThreadLeak = DetectThreadLeak.of(thread ->
SKIPPED_THREAD_PREFIX.stream().noneMatch(prefix -> thread.getName().startsWith(prefix)));
getStore(context).put(DETECT_THREAD_LEAK_KEY, detectThreadLeak);
getStore(context).put(DETECT_THREAD_LEAK_KEY, detectThreadLeak);
}
}
@Override
public void afterEach(ExtensionContext context) {
DetectThreadLeak detectThreadLeak = getStore(context).remove(DETECT_THREAD_LEAK_KEY, DetectThreadLeak.class);
if (detectThreadLeak == null) {
return;
}
List<Thread> threads = detectThreadLeak.newThreads();
assertTrue(threads.isEmpty(), "Thread leak detected: " +
if (isClusterTest(context)) {
DetectThreadLeak detectThreadLeak = getStore(context).remove(DETECT_THREAD_LEAK_KEY, DetectThreadLeak.class);
if (detectThreadLeak == null) {
return;
}
List<Thread> threads = detectThreadLeak.newThreads();
assertTrue(threads.isEmpty(), "Thread leak detected: " +
threads.stream().map(Thread::getName).collect(Collectors.joining(", ")));
}
}
private Store getStore(ExtensionContext context) {
@ -174,6 +190,21 @@ public class ClusterTestExtensions implements TestTemplateInvocationContextProvi
return count;
}
private TestTemplateInvocationContext invocationContextForClusterType(
Type type,
String baseDisplayName,
ClusterConfig config
) {
switch (type) {
case KRAFT:
return new RaftClusterInvocationContext(baseDisplayName, config, false);
case CO_KRAFT:
return new RaftClusterInvocationContext(baseDisplayName, config, true);
default:
throw new IllegalArgumentException("Unsupported @Type value " + type);
}
}
List<TestTemplateInvocationContext> processClusterTemplate(ExtensionContext context, ClusterTemplate annot) {
if (annot.value().trim().isEmpty()) {
throw new IllegalStateException("ClusterTemplate value can't be empty string.");
@ -184,7 +215,7 @@ public class ClusterTestExtensions implements TestTemplateInvocationContextProvi
List<TestTemplateInvocationContext> contexts = IntStream.range(0, repeatCount)
.mapToObj(__ -> generateClusterConfigurations(context, annot.value()).stream())
.flatMap(Function.identity())
.flatMap(config -> config.clusterTypes().stream().map(type -> type.invocationContexts(baseDisplayName, config)))
.flatMap(config -> config.clusterTypes().stream().map(type -> invocationContextForClusterType(type, baseDisplayName, config)))
.collect(Collectors.toList());
if (contexts.isEmpty()) {
@ -259,7 +290,7 @@ public class ClusterTestExtensions implements TestTemplateInvocationContextProvi
.build();
return Arrays.stream(types)
.map(type -> type.invocationContexts(context.getRequiredTestMethod().getName(), config))
.map(type -> invocationContextForClusterType(type, context.getRequiredTestMethod().getName(), config))
.collect(Collectors.toList());
}

View File

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.test.api;
package org.apache.kafka.common.test.junit;
import kafka.network.SocketServer;
import kafka.server.BrokerServer;
@ -22,9 +22,12 @@ import kafka.server.ControllerServer;
import kafka.server.KafkaBroker;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.test.ClusterInstance;
import org.apache.kafka.common.test.KafkaClusterTestKit;
import org.apache.kafka.common.test.TestKitNodes;
import org.apache.kafka.common.test.TestUtils;
import org.apache.kafka.common.test.api.ClusterConfig;
import org.apache.kafka.common.test.api.Type;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.metadata.BrokerState;
import org.apache.kafka.metadata.bootstrap.BootstrapMetadata;

View File

@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
org.apache.kafka.common.test.junit.ClusterTestExtensions

View File

@ -15,7 +15,7 @@
* limitations under the License.
*/
package org.apache.kafka.common.test.api;
package org.apache.kafka.common.test.junit;
import kafka.server.ControllerServer;
@ -40,14 +40,22 @@ import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.test.ClusterInstance;
import org.apache.kafka.common.test.JaasUtils;
import org.apache.kafka.common.test.TestUtils;
import org.apache.kafka.common.test.api.AutoStart;
import org.apache.kafka.common.test.api.ClusterConfig;
import org.apache.kafka.common.test.api.ClusterConfigProperty;
import org.apache.kafka.common.test.api.ClusterTemplate;
import org.apache.kafka.common.test.api.ClusterTest;
import org.apache.kafka.common.test.api.ClusterTestDefaults;
import org.apache.kafka.common.test.api.ClusterTests;
import org.apache.kafka.common.test.api.Type;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
import org.apache.kafka.server.common.MetadataVersion;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.extension.ExtendWith;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
@ -84,7 +92,6 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
@ClusterConfigProperty(key = "default.key", value = "default.value"),
@ClusterConfigProperty(id = 0, key = "queued.max.requests", value = "100"),
}) // Set defaults for a few params in @ClusterTest(s)
@ExtendWith(ClusterTestExtensions.class)
public class ClusterTestExtensionsTest {
private final ClusterInstance clusterInstance;
@ -371,7 +378,7 @@ public class ClusterTestExtensionsTest {
}
)
public void testSaslPlaintext(ClusterInstance clusterInstance) throws CancellationException, ExecutionException, InterruptedException {
Assertions.assertEquals(SecurityProtocol.SASL_PLAINTEXT, clusterInstance.config().brokerSecurityProtocol());
assertEquals(SecurityProtocol.SASL_PLAINTEXT, clusterInstance.config().brokerSecurityProtocol());
// default ClusterInstance#admin helper with admin credentials
try (Admin admin = clusterInstance.admin()) {

Some files were not shown because too many files have changed in this diff Show More