MINOR: add startup timeouts to KRaft integration tests (#13153)

When running junit tests, it is not good to block forever on CompletableFuture objects.  When there
are bugs, this can lead to junit tests hanging forever. Jenkins does not deal with this well -- it
often brings down the whole multi-hour test run.  Therefore, when running integration tests in
JUnit, set some reasonable time limits on broker and controller startup time.

Reviewers: Jason Gustafson <jason@confluent.io>
This commit is contained in:
Colin Patrick McCabe 2023-01-30 11:29:30 -08:00 committed by GitHub
parent 17559d581e
commit eb7d5cbf15
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 386 additions and 43 deletions

View File

@ -17,7 +17,10 @@
package org.apache.kafka.common.utils; package org.apache.kafka.common.utils;
import java.time.Duration; import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Supplier; import java.util.function.Supplier;
/** /**
@ -86,4 +89,30 @@ public interface Time {
return timer(timeout.toMillis()); return timer(timeout.toMillis());
} }
/**
* Wait for a future to complete, or time out.
*
* @param future The future to wait for.
* @param deadlineNs The time in the future, in monotonic nanoseconds, to time out.
* @return The result of the future.
* @param <T> The type of the future.
*/
default <T> T waitForFuture(
CompletableFuture<T> future,
long deadlineNs
) throws TimeoutException, InterruptedException, ExecutionException {
TimeoutException timeoutException = null;
while (true) {
long nowNs = nanoseconds();
if (deadlineNs <= nowNs) {
throw (timeoutException == null) ? new TimeoutException() : timeoutException;
}
long deltaNs = deadlineNs - nowNs;
try {
return future.get(deltaNs, TimeUnit.NANOSECONDS);
} catch (TimeoutException t) {
timeoutException = t;
}
}
}
} }

View File

@ -47,7 +47,7 @@ import org.apache.kafka.server.common.ApiMessageAndVersion
import org.apache.kafka.server.log.internals.LogDirFailureChannel import org.apache.kafka.server.log.internals.LogDirFailureChannel
import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig
import org.apache.kafka.server.metrics.KafkaYammerMetrics import org.apache.kafka.server.metrics.KafkaYammerMetrics
import org.apache.kafka.server.util.KafkaScheduler import org.apache.kafka.server.util.{Deadline, FutureUtils, KafkaScheduler}
import org.apache.kafka.snapshot.SnapshotWriter import org.apache.kafka.snapshot.SnapshotWriter
import java.net.InetAddress import java.net.InetAddress
@ -179,6 +179,7 @@ class BrokerServer(
override def startup(): Unit = { override def startup(): Unit = {
if (!maybeChangeStatus(SHUTDOWN, STARTING)) return if (!maybeChangeStatus(SHUTDOWN, STARTING)) return
val startupDeadline = Deadline.fromDelay(time, config.serverMaxStartupTimeMs, TimeUnit.MILLISECONDS)
try { try {
sharedServer.startForBroker() sharedServer.startForBroker()
@ -216,7 +217,10 @@ class BrokerServer(
tokenCache = new DelegationTokenCache(ScramMechanism.mechanismNames) tokenCache = new DelegationTokenCache(ScramMechanism.mechanismNames)
credentialProvider = new CredentialProvider(ScramMechanism.mechanismNames, tokenCache) credentialProvider = new CredentialProvider(ScramMechanism.mechanismNames, tokenCache)
val controllerNodes = RaftConfig.voterConnectionsToNodes(sharedServer.controllerQuorumVotersFuture.get()).asScala val voterConnections = FutureUtils.waitWithLogging(logger.underlying,
"controller quorum voters future", sharedServer.controllerQuorumVotersFuture,
startupDeadline, time)
val controllerNodes = RaftConfig.voterConnectionsToNodes(voterConnections).asScala
val controllerNodeProvider = RaftControllerNodeProvider(raftManager, config, controllerNodes) val controllerNodeProvider = RaftControllerNodeProvider(raftManager, config, controllerNodes)
clientToControllerChannelManager = BrokerToControllerChannelManager( clientToControllerChannelManager = BrokerToControllerChannelManager(
@ -436,13 +440,8 @@ class BrokerServer(
config.numIoThreads, s"${DataPlaneAcceptor.MetricPrefix}RequestHandlerAvgIdlePercent", config.numIoThreads, s"${DataPlaneAcceptor.MetricPrefix}RequestHandlerAvgIdlePercent",
DataPlaneAcceptor.ThreadPrefix) DataPlaneAcceptor.ThreadPrefix)
info("Waiting for broker metadata to catch up.") FutureUtils.waitWithLogging(logger.underlying, "broker metadata to catch up",
try { lifecycleManager.initialCatchUpFuture, startupDeadline, time)
lifecycleManager.initialCatchUpFuture.get()
} catch {
case t: Throwable => throw new RuntimeException("Received a fatal error while " +
"waiting for the broker to catch up with the current cluster metadata.", t)
}
// Apply the metadata log changes that we've accumulated. // Apply the metadata log changes that we've accumulated.
metadataPublisher = new BrokerMetadataPublisher(config, metadataPublisher = new BrokerMetadataPublisher(config,
@ -465,12 +464,9 @@ class BrokerServer(
// publish operation to complete. This first operation will initialize logManager, // publish operation to complete. This first operation will initialize logManager,
// replicaManager, groupCoordinator, and txnCoordinator. The log manager may perform // replicaManager, groupCoordinator, and txnCoordinator. The log manager may perform
// a potentially lengthy recovery-from-unclean-shutdown operation here, if required. // a potentially lengthy recovery-from-unclean-shutdown operation here, if required.
try { FutureUtils.waitWithLogging(logger.underlying,
metadataListener.startPublishing(metadataPublisher).get() "the broker to catch up with the current cluster metadata",
} catch { metadataListener.startPublishing(metadataPublisher), startupDeadline, time)
case t: Throwable => throw new RuntimeException("Received a fatal error while " +
"waiting for the broker to catch up with the current cluster metadata.", t)
}
// Log static broker configurations. // Log static broker configurations.
new KafkaConfig(config.originals(), true) new KafkaConfig(config.originals(), true)
@ -492,20 +488,12 @@ class BrokerServer(
// We're now ready to unfence the broker. This also allows this broker to transition // We're now ready to unfence the broker. This also allows this broker to transition
// from RECOVERY state to RUNNING state, once the controller unfences the broker. // from RECOVERY state to RUNNING state, once the controller unfences the broker.
try { FutureUtils.waitWithLogging(logger.underlying, "the broker to be unfenced",
lifecycleManager.setReadyToUnfence().get() lifecycleManager.setReadyToUnfence(), startupDeadline, time)
} catch {
case t: Throwable => throw new RuntimeException("Received a fatal error while " +
"waiting for the broker to be unfenced.", t)
}
// Block here until all the authorizer futures are complete // Block here until all the authorizer futures are complete
try { FutureUtils.waitWithLogging(logger.underlying, "all of the authorizer futures to be completed",
CompletableFuture.allOf(authorizerFutures.values.toSeq: _*).join() CompletableFuture.allOf(authorizerFutures.values.toSeq: _*), startupDeadline, time)
} catch {
case t: Throwable => throw new RuntimeException("Received a fatal error while " +
"waiting for all of the authorizer futures to be completed.", t)
}
maybeChangeStatus(STARTING, STARTED) maybeChangeStatus(STARTING, STARTED)
} catch { } catch {

View File

@ -44,6 +44,7 @@ import org.apache.kafka.server.authorizer.Authorizer
import org.apache.kafka.server.common.ApiMessageAndVersion import org.apache.kafka.server.common.ApiMessageAndVersion
import org.apache.kafka.server.metrics.KafkaYammerMetrics import org.apache.kafka.server.metrics.KafkaYammerMetrics
import org.apache.kafka.server.policy.{AlterConfigPolicy, CreateTopicPolicy} import org.apache.kafka.server.policy.{AlterConfigPolicy, CreateTopicPolicy}
import org.apache.kafka.server.util.{Deadline, FutureUtils}
import java.util.OptionalLong import java.util.OptionalLong
import java.util.concurrent.locks.ReentrantLock import java.util.concurrent.locks.ReentrantLock
@ -128,6 +129,7 @@ class ControllerServer(
def startup(): Unit = { def startup(): Unit = {
if (!maybeChangeStatus(SHUTDOWN, STARTING)) return if (!maybeChangeStatus(SHUTDOWN, STARTING)) return
val startupDeadline = Deadline.fromDelay(time, config.serverMaxStartupTimeMs, TimeUnit.MILLISECONDS)
try { try {
info("Starting controller") info("Starting controller")
config.dynamicConfig.initialize(zkClientOpt = None) config.dynamicConfig.initialize(zkClientOpt = None)
@ -192,7 +194,11 @@ class ControllerServer(
alterConfigPolicy = Option(config. alterConfigPolicy = Option(config.
getConfiguredInstance(AlterConfigPolicyClassNameProp, classOf[AlterConfigPolicy])) getConfiguredInstance(AlterConfigPolicyClassNameProp, classOf[AlterConfigPolicy]))
val controllerNodes = RaftConfig.voterConnectionsToNodes(sharedServer.controllerQuorumVotersFuture.get()) val voterConnections = FutureUtils.waitWithLogging(logger.underlying,
"controller quorum voters future",
sharedServer.controllerQuorumVotersFuture,
startupDeadline, time)
val controllerNodes = RaftConfig.voterConnectionsToNodes(voterConnections)
val quorumFeatures = QuorumFeatures.create(config.nodeId, val quorumFeatures = QuorumFeatures.create(config.nodeId,
sharedServer.raftManager.apiVersions, sharedServer.raftManager.apiVersions,
QuorumFeatures.defaultFeatureMap(), QuorumFeatures.defaultFeatureMap(),
@ -291,13 +297,10 @@ class ControllerServer(
* and KIP-801 for details. * and KIP-801 for details.
*/ */
socketServer.enableRequestProcessing(authorizerFutures) socketServer.enableRequestProcessing(authorizerFutures)
// Block here until all the authorizer futures are complete // Block here until all the authorizer futures are complete
try { FutureUtils.waitWithLogging(logger.underlying, "all of the authorizer futures to be completed",
CompletableFuture.allOf(authorizerFutures.values.toSeq: _*).join() CompletableFuture.allOf(authorizerFutures.values.toSeq: _*), startupDeadline, time)
} catch {
case t: Throwable => throw new RuntimeException("Received a fatal error while " +
"waiting for all of the authorizer futures to be completed.", t)
}
} catch { } catch {
case e: Throwable => case e: Throwable =>
maybeChangeStatus(STARTING, STARTED) maybeChangeStatus(STARTING, STARTED)

View File

@ -89,6 +89,7 @@ object Defaults {
/** KRaft mode configs */ /** KRaft mode configs */
val EmptyNodeId: Int = -1 val EmptyNodeId: Int = -1
val ServerMaxStartupTimeMs = Long.MaxValue
/************* Authorizer Configuration ***********/ /************* Authorizer Configuration ***********/
val AuthorizerClassName = "" val AuthorizerClassName = ""
@ -376,6 +377,7 @@ object KafkaConfig {
val MetadataMaxRetentionMillisProp = "metadata.max.retention.ms" val MetadataMaxRetentionMillisProp = "metadata.max.retention.ms"
val QuorumVotersProp = RaftConfig.QUORUM_VOTERS_CONFIG val QuorumVotersProp = RaftConfig.QUORUM_VOTERS_CONFIG
val MetadataMaxIdleIntervalMsProp = "metadata.max.idle.interval.ms" val MetadataMaxIdleIntervalMsProp = "metadata.max.idle.interval.ms"
val ServerMaxStartupTimeMsProp = "server.max.startup.time.ms"
/** ZK to KRaft Migration configs */ /** ZK to KRaft Migration configs */
val MigrationEnabledProp = "zookeeper.metadata.migration.enable" val MigrationEnabledProp = "zookeeper.metadata.migration.enable"
@ -713,6 +715,8 @@ object KafkaConfig {
val SaslMechanismControllerProtocolDoc = "SASL mechanism used for communication with controllers. Default is GSSAPI." val SaslMechanismControllerProtocolDoc = "SASL mechanism used for communication with controllers. Default is GSSAPI."
val MetadataLogSegmentBytesDoc = "The maximum size of a single metadata log file." val MetadataLogSegmentBytesDoc = "The maximum size of a single metadata log file."
val MetadataLogSegmentMinBytesDoc = "Override the minimum size for a single metadata log file. This should be used for testing only." val MetadataLogSegmentMinBytesDoc = "Override the minimum size for a single metadata log file. This should be used for testing only."
val ServerMaxStartupTimeMsDoc = "The maximum number of milliseconds we will wait for the server to come up. " +
"By default there is no limit. This should be used for testing only."
val MetadataLogSegmentMillisDoc = "The maximum time before a new metadata log file is rolled out (in milliseconds)." val MetadataLogSegmentMillisDoc = "The maximum time before a new metadata log file is rolled out (in milliseconds)."
val MetadataMaxRetentionBytesDoc = "The maximum combined size of the metadata log and snapshots before deleting old " + val MetadataMaxRetentionBytesDoc = "The maximum combined size of the metadata log and snapshots before deleting old " +
@ -1086,7 +1090,7 @@ object KafkaConfig {
val PasswordEncoderIterationsDoc = "The iteration count used for encoding dynamically configured passwords." val PasswordEncoderIterationsDoc = "The iteration count used for encoding dynamically configured passwords."
@nowarn("cat=deprecation") @nowarn("cat=deprecation")
private[server] val configDef = { val configDef = {
import ConfigDef.Importance._ import ConfigDef.Importance._
import ConfigDef.Range._ import ConfigDef.Range._
import ConfigDef.Type._ import ConfigDef.Type._
@ -1135,10 +1139,6 @@ object KafkaConfig {
*/ */
.define(MetadataSnapshotMaxNewRecordBytesProp, LONG, Defaults.MetadataSnapshotMaxNewRecordBytes, atLeast(1), HIGH, MetadataSnapshotMaxNewRecordBytesDoc) .define(MetadataSnapshotMaxNewRecordBytesProp, LONG, Defaults.MetadataSnapshotMaxNewRecordBytes, atLeast(1), HIGH, MetadataSnapshotMaxNewRecordBytesDoc)
.define(MetadataSnapshotMaxIntervalMsProp, LONG, Defaults.MetadataSnapshotMaxIntervalMs, atLeast(0), HIGH, MetadataSnapshotMaxIntervalMsDoc) .define(MetadataSnapshotMaxIntervalMsProp, LONG, Defaults.MetadataSnapshotMaxIntervalMs, atLeast(0), HIGH, MetadataSnapshotMaxIntervalMsDoc)
/*
* KRaft mode private configs. Note that these configs are defined as internal. We will make them public in the 3.0.0 release.
*/
.define(ProcessRolesProp, LIST, Collections.emptyList(), ValidList.in("broker", "controller"), HIGH, ProcessRolesDoc) .define(ProcessRolesProp, LIST, Collections.emptyList(), ValidList.in("broker", "controller"), HIGH, ProcessRolesDoc)
.define(NodeIdProp, INT, Defaults.EmptyNodeId, null, HIGH, NodeIdDoc) .define(NodeIdProp, INT, Defaults.EmptyNodeId, null, HIGH, NodeIdDoc)
.define(InitialBrokerRegistrationTimeoutMsProp, INT, Defaults.InitialBrokerRegistrationTimeoutMs, null, MEDIUM, InitialBrokerRegistrationTimeoutMsDoc) .define(InitialBrokerRegistrationTimeoutMsProp, INT, Defaults.InitialBrokerRegistrationTimeoutMs, null, MEDIUM, InitialBrokerRegistrationTimeoutMsDoc)
@ -1153,6 +1153,7 @@ object KafkaConfig {
.define(MetadataMaxRetentionBytesProp, LONG, Defaults.MetadataMaxRetentionBytes, null, HIGH, MetadataMaxRetentionBytesDoc) .define(MetadataMaxRetentionBytesProp, LONG, Defaults.MetadataMaxRetentionBytes, null, HIGH, MetadataMaxRetentionBytesDoc)
.define(MetadataMaxRetentionMillisProp, LONG, LogConfig.DEFAULT_RETENTION_MS, null, HIGH, MetadataMaxRetentionMillisDoc) .define(MetadataMaxRetentionMillisProp, LONG, LogConfig.DEFAULT_RETENTION_MS, null, HIGH, MetadataMaxRetentionMillisDoc)
.define(MetadataMaxIdleIntervalMsProp, INT, Defaults.MetadataMaxIdleIntervalMs, atLeast(0), LOW, MetadataMaxIdleIntervalMsDoc) .define(MetadataMaxIdleIntervalMsProp, INT, Defaults.MetadataMaxIdleIntervalMs, atLeast(0), LOW, MetadataMaxIdleIntervalMsDoc)
.defineInternal(ServerMaxStartupTimeMsProp, LONG, Defaults.ServerMaxStartupTimeMs, atLeast(0), MEDIUM, ServerMaxStartupTimeMsDoc)
.define(MigrationEnabledProp, BOOLEAN, false, HIGH, "Enable ZK to KRaft migration") .define(MigrationEnabledProp, BOOLEAN, false, HIGH, "Enable ZK to KRaft migration")
/************* Authorizer Configuration ***********/ /************* Authorizer Configuration ***********/
@ -1660,6 +1661,7 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
def metadataLogSegmentMillis = getLong(KafkaConfig.MetadataLogSegmentMillisProp) def metadataLogSegmentMillis = getLong(KafkaConfig.MetadataLogSegmentMillisProp)
def metadataRetentionBytes = getLong(KafkaConfig.MetadataMaxRetentionBytesProp) def metadataRetentionBytes = getLong(KafkaConfig.MetadataMaxRetentionBytesProp)
def metadataRetentionMillis = getLong(KafkaConfig.MetadataMaxRetentionMillisProp) def metadataRetentionMillis = getLong(KafkaConfig.MetadataMaxRetentionMillisProp)
val serverMaxStartupTimeMs = getLong(KafkaConfig.ServerMaxStartupTimeMsProp)
def numNetworkThreads = getInt(KafkaConfig.NumNetworkThreadsProp) def numNetworkThreads = getInt(KafkaConfig.NumNetworkThreadsProp)
def backgroundThreads = getInt(KafkaConfig.BackgroundThreadsProp) def backgroundThreads = getInt(KafkaConfig.BackgroundThreadsProp)

View File

@ -155,6 +155,8 @@ public class KafkaClusterTestKit implements AutoCloseable {
ControllerNode controllerNode = nodes.controllerNodes().get(node.id()); ControllerNode controllerNode = nodes.controllerNodes().get(node.id());
Map<String, String> props = new HashMap<>(configProps); Map<String, String> props = new HashMap<>(configProps);
props.put(KafkaConfig$.MODULE$.ServerMaxStartupTimeMsProp(),
Long.toString(TimeUnit.MINUTES.toMillis(10)));
props.put(KafkaConfig$.MODULE$.ProcessRolesProp(), roles(node.id())); props.put(KafkaConfig$.MODULE$.ProcessRolesProp(), roles(node.id()));
props.put(KafkaConfig$.MODULE$.NodeIdProp(), props.put(KafkaConfig$.MODULE$.NodeIdProp(),
Integer.toString(node.id())); Integer.toString(node.id()));

View File

@ -411,7 +411,17 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
assertFalse(maxMessageBytes2.isSensitive) assertFalse(maxMessageBytes2.isSensitive)
assertFalse(maxMessageBytes2.isReadOnly) assertFalse(maxMessageBytes2.isReadOnly)
assertEquals(brokers(1).config.nonInternalValues.size, configs.get(brokerResource1).entries.size) // Find the number of internal configs that we have explicitly set in the broker config.
// These will appear when we describe the broker configuration. Other internal configs,
// that we have not set, will not appear there.
val numInternalConfigsSet = brokers.head.config.originals.keySet().asScala.count(k => {
Option(KafkaConfig.configDef.configKeys().get(k)) match {
case None => false
case Some(configDef) => configDef.internalConfig
}
})
assertEquals(brokers(1).config.nonInternalValues.size + numInternalConfigsSet,
configs.get(brokerResource1).entries.size)
assertEquals(brokers(1).config.brokerId.toString, configs.get(brokerResource1).get(KafkaConfig.BrokerIdProp).value) assertEquals(brokers(1).config.brokerId.toString, configs.get(brokerResource1).get(KafkaConfig.BrokerIdProp).value)
val listenerSecurityProtocolMap = configs.get(brokerResource1).get(KafkaConfig.ListenerSecurityProtocolMapProp) val listenerSecurityProtocolMap = configs.get(brokerResource1).get(KafkaConfig.ListenerSecurityProtocolMapProp)
assertEquals(brokers(1).config.getString(KafkaConfig.ListenerSecurityProtocolMapProp), listenerSecurityProtocolMap.value) assertEquals(brokers(1).config.getString(KafkaConfig.ListenerSecurityProtocolMapProp), listenerSecurityProtocolMap.value)
@ -432,7 +442,8 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
assertFalse(compressionType.isSensitive) assertFalse(compressionType.isSensitive)
assertFalse(compressionType.isReadOnly) assertFalse(compressionType.isReadOnly)
assertEquals(brokers(2).config.nonInternalValues.size, configs.get(brokerResource2).entries.size) assertEquals(brokers(2).config.nonInternalValues.size + numInternalConfigsSet,
configs.get(brokerResource2).entries.size)
assertEquals(brokers(2).config.brokerId.toString, configs.get(brokerResource2).get(KafkaConfig.BrokerIdProp).value) assertEquals(brokers(2).config.brokerId.toString, configs.get(brokerResource2).get(KafkaConfig.BrokerIdProp).value)
assertEquals(brokers(2).config.logCleanerThreads.toString, assertEquals(brokers(2).config.logCleanerThreads.toString,
configs.get(brokerResource2).get(KafkaConfig.LogCleanerThreadsProp).value) configs.get(brokerResource2).get(KafkaConfig.LogCleanerThreadsProp).value)

View File

@ -21,7 +21,7 @@ import java.io.{ByteArrayOutputStream, File, PrintStream}
import java.net.InetSocketAddress import java.net.InetSocketAddress
import java.util import java.util
import java.util.{Collections, Properties} import java.util.{Collections, Properties}
import java.util.concurrent.CompletableFuture import java.util.concurrent.{CompletableFuture, TimeUnit}
import javax.security.auth.login.Configuration import javax.security.auth.login.Configuration
import kafka.tools.StorageTool import kafka.tools.StorageTool
import kafka.utils.{CoreUtils, Logging, TestInfoUtils, TestUtils} import kafka.utils.{CoreUtils, Logging, TestInfoUtils, TestUtils}
@ -295,6 +295,7 @@ abstract class QuorumTestHarness extends Logging {
throw new RuntimeException("Only one KRaft controller is supported for now.") throw new RuntimeException("Only one KRaft controller is supported for now.")
} }
val props = propsList(0) val props = propsList(0)
props.setProperty(KafkaConfig.ServerMaxStartupTimeMsProp, TimeUnit.MINUTES.toMillis(10).toString)
props.setProperty(KafkaConfig.ProcessRolesProp, "controller") props.setProperty(KafkaConfig.ProcessRolesProp, "controller")
if (props.getProperty(KafkaConfig.NodeIdProp) == null) { if (props.getProperty(KafkaConfig.NodeIdProp) == null) {
props.setProperty(KafkaConfig.NodeIdProp, "1000") props.setProperty(KafkaConfig.NodeIdProp, "1000")

View File

@ -306,6 +306,7 @@ object TestUtils extends Logging {
val props = new Properties val props = new Properties
if (zkConnect == null) { if (zkConnect == null) {
props.setProperty(KafkaConfig.ServerMaxStartupTimeMsProp, TimeUnit.MINUTES.toMillis(10).toString)
props.put(KafkaConfig.NodeIdProp, nodeId.toString) props.put(KafkaConfig.NodeIdProp, nodeId.toString)
props.put(KafkaConfig.BrokerIdProp, nodeId.toString) props.put(KafkaConfig.BrokerIdProp, nodeId.toString)
props.put(KafkaConfig.AdvertisedListenersProp, listeners) props.put(KafkaConfig.AdvertisedListenersProp, listeners)

View File

@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server.util;
import org.apache.kafka.common.utils.Time;
import java.math.BigInteger;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
public class Deadline {
private final long nanoseconds;
public static Deadline fromMonotonicNanoseconds(
long nanoseconds
) {
return new Deadline(nanoseconds);
}
public static Deadline fromDelay(
Time time,
long delay,
TimeUnit timeUnit
) {
if (delay < 0) {
throw new RuntimeException("Negative delays are not allowed.");
}
long nowNs = time.nanoseconds();
BigInteger deadlineNs = BigInteger.valueOf(nowNs).
add(BigInteger.valueOf(timeUnit.toNanos(delay)));
if (deadlineNs.compareTo(BigInteger.valueOf(Long.MAX_VALUE)) >= 0) {
return new Deadline(Long.MAX_VALUE);
} else {
return new Deadline(deadlineNs.longValue());
}
}
private Deadline(long nanoseconds) {
this.nanoseconds = nanoseconds;
}
public long nanoseconds() {
return nanoseconds;
}
@Override
public int hashCode() {
return Objects.hash(nanoseconds);
}
@Override
public boolean equals(Object o) {
if (o == null || !(o.getClass().equals(this.getClass()))) return false;
Deadline other = (Deadline) o;
return nanoseconds == other.nanoseconds;
}
}

View File

@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server.util;
import org.apache.kafka.common.utils.Time;
import org.slf4j.Logger;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
public class FutureUtils {
/**
* Wait for a future until a specific time in the future, with copious logging.
*
* @param log The slf4j object to use to log success and failure.
* @param action The action we are waiting for.
* @param future The future we are waiting for.
* @param deadline The deadline in the future we are waiting for.
* @param time The clock object.
*
* @return The result of the future.
* @param <T> The type of the future.
*
* @throws java.util.concurrent.TimeoutException If the future times out.
* @throws Throwable If the future fails. Note: we unwrap ExecutionException here.
*/
public static <T> T waitWithLogging(
Logger log,
String action,
CompletableFuture<T> future,
Deadline deadline,
Time time
) throws Throwable {
log.info("Waiting for {}", action);
try {
T result = time.waitForFuture(future, deadline.nanoseconds());
log.info("Finished waiting for {}", action);
return result;
} catch (TimeoutException t) {
log.error("Timed out while waiting for {}", action, t);
TimeoutException timeout = new TimeoutException("Timed out while waiting for " + action);
timeout.setStackTrace(t.getStackTrace());
throw timeout;
} catch (Throwable t) {
if (t instanceof ExecutionException) {
ExecutionException executionException = (ExecutionException) t;
t = executionException.getCause();
}
log.error("Received a fatal error while waiting for {}", action, t);
throw new RuntimeException("Received a fatal error while waiting for " + action, t);
}
}
}

View File

@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server.util;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static java.util.concurrent.TimeUnit.HOURS;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
@Timeout(value = 120)
public class DeadlineTest {
private static final Logger log = LoggerFactory.getLogger(FutureUtilsTest.class);
private static Time monoTime(long monotonicTime) {
return new MockTime(0, 0, monotonicTime);
}
@Test
public void testOneMillisecondDeadline() {
assertEquals(MILLISECONDS.toNanos(1),
Deadline.fromDelay(monoTime(0), 1, MILLISECONDS).nanoseconds());
}
@Test
public void testOneMillisecondDeadlineWithBase() {
final long nowNs = 123456789L;
assertEquals(nowNs + MILLISECONDS.toNanos(1),
Deadline.fromDelay(monoTime(nowNs), 1, MILLISECONDS).nanoseconds());
}
@Test
public void testNegativeDelayFails() {
assertEquals("Negative delays are not allowed.",
assertThrows(RuntimeException.class,
() -> Deadline.fromDelay(monoTime(123456789L), -1L, MILLISECONDS)).
getMessage());
}
@Test
public void testMaximumDelay() {
assertEquals(Long.MAX_VALUE,
Deadline.fromDelay(monoTime(123L), Long.MAX_VALUE, HOURS).nanoseconds());
assertEquals(Long.MAX_VALUE,
Deadline.fromDelay(monoTime(0), Long.MAX_VALUE / 2, MILLISECONDS).nanoseconds());
assertEquals(Long.MAX_VALUE,
Deadline.fromDelay(monoTime(Long.MAX_VALUE), Long.MAX_VALUE, NANOSECONDS).nanoseconds());
}
}

View File

@ -0,0 +1,93 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server.util;
import org.apache.kafka.common.utils.Time;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
@Timeout(value = 120)
public class FutureUtilsTest {
private static final Logger log = LoggerFactory.getLogger(FutureUtilsTest.class);
@Test
public void testWaitWithLogging() throws Throwable {
ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1);
CompletableFuture<Integer> future = new CompletableFuture<>();
executorService.schedule(() -> future.complete(123), 1000, TimeUnit.NANOSECONDS);
assertEquals(123, FutureUtils.waitWithLogging(log,
"the future to be completed",
future,
Deadline.fromDelay(Time.SYSTEM, 30, TimeUnit.SECONDS),
Time.SYSTEM));
executorService.shutdownNow();
executorService.awaitTermination(1, TimeUnit.MINUTES);
}
@ParameterizedTest
@ValueSource(booleans = {false, true})
public void testWaitWithLoggingTimeout(boolean immediateTimeout) throws Throwable {
ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1);
CompletableFuture<Integer> future = new CompletableFuture<>();
executorService.schedule(() -> future.complete(456), 10000, TimeUnit.MILLISECONDS);
assertThrows(TimeoutException.class, () -> {
FutureUtils.waitWithLogging(log,
"the future to be completed",
future,
immediateTimeout ?
Deadline.fromDelay(Time.SYSTEM, 0, TimeUnit.SECONDS) :
Deadline.fromDelay(Time.SYSTEM, 1, TimeUnit.MILLISECONDS),
Time.SYSTEM);
});
executorService.shutdownNow();
executorService.awaitTermination(1, TimeUnit.MINUTES);
}
@Test
public void testWaitWithLoggingError() throws Throwable {
ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1);
CompletableFuture<Integer> future = new CompletableFuture<>();
executorService.schedule(() -> {
future.completeExceptionally(new IllegalArgumentException("uh oh"));
}, 1, TimeUnit.NANOSECONDS);
assertEquals("Received a fatal error while waiting for the future to be completed",
assertThrows(RuntimeException.class, () -> {
FutureUtils.waitWithLogging(log,
"the future to be completed",
future,
Deadline.fromDelay(Time.SYSTEM, 30, TimeUnit.SECONDS),
Time.SYSTEM);
}).getMessage());
executorService.shutdown();
executorService.awaitTermination(1, TimeUnit.MINUTES);
}
}