KAFKA-18975 Move clients-integration-test out of core module (#19217)

Move following tests from core to clients-integration-test module.

- ClientTelemetryTest
- DeleteTopicTest
- DescribeAuthorizedOperationsTest
- ConsumerIntegrationTest
- CustomQuotaCallbackTest
- RackAwareAutoTopicCreationTest

Move following tests from core to server module.

- BootstrapControllersIntegrationTest
- LogManagerIntegrationTest

Reviewers: Kirk True <kirk@kirktrue.pro>, Ken Huang <s7133700@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
PoAn Yang 2025-03-20 02:43:19 +08:00 committed by GitHub
parent 3a3159b01e
commit fcca4056fd
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 25 additions and 45 deletions

View File

@ -911,6 +911,8 @@ project(':server') {
testImplementation libs.mockitoCore testImplementation libs.mockitoCore
testImplementation libs.junitJupiter testImplementation libs.junitJupiter
testImplementation testLog4j2Libs testImplementation testLog4j2Libs
testImplementation project(':test-common:test-common-internal-api')
testImplementation project(':test-common:test-common-runtime')
testRuntimeOnly runtimeTestLibs testRuntimeOnly runtimeTestLibs
} }
@ -1972,9 +1974,13 @@ project(':clients:clients-integration-tests') {
testImplementation libs.slf4jApi testImplementation libs.slf4jApi
testImplementation project(':test-common:test-common-internal-api') testImplementation project(':test-common:test-common-internal-api')
testImplementation project(':test-common:test-common-runtime') testImplementation project(':test-common:test-common-runtime')
testImplementation project(':metadata')
testImplementation project(':server')
testImplementation project(':storage')
testImplementation project(':core').sourceSets.test.output
testImplementation project(':clients').sourceSets.test.output
implementation project(':server-common') implementation project(':server-common')
testImplementation project(':metadata') testImplementation project(':metadata')
implementation project(':clients').sourceSets.test.output
implementation project(':group-coordinator') implementation project(':group-coordinator')
implementation project(':transaction-coordinator') implementation project(':transaction-coordinator')

View File

@ -84,6 +84,7 @@
<allow pkg="org.apache.kafka.server" /> <allow pkg="org.apache.kafka.server" />
<allow pkg="org.apache.kafka.image" /> <allow pkg="org.apache.kafka.image" />
<allow pkg="org.apache.kafka.storage.internals.log" /> <allow pkg="org.apache.kafka.storage.internals.log" />
<allow pkg="org.apache.kafka.storage.internals.checkpoint" />
<subpackage name="metrics"> <subpackage name="metrics">
<allow class="org.apache.kafka.server.authorizer.AuthorizableRequestContext" /> <allow class="org.apache.kafka.server.authorizer.AuthorizableRequestContext" />
<allow pkg="org.apache.kafka.server.telemetry" /> <allow pkg="org.apache.kafka.server.telemetry" />

View File

@ -15,11 +15,10 @@
* limitations under the License. * limitations under the License.
*/ */
package kafka.admin; package org.apache.kafka.clients.admin;
import kafka.admin.ConfigCommand;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecord;
@ -71,7 +70,7 @@ public class ClientTelemetryTest {
types = Type.KRAFT, types = Type.KRAFT,
brokers = 3, brokers = 3,
serverProperties = { serverProperties = {
@ClusterConfigProperty(key = METRIC_REPORTER_CLASSES_CONFIG, value = "kafka.admin.ClientTelemetryTest$GetIdClientTelemetry"), @ClusterConfigProperty(key = METRIC_REPORTER_CLASSES_CONFIG, value = "org.apache.kafka.clients.admin.ClientTelemetryTest$GetIdClientTelemetry"),
}) })
public void testClientInstanceId(ClusterInstance clusterInstance) throws InterruptedException, ExecutionException { public void testClientInstanceId(ClusterInstance clusterInstance) throws InterruptedException, ExecutionException {
Map<String, Object> configs = new HashMap<>(); Map<String, Object> configs = new HashMap<>();
@ -159,7 +158,7 @@ public class ClientTelemetryTest {
/** /**
* We should add a ClientTelemetry into plugins to test the clientInstanceId method Otherwise the * We should add a ClientTelemetry into plugins to test the clientInstanceId method Otherwise the
* {@link org.apache.kafka.common.protocol.ApiKeys.GET_TELEMETRY_SUBSCRIPTIONS } command will not be supported * {@link org.apache.kafka.common.protocol.ApiKeys#GET_TELEMETRY_SUBSCRIPTIONS} command will not be supported
* by the server * by the server
**/ **/
public static class GetIdClientTelemetry implements ClientTelemetry, MetricsReporter { public static class GetIdClientTelemetry implements ClientTelemetry, MetricsReporter {

View File

@ -14,15 +14,10 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package kafka.admin; package org.apache.kafka.clients.admin;
import kafka.server.KafkaBroker; import kafka.server.KafkaBroker;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewPartitionReassignment;
import org.apache.kafka.clients.admin.NewPartitions;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.compress.Compression; import org.apache.kafka.common.compress.Compression;
import org.apache.kafka.common.errors.TopicDeletionDisabledException; import org.apache.kafka.common.errors.TopicDeletionDisabledException;
@ -53,8 +48,6 @@ import java.util.concurrent.ExecutionException;
import java.util.function.Supplier; import java.util.function.Supplier;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import scala.jdk.javaapi.OptionConverters;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.fail; import static org.junit.jupiter.api.Assertions.fail;
@ -322,8 +315,8 @@ public class DeleteTopicTest {
private Supplier<Optional<Integer>> isLeaderKnown(Map<Integer, KafkaBroker> idToBroker, TopicPartition topicPartition) { private Supplier<Optional<Integer>> isLeaderKnown(Map<Integer, KafkaBroker> idToBroker, TopicPartition topicPartition) {
return () -> idToBroker.values() return () -> idToBroker.values()
.stream() .stream()
.filter(broker -> OptionConverters.toJava(broker.replicaManager().onlinePartition(topicPartition)) .filter(broker -> broker.replicaManager().onlinePartition(topicPartition)
.stream().anyMatch(tp -> tp.leaderIdIfLocal().isDefined())) .exists(tp -> tp.leaderIdIfLocal().isDefined()))
.map(broker -> broker.config().brokerId()) .map(broker -> broker.config().brokerId())
.findFirst(); .findFirst();
} }

View File

@ -15,17 +15,9 @@
* limitations under the License. * limitations under the License.
*/ */
package kafka.admin; package org.apache.kafka.clients.admin;
import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.ConsumerGroupDescription;
import org.apache.kafka.clients.admin.DescribeClusterOptions;
import org.apache.kafka.clients.admin.DescribeConsumerGroupsOptions;
import org.apache.kafka.clients.admin.DescribeConsumerGroupsResult;
import org.apache.kafka.clients.admin.DescribeTopicsOptions;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.acl.AccessControlEntry; import org.apache.kafka.common.acl.AccessControlEntry;

View File

@ -14,10 +14,8 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package kafka.admin; package org.apache.kafka.clients.admin;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.Node; import org.apache.kafka.common.Node;

View File

@ -14,14 +14,8 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package kafka.clients.consumer; package org.apache.kafka.clients.consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.GroupProtocol;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.internals.AbstractHeartbeatRequestManager; import org.apache.kafka.clients.consumer.internals.AbstractHeartbeatRequestManager;
import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.ProducerRecord;

View File

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package kafka.test.api; package org.apache.kafka.server.quota;
import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.clients.admin.NewTopic;
@ -28,9 +28,6 @@ import org.apache.kafka.common.test.api.ClusterTestDefaults;
import org.apache.kafka.common.test.api.Type; import org.apache.kafka.common.test.api.Type;
import org.apache.kafka.common.test.junit.ClusterTestExtensions; import org.apache.kafka.common.test.junit.ClusterTestExtensions;
import org.apache.kafka.server.config.QuotaConfig; import org.apache.kafka.server.config.QuotaConfig;
import org.apache.kafka.server.quota.ClientQuotaCallback;
import org.apache.kafka.server.quota.ClientQuotaEntity;
import org.apache.kafka.server.quota.ClientQuotaType;
import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.api.extension.ExtendWith;
@ -42,9 +39,9 @@ import java.util.concurrent.atomic.AtomicInteger;
@ClusterTestDefaults(controllers = 3, @ClusterTestDefaults(controllers = 3,
types = {Type.KRAFT}, types = {Type.KRAFT},
serverProperties = { serverProperties = {
@ClusterConfigProperty(id = 3000, key = QuotaConfig.CLIENT_QUOTA_CALLBACK_CLASS_CONFIG, value = "kafka.test.api.CustomQuotaCallbackTest$CustomQuotaCallback"), @ClusterConfigProperty(id = 3000, key = QuotaConfig.CLIENT_QUOTA_CALLBACK_CLASS_CONFIG, value = "org.apache.kafka.server.quota.CustomQuotaCallbackTest$CustomQuotaCallback"),
@ClusterConfigProperty(id = 3001, key = QuotaConfig.CLIENT_QUOTA_CALLBACK_CLASS_CONFIG, value = "kafka.test.api.CustomQuotaCallbackTest$CustomQuotaCallback"), @ClusterConfigProperty(id = 3001, key = QuotaConfig.CLIENT_QUOTA_CALLBACK_CLASS_CONFIG, value = "org.apache.kafka.server.quota.CustomQuotaCallbackTest$CustomQuotaCallback"),
@ClusterConfigProperty(id = 3002, key = QuotaConfig.CLIENT_QUOTA_CALLBACK_CLASS_CONFIG, value = "kafka.test.api.CustomQuotaCallbackTest$CustomQuotaCallback"), @ClusterConfigProperty(id = 3002, key = QuotaConfig.CLIENT_QUOTA_CALLBACK_CLASS_CONFIG, value = "org.apache.kafka.server.quota.CustomQuotaCallbackTest$CustomQuotaCallback"),
} }
) )
@ExtendWith(ClusterTestExtensions.class) @ExtendWith(ClusterTestExtensions.class)

View File

@ -15,7 +15,7 @@
* limitations under the License. * limitations under the License.
*/ */
package kafka.server; package org.apache.kafka.server;
import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AlterConfigOp; import org.apache.kafka.clients.admin.AlterConfigOp;

View File

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package kafka.server; package org.apache.kafka.server;
import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.clients.admin.NewTopic;