KAFKA-18189: CoordinatorRequestManager log message can include incorrect coordinator disconnect time (#18109)

Fixed logic in markCoordinatorUnknown to ensure the warning log contains the correct number of milliseconds the client has been disconnected.

Reviewers: Christo Lolov <lolovc@amazon.com>
This commit is contained in:
Kirk True 2024-12-11 08:22:51 -08:00 committed by GitHub
parent bd6d0fbf3d
commit d09e222846
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 100 additions and 8 deletions

View File

@ -139,22 +139,34 @@ public class CoordinatorRequestManager implements RequestManager {
}
/**
* Mark the current coordinator null.
* Mark the coordinator as "unknown" (i.e. {@code null}) when a disconnect is detected. This detection can occur
* in one of two paths:
*
* @param cause why the coordinator is marked unknown.
* @param currentTimeMs the current time in ms.
* <ol>
* <li>The coordinator was discovered, but then later disconnected</li>
* <li>The coordinator has not yet been discovered and/or connected</li>
* </ol>
*
* @param cause String explanation of why the coordinator is marked unknown
* @param currentTimeMs Current time in milliseconds
*/
public void markCoordinatorUnknown(final String cause, final long currentTimeMs) {
if (this.coordinator != null) {
log.info("Group coordinator {} is unavailable or invalid due to cause: {}. "
+ "Rediscovery will be attempted.", this.coordinator, cause);
this.coordinator = null;
if (coordinator != null || timeMarkedUnknownMs == -1) {
timeMarkedUnknownMs = currentTimeMs;
totalDisconnectedMin = 0;
}
if (coordinator != null) {
log.info(
"Group coordinator {} is unavailable or invalid due to cause: {}. Rediscovery will be attempted.",
coordinator,
cause
);
coordinator = null;
} else {
long durationOfOngoingDisconnectMs = Math.max(0, currentTimeMs - timeMarkedUnknownMs);
long currDisconnectMin = durationOfOngoingDisconnectMs / COORDINATOR_DISCONNECT_LOGGING_INTERVAL_MS;
if (currDisconnectMin > this.totalDisconnectedMin) {
if (currDisconnectMin > totalDisconnectedMin) {
log.debug("Consumer has been disconnected from the group coordinator for {}ms", durationOfOngoingDisconnectMs);
totalDisconnectedMin = currDisconnectMin;
}

View File

@ -28,16 +28,24 @@ import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.FindCoordinatorRequest;
import org.apache.kafka.common.requests.FindCoordinatorResponse;
import org.apache.kafka.common.requests.RequestHeader;
import org.apache.kafka.common.utils.LogCaptureAppender;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.log4j.Level;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
@ -75,6 +83,78 @@ public class CoordinatorRequestManagerTest {
assertEquals(Collections.emptyList(), pollResult.unsentRequests);
}
/**
* This test mimics a client that has been disconnected from the coordinator. When the client remains disconnected
* from the coordinator for 60 seconds, the client will begin to emit a warning log every minute thereafter to
* alert the user about the ongoing disconnect status. The warning log includes the length of time of the ongoing
* disconnect:
*
* <code>
* Consumer has been disconnected from the group coordinator for XXXXXms
* </code>
*
* <p/>
*
* However, the logic used to calculate the length of the disconnect was not correct. This test exercises the
* disconnect logic, controlling the logging and system time, to ensure the warning message is correct.
*
* @see CoordinatorRequestManager#markCoordinatorUnknown(String, long)
*/
@Test
public void testMarkCoordinatorUnknownLoggingAccuracy() {
long oneMinute = 60000;
try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister()) {
// You'd be forgiven for assuming that a warning message would be logged at WARN, but
// markCoordinatorUnknown logs the warning at DEBUG. This is partly for historical parity with the
// ClassicKafkaConsumer.
appender.setClassLogger(CoordinatorRequestManager.class, Level.DEBUG);
CoordinatorRequestManager coordinatorRequestManager = setupCoordinatorManager(GROUP_ID);
assertFalse(coordinatorRequestManager.coordinator().isPresent());
// Step 1: mark the coordinator as disconnected right after creation of the CoordinatorRequestManager.
// Because the disconnect occurred immediately, no warning should be logged.
coordinatorRequestManager.markCoordinatorUnknown("test", time.milliseconds());
assertTrue(millisecondsFromLog(appender).isEmpty());
// Step 2: sleep for one minute and mark the coordinator unknown again. Then verify that the warning was
// logged and the reported time is accurate.
time.sleep(oneMinute);
coordinatorRequestManager.markCoordinatorUnknown("test", time.milliseconds());
Optional<Long> firstLogMs = millisecondsFromLog(appender);
assertTrue(firstLogMs.isPresent());
assertEquals(oneMinute, firstLogMs.get());
// Step 3: sleep for *another* minute, mark the coordinator unknown again, and verify the accuracy.
time.sleep(oneMinute);
coordinatorRequestManager.markCoordinatorUnknown("test", time.milliseconds());
Optional<Long> secondLogMs = millisecondsFromLog(appender);
assertTrue(secondLogMs.isPresent());
assertEquals(oneMinute * 2, secondLogMs.get());
}
}
private Optional<Long> millisecondsFromLog(LogCaptureAppender appender) {
Pattern pattern = Pattern.compile("\\s+(?<millis>\\d+)+ms");
List<Long> milliseconds = appender.getMessages().stream()
.map(pattern::matcher)
.filter(Matcher::find)
.map(matcher -> matcher.group("millis"))
.filter(Objects::nonNull)
.map(millisString -> {
try {
return Long.parseLong(millisString);
} catch (NumberFormatException e) {
return null;
}
})
.filter(Objects::nonNull)
.collect(Collectors.toList());
// Return the most recent log entry that matches the message in markCoordinatorUnknown, if present.
return milliseconds.isEmpty() ? Optional.empty() : Optional.of(milliseconds.get(milliseconds.size() - 1));
}
@Test
public void testMarkCoordinatorUnknown() {
CoordinatorRequestManager coordinatorManager = setupCoordinatorManager(GROUP_ID);