KAFKA-10571; Replace blackout with backoff for KIP-629

This replaces code and comment occurrences as described in the KIP

Author: Xavier Léauté <xvrl@apache.org>

Reviewers: Gwen Shapira, Mickael Maison

Closes #9366 from xvrl/kafka-10571
This commit is contained in:
Xavier Léauté 2020-10-08 15:54:59 -07:00 committed by Gwen Shapira
parent 5fc3f73f08
commit 4ab72780dd
10 changed files with 34 additions and 34 deletions

View File

@ -948,7 +948,7 @@ public class Fetcher<K, V> implements Closeable {
if (client.isUnavailable(leader)) {
client.maybeThrowAuthFailure(leader);
// The connection has failed and we need to await the blackout period before we can
// The connection has failed and we need to await the backoff period before we can
// try again. No need to request a metadata update since the disconnect will have
// done so already.
log.debug("Leader {} for partition {} is unavailable for fetching offset until reconnect backoff expires",
@ -1173,7 +1173,7 @@ public class Fetcher<K, V> implements Closeable {
if (client.isUnavailable(node)) {
client.maybeThrowAuthFailure(node);
// If we try to send during the reconnect blackout window, then the request is just
// If we try to send during the reconnect backoff window, then the request is just
// going to be failed anyway before being sent, so skip the send for now
log.trace("Skipping fetch for partition {} because node {} is awaiting reconnect backoff", partition, node);
} else if (this.nodesWithPendingFetchRequests.contains(node.id())) {

View File

@ -126,7 +126,7 @@ public class MockClient implements KafkaClient {
return connectionDelay(node, now);
}
public void blackout(Node node, long durationMs) {
public void backoff(Node node, long durationMs) {
connectionState(node.idString()).backoff(time.milliseconds() + durationMs);
}
@ -143,15 +143,15 @@ public class MockClient implements KafkaClient {
connectionState(node.idString()).setReadyDelayed(time.milliseconds() + durationMs);
}
public void authenticationFailed(Node node, long blackoutMs) {
public void authenticationFailed(Node node, long backoffMs) {
pendingAuthenticationErrors.remove(node);
authenticationErrors.put(node, (AuthenticationException) Errors.SASL_AUTHENTICATION_FAILED.exception());
disconnect(node.idString());
blackout(node, blackoutMs);
backoff(node, backoffMs);
}
public void createPendingAuthenticationError(Node node, long blackoutMs) {
pendingAuthenticationErrors.put(node, blackoutMs);
public void createPendingAuthenticationError(Node node, long backoffMs) {
pendingAuthenticationErrors.put(node, backoffMs);
}
@Override
@ -190,12 +190,12 @@ public class MockClient implements KafkaClient {
pendingAuthenticationErrors.entrySet().iterator(); authErrorIter.hasNext(); ) {
Map.Entry<Node, Long> entry = authErrorIter.next();
Node node = entry.getKey();
long blackoutMs = entry.getValue();
long backoffMs = entry.getValue();
if (node.idString().equals(request.destination())) {
authErrorIter.remove();
// Set up a disconnected ClientResponse and create an authentication error
// for the affected node.
authenticationFailed(node, blackoutMs);
authenticationFailed(node, backoffMs);
AbstractRequest.Builder<?> builder = request.requestBuilder();
short version = nodeApiVersions.latestUsableVersion(request.apiKey(), builder.oldestAllowedVersion(),
builder.latestAllowedVersion());

View File

@ -934,7 +934,7 @@ public class NetworkClientTest {
client.connectionFailed(node));
assertFalse(client.canConnect(node, time.milliseconds()));
// ensure disconnect does not reset blackout period if already disconnected
// ensure disconnect does not reset backoff period if already disconnected
time.sleep(reconnectBackoffMaxMsTest);
assertTrue(client.canConnect(node, time.milliseconds()));
client.disconnect(node.idString());

View File

@ -248,9 +248,9 @@ public class AbstractCoordinatorTest {
mockClient.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
mockClient.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
// blackout the coordinator for 10 milliseconds to simulate a disconnect.
// cut out the coordinator for 10 milliseconds to simulate a disconnect.
// after backing off, we should be able to connect.
mockClient.blackout(coordinatorNode, 10L);
mockClient.backoff(coordinatorNode, 10L);
long initialTime = mockTime.milliseconds();
coordinator.ensureCoordinatorReady(mockTime.timer(Long.MAX_VALUE));
@ -976,7 +976,7 @@ public class AbstractCoordinatorTest {
public void testLookupCoordinator() {
setupCoordinator();
mockClient.blackout(node, 50);
mockClient.backoff(node, 50);
RequestFuture<Void> noBrokersAvailableFuture = coordinator.lookupCoordinator();
assertTrue("Failed future expected", noBrokersAvailableFuture.failed());
mockTime.sleep(50);

View File

@ -86,7 +86,7 @@ public class ConsumerNetworkClientTest {
}
@Test
public void sendWithinBlackoutPeriodAfterAuthenticationFailure() {
public void sendWithinBackoffPeriodAfterAuthenticationFailure() {
client.authenticationFailed(node, 300);
client.prepareResponse(heartbeatResponse(Errors.NONE));
final RequestFuture<ClientResponse> future = consumerClient.send(node, heartbeat());
@ -94,7 +94,7 @@ public class ConsumerNetworkClientTest {
assertTrue(future.failed());
assertTrue("Expected only an authentication error.", future.exception() instanceof AuthenticationException);
time.sleep(30); // wait less than the blackout period
time.sleep(30); // wait less than the backoff period
assertTrue(client.connectionFailed(node));
final RequestFuture<ClientResponse> future2 = consumerClient.send(node, heartbeat());

View File

@ -352,7 +352,7 @@ public class FetcherTest {
subscriptions.seek(tp0, 0);
Node node = initialUpdateResponse.brokers().iterator().next();
client.blackout(node, 500);
client.backoff(node, 500);
assertEquals(0, fetcher.sendFetches());
time.sleep(500);
@ -1543,7 +1543,7 @@ public class FetcherTest {
// Check that we skip sending the ListOffset request when the node is blacked out
client.updateMetadata(initialUpdateResponse);
Node node = initialUpdateResponse.brokers().iterator().next();
client.blackout(node, 500);
client.backoff(node, 500);
fetcher.resetOffsetsIfNeeded();
assertEquals(0, consumerClient.pendingRequestCount());
consumerClient.pollNoWakeup();

View File

@ -468,7 +468,7 @@ public class SenderTest {
// Disconnect the target node for the pending produce request. This will ensure that sender will try to
// expire the batch.
client.disconnect(clusterNode.idString());
client.blackout(clusterNode, 100);
client.backoff(clusterNode, 100);
sender.runOnce(); // We should try to flush the batch, but we expire it instead without sending anything.
assertEquals("Callbacks not invoked for expiry", messagesPerBatch, expiryCallbackCount.get());
@ -993,7 +993,7 @@ public class SenderTest {
Node node = metadata.fetch().nodes().get(0);
time.sleep(10000L);
client.disconnect(node.idString());
client.blackout(node, 10);
client.backoff(node, 10);
sender.runOnce();
@ -1031,7 +1031,7 @@ public class SenderTest {
// Note deliveryTimeoutMs is 1500.
time.sleep(600L);
client.disconnect(node.idString());
client.blackout(node, 10);
client.backoff(node, 10);
sender.runOnce(); // now expire the first batch.
assertFutureFailure(request1, TimeoutException.class);
@ -1093,7 +1093,7 @@ public class SenderTest {
Node node = metadata.fetch().nodes().get(0);
time.sleep(1000L);
client.disconnect(node.idString());
client.blackout(node, 10);
client.backoff(node, 10);
sender.runOnce(); // now expire the first batch.
assertFutureFailure(request1, TimeoutException.class);
@ -1149,7 +1149,7 @@ public class SenderTest {
Node node = metadata.fetch().nodes().get(0);
time.sleep(1000L);
client.disconnect(node.idString());
client.blackout(node, 10);
client.backoff(node, 10);
sender.runOnce(); // now expire the first batch.
assertFutureFailure(request1, TimeoutException.class);
@ -1181,7 +1181,7 @@ public class SenderTest {
Node node = metadata.fetch().nodes().get(0);
time.sleep(15000L);
client.disconnect(node.idString());
client.blackout(node, 10);
client.backoff(node, 10);
sender.runOnce(); // now expire the batch.

View File

@ -1011,11 +1011,11 @@ public class TransactionManagerTest {
assertEquals(brokerNode, transactionManager.coordinator(CoordinatorType.TRANSACTION));
client.disconnect(brokerNode.idString());
client.blackout(brokerNode, 100);
client.backoff(brokerNode, 100);
// send pid to coordinator. Should get disconnected before the send and resend the FindCoordinator
// and InitPid requests.
runUntil(() -> transactionManager.coordinator(CoordinatorType.TRANSACTION) == null);
time.sleep(110); // waiting for the blackout period for the node to expire.
time.sleep(110); // waiting for the backoff period for the node to expire.
assertFalse(initPidResult.isCompleted());
assertFalse(transactionManager.hasProducerId());
@ -2358,7 +2358,7 @@ public class TransactionManagerTest {
// expire the batch.
Node clusterNode = metadata.fetch().nodes().get(0);
client.disconnect(clusterNode.idString());
client.blackout(clusterNode, 100);
client.backoff(clusterNode, 100);
runUntil(responseFuture::isDone);
@ -2409,7 +2409,7 @@ public class TransactionManagerTest {
// expire the batch.
Node clusterNode = metadata.fetch().nodes().get(0);
client.disconnect(clusterNode.idString());
client.blackout(clusterNode, 100);
client.backoff(clusterNode, 100);
runUntil(firstBatchResponse::isDone);
runUntil(secondBatchResponse::isDone);
@ -2525,7 +2525,7 @@ public class TransactionManagerTest {
// expire the batch.
Node clusterNode = metadata.fetch().nodes().get(0);
client.disconnect(clusterNode.idString());
client.blackout(clusterNode, 100);
client.backoff(clusterNode, 100);
runUntil(responseFuture::isDone); // We should try to flush the produce, but expire it instead without sending anything.
@ -2909,7 +2909,7 @@ public class TransactionManagerTest {
// expire the batch.
Node clusterNode = metadata.fetch().nodes().get(0);
client.disconnect(clusterNode.idString());
client.blackout(clusterNode, 100);
client.backoff(clusterNode, 100);
runUntil(responseFuture2::isDone); // We should try to flush the produce, but expire it instead without sending anything.
@ -2917,7 +2917,7 @@ public class TransactionManagerTest {
TransactionalRequestResult abortResult = transactionManager.beginAbort();
sender.runOnce(); // handle the abort
time.sleep(110); // Sleep to make sure the node blackout period has passed
time.sleep(110); // Sleep to make sure the node backoff period has passed
prepareFindCoordinatorResponse(Errors.NONE, false, CoordinatorType.TRANSACTION, transactionalId);
prepareEndTxnResponse(Errors.NONE, TransactionResult.ABORT, producerId, initialEpoch);

View File

@ -166,7 +166,7 @@ object HostedPartition {
object ReplicaManager {
val HighWatermarkFilename = "replication-offset-checkpoint"
val IsrChangePropagationBlackOut = 5000L
val IsrChangePropagationBackoff = 5000L
val IsrChangePropagationInterval = 60000L
}
@ -292,7 +292,7 @@ class ReplicaManager(val config: KafkaConfig,
val now = System.currentTimeMillis()
isrChangeSet synchronized {
if (isrChangeSet.nonEmpty &&
(lastIsrChangeMs.get() + ReplicaManager.IsrChangePropagationBlackOut < now ||
(lastIsrChangeMs.get() + ReplicaManager.IsrChangePropagationBackoff < now ||
lastIsrPropagationMs.get() + ReplicaManager.IsrChangePropagationInterval < now)) {
zkClient.propagateIsrChanges(isrChangeSet)
isrChangeSet.clear()

View File

@ -63,7 +63,7 @@ class KafkaNetworkChannelTest {
val destinationId = 2
val destinationNode = new Node(destinationId, "127.0.0.1", 9092)
channel.updateEndpoint(destinationId, new InetSocketAddress(destinationNode.host, destinationNode.port))
client.blackout(destinationNode, 500)
client.backoff(destinationNode, 500)
assertBrokerNotAvailable(destinationId)
}
@ -90,7 +90,7 @@ class KafkaNetworkChannelTest {
client.createPendingAuthenticationError(destinationNode, 100)
sendAndAssertErrorResponse(apiKey, destinationId, Errors.CLUSTER_AUTHORIZATION_FAILED)
// reset to clear blackout time
// reset to clear backoff time
client.reset()
}
}