From 68f1da84740092dbd5ebb49ae62035174758b98b Mon Sep 17 00:00:00 2001 From: TengYao Chi Date: Sat, 4 Oct 2025 01:21:38 +0800 Subject: [PATCH] KAFKA-18185: remove internal.leave.group.on.close config (#19400) JIRA: KAFKA-18185 This is a follow-up of #17614 The patch is to remove the `internal.leave.group.on.close` config. Reviewers: Sophie Blee-Goldman , Chia-Ping Tsai , Bill Bejeck --- .../kafka/clients/GroupRebalanceConfig.java | 12 +-- .../clients/consumer/ConsumerConfig.java | 15 --- .../internals/AbstractCoordinator.java | 2 +- .../internals/ClassicKafkaConsumer.java | 3 +- .../internals/ConsumerMembershipManager.java | 5 +- .../internals/AbstractCoordinatorTest.java | 16 +-- .../internals/ConsumerCoordinatorTest.java | 3 +- .../consumer/internals/HeartbeatTest.java | 3 +- .../WorkerCoordinatorIncrementalTest.java | 3 +- .../distributed/WorkerCoordinatorTest.java | 3 +- .../api/PlaintextAdminIntegrationTest.scala | 20 +--- docs/upgrade.html | 3 + .../apache/kafka/streams/KafkaStreams.java | 100 +++--------------- .../apache/kafka/streams/StreamsConfig.java | 1 - .../processor/internals/StreamThread.java | 26 +++-- .../kafka/streams/KafkaStreamsTest.java | 11 +- .../kafka/streams/StreamsConfigTest.java | 7 -- .../processor/internals/StreamThreadTest.java | 21 ++-- 18 files changed, 62 insertions(+), 192 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/GroupRebalanceConfig.java b/clients/src/main/java/org/apache/kafka/clients/GroupRebalanceConfig.java index fce243ebc64..9162fa4d4c0 100644 --- a/clients/src/main/java/org/apache/kafka/clients/GroupRebalanceConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/GroupRebalanceConfig.java @@ -46,7 +46,6 @@ public class GroupRebalanceConfig { public final Optional rackId; public final long retryBackoffMs; public final long retryBackoffMaxMs; - public final boolean leaveGroupOnClose; public GroupRebalanceConfig(AbstractConfig config, ProtocolType protocolType) { this.sessionTimeoutMs = config.getInt(CommonClientConfigs.SESSION_TIMEOUT_MS_CONFIG); @@ -80,13 +79,6 @@ public class GroupRebalanceConfig { this.retryBackoffMs = config.getLong(CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG); this.retryBackoffMaxMs = config.getLong(CommonClientConfigs.RETRY_BACKOFF_MAX_MS_CONFIG); - - // Internal leave group config is only defined in Consumer. - if (protocolType == ProtocolType.CONSUMER) { - this.leaveGroupOnClose = config.getBoolean("internal.leave.group.on.close"); - } else { - this.leaveGroupOnClose = true; - } } // For testing purpose. @@ -97,8 +89,7 @@ public class GroupRebalanceConfig { Optional groupInstanceId, String rackId, long retryBackoffMs, - long retryBackoffMaxMs, - boolean leaveGroupOnClose) { + long retryBackoffMaxMs) { this.sessionTimeoutMs = sessionTimeoutMs; this.rebalanceTimeoutMs = rebalanceTimeoutMs; this.heartbeatIntervalMs = heartbeatIntervalMs; @@ -107,6 +98,5 @@ public class GroupRebalanceConfig { this.rackId = rackId == null || rackId.isEmpty() ? Optional.empty() : Optional.of(rackId); this.retryBackoffMs = retryBackoffMs; this.retryBackoffMaxMs = retryBackoffMaxMs; - this.leaveGroupOnClose = leaveGroupOnClose; } } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java index ee62dc9561b..43e793f4af1 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java @@ -330,17 +330,6 @@ public class ConsumerConfig extends AbstractConfig { "be excluded from the subscription. It is always possible to explicitly subscribe to an internal topic."; public static final boolean DEFAULT_EXCLUDE_INTERNAL_TOPICS = true; - /** - * internal.leave.group.on.close - * Whether or not the consumer should leave the group on close. If set to false then a rebalance - * won't occur until session.timeout.ms expires. - * - *

- * Note: this is an internal configuration and could be changed in the future in a backward incompatible way - * - */ - static final String LEAVE_GROUP_ON_CLOSE_CONFIG = "internal.leave.group.on.close"; - /** * internal.throw.on.fetch.stable.offset.unsupported * Whether or not the consumer should throw when the new stable offset feature is supported. @@ -634,10 +623,6 @@ public class ConsumerConfig extends AbstractConfig { DEFAULT_EXCLUDE_INTERNAL_TOPICS, Importance.MEDIUM, EXCLUDE_INTERNAL_TOPICS_DOC) - .defineInternal(LEAVE_GROUP_ON_CLOSE_CONFIG, - Type.BOOLEAN, - true, - Importance.LOW) .defineInternal(THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED, Type.BOOLEAN, false, diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java index a07e12a518a..4098a1fea88 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java @@ -1170,7 +1170,7 @@ public abstract class AbstractCoordinator implements Closeable { public synchronized RequestFuture maybeLeaveGroup(CloseOptions.GroupMembershipOperation membershipOperation, String leaveReason) { RequestFuture future = null; - if (rebalanceConfig.leaveGroupOnClose && shouldSendLeaveGroupRequest(membershipOperation)) { + if (shouldSendLeaveGroupRequest(membershipOperation)) { log.info("Member {} sending LeaveGroup request to coordinator {} due to {}", generation.memberId, coordinator, leaveReason); LeaveGroupRequest.Builder request = new LeaveGroupRequest.Builder( diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java index 787d710535e..c227a9511b7 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java @@ -331,8 +331,7 @@ public class ClassicKafkaConsumer implements ConsumerDelegate { groupInstanceId, rackId, retryBackoffMs, - retryBackoffMaxMs, - true + retryBackoffMaxMs ); this.coordinator = new ConsumerCoordinator( rebalanceConfig, diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManager.java index 25e523c3a0d..82c209ac128 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManager.java @@ -421,7 +421,7 @@ public class ConsumerMembershipManager extends AbstractMembershipManager groupInstanceId, Optional> heartbeatThreadSupplier) { - setupCoordinator(retryBackoffMs, retryBackoffMaxMs, rebalanceTimeoutMs, groupInstanceId, heartbeatThreadSupplier, groupInstanceId.isEmpty()); - } - - private void setupCoordinator(int retryBackoffMs, int retryBackoffMaxMs, int rebalanceTimeoutMs, Optional groupInstanceId, Optional> heartbeatThreadSupplier, boolean leaveOnClose) { LogContext logContext = new LogContext(); this.mockTime = new MockTime(); ConsumerMetadata metadata = new ConsumerMetadata(retryBackoffMs, retryBackoffMaxMs, 60 * 60 * 1000L, @@ -168,8 +163,7 @@ public class AbstractCoordinatorTest { groupInstanceId, null, retryBackoffMs, - retryBackoffMaxMs, - leaveOnClose); + retryBackoffMaxMs); this.coordinator = new DummyCoordinator(rebalanceConfig, consumerClient, metrics, @@ -1109,7 +1103,7 @@ public class AbstractCoordinatorTest { @ParameterizedTest @MethodSource("groupInstanceIdAndMembershipOperationMatrix") public void testLeaveGroupSentWithGroupInstanceIdUnSetAndDifferentGroupMembershipOperation(Optional groupInstanceId, CloseOptions.GroupMembershipOperation operation) { - checkLeaveGroupRequestSent(groupInstanceId, operation, Optional.empty(), true); + checkLeaveGroupRequestSent(groupInstanceId, operation, Optional.empty()); } private static Stream groupInstanceIdAndMembershipOperationMatrix() { @@ -1124,11 +1118,11 @@ public class AbstractCoordinatorTest { } private void checkLeaveGroupRequestSent(Optional groupInstanceId) { - checkLeaveGroupRequestSent(groupInstanceId, CloseOptions.GroupMembershipOperation.DEFAULT, Optional.empty(), groupInstanceId.isEmpty()); + checkLeaveGroupRequestSent(groupInstanceId, CloseOptions.GroupMembershipOperation.DEFAULT, Optional.empty()); } - private void checkLeaveGroupRequestSent(Optional groupInstanceId, CloseOptions.GroupMembershipOperation operation, Optional> heartbeatThreadSupplier, boolean leaveOnClose) { - setupCoordinator(RETRY_BACKOFF_MS, RETRY_BACKOFF_MAX_MS, Integer.MAX_VALUE, groupInstanceId, heartbeatThreadSupplier, leaveOnClose); + private void checkLeaveGroupRequestSent(Optional groupInstanceId, CloseOptions.GroupMembershipOperation operation, Optional> heartbeatThreadSupplier) { + setupCoordinator(RETRY_BACKOFF_MS, RETRY_BACKOFF_MAX_MS, Integer.MAX_VALUE, groupInstanceId, heartbeatThreadSupplier); mockClient.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); mockClient.prepareResponse(joinGroupFollowerResponse(1, memberId, leaderId, Errors.NONE)); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java index 623fd765f39..a839618cf7c 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java @@ -224,8 +224,7 @@ public abstract class ConsumerCoordinatorTest { groupInstanceId, rackId, retryBackoffMs, - retryBackoffMaxMs, - groupInstanceId.isEmpty()); + retryBackoffMaxMs); } @AfterEach diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatTest.java index de7937673c8..2f3a8789299 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatTest.java @@ -47,8 +47,7 @@ public class HeartbeatTest { Optional.empty(), null, retryBackoffMs, - retryBackoffMaxMs, - true); + retryBackoffMaxMs); heartbeat = new Heartbeat(rebalanceConfig, time); } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorIncrementalTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorIncrementalTest.java index f867183a324..047a3b5d4e9 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorIncrementalTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorIncrementalTest.java @@ -149,8 +149,7 @@ public class WorkerCoordinatorIncrementalTest { Optional.empty(), null, retryBackoffMs, - retryBackoffMaxMs, - true); + retryBackoffMaxMs); this.coordinator = new WorkerCoordinator(rebalanceConfig, loggerFactory, consumerClient, diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java index c4bed410abb..46a5b076a33 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java @@ -140,8 +140,7 @@ public class WorkerCoordinatorTest { Optional.empty(), null, retryBackoffMs, - retryBackoffMaxMs, - true); + retryBackoffMaxMs); this.coordinator = new WorkerCoordinator(rebalanceConfig, logContext, consumerClient, diff --git a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala index 4a686f3d4d8..c8e26445922 100644 --- a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala @@ -24,7 +24,7 @@ import java.lang.{Long => JLong} import java.time.{Duration => JDuration} import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger} import java.util.concurrent.{CountDownLatch, ExecutionException, TimeUnit} -import java.util.{Collections, Locale, Optional, Properties} +import java.util.{Collections, Optional, Properties} import java.{time, util} import kafka.integration.KafkaServerTestHarness import kafka.server.KafkaConfig @@ -2355,9 +2355,6 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { defaultConsumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false") defaultConsumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, testGroupId) defaultConsumerConfig.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, testClientId) - // We need to set internal.leave.group.on.close to validate dynamic member removal, but it only works for ClassicConsumer - // After KIP-1092, we can control dynamic member removal for both ClassicConsumer and AsyncConsumer - defaultConsumerConfig.setProperty("internal.leave.group.on.close", "false") val backgroundConsumerSet = new BackgroundConsumerSet(defaultConsumerConfig) groupInstanceSet.zip(topicSet).foreach { case (groupInstanceId, topic) => @@ -2406,14 +2403,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { var testGroupDescription = describeTestGroupResult.describedGroups().get(testGroupId).get() assertEquals(testGroupId, testGroupDescription.groupId) assertFalse(testGroupDescription.isSimpleConsumerGroup) - - // Although we set `internal.leave.group.on.close` in the consumer, it only works for ClassicConsumer. - // After KIP-1092, we can control dynamic member removal in consumer.close() - if (groupProtocol == GroupProtocol.CLASSIC.name.toLowerCase(Locale.ROOT)) { - assertEquals(3, testGroupDescription.members().size()) - } else if (groupProtocol == GroupProtocol.CONSUMER.name.toLowerCase(Locale.ROOT)) { - assertEquals(2, testGroupDescription.members().size()) - } + assertEquals(2, testGroupDescription.members().size()) // Test delete one static member removeMembersResult = client.removeMembersFromConsumerGroup(testGroupId, @@ -2426,11 +2416,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { new DescribeConsumerGroupsOptions().includeAuthorizedOperations(true)) testGroupDescription = describeTestGroupResult.describedGroups().get(testGroupId).get() - if (groupProtocol == GroupProtocol.CLASSIC.name.toLowerCase(Locale.ROOT)) { - assertEquals(2, testGroupDescription.members().size()) - } else if (groupProtocol == GroupProtocol.CONSUMER.name.toLowerCase(Locale.ROOT)) { - assertEquals(1, testGroupDescription.members().size()) - } + assertEquals(1, testGroupDescription.members().size()) // Delete all active members remaining removeMembersResult = client.removeMembersFromConsumerGroup(testGroupId, new RemoveMembersFromConsumerGroupOptions()) diff --git a/docs/upgrade.html b/docs/upgrade.html index b5501bd2a74..745076c0785 100644 --- a/docs/upgrade.html +++ b/docs/upgrade.html @@ -36,6 +36,9 @@ The KafkaPrincipalBuilder now extends KafkaPrincipalSerde. Force developer to implement KafkaPrincipalSerde interface for custom KafkaPrincipalBuilder. For further details, please refer to KIP-1157. +

  • The behavior of org.apache.kafka.streams.KafkaStreams#removeStreamThread has been changed. The consumer has no longer remove once removeStreamThread finished. + Instead, consumer would be kicked off from the group after org.apache.kafka.streams.processor.internals.StreamThread completes its run function. +
  • The support for MX4J library, enabled through kafka_mx4jenable system property, was deprecated and will be removed in Kafka 5.0.
  • diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java index 384a6eeccd2..dbf101fd9af 100644 --- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java +++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java @@ -19,9 +19,6 @@ package org.apache.kafka.streams; import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo; -import org.apache.kafka.clients.admin.MemberToRemove; -import org.apache.kafka.clients.admin.RemoveMembersFromConsumerGroupOptions; -import org.apache.kafka.clients.admin.RemoveMembersFromConsumerGroupResult; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.common.KafkaFuture; @@ -491,7 +488,7 @@ public class KafkaStreams implements AutoCloseable { closeToError(); } final StreamThread deadThread = (StreamThread) Thread.currentThread(); - deadThread.shutdown(); + deadThread.shutdown(false); addStreamThread(); if (throwable instanceof RuntimeException) { throw (RuntimeException) throwable; @@ -981,7 +978,7 @@ public class KafkaStreams implements AutoCloseable { // use client id instead of thread client id since this admin client may be shared among threads adminClient = clientSupplier.getAdmin(applicationConfigs.getAdminConfigs(ClientUtils.adminClientId(clientId))); - + metrics = createMetrics(applicationConfigs, time, clientId); final StreamsClientMetricsDelegatingReporter reporter = new StreamsClientMetricsDelegatingReporter(adminClient, clientId); metrics.addReporter(reporter); @@ -1139,7 +1136,7 @@ public class KafkaStreams implements AutoCloseable { return Optional.of(streamThread.getName()); } else { log.warn("Terminating the new thread because the Kafka Streams client is in state {}", state); - streamThread.shutdown(); + streamThread.shutdown(true); threads.remove(streamThread); final long cacheSizePerThread = cacheSizePerThread(numLiveStreamThreads()); log.info("Resizing thread cache due to terminating added thread, new cache size per thread is {}", cacheSizePerThread); @@ -1159,6 +1156,10 @@ public class KafkaStreams implements AutoCloseable { * The removed stream thread is gracefully shut down. This method does not specify which stream * thread is shut down. *

    + * The consumer associated with the stream thread is closed using consumer.close() during the shutdown process. + * Note that this method does not guarantee immediate removal of the consumer from the consumer group. + * The consumer is only kicked off from the group after the stream thread completes its run function. + *

    * Since the number of stream threads decreases, the sizes of the caches in the remaining stream * threads are adapted so that the sum of the cache sizes over all stream threads equals the total * cache size specified in configuration {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG}. @@ -1171,17 +1172,15 @@ public class KafkaStreams implements AutoCloseable { } /** - * Removes one stream thread out of the running stream threads from this Kafka Streams client. - *

    - * The removed stream thread is gracefully shut down. This method does not specify which stream - * thread is shut down. + * The consumer associated with the stream thread is closed using consumer.close() during the shutdown process. + * Note that this method does not guarantee immediate removal of the consumer from the consumer group. + * The consumer is only kicked off from the group after the stream thread completes its run function. *

    * Since the number of stream threads decreases, the sizes of the caches in the remaining stream * threads are adapted so that the sum of the cache sizes over all stream threads equals the total * cache size specified in configuration {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG}. * * @param timeout The length of time to wait for the thread to shut down - * @throws org.apache.kafka.common.errors.TimeoutException if the thread does not stop in time * @return name of the removed stream thread or empty if a stream thread could not be removed because * no stream threads are alive */ @@ -1201,10 +1200,8 @@ public class KafkaStreams implements AutoCloseable { final boolean callingThreadIsNotCurrentStreamThread = !streamThread.getName().equals(Thread.currentThread().getName()); if (streamThread.isThreadAlive() && (callingThreadIsNotCurrentStreamThread || numLiveStreamThreads() == 1)) { log.info("Removing StreamThread {}", streamThread.getName()); - final Optional groupInstanceID = streamThread.groupInstanceID(); - streamThread.requestLeaveGroupDuringShutdown(); - streamThread.shutdown(); - if (!streamThread.getName().equals(Thread.currentThread().getName())) { + streamThread.shutdown(true); + if (callingThreadIsNotCurrentStreamThread) { final long remainingTimeMs = timeoutMs - (time.milliseconds() - startMs); if (remainingTimeMs <= 0 || !streamThread.waitOnThreadState(StreamThread.State.DEAD, remainingTimeMs)) { log.warn("{} did not shutdown in the allotted time.", streamThread.getName()); @@ -1224,46 +1221,6 @@ public class KafkaStreams implements AutoCloseable { final long cacheSizePerThread = cacheSizePerThread(numLiveStreamThreads()); log.info("Resizing thread cache due to thread removal, new cache size per thread is {}", cacheSizePerThread); resizeThreadCache(cacheSizePerThread); - if (groupInstanceID.isPresent() && callingThreadIsNotCurrentStreamThread) { - final MemberToRemove memberToRemove = new MemberToRemove(groupInstanceID.get()); - final Collection membersToRemove = Collections.singletonList(memberToRemove); - final RemoveMembersFromConsumerGroupResult removeMembersFromConsumerGroupResult = - adminClient.removeMembersFromConsumerGroup( - applicationConfigs.getString(StreamsConfig.APPLICATION_ID_CONFIG), - new RemoveMembersFromConsumerGroupOptions(membersToRemove) - ); - try { - final long remainingTimeMs = timeoutMs - (time.milliseconds() - startMs); - removeMembersFromConsumerGroupResult.memberResult(memberToRemove).get(remainingTimeMs, TimeUnit.MILLISECONDS); - } catch (final java.util.concurrent.TimeoutException exception) { - log.error( - String.format( - "Could not remove static member %s from consumer group %s due to a timeout:", - groupInstanceID.get(), - applicationConfigs.getString(StreamsConfig.APPLICATION_ID_CONFIG) - ), - exception - ); - throw new TimeoutException(exception.getMessage(), exception); - } catch (final InterruptedException e) { - Thread.currentThread().interrupt(); - } catch (final ExecutionException exception) { - log.error( - String.format( - "Could not remove static member %s from consumer group %s due to:", - groupInstanceID.get(), - applicationConfigs.getString(StreamsConfig.APPLICATION_ID_CONFIG) - ), - exception - ); - throw new StreamsException( - "Could not remove static member " + groupInstanceID.get() - + " from consumer group " + applicationConfigs.getString(StreamsConfig.APPLICATION_ID_CONFIG) - + " for the following reason: ", - exception.getCause() - ); - } - } final long remainingTimeMs = timeoutMs - (time.milliseconds() - startMs); if (remainingTimeMs <= 0) { throw new TimeoutException("Thread " + streamThread.getName() + " did not stop in the allotted time"); @@ -1496,7 +1453,7 @@ public class KafkaStreams implements AutoCloseable { return new Thread(() -> { // notify all the threads to stop; avoid deadlocks by stopping any // further state reports from the thread since we're shutting down - int numStreamThreads = processStreamThread(StreamThread::shutdown); + int numStreamThreads = processStreamThread(streamThread -> streamThread.shutdown(leaveGroup)); log.info("Shutting down {} stream threads", numStreamThreads); @@ -1516,10 +1473,6 @@ public class KafkaStreams implements AutoCloseable { } }); - if (leaveGroup) { - processStreamThread(streamThreadLeaveConsumerGroup(timeoutMs)); - } - log.info("Shutdown {} stream threads complete", numStreamThreads); if (globalStreamThread != null) { @@ -1659,33 +1612,6 @@ public class KafkaStreams implements AutoCloseable { return close(Optional.of(timeoutMs), options.leaveGroup); } - private Consumer streamThreadLeaveConsumerGroup(final long remainingTimeMs) { - return thread -> { - final Optional groupInstanceId = thread.groupInstanceID(); - if (groupInstanceId.isPresent()) { - log.debug("Sending leave group trigger to removing instance from consumer group: {}.", - groupInstanceId.get()); - final MemberToRemove memberToRemove = new MemberToRemove(groupInstanceId.get()); - final Collection membersToRemove = Collections.singletonList(memberToRemove); - - final RemoveMembersFromConsumerGroupResult removeMembersFromConsumerGroupResult = adminClient - .removeMembersFromConsumerGroup( - applicationConfigs.getString(StreamsConfig.APPLICATION_ID_CONFIG), - new RemoveMembersFromConsumerGroupOptions(membersToRemove) - ); - - try { - removeMembersFromConsumerGroupResult.memberResult(memberToRemove) - .get(remainingTimeMs, TimeUnit.MILLISECONDS); - } catch (final Exception e) { - final String msg = String.format("Could not remove static member %s from consumer group %s.", - groupInstanceId.get(), applicationConfigs.getString(StreamsConfig.APPLICATION_ID_CONFIG)); - log.error(msg, e); - } - } - }; - } - /** * Do a cleanup of the local {@link StateStore} directory ({@link StreamsConfig#STATE_DIR_CONFIG}) by deleting all * data with regard to the {@link StreamsConfig#APPLICATION_ID_CONFIG application ID}. diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java index 772efe6be99..2050db40443 100644 --- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java +++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java @@ -1315,7 +1315,6 @@ public class StreamsConfig extends AbstractConfig { ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1000", ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest", ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false", - "internal.leave.group.on.close", false, ConsumerConfig.GROUP_PROTOCOL_CONFIG, "classic" ); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index 91511da5ee0..4c16577972b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -17,6 +17,8 @@ package org.apache.kafka.streams.processor.internals; import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.consumer.CloseOptions; +import org.apache.kafka.clients.consumer.CloseOptions.GroupMembershipOperation; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; @@ -95,6 +97,8 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.function.BiConsumer; import java.util.stream.Collectors; +import static org.apache.kafka.clients.consumer.CloseOptions.GroupMembershipOperation.LEAVE_GROUP; +import static org.apache.kafka.clients.consumer.CloseOptions.GroupMembershipOperation.REMAIN_IN_GROUP; import static org.apache.kafka.streams.internals.StreamsConfigUtils.eosEnabled; import static org.apache.kafka.streams.processor.internals.ClientUtils.adminClientId; import static org.apache.kafka.streams.processor.internals.ClientUtils.consumerClientId; @@ -894,7 +898,7 @@ public class StreamThread extends Thread implements ProcessingThread { cleanRun = runLoop(); } catch (final Throwable e) { failedStreamThreadSensor.record(); - requestLeaveGroupDuringShutdown(); + leaveGroupRequested.set(true); streamsUncaughtExceptionHandler.accept(e, false); // Note: the above call currently rethrows the exception, so nothing below this line will be executed } finally { @@ -1874,10 +1878,13 @@ public class StreamThread extends Thread implements ProcessingThread { *

    * Note that there is nothing to prevent this function from being called multiple times * (e.g., in testing), hence the state is set only the first time + * + * @param leaveGroup this flag will control whether the consumer will leave the group on close or not */ - public void shutdown() { + public void shutdown(final boolean leaveGroup) { log.info("Informed to shut down"); final State oldState = setState(State.PENDING_SHUTDOWN); + leaveGroupRequested.set(leaveGroup); if (oldState == State.CREATED) { // The thread may not have been started. Take responsibility for shutting down completeShutdown(true); @@ -1910,18 +1917,13 @@ public class StreamThread extends Thread implements ProcessingThread { log.error("Failed to close changelog reader due to the following error:", e); } try { - if (leaveGroupRequested.get()) { - mainConsumer.unsubscribe(); - } - } catch (final Throwable e) { - log.error("Failed to unsubscribe due to the following error: ", e); - } - try { - mainConsumer.close(); + final GroupMembershipOperation membershipOperation = leaveGroupRequested.get() ? LEAVE_GROUP : REMAIN_IN_GROUP; + mainConsumer.close(CloseOptions.groupMembershipOperation(membershipOperation)); } catch (final Throwable e) { log.error("Failed to close consumer due to the following error:", e); } try { + // restore consumer isn't part of a consumer group so we use REMAIN_IN_GROUP to skip any leaveGroup checks restoreConsumer.close(); } catch (final Throwable e) { log.error("Failed to close restore consumer due to the following error:", e); @@ -2039,10 +2041,6 @@ public class StreamThread extends Thread implements ProcessingThread { return groupInstanceID; } - public void requestLeaveGroupDuringShutdown() { - leaveGroupRequested.set(true); - } - public Map producerMetrics() { return taskManager.producerMetrics(); } diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java index d417b5be2ed..ab292e572e3 100644 --- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java @@ -19,6 +19,7 @@ package org.apache.kafka.streams; import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo; import org.apache.kafka.clients.admin.MockAdminClient; +import org.apache.kafka.clients.consumer.CloseOptions; import org.apache.kafka.clients.producer.MockProducer; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.KafkaFuture; @@ -309,8 +310,8 @@ public class KafkaStreamsTest { private void prepareConsumer(final StreamThread thread, final AtomicReference state) { doAnswer(invocation -> { - supplier.consumer.close(); - supplier.restoreConsumer.close(); + supplier.consumer.close(CloseOptions.groupMembershipOperation(CloseOptions.GroupMembershipOperation.REMAIN_IN_GROUP)); + supplier.restoreConsumer.close(CloseOptions.groupMembershipOperation(CloseOptions.GroupMembershipOperation.REMAIN_IN_GROUP)); for (final MockProducer producer : supplier.producers) { producer.close(); } @@ -319,7 +320,7 @@ public class KafkaStreamsTest { threadStateListenerCapture.getValue().onChange(thread, StreamThread.State.PENDING_SHUTDOWN, StreamThread.State.RUNNING); threadStateListenerCapture.getValue().onChange(thread, StreamThread.State.DEAD, StreamThread.State.PENDING_SHUTDOWN); return null; - }).when(thread).shutdown(); + }).when(thread).shutdown(false); } private void prepareThreadLock(final StreamThread thread) { @@ -570,7 +571,7 @@ public class KafkaStreamsTest { for (int i = 0; i < NUM_THREADS; i++) { final StreamThread tmpThread = streams.threads.get(i); - tmpThread.shutdown(); + tmpThread.shutdown(false); waitForCondition(() -> tmpThread.state() == StreamThread.State.DEAD, "Thread never stopped."); streams.threads.get(i).join(); @@ -789,7 +790,7 @@ public class KafkaStreamsTest { prepareThreadLock(streamThreadTwo); try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) { streams.start(); - streamThreadOne.shutdown(); + streamThreadOne.shutdown(true); final Set threads = streams.metadataForLocalThreads(); assertThat(threads.size(), equalTo(1)); assertThat(threads, hasItem(streamThreadTwo.threadMetadata())); diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java index bd8002782d2..65147a81101 100644 --- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java @@ -597,13 +597,6 @@ public class StreamsConfigTest { assertEquals("50", returnedProps.get(ConsumerConfig.MAX_POLL_RECORDS_CONFIG)); } - @Test - public void shouldSetInternalLeaveGroupOnCloseConfigToFalseInConsumer() { - final StreamsConfig streamsConfig = new StreamsConfig(props); - final Map consumerConfigs = streamsConfig.getMainConsumerConfigs(groupId, clientId, threadIdx); - assertThat(consumerConfigs.get("internal.leave.group.on.close"), is(false)); - } - @Test public void shouldNotSetInternalThrowOnFetchStableOffsetUnsupportedConfigToFalseInConsumerForEosDisabled() { final Map consumerConfigs = streamsConfig.getMainConsumerConfigs(groupId, clientId, threadIdx); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java index e53474db74a..44c4ea9a869 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java @@ -247,7 +247,7 @@ public class StreamThreadTest { if (thread.state() != State.CREATED) { thread.taskManager().shutdown(false); } - thread.shutdown(); + thread.shutdown(true); thread = null; } final Set t = Collections.unmodifiableSet(Thread.getAllStackTraces().keySet()); @@ -409,7 +409,7 @@ public class StreamThreadTest { assertEquals(4, stateListener.numChanges); assertEquals(StreamThread.State.PARTITIONS_ASSIGNED, stateListener.oldState); - thread.shutdown(); + thread.shutdown(true); assertSame(StreamThread.State.PENDING_SHUTDOWN, thread.state()); } @@ -427,13 +427,13 @@ public class StreamThreadTest { 10 * 1000, "Thread never started."); - thread.shutdown(); + thread.shutdown(true); TestUtils.waitForCondition( () -> thread.state() == StreamThread.State.DEAD, 10 * 1000, "Thread never shut down."); - thread.shutdown(); + thread.shutdown(true); assertEquals(thread.state(), StreamThread.State.DEAD); } @@ -812,7 +812,7 @@ public class StreamThreadTest { 10 * 1000, "Thread never started."); - thread.shutdown(); + thread.shutdown(true); TestUtils.waitForCondition( () -> thread.state() == StreamThread.State.DEAD, 10 * 1000, @@ -880,7 +880,7 @@ public class StreamThreadTest { () -> { } ); - thread.shutdown(); + thread.shutdown(true); // Validate that the scheduled rebalance wasn't reset then set to MAX_VALUE so we // don't trigger one before we can shut down, since the rebalance must be ended @@ -1390,7 +1390,7 @@ public class StreamThreadTest { 10 * 1000, "Thread never started."); - thread.shutdown(); + thread.shutdown(true); // even if thread is no longer running, it should still be polling // as long as the rebalance is still ongoing @@ -1426,7 +1426,7 @@ public class StreamThreadTest { thread.setStateListener( (t, newState, oldState) -> { if (oldState == StreamThread.State.CREATED && newState == StreamThread.State.STARTING) { - thread.shutdown(); + thread.shutdown(true); } }); thread.run(); @@ -1524,7 +1524,7 @@ public class StreamThreadTest { topologyMetadata.buildAndRewriteTopology(); thread = buildStreamThread(consumer, taskManager, config, topologyMetadata) .updateThreadMetadata(adminClientId(CLIENT_ID)); - thread.shutdown(); + thread.shutdown(true); verify(taskManager).shutdown(true); } @@ -1542,7 +1542,7 @@ public class StreamThreadTest { topologyMetadata.buildAndRewriteTopology(); thread = buildStreamThread(consumer, taskManager, config, topologyMetadata) .updateThreadMetadata(adminClientId(CLIENT_ID)); - thread.shutdown(); + thread.shutdown(true); // Execute the run method. Verification of the mock will check that shutdown was only done once thread.run(); @@ -2812,7 +2812,6 @@ public class StreamThreadTest { assertThat(exceptionHandlerInvoked.get(), is(true)); verify(consumer).subscribe((Collection) any(), any()); - verify(consumer).unsubscribe(); } @ParameterizedTest