From d88a97adef684a0f5403c46f7fb2f8d1723eebd5 Mon Sep 17 00:00:00 2001 From: "Matthias J. Sax" Date: Wed, 13 Mar 2024 21:52:25 -0700 Subject: [PATCH] MINOR: simplify consumer logic (#15519) For static member, the `group.instance.id` cannot change. Reviewers: Chia-Ping Tsai , Lianet Magrans , David Jacot --- .../consumer/internals/HeartbeatRequestManager.java | 12 ++---------- 1 file changed, 2 insertions(+), 10 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java index d2a7205d5f3..19e4a8b7132 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java @@ -529,14 +529,8 @@ public class HeartbeatRequestManager implements RequestManager { // MemberEpoch - always sent data.setMemberEpoch(membershipManager.memberEpoch()); - // InstanceId - only sent if has changed since the last heartbeat - // Always send when leaving the group as a static member - membershipManager.groupInstanceId().ifPresent(groupInstanceId -> { - if (!groupInstanceId.equals(sentFields.instanceId) || membershipManager.memberEpoch() == ConsumerGroupHeartbeatRequest.LEAVE_GROUP_STATIC_MEMBER_EPOCH) { - data.setInstanceId(groupInstanceId); - sentFields.instanceId = groupInstanceId; - } - }); + // InstanceId - set if present + membershipManager.groupInstanceId().ifPresent(data::setInstanceId); // RebalanceTimeoutMs - only sent if has changed since the last heartbeat if (sentFields.rebalanceTimeoutMs != rebalanceTimeoutMs) { @@ -593,7 +587,6 @@ public class HeartbeatRequestManager implements RequestManager { // Fields of ConsumerHeartbeatRequest sent in the most recent request static class SentFields { - private String instanceId = null; private int rebalanceTimeoutMs = -1; private TreeSet subscribedTopicNames = null; private String serverAssignor = null; @@ -602,7 +595,6 @@ public class HeartbeatRequestManager implements RequestManager { SentFields() {} void reset() { - instanceId = null; rebalanceTimeoutMs = -1; subscribedTopicNames = null; serverAssignor = null;