From 5b478aebfdf6f1d908b5ce76087058c68caa049a Mon Sep 17 00:00:00 2001 From: Philip Nee Date: Tue, 12 Dec 2023 01:06:34 -0800 Subject: [PATCH] KAFKA-15818: ensure leave group on max poll interval (#14873) Currently, poll interval is not being respected during consumer#poll. When the user stops polling the consumer, we should assume either the consumer is too slow to respond or is already dead. In either case, we should let the group coordinator kick the member out of the group and reassign its partition after the rebalance timeout expires. If the consumer comes back alive, we should send a heartbeat and the member will be fenced and rejoin. (and the partitions will be revoked). This is the same behavior as the current implementation. Reviewers: Lucas Brutschy , Bruno Cadonna , Lianet Magrans --- .../internals/HeartbeatRequestManager.java | 90 ++++++++++++++---- .../consumer/internals/MemberState.java | 14 ++- .../consumer/internals/MembershipManager.java | 16 ++++ .../internals/MembershipManagerImpl.java | 29 +++++- .../events/ApplicationEventProcessor.java | 5 +- .../internals/ConsumerTestBuilder.java | 4 + .../HeartbeatRequestManagerTest.java | 94 ++++++++++++++++++- .../internals/MembershipManagerImplTest.java | 27 ++++++ .../kafka/api/PlaintextConsumerTest.scala | 1 + 9 files changed, 253 insertions(+), 27 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java index 3b099eb0766..0fb0346d5f4 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java @@ -75,7 +75,7 @@ public class HeartbeatRequestManager implements RequestManager { * Time that the group coordinator will wait on member to revoke its partitions. This is provided by the group * coordinator in the heartbeat */ - private final int rebalanceTimeoutMs; + private final int maxPollIntervalMs; /** * CoordinatorRequestManager manages the connection to the group coordinator @@ -102,6 +102,12 @@ public class HeartbeatRequestManager implements RequestManager { */ private final BackgroundEventHandler backgroundEventHandler; + /** + * Timer for tracking the time since the last consumer poll. If the timer expires, the consumer will stop + * sending heartbeat until the next poll. + */ + private final Timer pollTimer; + private GroupMetadataUpdateEvent previousGroupMetadataUpdateEvent = null; public HeartbeatRequestManager( @@ -116,17 +122,19 @@ public class HeartbeatRequestManager implements RequestManager { this.logger = logContext.logger(getClass()); this.membershipManager = membershipManager; this.backgroundEventHandler = backgroundEventHandler; - this.rebalanceTimeoutMs = config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG); + this.maxPollIntervalMs = config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG); long retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG); long retryBackoffMaxMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG); - this.heartbeatState = new HeartbeatState(subscriptions, membershipManager, rebalanceTimeoutMs); + this.heartbeatState = new HeartbeatState(subscriptions, membershipManager, maxPollIntervalMs); this.heartbeatRequestState = new HeartbeatRequestState(logContext, time, 0, retryBackoffMs, - retryBackoffMaxMs, rebalanceTimeoutMs); + retryBackoffMaxMs, maxPollIntervalMs); + this.pollTimer = time.timer(maxPollIntervalMs); } // Visible for testing HeartbeatRequestManager( final LogContext logContext, + final Timer timer, final ConsumerConfig config, final CoordinatorRequestManager coordinatorRequestManager, final MembershipManager membershipManager, @@ -134,12 +142,13 @@ public class HeartbeatRequestManager implements RequestManager { final HeartbeatRequestState heartbeatRequestState, final BackgroundEventHandler backgroundEventHandler) { this.logger = logContext.logger(this.getClass()); - this.rebalanceTimeoutMs = config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG); + this.maxPollIntervalMs = config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG); this.coordinatorRequestManager = coordinatorRequestManager; this.heartbeatRequestState = heartbeatRequestState; this.heartbeatState = heartbeatState; this.membershipManager = membershipManager; this.backgroundEventHandler = backgroundEventHandler; + this.pollTimer = timer; } /** @@ -167,20 +176,33 @@ public class HeartbeatRequestManager implements RequestManager { */ @Override public NetworkClientDelegate.PollResult poll(long currentTimeMs) { - if (!coordinatorRequestManager.coordinator().isPresent() || membershipManager.shouldSkipHeartbeat()) { + if (!coordinatorRequestManager.coordinator().isPresent() || + membershipManager.shouldSkipHeartbeat() || + pollTimer.isExpired()) { membershipManager.onHeartbeatRequestSkipped(); return NetworkClientDelegate.PollResult.EMPTY; } + pollTimer.update(currentTimeMs); + if (pollTimer.isExpired()) { + logger.warn("consumer poll timeout has expired. This means the time between subsequent calls to poll() " + + "was longer than the configured max.poll.interval.ms, which typically implies that " + + "the poll loop is spending too much time processing messages. You can address this " + + "either by increasing max.poll.interval.ms or by reducing the maximum size of batches " + + "returned in poll() with max.poll.records."); + // This should trigger a heartbeat with leave group epoch + membershipManager.transitionToStaled(); + NetworkClientDelegate.UnsentRequest request = makeHeartbeatRequest(currentTimeMs, true); + // We can ignore the leave response because we can join before or after receiving the response. + heartbeatRequestState.reset(); + return new NetworkClientDelegate.PollResult(heartbeatRequestState.heartbeatIntervalMs, Collections.singletonList(request)); + } boolean heartbeatNow = membershipManager.shouldHeartbeatNow() && !heartbeatRequestState.requestInFlight(); - if (!heartbeatRequestState.canSendRequest(currentTimeMs) && !heartbeatNow) { return new NetworkClientDelegate.PollResult(heartbeatRequestState.nextHeartbeatMs(currentTimeMs)); } - heartbeatRequestState.onSendAttempt(currentTimeMs); - membershipManager.onHeartbeatRequestSent(); - NetworkClientDelegate.UnsentRequest request = makeHeartbeatRequest(); + NetworkClientDelegate.UnsentRequest request = makeHeartbeatRequest(currentTimeMs, false); return new NetworkClientDelegate.PollResult(heartbeatRequestState.heartbeatIntervalMs, Collections.singletonList(request)); } @@ -199,15 +221,49 @@ public class HeartbeatRequestManager implements RequestManager { return heartbeatNow ? 0L : heartbeatRequestState.nextHeartbeatMs(currentTimeMs); } - private NetworkClientDelegate.UnsentRequest makeHeartbeatRequest() { + /** + * When consumer polls, we need to reset the pollTimer. If the poll timer has expired, we rejoin only when the + * member is in the {@link MemberState#UNSUBSCRIBED} state. + */ + public void resetPollTimer() { + pollTimer.reset(maxPollIntervalMs); + } + + private NetworkClientDelegate.UnsentRequest makeHeartbeatRequest(final long currentTimeMs, + final boolean ignoreResponse) { + NetworkClientDelegate.UnsentRequest request = makeHeartbeatRequest(ignoreResponse); + heartbeatRequestState.onSendAttempt(currentTimeMs); + membershipManager.onHeartbeatRequestSent(); + return request; + } + + private NetworkClientDelegate.UnsentRequest makeHeartbeatRequest(final boolean ignoreResponse) { NetworkClientDelegate.UnsentRequest request = new NetworkClientDelegate.UnsentRequest( new ConsumerGroupHeartbeatRequest.Builder(this.heartbeatState.buildRequestData()), coordinatorRequestManager.coordinator()); + if (ignoreResponse) + return logResponse(request); + else + return request.whenComplete((response, exception) -> { + if (response != null) { + onResponse((ConsumerGroupHeartbeatResponse) response.responseBody(), request.handler().completionTimeMs()); + } else { + onFailure(exception, request.handler().completionTimeMs()); + } + }); + } + + private NetworkClientDelegate.UnsentRequest logResponse(final NetworkClientDelegate.UnsentRequest request) { return request.whenComplete((response, exception) -> { if (response != null) { - onResponse((ConsumerGroupHeartbeatResponse) response.responseBody(), request.handler().completionTimeMs()); + Errors error = + Errors.forCode(((ConsumerGroupHeartbeatResponse) response.responseBody()).data().errorCode()); + if (error == Errors.NONE) + logger.debug("GroupHeartbeat responded successfully: {}", response); + else + logger.error("GroupHeartbeat failed because of {}: {}", error, response); } else { - onFailure(exception, request.handler().completionTimeMs()); + logger.error("GroupHeartbeat failed because of unexpected exception.", exception); } }); } @@ -229,10 +285,10 @@ public class HeartbeatRequestManager implements RequestManager { private void onResponse(final ConsumerGroupHeartbeatResponse response, long currentTimeMs) { if (Errors.forCode(response.data().errorCode()) == Errors.NONE) { - this.heartbeatRequestState.updateHeartbeatIntervalMs(response.data().heartbeatIntervalMs()); - this.heartbeatRequestState.onSuccessfulAttempt(currentTimeMs); - this.heartbeatRequestState.resetTimer(); - this.membershipManager.onHeartbeatResponseReceived(response.data()); + heartbeatRequestState.updateHeartbeatIntervalMs(response.data().heartbeatIntervalMs()); + heartbeatRequestState.onSuccessfulAttempt(currentTimeMs); + heartbeatRequestState.resetTimer(); + membershipManager.onHeartbeatResponseReceived(response.data()); maybeSendGroupMetadataUpdateEvent(); return; } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MemberState.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MemberState.java index 088d1ccd346..585440bafca 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MemberState.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MemberState.java @@ -18,6 +18,7 @@ package org.apache.kafka.clients.consumer.internals; import org.apache.kafka.common.protocol.Errors; + import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -99,7 +100,14 @@ public enum MemberState { * unrecoverable state where the member won't send any requests to the broker and cannot * perform any other transition. */ - FATAL; + FATAL, + + /** + * An intermediate state indicating the consumer is staled because the user has not polled the consumer + * within the max.poll.interval.ms time bound; therefore causing the member to leave the + * group. The member rejoins on the next poll. + */ + STALED; // Valid state transitions static { @@ -116,7 +124,7 @@ public enum MemberState { FENCED.previousValidStates = Arrays.asList(JOINING, STABLE, RECONCILING, ACKNOWLEDGING, PREPARE_LEAVING, LEAVING); - JOINING.previousValidStates = Arrays.asList(FENCED, UNSUBSCRIBED); + JOINING.previousValidStates = Arrays.asList(FENCED, UNSUBSCRIBED, STALED); PREPARE_LEAVING.previousValidStates = Arrays.asList(JOINING, STABLE, RECONCILING, ACKNOWLEDGING, UNSUBSCRIBED, FENCED); @@ -124,6 +132,8 @@ public enum MemberState { LEAVING.previousValidStates = Arrays.asList(PREPARE_LEAVING); UNSUBSCRIBED.previousValidStates = Arrays.asList(LEAVING); + + STALED.previousValidStates = Arrays.asList(JOINING, RECONCILING, ACKNOWLEDGING, STABLE); } private List previousValidStates; diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManager.java index b635eb3a0fd..78127ecb206 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManager.java @@ -61,6 +61,11 @@ public interface MembershipManager { */ MemberState state(); + /** + * @return True if the member is staled due to expired poll timer. + */ + boolean isStaled(); + /** * Update member info and transition member state based on a successful heartbeat response. * @@ -139,4 +144,15 @@ public interface MembershipManager { * Note that list of topics of the subscription is taken from the shared subscription state. */ void onSubscriptionUpdated(); + + /** + * Transition to the {@link MemberState#JOINING} state to attempt joining a group. + */ + void transitionToJoining(); + + /** + * When the user stops polling the consumer and the max.poll.interval.ms timer expires, we transition + * the member to STALE. + */ + void transitionToStaled(); } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java index 317126801ec..dfb339680c6 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java @@ -228,7 +228,7 @@ public class MembershipManagerImpl implements MembershipManager, ClusterResource * (callbacks executed and heartbeat request to leave is sent out). This will be empty is the * member is not leaving. */ - private Optional> leaveGroupInProgress; + private Optional> leaveGroupInProgress = Optional.empty(); /** * True if the member has registered to be notified when the cluster metadata is updated. @@ -315,6 +315,11 @@ public class MembershipManagerImpl implements MembershipManager, ClusterResource return memberEpoch; } + @Override + public boolean isStaled() { + return state == MemberState.STALED; + } + /** * {@inheritDoc} */ @@ -481,7 +486,8 @@ public class MembershipManagerImpl implements MembershipManager, ClusterResource * the user calls the subscribe API, or when the member wants to rejoin after getting fenced. * Visible for testing. */ - void transitionToJoining() { + @Override + public void transitionToJoining() { if (state == MemberState.FATAL) { log.warn("No action taken to join the group with the updated subscription because " + "the member is in FATAL state"); @@ -601,7 +607,7 @@ public class MembershipManagerImpl implements MembershipManager, ClusterResource @Override public boolean shouldHeartbeatNow() { MemberState state = state(); - return state == MemberState.ACKNOWLEDGING || state == MemberState.LEAVING; + return state == MemberState.ACKNOWLEDGING || state == MemberState.LEAVING || state == MemberState.JOINING; } /** @@ -610,6 +616,12 @@ public class MembershipManagerImpl implements MembershipManager, ClusterResource @Override public void onHeartbeatRequestSent() { MemberState state = state(); + if (isStaled()) { + log.debug("Member {} is staled and is therefore leaving the group. It will rejoin upon the next poll.", memberEpoch); + transitionToJoining(); + return; + } + if (state == MemberState.ACKNOWLEDGING) { if (allPendingAssignmentsReconciled()) { transitionTo(MemberState.STABLE); @@ -655,6 +667,17 @@ public class MembershipManagerImpl implements MembershipManager, ClusterResource return state == MemberState.UNSUBSCRIBED || state == MemberState.FATAL; } + /** + * Sets the epoch to the leave group epoch and clears the assignments. The member will rejoin with + * the existing subscriptions on the next time user polls. + */ + @Override + public void transitionToStaled() { + memberEpoch = ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH; + currentAssignment.clear(); + transitionTo(MemberState.STALED); + } + /** * Reconcile the assignment that has been received from the server and for which topic names * are resolved, kept in the {@link #assignmentReadyToReconcile}. This will commit if needed, diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java index 33233561a34..9c0bcdeefbd 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java @@ -21,6 +21,7 @@ import org.apache.kafka.clients.consumer.internals.CachedSupplier; import org.apache.kafka.clients.consumer.internals.CommitRequestManager; import org.apache.kafka.clients.consumer.internals.ConsumerMetadata; import org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread; +import org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager; import org.apache.kafka.clients.consumer.internals.MembershipManager; import org.apache.kafka.clients.consumer.internals.RequestManagers; import org.apache.kafka.common.KafkaException; @@ -128,8 +129,8 @@ public class ApplicationEventProcessor extends EventProcessor return; } - CommitRequestManager manager = requestManagers.commitRequestManager.get(); - manager.updateAutoCommitTimer(event.pollTimeMs()); + requestManagers.commitRequestManager.ifPresent(m -> m.updateAutoCommitTimer(event.pollTimeMs())); + requestManagers.heartbeatRequestManager.ifPresent(HeartbeatRequestManager::resetPollTimer); } private void process(final CommitApplicationEvent event) { diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerTestBuilder.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerTestBuilder.java index 28a68e9f751..ba0410c40b6 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerTestBuilder.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerTestBuilder.java @@ -35,6 +35,7 @@ import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; import java.io.Closeable; import java.time.Duration; @@ -80,6 +81,7 @@ public class ConsumerTestBuilder implements Closeable { final FetchConfig fetchConfig; final FetchBuffer fetchBuffer; final Metrics metrics; + final Timer pollTimer; final FetchMetricsManager metricsManager; final NetworkClientDelegate networkClientDelegate; final OffsetsRequestManager offsetsRequestManager; @@ -148,6 +150,7 @@ public class ConsumerTestBuilder implements Closeable { this.subscriptions = spy(createSubscriptionState(config, logContext)); this.metadata = spy(new ConsumerMetadata(config, subscriptions, logContext, new ClusterResourceListeners())); this.metricsManager = createFetchMetricsManager(metrics); + this.pollTimer = time.timer(groupRebalanceConfig.rebalanceTimeoutMs); this.client = new MockClient(time, metadata); MetadataResponse metadataResponse = RequestTestUtils.metadataUpdateWith(1, new HashMap() { @@ -219,6 +222,7 @@ public class ConsumerTestBuilder implements Closeable { gi.heartbeatJitterMs)); HeartbeatRequestManager heartbeat = spy(new HeartbeatRequestManager( logContext, + pollTimer, config, coordinator, mm, diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java index 0496e90b2b4..2d64c9ee2e7 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java @@ -17,8 +17,9 @@ package org.apache.kafka.clients.consumer.internals; import org.apache.kafka.clients.ClientResponse; -import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent; import org.apache.kafka.clients.Metadata; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent; import org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler; import org.apache.kafka.clients.consumer.internals.events.GroupMetadataUpdateEvent; import org.apache.kafka.common.KafkaException; @@ -32,7 +33,10 @@ import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest; import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse; import org.apache.kafka.common.requests.RequestHeader; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -48,28 +52,37 @@ import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Optional; +import java.util.Properties; import java.util.concurrent.BlockingQueue; -import static org.apache.kafka.clients.consumer.internals.ConsumerTestBuilder.DEFAULT_GROUP_ID; import static org.apache.kafka.clients.consumer.internals.ConsumerTestBuilder.DEFAULT_GROUP_INSTANCE_ID; import static org.apache.kafka.clients.consumer.internals.ConsumerTestBuilder.DEFAULT_HEARTBEAT_INTERVAL_MS; import static org.apache.kafka.clients.consumer.internals.ConsumerTestBuilder.DEFAULT_MAX_POLL_INTERVAL_MS; -import static org.apache.kafka.clients.consumer.internals.ConsumerTestBuilder.DEFAULT_RETRY_BACKOFF_MS; import static org.apache.kafka.clients.consumer.internals.ConsumerTestBuilder.DEFAULT_REMOTE_ASSIGNOR; +import static org.apache.kafka.clients.consumer.internals.ConsumerTestBuilder.DEFAULT_RETRY_BACKOFF_MAX_MS; +import static org.apache.kafka.clients.consumer.internals.ConsumerTestBuilder.DEFAULT_RETRY_BACKOFF_MS; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; public class HeartbeatRequestManagerTest { + private long retryBackoffMs = DEFAULT_RETRY_BACKOFF_MS; + private int heartbeatIntervalMs = DEFAULT_HEARTBEAT_INTERVAL_MS; + private int maxPollIntervalMs = DEFAULT_MAX_POLL_INTERVAL_MS; + private long retryBackoffMaxMs = DEFAULT_RETRY_BACKOFF_MAX_MS; + private static final String DEFAULT_GROUP_ID = "groupId"; private ConsumerTestBuilder testBuilder; private Time time; + private Timer pollTimer; + private ConsumerConfig config; private CoordinatorRequestManager coordinatorRequestManager; private SubscriptionState subscriptions; private Metadata metadata; @@ -99,6 +112,7 @@ public class HeartbeatRequestManagerTest { subscriptions = testBuilder.subscriptions; membershipManager = testBuilder.membershipManager.orElseThrow(IllegalStateException::new); metadata = testBuilder.metadata; + config = testBuilder.config; when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(new Node(1, "localhost", 9999))); } @@ -509,6 +523,41 @@ public class HeartbeatRequestManagerTest { assertEquals(MemberState.ACKNOWLEDGING, membershipManager.state()); } + @Test + public void testPollTimerExpiration() { + heartbeatRequestManager = createHeartbeatRequestManager(); + when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(new Node(1, "localhost", 9999))); + when(membershipManager.shouldSkipHeartbeat()).thenReturn(false); + + time.sleep(maxPollIntervalMs); + assertLeaveGroup(heartbeatRequestManager); + verify(membershipManager).transitionToStaled(); + + assertNoHeartbeat(heartbeatRequestManager); + heartbeatRequestManager.resetPollTimer(); + assertTrue(pollTimer.notExpired()); + assertHeartbeat(heartbeatRequestManager); + } + + private void assertHeartbeat(HeartbeatRequestManager hrm) { + NetworkClientDelegate.PollResult pollResult = hrm.poll(time.milliseconds()); + assertEquals(1, pollResult.unsentRequests.size()); + assertEquals(DEFAULT_HEARTBEAT_INTERVAL_MS, pollResult.timeUntilNextPollMs); + pollResult.unsentRequests.get(0).handler().onComplete(createHeartbeatResponse(pollResult.unsentRequests.get(0), + Errors.NONE)); + } + + private void assertNoHeartbeat(HeartbeatRequestManager hrm) { + NetworkClientDelegate.PollResult pollResult = hrm.poll(time.milliseconds()); + assertEquals(0, pollResult.unsentRequests.size()); + } + + private NetworkClientDelegate.PollResult assertLeaveGroup(HeartbeatRequestManager hrm) { + NetworkClientDelegate.PollResult pollResult = hrm.poll(time.milliseconds()); + assertEquals(1, pollResult.unsentRequests.size()); + return pollResult; + } + private void mockStableMember() { membershipManager.onSubscriptionUpdated(); // Heartbeat response without assignment to set the state to STABLE. @@ -591,4 +640,43 @@ public class HeartbeatRequestManagerTest { null, response); } + + private ConsumerConfig config() { + Properties prop = new Properties(); + prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + prop.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999"); + + prop.setProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, String.valueOf(maxPollIntervalMs)); + prop.setProperty(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG, String.valueOf(retryBackoffMs)); + prop.setProperty(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG, String.valueOf(retryBackoffMaxMs)); + prop.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, String.valueOf(heartbeatIntervalMs)); + return new ConsumerConfig(prop); + } + + private HeartbeatRequestManager createHeartbeatRequestManager() { + LogContext logContext = new LogContext(); + coordinatorRequestManager = mock(CoordinatorRequestManager.class); + subscriptions = mock(SubscriptionState.class); + membershipManager = mock(MembershipManager.class); + backgroundEventHandler = mock(BackgroundEventHandler.class); + heartbeatState = new HeartbeatRequestManager.HeartbeatState(subscriptions, membershipManager, maxPollIntervalMs); + heartbeatRequestState = new HeartbeatRequestManager.HeartbeatRequestState( + logContext, + time, + heartbeatIntervalMs, + retryBackoffMs, + retryBackoffMaxMs, + 0); + pollTimer = time.timer(maxPollIntervalMs); + return new HeartbeatRequestManager( + logContext, + pollTimer, + config(), + coordinatorRequestManager, + membershipManager, + heartbeatState, + heartbeatRequestState, + backgroundEventHandler); + } } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java index 30b1c1aea26..9f03b130dca 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java @@ -42,6 +42,7 @@ import java.util.TreeSet; import java.util.concurrent.CompletableFuture; import java.util.stream.Collectors; +import static org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotEquals; @@ -1044,6 +1045,22 @@ public class MembershipManagerImplTest { assertEquals(0, rebalanceListener.revokedCount); } + @Test + public void testTransitionToStaled() { + MembershipManager membershipManager = memberJoinWithAssignment("topic", Uuid.randomUuid()); + membershipManager.transitionToStaled(); + assertEquals(LEAVE_GROUP_MEMBER_EPOCH, membershipManager.memberEpoch()); + assertTrue(membershipManager.currentAssignment().isEmpty()); + } + + @Test + public void testHeartbeatSentOnStaledMember() { + MembershipManagerImpl membershipManager = createMemberInStableState(); + membershipManager.transitionToStaled(); + membershipManager.onHeartbeatRequestSent(); + assertEquals(MemberState.JOINING, membershipManager.state()); + } + private MembershipManagerImpl mockMemberSuccessfullyReceivesAndAcksAssignment( Uuid topicId, String topicName, List partitions) { MembershipManagerImpl membershipManager = createMembershipManagerJoiningGroup(); @@ -1383,4 +1400,14 @@ public class MembershipManagerImplTest { .setPartitions(Arrays.asList(3, 4, 5)) )); } + + private MembershipManager memberJoinWithAssignment(String topicName, Uuid topicId) { + MembershipManagerImpl membershipManager = mockJoinAndReceiveAssignment(true); + membershipManager.onHeartbeatRequestSent(); + when(metadata.topicNames()).thenReturn(Collections.singletonMap(topicId, topicName)); + receiveAssignment(topicId, Collections.singletonList(0), membershipManager); + membershipManager.onHeartbeatRequestSent(); + assertFalse(membershipManager.currentAssignment().isEmpty()); + return membershipManager; + } } diff --git a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala index 5631c27721d..b7768097b07 100644 --- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala +++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala @@ -948,6 +948,7 @@ class PlaintextConsumerTest extends BaseConsumerTest { // 1 consumer using range assignment this.consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "range-group") this.consumerConfig.setProperty(ConsumerConfig.GROUP_REMOTE_ASSIGNOR_CONFIG, "range") + this.consumerConfig.setProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "30000") val consumer = createConsumer() // create two new topics, each having 2 partitions