KAFKA-19324 Make org.apache.kafka.common.test.TestUtils package-private to prevent cross-module access (#19884)

Description

* Replace `org.apache.kafka.common.test.TestUtils` with
`org.apache.kafka.test.TestUtils` in outer package modules to
standardize test utility usage
* Move `waitUntilLeaderIsElectedOrChangedWithAdmin` method from
`org.apache.kafka.test.TestUtils` to `ClusterInstance` and refactor for
better code organization
* Add `org.apache.kafka.test.TestUtils` dependency to
`transaction-coordinator` import control

Reviewers: PoAn Yang [payang@apache.org](mailto:payang@apache.org), Ken
 Huang  [s7133700@gmail.com](mailto:s7133700@gmail.com), Ken Huang
 [s7133700@gmail.com](mailto:s7133700@gmail.com), Chia-Ping Tsai
 [chia7712@gmail.com](mailto:chia7712@gmail.com)
This commit is contained in:
Bolin Lin 2025-06-22 22:47:40 +08:00 committed by GitHub
parent 583acb60d6
commit 3404f65cdb
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
26 changed files with 128 additions and 105 deletions

View File

@ -38,6 +38,7 @@
<allow pkg="org.apache.kafka.coordinator.transaction" />
<allow pkg="org.apache.kafka.common" />
<allow pkg="org.apache.kafka.common.test.api" />
<allow pkg="org.apache.kafka.test" />
<allow pkg="org.slf4j" />
<subpackage name="generated">
<allow pkg="org.apache.kafka.common.protocol" />

View File

@ -24,11 +24,11 @@ import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.TopicConfig;
import org.apache.kafka.common.test.ClusterInstance;
import org.apache.kafka.common.test.TestUtils;
import org.apache.kafka.common.test.api.ClusterConfigProperty;
import org.apache.kafka.common.test.api.ClusterTest;
import org.apache.kafka.common.test.api.Type;
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
import org.apache.kafka.test.TestUtils;
import java.time.Duration;
import java.util.List;

View File

@ -33,7 +33,7 @@ import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.test.ClusterInstance;
import org.apache.kafka.common.test.TestUtils;
import org.apache.kafka.test.TestUtils;
import java.time.Duration;
import java.util.ArrayList;

View File

@ -19,11 +19,11 @@ package org.apache.kafka.clients;
import org.apache.kafka.clients.admin.FeatureUpdate;
import org.apache.kafka.clients.admin.UpdateFeaturesOptions;
import org.apache.kafka.common.test.ClusterInstance;
import org.apache.kafka.common.test.TestUtils;
import org.apache.kafka.common.test.api.ClusterTest;
import org.apache.kafka.common.test.api.ClusterTests;
import org.apache.kafka.common.test.api.Type;
import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.test.TestUtils;
import java.util.Map;

View File

@ -34,7 +34,6 @@ import org.apache.kafka.common.errors.TransactionalIdNotFoundException;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.test.ClusterInstance;
import org.apache.kafka.common.test.TestUtils;
import org.apache.kafka.common.test.api.ClusterConfigProperty;
import org.apache.kafka.common.test.api.ClusterFeature;
import org.apache.kafka.common.test.api.ClusterTest;
@ -47,6 +46,7 @@ import org.apache.kafka.server.common.Feature;
import org.apache.kafka.server.config.ReplicationConfigs;
import org.apache.kafka.server.config.ServerConfigs;
import org.apache.kafka.server.config.ServerLogConfigs;
import org.apache.kafka.test.TestUtils;
import java.time.Duration;
import java.util.ArrayList;

View File

@ -27,7 +27,6 @@ import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.test.ClusterInstance;
import org.apache.kafka.common.test.TestUtils;
import org.apache.kafka.common.test.api.ClusterConfigProperty;
import org.apache.kafka.common.test.api.ClusterTest;
import org.apache.kafka.common.test.api.ClusterTestDefaults;
@ -38,6 +37,7 @@ import org.apache.kafka.coordinator.transaction.TransactionStateManagerConfig;
import org.apache.kafka.server.config.ReplicationConfigs;
import org.apache.kafka.server.config.ServerConfigs;
import org.apache.kafka.server.config.ServerLogConfigs;
import org.apache.kafka.test.TestUtils;
import java.time.Duration;
import java.util.ArrayList;

View File

@ -25,7 +25,6 @@ import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.SimpleRecord;
import org.apache.kafka.common.test.ClusterInstance;
import org.apache.kafka.common.test.TestUtils;
import org.apache.kafka.common.test.api.ClusterConfigProperty;
import org.apache.kafka.common.test.api.ClusterTest;
import org.apache.kafka.common.test.api.ClusterTestDefaults;
@ -37,6 +36,7 @@ import org.apache.kafka.server.config.ServerConfigs;
import org.apache.kafka.storage.internals.log.AppendOrigin;
import org.apache.kafka.storage.internals.log.UnifiedLog;
import org.apache.kafka.storage.internals.log.VerificationGuard;
import org.apache.kafka.test.TestUtils;
import java.util.ArrayList;
import java.util.Collection;
@ -202,6 +202,7 @@ public class DeleteTopicTest {
waitForReplicaCreated(cluster.brokers(), topicPartition, "Replicas for topic " + DEFAULT_TOPIC + " not created.");
}
}
@ClusterTest
public void testDeleteNonExistingTopic(ClusterInstance cluster) throws Exception {
try (Admin admin = cluster.admin()) {
@ -220,7 +221,7 @@ public class DeleteTopicTest {
cluster.waitForTopic(topic, 0);
waitForReplicaCreated(cluster.brokers(), topicPartition, "Replicas for topic test not created.");
TestUtils.waitUntilLeaderIsElectedOrChangedWithAdmin(admin, DEFAULT_TOPIC, 0, 1000);
cluster.waitUntilLeaderIsElectedOrChangedWithAdmin(admin, DEFAULT_TOPIC, 0, 1000);
}
}

View File

@ -28,12 +28,12 @@ import org.apache.kafka.common.config.TopicConfig;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.requests.ListOffsetsResponse;
import org.apache.kafka.common.test.ClusterInstance;
import org.apache.kafka.common.test.TestUtils;
import org.apache.kafka.common.test.api.ClusterConfigProperty;
import org.apache.kafka.common.test.api.ClusterTest;
import org.apache.kafka.common.test.api.ClusterTestDefaults;
import org.apache.kafka.common.test.api.Type;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;

View File

@ -31,12 +31,12 @@ import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.test.ClusterInstance;
import org.apache.kafka.common.test.TestUtils;
import org.apache.kafka.common.test.api.ClusterConfigProperty;
import org.apache.kafka.common.test.api.ClusterTest;
import org.apache.kafka.common.test.api.ClusterTests;
import org.apache.kafka.common.test.api.Type;
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
import org.apache.kafka.test.TestUtils;
import java.time.Duration;
import java.util.Collection;

View File

@ -23,12 +23,12 @@ import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.test.ClusterInstance;
import org.apache.kafka.common.test.TestUtils;
import org.apache.kafka.common.test.api.ClusterConfigProperty;
import org.apache.kafka.common.test.api.ClusterTest;
import org.apache.kafka.common.test.api.ClusterTestDefaults;
import org.apache.kafka.common.test.api.Type;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.BeforeEach;

View File

@ -22,11 +22,11 @@ import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.InvalidRegularExpression;
import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.test.ClusterInstance;
import org.apache.kafka.common.test.TestUtils;
import org.apache.kafka.common.test.api.ClusterConfigProperty;
import org.apache.kafka.common.test.api.ClusterTest;
import org.apache.kafka.common.test.api.ClusterTestDefaults;
import org.apache.kafka.common.test.api.Type;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.BeforeEach;

View File

@ -26,11 +26,11 @@ import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.test.ClusterInstance;
import org.apache.kafka.common.test.TestUtils;
import org.apache.kafka.common.test.api.ClusterConfigProperty;
import org.apache.kafka.common.test.api.ClusterTest;
import org.apache.kafka.common.test.api.Type;
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
import org.apache.kafka.test.TestUtils;
import java.time.Duration;
import java.util.List;

View File

@ -28,7 +28,6 @@ import org.apache.kafka.common.TopicPartitionInfo;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.test.ClusterInstance;
import org.apache.kafka.common.test.TestUtils;
import org.apache.kafka.common.test.api.ClusterConfigProperty;
import org.apache.kafka.common.test.api.ClusterTest;
import org.apache.kafka.common.test.api.ClusterTestDefaults;
@ -36,6 +35,7 @@ import org.apache.kafka.common.test.api.Type;
import org.apache.kafka.server.config.ServerLogConfigs;
import org.apache.kafka.storage.internals.checkpoint.OffsetCheckpointFile;
import org.apache.kafka.storage.internals.log.UnifiedLog;
import org.apache.kafka.test.TestUtils;
import java.io.File;
import java.nio.charset.StandardCharsets;

View File

@ -44,7 +44,6 @@ import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder;
import org.apache.kafka.common.test.ClusterInstance;
import org.apache.kafka.common.test.TestUtils;
import org.apache.kafka.common.test.api.ClusterConfigProperty;
import org.apache.kafka.common.test.api.ClusterTest;
import org.apache.kafka.common.test.api.ClusterTestDefaults;
@ -53,6 +52,7 @@ import org.apache.kafka.metadata.authorizer.StandardAuthorizer;
import org.apache.kafka.server.authorizer.AuthorizableRequestContext;
import org.apache.kafka.server.authorizer.Authorizer;
import org.apache.kafka.server.config.ServerConfigs;
import org.apache.kafka.test.TestUtils;
import java.net.InetAddress;
import java.time.Duration;

View File

@ -26,11 +26,11 @@ import org.apache.kafka.common.metrics.Monitorable;
import org.apache.kafka.common.metrics.PluginMetrics;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.test.ClusterInstance;
import org.apache.kafka.common.test.TestUtils;
import org.apache.kafka.common.test.api.ClusterConfigProperty;
import org.apache.kafka.common.test.api.ClusterTest;
import org.apache.kafka.common.test.api.Type;
import org.apache.kafka.server.config.QuotaConfig;
import org.apache.kafka.test.TestUtils;
import java.util.LinkedHashMap;
import java.util.List;

View File

@ -31,10 +31,11 @@ import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.record.RecordBatch
import org.apache.kafka.common.requests.{InitProducerIdRequest, InitProducerIdResponse}
import org.apache.kafka.common.test.{ClusterInstance, TestUtils}
import org.apache.kafka.common.test.ClusterInstance
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
import org.apache.kafka.coordinator.transaction.TransactionLogConfig
import org.apache.kafka.server.common.{Feature, MetadataVersion}
import org.apache.kafka.test.TestUtils
import org.junit.jupiter.api.Assertions.{assertEquals, assertInstanceOf, assertThrows, assertTrue}
import java.time.Duration

View File

@ -43,7 +43,6 @@ import org.apache.kafka.storage.internals.log.CleanerConfig
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.function.Executable
import org.apache.kafka.common.test.{TestUtils => JTestUtils}
import scala.jdk.CollectionConverters._
@ -592,8 +591,17 @@ class KafkaConfigTest {
props.setProperty(SocketServerConfigs.LISTENERS_CONFIG, "plaintext://localhost:9091,SsL://localhost:9092")
props.setProperty(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG, "PLAINTEXT:PLAINTEXT,SSL:SSL,CONTROLLER:PLAINTEXT")
val config = KafkaConfig.fromProps(props)
assertEquals(Some("SSL://localhost:9092"), config.listeners.find(_.listener == "SSL").map(JTestUtils.endpointToString))
assertEquals(Some("PLAINTEXT://localhost:9091"), config.listeners.find(_.listener == "PLAINTEXT").map(JTestUtils.endpointToString))
assertEndpointsEqual(new Endpoint("SSL", SecurityProtocol.SSL, "localhost", 9092),
config.listeners.find(_.listener == "SSL").getOrElse(fail("SSL endpoint not found")))
assertEndpointsEqual( new Endpoint("PLAINTEXT", SecurityProtocol.PLAINTEXT, "localhost", 9091),
config.listeners.find(_.listener == "PLAINTEXT").getOrElse(fail("PLAINTEXT endpoint not found")))
}
private def assertEndpointsEqual(expected: Endpoint, actual: Endpoint): Unit = {
assertEquals(expected.host(), actual.host(), "Host mismatch")
assertEquals(expected.port(), actual.port(), "Port mismatch")
assertEquals(expected.listener(), actual.listener(), "Listener mismatch")
assertEquals(expected.securityProtocol(), actual.securityProtocol(), "Security protocol mismatch")
}
private def listenerListToEndPoints(listenerList: String,
@ -1186,7 +1194,8 @@ class KafkaConfigTest {
val config = KafkaConfig.fromProps(defaults)
assertEquals(1, config.brokerId)
assertEquals(Seq("PLAINTEXT://127.0.0.1:1122"), config.effectiveAdvertisedBrokerListeners.map(JTestUtils.endpointToString))
assertEndpointsEqual(new Endpoint("PLAINTEXT", SecurityProtocol.PLAINTEXT, "127.0.0.1", 1122),
config.effectiveAdvertisedBrokerListeners.head)
assertEquals(Map("127.0.0.1" -> 2, "127.0.0.2" -> 3), config.maxConnectionsPerIpOverrides)
assertEquals(util.List.of("/tmp1", "/tmp2"), config.logDirs)
assertEquals(12 * 60L * 1000L * 60, config.logRollTimeMillis)

View File

@ -17,8 +17,8 @@
package org.apache.kafka.server.purgatory;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.test.TestUtils;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Test;

View File

@ -28,7 +28,6 @@ import org.apache.kafka.common.config.TopicConfig;
import org.apache.kafka.common.errors.InvalidConfigurationException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.test.ClusterInstance;
import org.apache.kafka.common.test.TestUtils;
import org.apache.kafka.common.test.api.ClusterConfigProperty;
import org.apache.kafka.common.test.api.ClusterTest;
import org.apache.kafka.common.test.api.ClusterTestDefaults;
@ -41,6 +40,8 @@ import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState;
import org.apache.kafka.test.TestCondition;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.TestInfo;
@ -554,7 +555,7 @@ class RemoteTopicCrudTest {
}
private void verifyRemoteLogTopicConfigs(Map<String, String> topicConfig) throws Exception {
TestUtils.waitForCondition(() -> {
TestCondition condition = () -> {
var logBuffer = cluster.brokers().values()
.stream()
.map(broker -> broker.logManager().getLog(new TopicPartition(testTopicName, 0), false))
@ -613,7 +614,8 @@ class RemoteTopicCrudTest {
}
}
return result;
}, "Failed to update topic config $topicConfig" + topicConfig);
};
TestUtils.waitForCondition(condition, "Failed to update topic config $topicConfig" + topicConfig);
}

View File

@ -29,10 +29,10 @@ import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.config.TopicConfig;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.test.ClusterInstance;
import org.apache.kafka.common.test.TestUtils;
import org.apache.kafka.common.test.api.ClusterConfigProperty;
import org.apache.kafka.common.test.api.ClusterTest;
import org.apache.kafka.common.test.api.Type;
import org.apache.kafka.test.TestUtils;
import java.time.Duration;
import java.util.ArrayList;

View File

@ -36,10 +36,13 @@ import org.apache.kafka.clients.consumer.ShareConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.acl.AccessControlEntry;
import org.apache.kafka.common.acl.AclBindingFilter;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.errors.LeaderNotAvailableException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
@ -65,6 +68,7 @@ import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@ -407,10 +411,54 @@ public interface ClusterInstance {
}
}
/**
* Wait for a leader to be elected or changed using the provided admin client.
*/
default int waitUntilLeaderIsElectedOrChangedWithAdmin(Admin admin,
String topic,
int partitionNumber,
long timeoutMs) throws Exception {
long startTime = System.currentTimeMillis();
TopicPartition topicPartition = new TopicPartition(topic, partitionNumber);
while (System.currentTimeMillis() < startTime + timeoutMs) {
try {
TopicDescription topicDescription = admin.describeTopics(List.of(topic))
.allTopicNames().get().get(topic);
Optional<Integer> leader = topicDescription.partitions().stream()
.filter(partitionInfo -> partitionInfo.partition() == partitionNumber)
.findFirst()
.map(partitionInfo -> {
int leaderId = partitionInfo.leader().id();
return leaderId == Node.noNode().id() ? null : leaderId;
});
if (leader.isPresent()) {
return leader.get();
}
} catch (ExecutionException e) {
Throwable cause = e.getCause();
if (cause instanceof UnknownTopicOrPartitionException ||
cause instanceof LeaderNotAvailableException) {
continue;
} else {
throw e;
}
}
TimeUnit.MILLISECONDS.sleep(Math.min(100L, timeoutMs));
}
throw new AssertionError("Timing out after " + timeoutMs +
" ms since a leader was not elected for partition " + topicPartition);
}
default List<Integer> boundPorts() {
return brokers().values().stream()
.map(KafkaBroker::socketServer)
.map(s -> s.boundPort(clientListener()))
.toList();
}
}

View File

@ -16,13 +16,7 @@
*/
package org.apache.kafka.common.test;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.common.Endpoint;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.Utils;
@ -32,18 +26,17 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.util.List;
import java.util.Optional;
import java.util.Random;
import java.util.function.BiFunction;
import java.util.function.Supplier;
import static java.lang.String.format;
/**
* Helper functions for writing unit tests
* Helper functions for writing unit tests.
* <p>
* <b>Package-private:</b> Not intended for use outside {@code org.apache.kafka.common.test}.
*/
public class TestUtils {
class TestUtils {
private static final Logger log = LoggerFactory.getLogger(TestUtils.class);
/* A consistent random number generator to make tests repeatable */
@ -103,18 +96,6 @@ public class TestUtils {
return file;
}
/**
* Convert EndPoint to String
*/
public static String endpointToString(Endpoint endPoint) {
String host = endPoint.host();
int port = endPoint.port();
ListenerName listenerName = ListenerName.normalised(endPoint.listener());
String hostport = (host == null) ? (":" + port) : Utils.formatAddress(host, port);
return listenerName.value() + "://" + hostport;
}
/**
* uses default value of 15 seconds for timeout
*/
@ -164,51 +145,4 @@ public class TestUtils {
String conditionDetails) throws InterruptedException {
waitForCondition(testCondition, maxWaitMs, () -> conditionDetails);
}
public static int waitUntilLeaderIsElectedOrChangedWithAdmin(Admin admin,
String topic,
int partitionNumber,
long timeoutMs) throws Exception {
BiFunction<String, Integer, Optional<Integer>> getPartitionLeader = (t, p) -> {
try {
return Optional.ofNullable(getLeaderFromAdmin(admin, t, p));
} catch (Exception e) {
throw new RuntimeException(e);
}
};
return doWaitUntilLeaderIsElectedOrChanged(getPartitionLeader, topic, partitionNumber, timeoutMs);
}
private static Integer getLeaderFromAdmin(Admin admin, String topic, int partition) throws Exception {
TopicDescription topicDescription = admin.describeTopics(List.of(topic)).allTopicNames().get().get(topic);
return topicDescription.partitions().stream()
.filter(partitionInfo -> partitionInfo.partition() == partition)
.findFirst()
.map(partitionInfo -> partitionInfo.leader().id() == Node.noNode().id() ? null : partitionInfo.leader().id())
.orElse(null);
}
private static int doWaitUntilLeaderIsElectedOrChanged(BiFunction<String, Integer, Optional<Integer>> getPartitionLeader,
String topic,
int partition,
long timeoutMs) throws Exception {
long startTime = System.currentTimeMillis();
TopicPartition topicPartition = new TopicPartition(topic, partition);
Optional<Integer> electedLeader = Optional.empty();
while (electedLeader.isEmpty() && System.currentTimeMillis() < startTime + timeoutMs) {
Optional<Integer> leader = getPartitionLeader.apply(topic, partition);
if (leader.isPresent()) {
log.trace("Leader {} is elected for partition {}", leader.get(), topicPartition);
electedLeader = leader;
} else {
log.trace("Leader for partition {} is not elected yet", topicPartition);
}
Thread.sleep(Math.min(timeoutMs, 100L));
}
Optional<Integer> finalLeader = electedLeader;
return electedLeader.orElseThrow(() -> new AssertionError("Timing out after " + timeoutMs
+ " ms since a leader was not elected for partition " + topicPartition + ", leader is " + finalLeader));
}
}

View File

@ -17,7 +17,6 @@
package org.apache.kafka.common.test.junit;
import org.apache.kafka.common.test.ClusterInstance;
import org.junit.jupiter.api.TestTemplate;

View File

@ -25,7 +25,6 @@ import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.test.ClusterInstance;
import org.apache.kafka.common.test.KafkaClusterTestKit;
import org.apache.kafka.common.test.TestKitNodes;
import org.apache.kafka.common.test.TestUtils;
import org.apache.kafka.common.test.api.ClusterConfig;
import org.apache.kafka.common.test.api.Type;
import org.apache.kafka.common.utils.Utils;
@ -50,6 +49,7 @@ import java.util.Optional;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@ -71,6 +71,35 @@ public class RaftClusterInvocationContext implements TestTemplateInvocationConte
private final ClusterConfig clusterConfig;
private final boolean isCombined;
/**
* Wait for condition to be met for at most 15 seconds and throw assertion failure otherwise.
* This should be used instead of {@code Thread.sleep} whenever possible as it allows a longer timeout to be used
* without unnecessarily increasing test time (as the condition is checked frequently). The longer timeout is needed to
* avoid transient failures due to slow or overloaded machines.
*/
static void waitForCondition(final java.util.function.Supplier<Boolean> testCondition,
final String conditionDetails) throws InterruptedException {
var maxWaitMs = 15_000L;
long endTime = System.currentTimeMillis() + maxWaitMs;
while (System.currentTimeMillis() < endTime) {
try {
if (testCondition.get()) {
return;
}
} catch (Exception e) {
if (System.currentTimeMillis() >= endTime) {
throw new AssertionError(String.format("Assertion failed with an exception after %s ms", maxWaitMs), e);
}
}
if (System.currentTimeMillis() < endTime) {
TimeUnit.MILLISECONDS.sleep(100);
}
}
throw new AssertionError("Condition not met: " + conditionDetails);
}
public RaftClusterInvocationContext(String baseDisplayName, ClusterConfig clusterConfig, boolean isCombined) {
this.baseDisplayName = baseDisplayName;
this.clusterConfig = clusterConfig;
@ -180,7 +209,7 @@ public class RaftClusterInvocationContext implements TestTemplateInvocationConte
format();
if (started.compareAndSet(false, true)) {
clusterTestKit.startup();
TestUtils.waitForCondition(
waitForCondition(
() -> this.clusterTestKit.brokers().values().stream().allMatch(
brokers -> brokers.brokerState() == BrokerState.RUNNING
), "Broker never made it to RUNNING state.");

View File

@ -44,7 +44,6 @@ import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.test.ClusterInstance;
import org.apache.kafka.common.test.JaasUtils;
import org.apache.kafka.common.test.TestUtils;
import org.apache.kafka.common.test.api.AutoStart;
import org.apache.kafka.common.test.api.ClusterConfig;
import org.apache.kafka.common.test.api.ClusterConfigProperty;
@ -302,7 +301,7 @@ public class ClusterTestExtensionsTest {
producer.flush();
consumer.subscribe(List.of(topic));
List<ConsumerRecord<String, String>> records = new ArrayList<>();
TestUtils.waitForCondition(() -> {
RaftClusterInvocationContext.waitForCondition(() -> {
consumer.poll(Duration.ofMillis(100)).forEach(records::add);
return records.size() == 1;
}, "Failed to receive message");
@ -330,7 +329,7 @@ public class ClusterTestExtensionsTest {
producer.flush();
consumer.subscribe(List.of(topic));
List<ConsumerRecord<byte[], byte[]>> records = new ArrayList<>();
TestUtils.waitForCondition(() -> {
RaftClusterInvocationContext.waitForCondition(() -> {
consumer.poll(Duration.ofMillis(100)).forEach(records::add);
return records.size() == 1;
}, "Failed to receive message");
@ -407,7 +406,7 @@ public class ClusterTestExtensionsTest {
}
try (Consumer<byte[], byte[]> consumer = clusterInstance.consumer()) {
consumer.subscribe(List.of(topic));
TestUtils.waitForCondition(() -> {
RaftClusterInvocationContext.waitForCondition(() -> {
ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofMillis(100));
return records.count() == 1;
}, "Failed to receive message");
@ -438,7 +437,7 @@ public class ClusterTestExtensionsTest {
try (Consumer<byte[], byte[]> consumer = clusterInstance.consumer(nonAdminConfig)) {
consumer.subscribe(List.of(topic));
AtomicBoolean hasException = new AtomicBoolean(false);
TestUtils.waitForCondition(() -> {
RaftClusterInvocationContext.waitForCondition(() -> {
if (hasException.get()) {
return true;
}
@ -476,7 +475,7 @@ public class ClusterTestExtensionsTest {
try (Consumer<byte[], byte[]> consumer = clusterInstance.consumer(unknownUserConfig)) {
consumer.subscribe(List.of(topic));
AtomicBoolean hasException = new AtomicBoolean(false);
TestUtils.waitForCondition(() -> {
RaftClusterInvocationContext.waitForCondition(() -> {
if (hasException.get()) {
return true;
}

View File

@ -20,11 +20,11 @@ import org.apache.kafka.common.errors.CoordinatorLoadInProgressException;
import org.apache.kafka.common.message.AllocateProducerIdsResponseData;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AllocateProducerIdsResponse;
import org.apache.kafka.common.test.TestUtils;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.common.NodeToControllerChannelManager;
import org.apache.kafka.server.common.ProducerIdsBlock;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;