KAFKA-18185: remove internal.leave.group.on.close config (#19400)
CI / build (push) Waiting to run Details

JIRA: KAFKA-18185

This is a follow-up of #17614   The patch is to remove the
`internal.leave.group.on.close` config.

Reviewers: Sophie Blee-Goldman <ableegoldman@gmail.com>, Chia-Ping Tsai
 <chia7712@gmail.com>, Bill Bejeck <bbejeck@apache.org>
This commit is contained in:
TengYao Chi 2025-10-04 01:21:38 +08:00 committed by GitHub
parent 28e7803037
commit 68f1da8474
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
18 changed files with 62 additions and 192 deletions

View File

@ -46,7 +46,6 @@ public class GroupRebalanceConfig {
public final Optional<String> rackId;
public final long retryBackoffMs;
public final long retryBackoffMaxMs;
public final boolean leaveGroupOnClose;
public GroupRebalanceConfig(AbstractConfig config, ProtocolType protocolType) {
this.sessionTimeoutMs = config.getInt(CommonClientConfigs.SESSION_TIMEOUT_MS_CONFIG);
@ -80,13 +79,6 @@ public class GroupRebalanceConfig {
this.retryBackoffMs = config.getLong(CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG);
this.retryBackoffMaxMs = config.getLong(CommonClientConfigs.RETRY_BACKOFF_MAX_MS_CONFIG);
// Internal leave group config is only defined in Consumer.
if (protocolType == ProtocolType.CONSUMER) {
this.leaveGroupOnClose = config.getBoolean("internal.leave.group.on.close");
} else {
this.leaveGroupOnClose = true;
}
}
// For testing purpose.
@ -97,8 +89,7 @@ public class GroupRebalanceConfig {
Optional<String> groupInstanceId,
String rackId,
long retryBackoffMs,
long retryBackoffMaxMs,
boolean leaveGroupOnClose) {
long retryBackoffMaxMs) {
this.sessionTimeoutMs = sessionTimeoutMs;
this.rebalanceTimeoutMs = rebalanceTimeoutMs;
this.heartbeatIntervalMs = heartbeatIntervalMs;
@ -107,6 +98,5 @@ public class GroupRebalanceConfig {
this.rackId = rackId == null || rackId.isEmpty() ? Optional.empty() : Optional.of(rackId);
this.retryBackoffMs = retryBackoffMs;
this.retryBackoffMaxMs = retryBackoffMaxMs;
this.leaveGroupOnClose = leaveGroupOnClose;
}
}

View File

@ -330,17 +330,6 @@ public class ConsumerConfig extends AbstractConfig {
"be excluded from the subscription. It is always possible to explicitly subscribe to an internal topic.";
public static final boolean DEFAULT_EXCLUDE_INTERNAL_TOPICS = true;
/**
* <code>internal.leave.group.on.close</code>
* Whether or not the consumer should leave the group on close. If set to <code>false</code> then a rebalance
* won't occur until <code>session.timeout.ms</code> expires.
*
* <p>
* Note: this is an internal configuration and could be changed in the future in a backward incompatible way
*
*/
static final String LEAVE_GROUP_ON_CLOSE_CONFIG = "internal.leave.group.on.close";
/**
* <code>internal.throw.on.fetch.stable.offset.unsupported</code>
* Whether or not the consumer should throw when the new stable offset feature is supported.
@ -634,10 +623,6 @@ public class ConsumerConfig extends AbstractConfig {
DEFAULT_EXCLUDE_INTERNAL_TOPICS,
Importance.MEDIUM,
EXCLUDE_INTERNAL_TOPICS_DOC)
.defineInternal(LEAVE_GROUP_ON_CLOSE_CONFIG,
Type.BOOLEAN,
true,
Importance.LOW)
.defineInternal(THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED,
Type.BOOLEAN,
false,

View File

@ -1170,7 +1170,7 @@ public abstract class AbstractCoordinator implements Closeable {
public synchronized RequestFuture<Void> maybeLeaveGroup(CloseOptions.GroupMembershipOperation membershipOperation, String leaveReason) {
RequestFuture<Void> future = null;
if (rebalanceConfig.leaveGroupOnClose && shouldSendLeaveGroupRequest(membershipOperation)) {
if (shouldSendLeaveGroupRequest(membershipOperation)) {
log.info("Member {} sending LeaveGroup request to coordinator {} due to {}",
generation.memberId, coordinator, leaveReason);
LeaveGroupRequest.Builder request = new LeaveGroupRequest.Builder(

View File

@ -331,8 +331,7 @@ public class ClassicKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
groupInstanceId,
rackId,
retryBackoffMs,
retryBackoffMaxMs,
true
retryBackoffMaxMs
);
this.coordinator = new ConsumerCoordinator(
rebalanceConfig,

View File

@ -421,7 +421,7 @@ public class ConsumerMembershipManager extends AbstractMembershipManager<Consume
@Override
public boolean isLeavingGroup() {
CloseOptions.GroupMembershipOperation leaveGroupOperation = leaveGroupOperation();
if (REMAIN_IN_GROUP == leaveGroupOperation) {
if (REMAIN_IN_GROUP == leaveGroupOperation && groupInstanceId.isEmpty()) {
return false;
}
@ -432,7 +432,8 @@ public class ConsumerMembershipManager extends AbstractMembershipManager<Consume
boolean hasLeaveOperation = DEFAULT == leaveGroupOperation ||
// Leave operation: both static and dynamic consumers will send a leave heartbeat
LEAVE_GROUP == leaveGroupOperation ||
// Remain in group: only static consumers will send a leave heartbeat, while dynamic members will not
// Remain in group: static consumers will send a leave heartbeat with -2 epoch to reflect that a member using the given
// instance id decided to leave the group and would be back within the session timeout.
groupInstanceId().isPresent();
return isLeavingState && hasLeaveOperation;

View File

@ -135,12 +135,7 @@ public class AbstractCoordinatorTest {
Optional.empty(), Optional.empty());
}
private void setupCoordinator(int retryBackoffMs, int retryBackoffMaxMs, int rebalanceTimeoutMs, Optional<String> groupInstanceId, Optional<Supplier<BaseHeartbeatThread>> heartbeatThreadSupplier) {
setupCoordinator(retryBackoffMs, retryBackoffMaxMs, rebalanceTimeoutMs, groupInstanceId, heartbeatThreadSupplier, groupInstanceId.isEmpty());
}
private void setupCoordinator(int retryBackoffMs, int retryBackoffMaxMs, int rebalanceTimeoutMs, Optional<String> groupInstanceId, Optional<Supplier<BaseHeartbeatThread>> heartbeatThreadSupplier, boolean leaveOnClose) {
LogContext logContext = new LogContext();
this.mockTime = new MockTime();
ConsumerMetadata metadata = new ConsumerMetadata(retryBackoffMs, retryBackoffMaxMs, 60 * 60 * 1000L,
@ -168,8 +163,7 @@ public class AbstractCoordinatorTest {
groupInstanceId,
null,
retryBackoffMs,
retryBackoffMaxMs,
leaveOnClose);
retryBackoffMaxMs);
this.coordinator = new DummyCoordinator(rebalanceConfig,
consumerClient,
metrics,
@ -1109,7 +1103,7 @@ public class AbstractCoordinatorTest {
@ParameterizedTest
@MethodSource("groupInstanceIdAndMembershipOperationMatrix")
public void testLeaveGroupSentWithGroupInstanceIdUnSetAndDifferentGroupMembershipOperation(Optional<String> groupInstanceId, CloseOptions.GroupMembershipOperation operation) {
checkLeaveGroupRequestSent(groupInstanceId, operation, Optional.empty(), true);
checkLeaveGroupRequestSent(groupInstanceId, operation, Optional.empty());
}
private static Stream<Arguments> groupInstanceIdAndMembershipOperationMatrix() {
@ -1124,11 +1118,11 @@ public class AbstractCoordinatorTest {
}
private void checkLeaveGroupRequestSent(Optional<String> groupInstanceId) {
checkLeaveGroupRequestSent(groupInstanceId, CloseOptions.GroupMembershipOperation.DEFAULT, Optional.empty(), groupInstanceId.isEmpty());
checkLeaveGroupRequestSent(groupInstanceId, CloseOptions.GroupMembershipOperation.DEFAULT, Optional.empty());
}
private void checkLeaveGroupRequestSent(Optional<String> groupInstanceId, CloseOptions.GroupMembershipOperation operation, Optional<Supplier<BaseHeartbeatThread>> heartbeatThreadSupplier, boolean leaveOnClose) {
setupCoordinator(RETRY_BACKOFF_MS, RETRY_BACKOFF_MAX_MS, Integer.MAX_VALUE, groupInstanceId, heartbeatThreadSupplier, leaveOnClose);
private void checkLeaveGroupRequestSent(Optional<String> groupInstanceId, CloseOptions.GroupMembershipOperation operation, Optional<Supplier<BaseHeartbeatThread>> heartbeatThreadSupplier) {
setupCoordinator(RETRY_BACKOFF_MS, RETRY_BACKOFF_MAX_MS, Integer.MAX_VALUE, groupInstanceId, heartbeatThreadSupplier);
mockClient.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
mockClient.prepareResponse(joinGroupFollowerResponse(1, memberId, leaderId, Errors.NONE));

View File

@ -224,8 +224,7 @@ public abstract class ConsumerCoordinatorTest {
groupInstanceId,
rackId,
retryBackoffMs,
retryBackoffMaxMs,
groupInstanceId.isEmpty());
retryBackoffMaxMs);
}
@AfterEach

View File

@ -47,8 +47,7 @@ public class HeartbeatTest {
Optional.empty(),
null,
retryBackoffMs,
retryBackoffMaxMs,
true);
retryBackoffMaxMs);
heartbeat = new Heartbeat(rebalanceConfig, time);
}

View File

@ -149,8 +149,7 @@ public class WorkerCoordinatorIncrementalTest {
Optional.empty(),
null,
retryBackoffMs,
retryBackoffMaxMs,
true);
retryBackoffMaxMs);
this.coordinator = new WorkerCoordinator(rebalanceConfig,
loggerFactory,
consumerClient,

View File

@ -140,8 +140,7 @@ public class WorkerCoordinatorTest {
Optional.empty(),
null,
retryBackoffMs,
retryBackoffMaxMs,
true);
retryBackoffMaxMs);
this.coordinator = new WorkerCoordinator(rebalanceConfig,
logContext,
consumerClient,

View File

@ -24,7 +24,7 @@ import java.lang.{Long => JLong}
import java.time.{Duration => JDuration}
import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
import java.util.concurrent.{CountDownLatch, ExecutionException, TimeUnit}
import java.util.{Collections, Locale, Optional, Properties}
import java.util.{Collections, Optional, Properties}
import java.{time, util}
import kafka.integration.KafkaServerTestHarness
import kafka.server.KafkaConfig
@ -2355,9 +2355,6 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
defaultConsumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
defaultConsumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, testGroupId)
defaultConsumerConfig.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, testClientId)
// We need to set internal.leave.group.on.close to validate dynamic member removal, but it only works for ClassicConsumer
// After KIP-1092, we can control dynamic member removal for both ClassicConsumer and AsyncConsumer
defaultConsumerConfig.setProperty("internal.leave.group.on.close", "false")
val backgroundConsumerSet = new BackgroundConsumerSet(defaultConsumerConfig)
groupInstanceSet.zip(topicSet).foreach { case (groupInstanceId, topic) =>
@ -2406,14 +2403,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
var testGroupDescription = describeTestGroupResult.describedGroups().get(testGroupId).get()
assertEquals(testGroupId, testGroupDescription.groupId)
assertFalse(testGroupDescription.isSimpleConsumerGroup)
// Although we set `internal.leave.group.on.close` in the consumer, it only works for ClassicConsumer.
// After KIP-1092, we can control dynamic member removal in consumer.close()
if (groupProtocol == GroupProtocol.CLASSIC.name.toLowerCase(Locale.ROOT)) {
assertEquals(3, testGroupDescription.members().size())
} else if (groupProtocol == GroupProtocol.CONSUMER.name.toLowerCase(Locale.ROOT)) {
assertEquals(2, testGroupDescription.members().size())
}
assertEquals(2, testGroupDescription.members().size())
// Test delete one static member
removeMembersResult = client.removeMembersFromConsumerGroup(testGroupId,
@ -2426,11 +2416,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
new DescribeConsumerGroupsOptions().includeAuthorizedOperations(true))
testGroupDescription = describeTestGroupResult.describedGroups().get(testGroupId).get()
if (groupProtocol == GroupProtocol.CLASSIC.name.toLowerCase(Locale.ROOT)) {
assertEquals(2, testGroupDescription.members().size())
} else if (groupProtocol == GroupProtocol.CONSUMER.name.toLowerCase(Locale.ROOT)) {
assertEquals(1, testGroupDescription.members().size())
}
assertEquals(1, testGroupDescription.members().size())
// Delete all active members remaining
removeMembersResult = client.removeMembersFromConsumerGroup(testGroupId, new RemoveMembersFromConsumerGroupOptions())

View File

@ -36,6 +36,9 @@
The <code>KafkaPrincipalBuilder</code> now extends <code>KafkaPrincipalSerde</code>. Force developer to implement <code>KafkaPrincipalSerde</code> interface for custom <code>KafkaPrincipalBuilder</code>.
For further details, please refer to <a href="https://cwiki.apache.org/confluence/x/1gq9F">KIP-1157</a>.
</li>
<li>The behavior of <code>org.apache.kafka.streams.KafkaStreams#removeStreamThread</code> has been changed. The consumer has no longer remove once <code>removeStreamThread</code> finished.
Instead, consumer would be kicked off from the group after <code>org.apache.kafka.streams.processor.internals.StreamThread</code> completes its <code>run</code> function.
</li>
<li>
The support for MX4J library, enabled through <code>kafka_mx4jenable</code> system property, was deprecated and will be removed in Kafka 5.0.
</li>

View File

@ -19,9 +19,6 @@ package org.apache.kafka.streams;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo;
import org.apache.kafka.clients.admin.MemberToRemove;
import org.apache.kafka.clients.admin.RemoveMembersFromConsumerGroupOptions;
import org.apache.kafka.clients.admin.RemoveMembersFromConsumerGroupResult;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.common.KafkaFuture;
@ -491,7 +488,7 @@ public class KafkaStreams implements AutoCloseable {
closeToError();
}
final StreamThread deadThread = (StreamThread) Thread.currentThread();
deadThread.shutdown();
deadThread.shutdown(false);
addStreamThread();
if (throwable instanceof RuntimeException) {
throw (RuntimeException) throwable;
@ -981,7 +978,7 @@ public class KafkaStreams implements AutoCloseable {
// use client id instead of thread client id since this admin client may be shared among threads
adminClient = clientSupplier.getAdmin(applicationConfigs.getAdminConfigs(ClientUtils.adminClientId(clientId)));
metrics = createMetrics(applicationConfigs, time, clientId);
final StreamsClientMetricsDelegatingReporter reporter = new StreamsClientMetricsDelegatingReporter(adminClient, clientId);
metrics.addReporter(reporter);
@ -1139,7 +1136,7 @@ public class KafkaStreams implements AutoCloseable {
return Optional.of(streamThread.getName());
} else {
log.warn("Terminating the new thread because the Kafka Streams client is in state {}", state);
streamThread.shutdown();
streamThread.shutdown(true);
threads.remove(streamThread);
final long cacheSizePerThread = cacheSizePerThread(numLiveStreamThreads());
log.info("Resizing thread cache due to terminating added thread, new cache size per thread is {}", cacheSizePerThread);
@ -1159,6 +1156,10 @@ public class KafkaStreams implements AutoCloseable {
* The removed stream thread is gracefully shut down. This method does not specify which stream
* thread is shut down.
* <p>
* The consumer associated with the stream thread is closed using consumer.close() during the shutdown process.
* Note that this method does not guarantee immediate removal of the consumer from the consumer group.
* The consumer is only kicked off from the group after the stream thread completes its run function.
* <p>
* Since the number of stream threads decreases, the sizes of the caches in the remaining stream
* threads are adapted so that the sum of the cache sizes over all stream threads equals the total
* cache size specified in configuration {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG}.
@ -1171,17 +1172,15 @@ public class KafkaStreams implements AutoCloseable {
}
/**
* Removes one stream thread out of the running stream threads from this Kafka Streams client.
* <p>
* The removed stream thread is gracefully shut down. This method does not specify which stream
* thread is shut down.
* The consumer associated with the stream thread is closed using consumer.close() during the shutdown process.
* Note that this method does not guarantee immediate removal of the consumer from the consumer group.
* The consumer is only kicked off from the group after the stream thread completes its run function.
* <p>
* Since the number of stream threads decreases, the sizes of the caches in the remaining stream
* threads are adapted so that the sum of the cache sizes over all stream threads equals the total
* cache size specified in configuration {@link StreamsConfig#STATESTORE_CACHE_MAX_BYTES_CONFIG}.
*
* @param timeout The length of time to wait for the thread to shut down
* @throws org.apache.kafka.common.errors.TimeoutException if the thread does not stop in time
* @return name of the removed stream thread or empty if a stream thread could not be removed because
* no stream threads are alive
*/
@ -1201,10 +1200,8 @@ public class KafkaStreams implements AutoCloseable {
final boolean callingThreadIsNotCurrentStreamThread = !streamThread.getName().equals(Thread.currentThread().getName());
if (streamThread.isThreadAlive() && (callingThreadIsNotCurrentStreamThread || numLiveStreamThreads() == 1)) {
log.info("Removing StreamThread {}", streamThread.getName());
final Optional<String> groupInstanceID = streamThread.groupInstanceID();
streamThread.requestLeaveGroupDuringShutdown();
streamThread.shutdown();
if (!streamThread.getName().equals(Thread.currentThread().getName())) {
streamThread.shutdown(true);
if (callingThreadIsNotCurrentStreamThread) {
final long remainingTimeMs = timeoutMs - (time.milliseconds() - startMs);
if (remainingTimeMs <= 0 || !streamThread.waitOnThreadState(StreamThread.State.DEAD, remainingTimeMs)) {
log.warn("{} did not shutdown in the allotted time.", streamThread.getName());
@ -1224,46 +1221,6 @@ public class KafkaStreams implements AutoCloseable {
final long cacheSizePerThread = cacheSizePerThread(numLiveStreamThreads());
log.info("Resizing thread cache due to thread removal, new cache size per thread is {}", cacheSizePerThread);
resizeThreadCache(cacheSizePerThread);
if (groupInstanceID.isPresent() && callingThreadIsNotCurrentStreamThread) {
final MemberToRemove memberToRemove = new MemberToRemove(groupInstanceID.get());
final Collection<MemberToRemove> membersToRemove = Collections.singletonList(memberToRemove);
final RemoveMembersFromConsumerGroupResult removeMembersFromConsumerGroupResult =
adminClient.removeMembersFromConsumerGroup(
applicationConfigs.getString(StreamsConfig.APPLICATION_ID_CONFIG),
new RemoveMembersFromConsumerGroupOptions(membersToRemove)
);
try {
final long remainingTimeMs = timeoutMs - (time.milliseconds() - startMs);
removeMembersFromConsumerGroupResult.memberResult(memberToRemove).get(remainingTimeMs, TimeUnit.MILLISECONDS);
} catch (final java.util.concurrent.TimeoutException exception) {
log.error(
String.format(
"Could not remove static member %s from consumer group %s due to a timeout:",
groupInstanceID.get(),
applicationConfigs.getString(StreamsConfig.APPLICATION_ID_CONFIG)
),
exception
);
throw new TimeoutException(exception.getMessage(), exception);
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
} catch (final ExecutionException exception) {
log.error(
String.format(
"Could not remove static member %s from consumer group %s due to:",
groupInstanceID.get(),
applicationConfigs.getString(StreamsConfig.APPLICATION_ID_CONFIG)
),
exception
);
throw new StreamsException(
"Could not remove static member " + groupInstanceID.get()
+ " from consumer group " + applicationConfigs.getString(StreamsConfig.APPLICATION_ID_CONFIG)
+ " for the following reason: ",
exception.getCause()
);
}
}
final long remainingTimeMs = timeoutMs - (time.milliseconds() - startMs);
if (remainingTimeMs <= 0) {
throw new TimeoutException("Thread " + streamThread.getName() + " did not stop in the allotted time");
@ -1496,7 +1453,7 @@ public class KafkaStreams implements AutoCloseable {
return new Thread(() -> {
// notify all the threads to stop; avoid deadlocks by stopping any
// further state reports from the thread since we're shutting down
int numStreamThreads = processStreamThread(StreamThread::shutdown);
int numStreamThreads = processStreamThread(streamThread -> streamThread.shutdown(leaveGroup));
log.info("Shutting down {} stream threads", numStreamThreads);
@ -1516,10 +1473,6 @@ public class KafkaStreams implements AutoCloseable {
}
});
if (leaveGroup) {
processStreamThread(streamThreadLeaveConsumerGroup(timeoutMs));
}
log.info("Shutdown {} stream threads complete", numStreamThreads);
if (globalStreamThread != null) {
@ -1659,33 +1612,6 @@ public class KafkaStreams implements AutoCloseable {
return close(Optional.of(timeoutMs), options.leaveGroup);
}
private Consumer<StreamThread> streamThreadLeaveConsumerGroup(final long remainingTimeMs) {
return thread -> {
final Optional<String> groupInstanceId = thread.groupInstanceID();
if (groupInstanceId.isPresent()) {
log.debug("Sending leave group trigger to removing instance from consumer group: {}.",
groupInstanceId.get());
final MemberToRemove memberToRemove = new MemberToRemove(groupInstanceId.get());
final Collection<MemberToRemove> membersToRemove = Collections.singletonList(memberToRemove);
final RemoveMembersFromConsumerGroupResult removeMembersFromConsumerGroupResult = adminClient
.removeMembersFromConsumerGroup(
applicationConfigs.getString(StreamsConfig.APPLICATION_ID_CONFIG),
new RemoveMembersFromConsumerGroupOptions(membersToRemove)
);
try {
removeMembersFromConsumerGroupResult.memberResult(memberToRemove)
.get(remainingTimeMs, TimeUnit.MILLISECONDS);
} catch (final Exception e) {
final String msg = String.format("Could not remove static member %s from consumer group %s.",
groupInstanceId.get(), applicationConfigs.getString(StreamsConfig.APPLICATION_ID_CONFIG));
log.error(msg, e);
}
}
};
}
/**
* Do a cleanup of the local {@link StateStore} directory ({@link StreamsConfig#STATE_DIR_CONFIG}) by deleting all
* data with regard to the {@link StreamsConfig#APPLICATION_ID_CONFIG application ID}.

View File

@ -1315,7 +1315,6 @@ public class StreamsConfig extends AbstractConfig {
ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1000",
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest",
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false",
"internal.leave.group.on.close", false,
ConsumerConfig.GROUP_PROTOCOL_CONFIG, "classic"
);

View File

@ -17,6 +17,8 @@
package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.consumer.CloseOptions;
import org.apache.kafka.clients.consumer.CloseOptions.GroupMembershipOperation;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
@ -95,6 +97,8 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import static org.apache.kafka.clients.consumer.CloseOptions.GroupMembershipOperation.LEAVE_GROUP;
import static org.apache.kafka.clients.consumer.CloseOptions.GroupMembershipOperation.REMAIN_IN_GROUP;
import static org.apache.kafka.streams.internals.StreamsConfigUtils.eosEnabled;
import static org.apache.kafka.streams.processor.internals.ClientUtils.adminClientId;
import static org.apache.kafka.streams.processor.internals.ClientUtils.consumerClientId;
@ -894,7 +898,7 @@ public class StreamThread extends Thread implements ProcessingThread {
cleanRun = runLoop();
} catch (final Throwable e) {
failedStreamThreadSensor.record();
requestLeaveGroupDuringShutdown();
leaveGroupRequested.set(true);
streamsUncaughtExceptionHandler.accept(e, false);
// Note: the above call currently rethrows the exception, so nothing below this line will be executed
} finally {
@ -1874,10 +1878,13 @@ public class StreamThread extends Thread implements ProcessingThread {
* <p>
* Note that there is nothing to prevent this function from being called multiple times
* (e.g., in testing), hence the state is set only the first time
*
* @param leaveGroup this flag will control whether the consumer will leave the group on close or not
*/
public void shutdown() {
public void shutdown(final boolean leaveGroup) {
log.info("Informed to shut down");
final State oldState = setState(State.PENDING_SHUTDOWN);
leaveGroupRequested.set(leaveGroup);
if (oldState == State.CREATED) {
// The thread may not have been started. Take responsibility for shutting down
completeShutdown(true);
@ -1910,18 +1917,13 @@ public class StreamThread extends Thread implements ProcessingThread {
log.error("Failed to close changelog reader due to the following error:", e);
}
try {
if (leaveGroupRequested.get()) {
mainConsumer.unsubscribe();
}
} catch (final Throwable e) {
log.error("Failed to unsubscribe due to the following error: ", e);
}
try {
mainConsumer.close();
final GroupMembershipOperation membershipOperation = leaveGroupRequested.get() ? LEAVE_GROUP : REMAIN_IN_GROUP;
mainConsumer.close(CloseOptions.groupMembershipOperation(membershipOperation));
} catch (final Throwable e) {
log.error("Failed to close consumer due to the following error:", e);
}
try {
// restore consumer isn't part of a consumer group so we use REMAIN_IN_GROUP to skip any leaveGroup checks
restoreConsumer.close();
} catch (final Throwable e) {
log.error("Failed to close restore consumer due to the following error:", e);
@ -2039,10 +2041,6 @@ public class StreamThread extends Thread implements ProcessingThread {
return groupInstanceID;
}
public void requestLeaveGroupDuringShutdown() {
leaveGroupRequested.set(true);
}
public Map<MetricName, Metric> producerMetrics() {
return taskManager.producerMetrics();
}

View File

@ -19,6 +19,7 @@ package org.apache.kafka.streams;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo;
import org.apache.kafka.clients.admin.MockAdminClient;
import org.apache.kafka.clients.consumer.CloseOptions;
import org.apache.kafka.clients.producer.MockProducer;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaFuture;
@ -309,8 +310,8 @@ public class KafkaStreamsTest {
private void prepareConsumer(final StreamThread thread, final AtomicReference<StreamThread.State> state) {
doAnswer(invocation -> {
supplier.consumer.close();
supplier.restoreConsumer.close();
supplier.consumer.close(CloseOptions.groupMembershipOperation(CloseOptions.GroupMembershipOperation.REMAIN_IN_GROUP));
supplier.restoreConsumer.close(CloseOptions.groupMembershipOperation(CloseOptions.GroupMembershipOperation.REMAIN_IN_GROUP));
for (final MockProducer<byte[], byte[]> producer : supplier.producers) {
producer.close();
}
@ -319,7 +320,7 @@ public class KafkaStreamsTest {
threadStateListenerCapture.getValue().onChange(thread, StreamThread.State.PENDING_SHUTDOWN, StreamThread.State.RUNNING);
threadStateListenerCapture.getValue().onChange(thread, StreamThread.State.DEAD, StreamThread.State.PENDING_SHUTDOWN);
return null;
}).when(thread).shutdown();
}).when(thread).shutdown(false);
}
private void prepareThreadLock(final StreamThread thread) {
@ -570,7 +571,7 @@ public class KafkaStreamsTest {
for (int i = 0; i < NUM_THREADS; i++) {
final StreamThread tmpThread = streams.threads.get(i);
tmpThread.shutdown();
tmpThread.shutdown(false);
waitForCondition(() -> tmpThread.state() == StreamThread.State.DEAD,
"Thread never stopped.");
streams.threads.get(i).join();
@ -789,7 +790,7 @@ public class KafkaStreamsTest {
prepareThreadLock(streamThreadTwo);
try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) {
streams.start();
streamThreadOne.shutdown();
streamThreadOne.shutdown(true);
final Set<ThreadMetadata> threads = streams.metadataForLocalThreads();
assertThat(threads.size(), equalTo(1));
assertThat(threads, hasItem(streamThreadTwo.threadMetadata()));

View File

@ -597,13 +597,6 @@ public class StreamsConfigTest {
assertEquals("50", returnedProps.get(ConsumerConfig.MAX_POLL_RECORDS_CONFIG));
}
@Test
public void shouldSetInternalLeaveGroupOnCloseConfigToFalseInConsumer() {
final StreamsConfig streamsConfig = new StreamsConfig(props);
final Map<String, Object> consumerConfigs = streamsConfig.getMainConsumerConfigs(groupId, clientId, threadIdx);
assertThat(consumerConfigs.get("internal.leave.group.on.close"), is(false));
}
@Test
public void shouldNotSetInternalThrowOnFetchStableOffsetUnsupportedConfigToFalseInConsumerForEosDisabled() {
final Map<String, Object> consumerConfigs = streamsConfig.getMainConsumerConfigs(groupId, clientId, threadIdx);

View File

@ -247,7 +247,7 @@ public class StreamThreadTest {
if (thread.state() != State.CREATED) {
thread.taskManager().shutdown(false);
}
thread.shutdown();
thread.shutdown(true);
thread = null;
}
final Set<Thread> t = Collections.unmodifiableSet(Thread.getAllStackTraces().keySet());
@ -409,7 +409,7 @@ public class StreamThreadTest {
assertEquals(4, stateListener.numChanges);
assertEquals(StreamThread.State.PARTITIONS_ASSIGNED, stateListener.oldState);
thread.shutdown();
thread.shutdown(true);
assertSame(StreamThread.State.PENDING_SHUTDOWN, thread.state());
}
@ -427,13 +427,13 @@ public class StreamThreadTest {
10 * 1000,
"Thread never started.");
thread.shutdown();
thread.shutdown(true);
TestUtils.waitForCondition(
() -> thread.state() == StreamThread.State.DEAD,
10 * 1000,
"Thread never shut down.");
thread.shutdown();
thread.shutdown(true);
assertEquals(thread.state(), StreamThread.State.DEAD);
}
@ -812,7 +812,7 @@ public class StreamThreadTest {
10 * 1000,
"Thread never started.");
thread.shutdown();
thread.shutdown(true);
TestUtils.waitForCondition(
() -> thread.state() == StreamThread.State.DEAD,
10 * 1000,
@ -880,7 +880,7 @@ public class StreamThreadTest {
() -> { }
);
thread.shutdown();
thread.shutdown(true);
// Validate that the scheduled rebalance wasn't reset then set to MAX_VALUE so we
// don't trigger one before we can shut down, since the rebalance must be ended
@ -1390,7 +1390,7 @@ public class StreamThreadTest {
10 * 1000,
"Thread never started.");
thread.shutdown();
thread.shutdown(true);
// even if thread is no longer running, it should still be polling
// as long as the rebalance is still ongoing
@ -1426,7 +1426,7 @@ public class StreamThreadTest {
thread.setStateListener(
(t, newState, oldState) -> {
if (oldState == StreamThread.State.CREATED && newState == StreamThread.State.STARTING) {
thread.shutdown();
thread.shutdown(true);
}
});
thread.run();
@ -1524,7 +1524,7 @@ public class StreamThreadTest {
topologyMetadata.buildAndRewriteTopology();
thread = buildStreamThread(consumer, taskManager, config, topologyMetadata)
.updateThreadMetadata(adminClientId(CLIENT_ID));
thread.shutdown();
thread.shutdown(true);
verify(taskManager).shutdown(true);
}
@ -1542,7 +1542,7 @@ public class StreamThreadTest {
topologyMetadata.buildAndRewriteTopology();
thread = buildStreamThread(consumer, taskManager, config, topologyMetadata)
.updateThreadMetadata(adminClientId(CLIENT_ID));
thread.shutdown();
thread.shutdown(true);
// Execute the run method. Verification of the mock will check that shutdown was only done once
thread.run();
@ -2812,7 +2812,6 @@ public class StreamThreadTest {
assertThat(exceptionHandlerInvoked.get(), is(true));
verify(consumer).subscribe((Collection<String>) any(), any());
verify(consumer).unsubscribe();
}
@ParameterizedTest