MINOR: Extract HeartbeatRequestState from heartbeat request managers (#19043)

The AbstractHeartbeatRequestManager and the
StreamsGroupHeartbeatRequestManager, both use the
HeartbeatRequestState to track the state of the heartbeat requests. Both
heartbeat request managers have an implementation of
HeartbeatRequestState as inner class.
To deduplicate code this commit extracts the HeartbeatRequestState so
that the same code can be used by both heartbeat request manager.

Reviewers: Kirk True <ktrue@confluent.io>, Chia-Ping Tsai
<chia7712@gmail.com>, Lucas Brutschy <lbrutschy@confluent.io>
This commit is contained in:
Bruno Cadonna 2025-03-03 10:46:20 +01:00 committed by GitHub
parent a04dd21f26
commit 898dcd11ad
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 260 additions and 151 deletions

View File

@ -178,7 +178,7 @@ public abstract class AbstractHeartbeatRequestManager<R extends AbstractResponse
// We can ignore the leave response because we can join before or after receiving the response. // We can ignore the leave response because we can join before or after receiving the response.
heartbeatRequestState.reset(); heartbeatRequestState.reset();
resetHeartbeatState(); resetHeartbeatState();
return new NetworkClientDelegate.PollResult(heartbeatRequestState.heartbeatIntervalMs, Collections.singletonList(leaveHeartbeat)); return new NetworkClientDelegate.PollResult(heartbeatRequestState.heartbeatIntervalMs(), Collections.singletonList(leaveHeartbeat));
} }
// Case 1: The member is leaving // Case 1: The member is leaving
@ -192,7 +192,7 @@ public abstract class AbstractHeartbeatRequestManager<R extends AbstractResponse
} }
NetworkClientDelegate.UnsentRequest request = makeHeartbeatRequest(currentTimeMs, false); NetworkClientDelegate.UnsentRequest request = makeHeartbeatRequest(currentTimeMs, false);
return new NetworkClientDelegate.PollResult(heartbeatRequestState.heartbeatIntervalMs, Collections.singletonList(request)); return new NetworkClientDelegate.PollResult(heartbeatRequestState.heartbeatIntervalMs(), Collections.singletonList(request));
} }
/** /**
@ -222,7 +222,7 @@ public abstract class AbstractHeartbeatRequestManager<R extends AbstractResponse
public PollResult pollOnClose(long currentTimeMs) { public PollResult pollOnClose(long currentTimeMs) {
if (membershipManager().isLeavingGroup()) { if (membershipManager().isLeavingGroup()) {
NetworkClientDelegate.UnsentRequest request = makeHeartbeatRequest(currentTimeMs, true); NetworkClientDelegate.UnsentRequest request = makeHeartbeatRequest(currentTimeMs, true);
return new NetworkClientDelegate.PollResult(heartbeatRequestState.heartbeatIntervalMs, Collections.singletonList(request)); return new NetworkClientDelegate.PollResult(heartbeatRequestState.heartbeatIntervalMs(), Collections.singletonList(request));
} }
return EMPTY; return EMPTY;
} }
@ -512,85 +512,4 @@ public abstract class AbstractHeartbeatRequestManager<R extends AbstractResponse
* @return The heartbeat interval * @return The heartbeat interval
*/ */
public abstract long heartbeatIntervalForResponse(R response); public abstract long heartbeatIntervalForResponse(R response);
/**
* Represents the state of a heartbeat request, including logic for timing, retries, and exponential backoff. The
* object extends {@link RequestState} to enable exponential backoff and duplicated request handling. The two fields
* that it holds are:
*/
static class HeartbeatRequestState extends RequestState {
/**
* heartbeatTimer tracks the time since the last heartbeat was sent
*/
private final Timer heartbeatTimer;
/**
* The heartbeat interval which is acquired/updated through the heartbeat request
*/
private long heartbeatIntervalMs;
HeartbeatRequestState(
final LogContext logContext,
final Time time,
final long heartbeatIntervalMs,
final long retryBackoffMs,
final long retryBackoffMaxMs,
final double jitter) {
super(logContext, HeartbeatRequestState.class.getName(), retryBackoffMs, 2, retryBackoffMaxMs, jitter);
this.heartbeatIntervalMs = heartbeatIntervalMs;
this.heartbeatTimer = time.timer(heartbeatIntervalMs);
}
private void update(final long currentTimeMs) {
this.heartbeatTimer.update(currentTimeMs);
}
void resetTimer() {
this.heartbeatTimer.reset(heartbeatIntervalMs);
}
@Override
public String toStringBase() {
return super.toStringBase() +
", remainingMs=" + heartbeatTimer.remainingMs() +
", heartbeatIntervalMs=" + heartbeatIntervalMs;
}
/**
* Check if a heartbeat request should be sent on the current time. A heartbeat should be
* sent if the heartbeat timer has expired, backoff has expired, and there is no request
* in-flight.
*/
@Override
public boolean canSendRequest(final long currentTimeMs) {
update(currentTimeMs);
return heartbeatTimer.isExpired() && super.canSendRequest(currentTimeMs);
}
long timeToNextHeartbeatMs(final long currentTimeMs) {
if (heartbeatTimer.isExpired()) {
return this.remainingBackoffMs(currentTimeMs);
}
return heartbeatTimer.remainingMs();
}
@Override
public void onFailedAttempt(final long currentTimeMs) {
// Reset timer to allow sending HB after a failure without waiting for the interval.
// After a failure, a next HB may be needed with backoff (ex. errors that lead to
// retries, like coordinator load error), or immediately (ex. errors that lead to
// rejoining, like fencing errors).
heartbeatTimer.reset(0);
super.onFailedAttempt(currentTimeMs);
}
private void updateHeartbeatIntervalMs(final long heartbeatIntervalMs) {
if (this.heartbeatIntervalMs == heartbeatIntervalMs) {
// no need to update the timer if the interval hasn't changed
return;
}
this.heartbeatIntervalMs = heartbeatIntervalMs;
this.heartbeatTimer.updateAndReset(heartbeatIntervalMs);
}
}
} }

View File

@ -81,7 +81,7 @@ public class ConsumerHeartbeatRequestManager extends AbstractHeartbeatRequestMan
final CoordinatorRequestManager coordinatorRequestManager, final CoordinatorRequestManager coordinatorRequestManager,
final ConsumerMembershipManager membershipManager, final ConsumerMembershipManager membershipManager,
final HeartbeatState heartbeatState, final HeartbeatState heartbeatState,
final AbstractHeartbeatRequestManager.HeartbeatRequestState heartbeatRequestState, final HeartbeatRequestState heartbeatRequestState,
final BackgroundEventHandler backgroundEventHandler, final BackgroundEventHandler backgroundEventHandler,
final Metrics metrics) { final Metrics metrics) {
super(logContext, timer, config, coordinatorRequestManager, heartbeatRequestState, backgroundEventHandler, super(logContext, timer, config, coordinatorRequestManager, heartbeatRequestState, backgroundEventHandler,

View File

@ -0,0 +1,113 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.consumer.internals;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Timer;
/**
* Represents the state of a heartbeat request, including logic for timing, retries, and exponential backoff.
*
* The class extends {@link org.apache.kafka.clients.consumer.internals.RequestState} to enable exponential backoff
* and duplicated request handling.
*/
public class HeartbeatRequestState extends RequestState {
/**
* The heartbeat timer tracks the time since the last heartbeat was sent
*/
private final Timer heartbeatTimer;
/**
* The heartbeat interval which is acquired/updated through the heartbeat request
*/
private long heartbeatIntervalMs;
public HeartbeatRequestState(final LogContext logContext,
final Time time,
final long heartbeatIntervalMs,
final long retryBackoffMs,
final long retryBackoffMaxMs,
final double jitter) {
super(
logContext,
HeartbeatRequestState.class.getName(),
retryBackoffMs,
2,
retryBackoffMaxMs,
jitter
);
this.heartbeatIntervalMs = heartbeatIntervalMs;
this.heartbeatTimer = time.timer(heartbeatIntervalMs);
}
public long heartbeatIntervalMs() {
return heartbeatIntervalMs;
}
public void resetTimer() {
this.heartbeatTimer.reset(heartbeatIntervalMs);
}
public long timeToNextHeartbeatMs(final long currentTimeMs) {
if (heartbeatTimer.isExpired()) {
return remainingBackoffMs(currentTimeMs);
}
return heartbeatTimer.remainingMs();
}
/**
* @inheritDoc
*
* Adds to the overridden method the reset of the heartbeat timer to a zero interval which allows sending
* heartbeats after a failure without waiting for the interval.
* After a failure, a next heartbeat may be needed with backoff (ex. errors that lead to retries, like coordinator
* load error), or immediately (ex. errors that lead to rejoining, like fencing errors).
*/
@Override
public void onFailedAttempt(final long currentTimeMs) {
heartbeatTimer.reset(0);
super.onFailedAttempt(currentTimeMs);
}
@Override
public boolean canSendRequest(final long currentTimeMs) {
update(currentTimeMs);
return heartbeatTimer.isExpired() && super.canSendRequest(currentTimeMs);
}
private void update(final long currentTimeMs) {
this.heartbeatTimer.update(currentTimeMs);
}
public void updateHeartbeatIntervalMs(final long heartbeatIntervalMs) {
if (this.heartbeatIntervalMs == heartbeatIntervalMs) {
// no need to update the timer if the interval hasn't changed
return;
}
this.heartbeatIntervalMs = heartbeatIntervalMs;
this.heartbeatTimer.updateAndReset(heartbeatIntervalMs);
}
@Override
public String toStringBase() {
return super.toStringBase() +
", remainingMs=" + heartbeatTimer.remainingMs() +
", heartbeatIntervalMs=" + heartbeatIntervalMs;
}
}

View File

@ -77,7 +77,7 @@ public class ShareHeartbeatRequestManager extends AbstractHeartbeatRequestManage
final CoordinatorRequestManager coordinatorRequestManager, final CoordinatorRequestManager coordinatorRequestManager,
final ShareMembershipManager membershipManager, final ShareMembershipManager membershipManager,
final HeartbeatState heartbeatState, final HeartbeatState heartbeatState,
final AbstractHeartbeatRequestManager.HeartbeatRequestState heartbeatRequestState, final HeartbeatRequestState heartbeatRequestState,
final BackgroundEventHandler backgroundEventHandler, final BackgroundEventHandler backgroundEventHandler,
final Metrics metrics) { final Metrics metrics) {
super(logContext, timer, config, coordinatorRequestManager, heartbeatRequestState, backgroundEventHandler, super(logContext, timer, config, coordinatorRequestManager, heartbeatRequestState, backgroundEventHandler,

View File

@ -28,7 +28,6 @@ import org.apache.kafka.common.requests.StreamsGroupHeartbeatRequest;
import org.apache.kafka.common.requests.StreamsGroupHeartbeatResponse; import org.apache.kafka.common.requests.StreamsGroupHeartbeatResponse;
import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Timer;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -207,64 +206,6 @@ public class StreamsGroupHeartbeatRequestManager implements RequestManager {
} }
/**
* Represents the state of a heartbeat request, including logic for timing, retries, and exponential backoff. The object extends
* {@link RequestState} to enable exponential backoff and duplicated request handling. The two fields that it holds are:
*/
static class HeartbeatRequestState extends RequestState {
/**
* The heartbeat timer tracks the time since the last heartbeat was sent
*/
private final Timer heartbeatTimer;
/**
* The heartbeat interval which is acquired/updated through the heartbeat request
*/
private long heartbeatIntervalMs;
public HeartbeatRequestState(final LogContext logContext,
final Time time,
final long heartbeatIntervalMs,
final long retryBackoffMs,
final long retryBackoffMaxMs,
final double jitter) {
super(
logContext,
StreamsGroupHeartbeatRequestManager.HeartbeatRequestState.class.getName(),
retryBackoffMs,
2,
retryBackoffMaxMs,
jitter
);
this.heartbeatIntervalMs = heartbeatIntervalMs;
this.heartbeatTimer = time.timer(heartbeatIntervalMs);
}
private void update(final long currentTimeMs) {
this.heartbeatTimer.update(currentTimeMs);
}
public void resetTimer() {
this.heartbeatTimer.reset(heartbeatIntervalMs);
}
@Override
public boolean canSendRequest(final long currentTimeMs) {
update(currentTimeMs);
return heartbeatTimer.isExpired() && super.canSendRequest(currentTimeMs);
}
private void updateHeartbeatIntervalMs(final long heartbeatIntervalMs) {
if (this.heartbeatIntervalMs == heartbeatIntervalMs) {
// no need to update the timer if the interval hasn't changed
return;
}
this.heartbeatIntervalMs = heartbeatIntervalMs;
this.heartbeatTimer.updateAndReset(heartbeatIntervalMs);
}
}
private final Logger logger; private final Logger logger;
private final int maxPollIntervalMs; private final int maxPollIntervalMs;
@ -318,7 +259,7 @@ public class StreamsGroupHeartbeatRequestManager implements RequestManager {
@Override @Override
public NetworkClientDelegate.PollResult poll(long currentTimeMs) { public NetworkClientDelegate.PollResult poll(long currentTimeMs) {
return new NetworkClientDelegate.PollResult( return new NetworkClientDelegate.PollResult(
heartbeatRequestState.heartbeatIntervalMs, heartbeatRequestState.heartbeatIntervalMs(),
Collections.singletonList(makeHeartbeatRequest(currentTimeMs)) Collections.singletonList(makeHeartbeatRequest(currentTimeMs))
); );
} }

View File

@ -20,7 +20,6 @@ import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.Metadata; import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.SubscriptionPattern; import org.apache.kafka.clients.consumer.SubscriptionPattern;
import org.apache.kafka.clients.consumer.internals.AbstractHeartbeatRequestManager.HeartbeatRequestState;
import org.apache.kafka.clients.consumer.internals.AbstractMembershipManager.LocalAssignment; import org.apache.kafka.clients.consumer.internals.AbstractMembershipManager.LocalAssignment;
import org.apache.kafka.clients.consumer.internals.ConsumerHeartbeatRequestManager.HeartbeatState; import org.apache.kafka.clients.consumer.internals.ConsumerHeartbeatRequestManager.HeartbeatState;
import org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler; import org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;

View File

@ -0,0 +1,137 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.consumer.internals;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class HeartbeatRequestStateTest {
private static final LogContext LOG_CONTEXT = new LogContext("test ");
private static final long HEARTBEAT_INTERVAL_MS = 1000;
private static final long RETRY_BACKOFF_MS = 100;
private static final long RETRY_BACKOFF_MAX_MS = 500;
private static final double JITTER = 0.2;
private final Time time = new MockTime();
@Test
public void testCanSendRequestAndTimeToNextHeartbeatMs() {
final HeartbeatRequestState heartbeatRequestState = new HeartbeatRequestState(
LOG_CONTEXT,
time,
HEARTBEAT_INTERVAL_MS,
RETRY_BACKOFF_MS,
RETRY_BACKOFF_MAX_MS,
JITTER
);
assertFalse(heartbeatRequestState.canSendRequest(time.milliseconds()));
assertEquals(HEARTBEAT_INTERVAL_MS, heartbeatRequestState.timeToNextHeartbeatMs(time.milliseconds()));
time.sleep(HEARTBEAT_INTERVAL_MS - 1);
assertFalse(heartbeatRequestState.canSendRequest(time.milliseconds()));
assertEquals(1, heartbeatRequestState.timeToNextHeartbeatMs(time.milliseconds()));
time.sleep(1);
assertTrue(heartbeatRequestState.canSendRequest(time.milliseconds()));
assertEquals(0, heartbeatRequestState.timeToNextHeartbeatMs(time.milliseconds()));
time.sleep(100);
assertTrue(heartbeatRequestState.canSendRequest(time.milliseconds()));
assertEquals(0, heartbeatRequestState.timeToNextHeartbeatMs(time.milliseconds()));
}
@Test
public void testResetTimer() {
final HeartbeatRequestState heartbeatRequestState = new HeartbeatRequestState(
LOG_CONTEXT,
time,
HEARTBEAT_INTERVAL_MS,
RETRY_BACKOFF_MS,
RETRY_BACKOFF_MAX_MS,
JITTER
);
time.sleep(HEARTBEAT_INTERVAL_MS + 100);
assertTrue(heartbeatRequestState.canSendRequest(time.milliseconds()));
assertEquals(0, heartbeatRequestState.timeToNextHeartbeatMs(time.milliseconds()));
heartbeatRequestState.resetTimer();
assertFalse(heartbeatRequestState.canSendRequest(time.milliseconds()));
assertEquals(HEARTBEAT_INTERVAL_MS, heartbeatRequestState.timeToNextHeartbeatMs(time.milliseconds()));
}
@Test
public void testUpdateHeartbeatIntervalMs() {
final HeartbeatRequestState heartbeatRequestState = new HeartbeatRequestState(
LOG_CONTEXT,
time,
HEARTBEAT_INTERVAL_MS,
RETRY_BACKOFF_MS,
RETRY_BACKOFF_MAX_MS,
JITTER
);
final long updatedHeartbeatIntervalMs = 2 * HEARTBEAT_INTERVAL_MS;
time.sleep(HEARTBEAT_INTERVAL_MS + 100);
heartbeatRequestState.updateHeartbeatIntervalMs(updatedHeartbeatIntervalMs);
assertFalse(heartbeatRequestState.canSendRequest(time.milliseconds()));
assertEquals(2 * HEARTBEAT_INTERVAL_MS, heartbeatRequestState.timeToNextHeartbeatMs(time.milliseconds()));
}
@Test
public void testUpdateHeartbeatIntervalMsWithSameInterval() {
final HeartbeatRequestState heartbeatRequestState = new HeartbeatRequestState(
LOG_CONTEXT,
time,
HEARTBEAT_INTERVAL_MS,
RETRY_BACKOFF_MS,
RETRY_BACKOFF_MAX_MS,
JITTER
);
time.sleep(HEARTBEAT_INTERVAL_MS + 100);
heartbeatRequestState.updateHeartbeatIntervalMs(HEARTBEAT_INTERVAL_MS);
assertEquals(HEARTBEAT_INTERVAL_MS, heartbeatRequestState.heartbeatIntervalMs());
assertTrue(heartbeatRequestState.canSendRequest(time.milliseconds()));
}
@Test
public void testOnFailedAttempt() {
final HeartbeatRequestState heartbeatRequestState = new HeartbeatRequestState(
LOG_CONTEXT,
time,
HEARTBEAT_INTERVAL_MS,
RETRY_BACKOFF_MS,
RETRY_BACKOFF_MAX_MS,
JITTER
);
time.sleep(HEARTBEAT_INTERVAL_MS + 100);
heartbeatRequestState.onFailedAttempt(time.milliseconds());
assertFalse(heartbeatRequestState.canSendRequest(time.milliseconds()));
assertTrue(heartbeatRequestState.timeToNextHeartbeatMs(time.milliseconds()) > 0);
}
}

View File

@ -94,7 +94,7 @@ public class ShareHeartbeatRequestManagerTest {
private Metadata metadata; private Metadata metadata;
private ShareHeartbeatRequestManager heartbeatRequestManager; private ShareHeartbeatRequestManager heartbeatRequestManager;
private ShareMembershipManager membershipManager; private ShareMembershipManager membershipManager;
private ShareHeartbeatRequestManager.HeartbeatRequestState heartbeatRequestState; private HeartbeatRequestState heartbeatRequestState;
private ShareHeartbeatRequestManager.HeartbeatState heartbeatState; private ShareHeartbeatRequestManager.HeartbeatState heartbeatState;
private BackgroundEventHandler backgroundEventHandler; private BackgroundEventHandler backgroundEventHandler;
private Metrics metrics; private Metrics metrics;
@ -114,7 +114,7 @@ public class ShareHeartbeatRequestManagerTest {
logContext = new LogContext(); logContext = new LogContext();
ConsumerConfig config = mock(ConsumerConfig.class); ConsumerConfig config = mock(ConsumerConfig.class);
heartbeatRequestState = spy(new ShareHeartbeatRequestManager.HeartbeatRequestState( heartbeatRequestState = spy(new HeartbeatRequestState(
logContext, logContext,
time, time,
DEFAULT_HEARTBEAT_INTERVAL_MS, DEFAULT_HEARTBEAT_INTERVAL_MS,
@ -137,7 +137,7 @@ public class ShareHeartbeatRequestManagerTest {
} }
private void createHeartbeatRequestStateWithZeroHeartbeatInterval() { private void createHeartbeatRequestStateWithZeroHeartbeatInterval() {
heartbeatRequestState = spy(new ShareHeartbeatRequestManager.HeartbeatRequestState(logContext, heartbeatRequestState = spy(new HeartbeatRequestState(logContext,
time, time,
0, 0,
DEFAULT_RETRY_BACKOFF_MS, DEFAULT_RETRY_BACKOFF_MS,
@ -730,7 +730,7 @@ public class ShareHeartbeatRequestManagerTest {
final CoordinatorRequestManager coordinatorRequestManager, final CoordinatorRequestManager coordinatorRequestManager,
final ShareMembershipManager membershipManager, final ShareMembershipManager membershipManager,
final ShareHeartbeatRequestManager.HeartbeatState heartbeatState, final ShareHeartbeatRequestManager.HeartbeatState heartbeatState,
final ShareHeartbeatRequestManager.HeartbeatRequestState heartbeatRequestState, final HeartbeatRequestState heartbeatRequestState,
final BackgroundEventHandler backgroundEventHandler) { final BackgroundEventHandler backgroundEventHandler) {
LogContext logContext = new LogContext(); LogContext logContext = new LogContext();
pollTimer = time.timer(DEFAULT_MAX_POLL_INTERVAL_MS); pollTimer = time.timer(DEFAULT_MAX_POLL_INTERVAL_MS);