KAFKA-18193 Refactor Kafka Streams CloseOptions to Fluent API Style (#19955)

In Kafka Streams, configuration classes typically follow a fluent API
pattern to ensure a consistent and intuitive developer experience.
However, the current implementation of
`org.apache.kafka.streams.KafkaStreams$CloseOptions` deviates from this
convention by exposing a public constructor, breaking the uniformity
expected across the API.

To address this inconsistency, we propose introducing a new
`CloseOptions` class that adheres to the fluent API style, replacing the
existing implementation. The new class will retain the existing
`timeout(Duration)` and `leaveGroup(boolean)` methods but will enforce
fluent instantiation and configuration. Given the design shift, we will
not maintain backward compatibility with the current class.

This change aligns with the goal of standardizing configuration objects
across Kafka Streams, offering developers a more cohesive and
predictable API.

Reviewers: Bill Bejeck<bbejeck@apache.org>
This commit is contained in:
Ken Huang 2025-10-07 20:50:18 +08:00 committed by GitHub
parent fa2496bb91
commit ebae768bd8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 334 additions and 162 deletions

View File

@ -189,6 +189,13 @@
A new metric <code>AvgIdleRatio</code> has been added to the <code>ControllerEventManager</code> group. This metric measures the average idle ratio of the controller event queue thread,
providing visibility into how much time the controller spends waiting for events versus processing them. The metric value ranges from 0.0 (always busy) to 1.0 (always idle).
</li>
<li>
Deprecated <code>org.apache.kafka.streams.KafkaStreams$CloseOptions</code> and its related methods, such as
<code>KafkaStreams#close(org.apache.kafka.streams.KafkaStreams$CloseOptions)</code>.
As a replacement, please use <code>org.apache.kafka.streams.CloseOptions</code> and
<code>KafkaStreams#close(org.apache.kafka.streams.CloseOptions)</code>.
For further details, please refer to <a href="https://cwiki.apache.org/confluence/x/QAq9F">KIP-1153</a>.
</li>
</ul>
<h4><a id="upgrade_4_1_0" href="#upgrade_4_1_0">Upgrading to 4.1.0</a></h4>

View File

@ -28,8 +28,8 @@ import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
import org.apache.kafka.streams.CloseOptions;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KafkaStreams.CloseOptions;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
@ -159,7 +159,7 @@ public class KafkaStreamsCloseOptionsIntegrationTest {
IntegrationTestUtils.startApplicationAndWaitUntilRunning(streams);
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig, OUTPUT_TOPIC, 10);
streams.close(new CloseOptions().leaveGroup(true).timeout(Duration.ofSeconds(30)));
streams.close(CloseOptions.groupMembershipOperation(CloseOptions.GroupMembershipOperation.LEAVE_GROUP).withTimeout(Duration.ofSeconds(30)));
waitForEmptyConsumerGroup(adminClient, streamsConfig.getProperty(StreamsConfig.APPLICATION_ID_CONFIG), 0);
}

View File

@ -0,0 +1,98 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams;
import java.time.Duration;
import java.util.Objects;
import java.util.Optional;
public class CloseOptions {
/**
* Enum to specify the group membership operation upon closing the Kafka Streams application.
*
* <ul>
* <li><b>{@code LEAVE_GROUP}</b>: means the consumer leave the group.</li>
* <li><b>{@code REMAIN_IN_GROUP}</b>: means the consumer will remain in the group.</li>
* </ul>
*/
public enum GroupMembershipOperation {
LEAVE_GROUP,
REMAIN_IN_GROUP
}
/**
* Specifies the group membership operation upon shutdown.
* By default, {@code GroupMembershipOperation.REMAIN_IN_GROUP} will be applied, which follows the KafkaStreams default behavior.
*/
protected GroupMembershipOperation operation = GroupMembershipOperation.REMAIN_IN_GROUP;
/**
* Specifies the maximum amount of time to wait for the close process to complete.
* This allows users to define a custom timeout for gracefully stopping the KafkaStreams.
*/
protected Optional<Duration> timeout = Optional.of(Duration.ofMillis(Long.MAX_VALUE));
private CloseOptions() {
}
protected CloseOptions(final CloseOptions closeOptions) {
this.operation = closeOptions.operation;
this.timeout = closeOptions.timeout;
}
/**
* Static method to create a {@code CloseOptions} with a custom timeout.
*
* @param timeout the maximum time to wait for the KafkaStreams to close.
* @return a new {@code CloseOptions} instance with the specified timeout.
*/
public static CloseOptions timeout(final Duration timeout) {
return new CloseOptions().withTimeout(timeout);
}
/**
* Static method to create a {@code CloseOptions} with a specified group membership operation.
*
* @param operation the group membership operation to apply. Must be one of {@code LEAVE_GROUP}, {@code REMAIN_IN_GROUP}.
* @return a new {@code CloseOptions} instance with the specified group membership operation.
*/
public static CloseOptions groupMembershipOperation(final GroupMembershipOperation operation) {
return new CloseOptions().withGroupMembershipOperation(operation);
}
/**
* Fluent method to set the timeout for the close process.
*
* @param timeout the maximum time to wait for the KafkaStreams to close. If {@code null}, the default timeout will be used.
* @return this {@code CloseOptions} instance.
*/
public CloseOptions withTimeout(final Duration timeout) {
this.timeout = Optional.ofNullable(timeout);
return this;
}
/**
* Fluent method to set the group membership operation upon shutdown.
*
* @param operation the group membership operation to apply. Must be one of {@code LEAVE_GROUP}, {@code REMAIN_IN_GROUP}.
* @return this {@code CloseOptions} instance.
*/
public CloseOptions withGroupMembershipOperation(final GroupMembershipOperation operation) {
this.operation = Objects.requireNonNull(operation, "operation should not be null");
return this;
}
}

View File

@ -49,6 +49,7 @@ import org.apache.kafka.streams.errors.StreamsStoppedException;
import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler;
import org.apache.kafka.streams.errors.UnknownStateStoreException;
import org.apache.kafka.streams.internals.ClientInstanceIdsImpl;
import org.apache.kafka.streams.internals.CloseOptionsInternal;
import org.apache.kafka.streams.internals.metrics.ClientMetrics;
import org.apache.kafka.streams.internals.metrics.StreamsClientMetricsDelegatingReporter;
import org.apache.kafka.streams.processor.StandbyUpdateListener;
@ -488,7 +489,7 @@ public class KafkaStreams implements AutoCloseable {
closeToError();
}
final StreamThread deadThread = (StreamThread) Thread.currentThread();
deadThread.shutdown(false);
deadThread.shutdown(org.apache.kafka.streams.CloseOptions.GroupMembershipOperation.REMAIN_IN_GROUP);
addStreamThread();
if (throwable instanceof RuntimeException) {
throw (RuntimeException) throwable;
@ -1136,7 +1137,7 @@ public class KafkaStreams implements AutoCloseable {
return Optional.of(streamThread.getName());
} else {
log.warn("Terminating the new thread because the Kafka Streams client is in state {}", state);
streamThread.shutdown(true);
streamThread.shutdown(org.apache.kafka.streams.CloseOptions.GroupMembershipOperation.LEAVE_GROUP);
threads.remove(streamThread);
final long cacheSizePerThread = cacheSizePerThread(numLiveStreamThreads());
log.info("Resizing thread cache due to terminating added thread, new cache size per thread is {}", cacheSizePerThread);
@ -1200,7 +1201,7 @@ public class KafkaStreams implements AutoCloseable {
final boolean callingThreadIsNotCurrentStreamThread = !streamThread.getName().equals(Thread.currentThread().getName());
if (streamThread.isThreadAlive() && (callingThreadIsNotCurrentStreamThread || numLiveStreamThreads() == 1)) {
log.info("Removing StreamThread {}", streamThread.getName());
streamThread.shutdown(true);
streamThread.shutdown(org.apache.kafka.streams.CloseOptions.GroupMembershipOperation.LEAVE_GROUP);
if (callingThreadIsNotCurrentStreamThread) {
final long remainingTimeMs = timeoutMs - (time.milliseconds() - startMs);
if (remainingTimeMs <= 0 || !streamThread.waitOnThreadState(StreamThread.State.DEAD, remainingTimeMs)) {
@ -1418,15 +1419,18 @@ public class KafkaStreams implements AutoCloseable {
/**
* Class that handles options passed in case of {@code KafkaStreams} instance scale down
*/
@Deprecated(since = "4.2")
public static class CloseOptions {
private Duration timeout = Duration.ofMillis(Long.MAX_VALUE);
private boolean leaveGroup = false;
@Deprecated(since = "4.2")
public CloseOptions timeout(final Duration timeout) {
this.timeout = timeout;
return this;
}
@Deprecated(since = "4.2")
public CloseOptions leaveGroup(final boolean leaveGroup) {
this.leaveGroup = leaveGroup;
return this;
@ -1438,10 +1442,14 @@ public class KafkaStreams implements AutoCloseable {
* This will block until all threads have stopped.
*/
public void close() {
close(Optional.empty(), false);
close(Optional.empty(), org.apache.kafka.streams.CloseOptions.GroupMembershipOperation.REMAIN_IN_GROUP);
}
private Thread shutdownHelper(final boolean error, final long timeoutMs, final boolean leaveGroup) {
private Thread shutdownHelper(
final boolean error,
final long timeoutMs,
final org.apache.kafka.streams.CloseOptions.GroupMembershipOperation operation
) {
stateDirCleaner.shutdownNow();
if (rocksDBMetricsRecordingService != null) {
rocksDBMetricsRecordingService.shutdownNow();
@ -1453,7 +1461,9 @@ public class KafkaStreams implements AutoCloseable {
return new Thread(() -> {
// notify all the threads to stop; avoid deadlocks by stopping any
// further state reports from the thread since we're shutting down
int numStreamThreads = processStreamThread(streamThread -> streamThread.shutdown(leaveGroup));
int numStreamThreads = processStreamThread(
streamThread -> streamThread.shutdown(operation)
);
log.info("Shutting down {} stream threads", numStreamThreads);
@ -1513,7 +1523,7 @@ public class KafkaStreams implements AutoCloseable {
}, clientId + "-CloseThread");
}
private boolean close(final Optional<Long> timeout, final boolean leaveGroup) {
private boolean close(final Optional<Long> timeout, final org.apache.kafka.streams.CloseOptions.GroupMembershipOperation operation) {
final long timeoutMs;
if (timeout.isPresent()) {
timeoutMs = timeout.get();
@ -1544,7 +1554,7 @@ public class KafkaStreams implements AutoCloseable {
+ "PENDING_SHUTDOWN, PENDING_ERROR, ERROR, or NOT_RUNNING");
}
final Thread shutdownThread = shutdownHelper(false, timeoutMs, leaveGroup);
final Thread shutdownThread = shutdownHelper(false, timeoutMs, operation);
shutdownThread.setDaemon(true);
shutdownThread.start();
@ -1562,7 +1572,7 @@ public class KafkaStreams implements AutoCloseable {
if (!setState(State.PENDING_ERROR)) {
log.info("Skipping shutdown since we are already in {}", state());
} else {
final Thread shutdownThread = shutdownHelper(true, -1, false);
final Thread shutdownThread = shutdownHelper(true, -1, org.apache.kafka.streams.CloseOptions.GroupMembershipOperation.REMAIN_IN_GROUP);
shutdownThread.setDaemon(true);
shutdownThread.start();
@ -1588,12 +1598,13 @@ public class KafkaStreams implements AutoCloseable {
throw new IllegalArgumentException("Timeout can't be negative.");
}
return close(Optional.of(timeoutMs), false);
return close(Optional.of(timeoutMs), org.apache.kafka.streams.CloseOptions.GroupMembershipOperation.REMAIN_IN_GROUP);
}
/**
* Shutdown this {@code KafkaStreams} by signaling all the threads to stop, and then wait up to the timeout for the
* threads to join.
* This method is deprecated and replaced by {@link #close(org.apache.kafka.streams.CloseOptions)}.
* @param options contains timeout to specify how long to wait for the threads to shut down, and a flag leaveGroup to
* trigger consumer leave call
* @return {@code true} if all threads were successfully stopped&mdash;{@code false} if the timeout was reached
@ -1601,15 +1612,36 @@ public class KafkaStreams implements AutoCloseable {
* Note that this method must not be called in the {@link StateListener#onChange(KafkaStreams.State, KafkaStreams.State)} callback of {@link StateListener}.
* @throws IllegalArgumentException if {@code timeout} can't be represented as {@code long milliseconds}
*/
@Deprecated(since = "4.2")
public synchronized boolean close(final CloseOptions options) throws IllegalArgumentException {
final org.apache.kafka.streams.CloseOptions closeOptions = org.apache.kafka.streams.CloseOptions.timeout(options.timeout)
.withGroupMembershipOperation(options.leaveGroup ?
org.apache.kafka.streams.CloseOptions.GroupMembershipOperation.LEAVE_GROUP :
org.apache.kafka.streams.CloseOptions.GroupMembershipOperation.REMAIN_IN_GROUP);
return close(closeOptions);
}
/**
* Shutdown this {@code KafkaStreams} by signaling all the threads to stop, and then wait up to the timeout for the
* threads to join.
* @param options contains timeout to specify how long to wait for the threads to shut down,
* and a {@link org.apache.kafka.streams.CloseOptions.GroupMembershipOperation}
* to trigger consumer leave call or remain in the group
* @return {@code true} if all threads were successfully stopped&mdash;{@code false} if the timeout was reached
* before all threads stopped
* Note that this method must not be called in the {@link StateListener#onChange(KafkaStreams.State, KafkaStreams.State)} callback of {@link StateListener}.
* @throws IllegalArgumentException if {@code timeout} can't be represented as {@code long milliseconds}
*/
public synchronized boolean close(final org.apache.kafka.streams.CloseOptions options) throws IllegalArgumentException {
Objects.requireNonNull(options, "options cannot be null");
final String msgPrefix = prepareMillisCheckFailMsgPrefix(options.timeout, "timeout");
final long timeoutMs = validateMillisecondDuration(options.timeout, msgPrefix);
final CloseOptionsInternal optionsInternal = new CloseOptionsInternal(options);
final String msgPrefix = prepareMillisCheckFailMsgPrefix(optionsInternal.timeout(), "timeout");
final long timeoutMs = validateMillisecondDuration(optionsInternal.timeout().get(), msgPrefix);
if (timeoutMs < 0) {
throw new IllegalArgumentException("Timeout can't be negative.");
}
return close(Optional.of(timeoutMs), options.leaveGroup);
return close(Optional.of(timeoutMs), optionsInternal.operation());
}
/**

View File

@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.internals;
import org.apache.kafka.streams.CloseOptions;
import java.time.Duration;
import java.util.Optional;
public class CloseOptionsInternal extends CloseOptions {
public CloseOptionsInternal(final CloseOptions options) {
super(options);
}
public GroupMembershipOperation operation() {
return operation;
}
public Optional<Duration> timeout() {
return timeout;
}
}

View File

@ -91,9 +91,9 @@ import java.util.Optional;
import java.util.Queue;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
@ -367,7 +367,8 @@ public class StreamThread extends Thread implements ProcessingThread {
// These are used to signal from outside the stream thread, but the variables themselves are internal to the thread
private final AtomicLong cacheResizeSize = new AtomicLong(-1L);
private final AtomicBoolean leaveGroupRequested = new AtomicBoolean(false);
private final AtomicReference<org.apache.kafka.streams.CloseOptions.GroupMembershipOperation> leaveGroupRequested =
new AtomicReference<>(org.apache.kafka.streams.CloseOptions.GroupMembershipOperation.REMAIN_IN_GROUP);
private final AtomicLong lastShutdownWarningTimestamp = new AtomicLong(0L);
private final boolean eosEnabled;
private final boolean stateUpdaterEnabled;
@ -898,7 +899,7 @@ public class StreamThread extends Thread implements ProcessingThread {
cleanRun = runLoop();
} catch (final Throwable e) {
failedStreamThreadSensor.record();
leaveGroupRequested.set(true);
leaveGroupRequested.set(org.apache.kafka.streams.CloseOptions.GroupMembershipOperation.LEAVE_GROUP);
streamsUncaughtExceptionHandler.accept(e, false);
// Note: the above call currently rethrows the exception, so nothing below this line will be executed
} finally {
@ -1879,12 +1880,12 @@ public class StreamThread extends Thread implements ProcessingThread {
* Note that there is nothing to prevent this function from being called multiple times
* (e.g., in testing), hence the state is set only the first time
*
* @param leaveGroup this flag will control whether the consumer will leave the group on close or not
* @param operation the group membership operation to apply on shutdown. Must be one of LEAVE_GROUP or REMAIN_IN_GROUP.
*/
public void shutdown(final boolean leaveGroup) {
public void shutdown(final org.apache.kafka.streams.CloseOptions.GroupMembershipOperation operation) {
log.info("Informed to shut down");
final State oldState = setState(State.PENDING_SHUTDOWN);
leaveGroupRequested.set(leaveGroup);
leaveGroupRequested.set(operation);
if (oldState == State.CREATED) {
// The thread may not have been started. Take responsibility for shutting down
completeShutdown(true);
@ -1917,7 +1918,8 @@ public class StreamThread extends Thread implements ProcessingThread {
log.error("Failed to close changelog reader due to the following error:", e);
}
try {
final GroupMembershipOperation membershipOperation = leaveGroupRequested.get() ? LEAVE_GROUP : REMAIN_IN_GROUP;
final GroupMembershipOperation membershipOperation =
leaveGroupRequested.get() == org.apache.kafka.streams.CloseOptions.GroupMembershipOperation.LEAVE_GROUP ? LEAVE_GROUP : REMAIN_IN_GROUP;
mainConsumer.close(CloseOptions.groupMembershipOperation(membershipOperation));
} catch (final Throwable e) {
log.error("Failed to close consumer due to the following error:", e);

View File

@ -19,7 +19,6 @@ package org.apache.kafka.streams;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo;
import org.apache.kafka.clients.admin.MockAdminClient;
import org.apache.kafka.clients.consumer.CloseOptions;
import org.apache.kafka.clients.producer.MockProducer;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaFuture;
@ -310,8 +309,12 @@ public class KafkaStreamsTest {
private void prepareConsumer(final StreamThread thread, final AtomicReference<StreamThread.State> state) {
doAnswer(invocation -> {
supplier.consumer.close(CloseOptions.groupMembershipOperation(CloseOptions.GroupMembershipOperation.REMAIN_IN_GROUP));
supplier.restoreConsumer.close(CloseOptions.groupMembershipOperation(CloseOptions.GroupMembershipOperation.REMAIN_IN_GROUP));
supplier.consumer.close(
org.apache.kafka.clients.consumer.CloseOptions.groupMembershipOperation(org.apache.kafka.clients.consumer.CloseOptions.GroupMembershipOperation.REMAIN_IN_GROUP)
);
supplier.restoreConsumer.close(
org.apache.kafka.clients.consumer.CloseOptions.groupMembershipOperation(org.apache.kafka.clients.consumer.CloseOptions.GroupMembershipOperation.REMAIN_IN_GROUP)
);
for (final MockProducer<byte[], byte[]> producer : supplier.producers) {
producer.close();
}
@ -320,7 +323,7 @@ public class KafkaStreamsTest {
threadStateListenerCapture.getValue().onChange(thread, StreamThread.State.PENDING_SHUTDOWN, StreamThread.State.RUNNING);
threadStateListenerCapture.getValue().onChange(thread, StreamThread.State.DEAD, StreamThread.State.PENDING_SHUTDOWN);
return null;
}).when(thread).shutdown(false);
}).when(thread).shutdown(CloseOptions.GroupMembershipOperation.REMAIN_IN_GROUP);
}
private void prepareThreadLock(final StreamThread thread) {
@ -571,7 +574,7 @@ public class KafkaStreamsTest {
for (int i = 0; i < NUM_THREADS; i++) {
final StreamThread tmpThread = streams.threads.get(i);
tmpThread.shutdown(false);
tmpThread.shutdown(CloseOptions.GroupMembershipOperation.REMAIN_IN_GROUP);
waitForCondition(() -> tmpThread.state() == StreamThread.State.DEAD,
"Thread never stopped.");
streams.threads.get(i).join();
@ -790,7 +793,7 @@ public class KafkaStreamsTest {
prepareThreadLock(streamThreadTwo);
try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) {
streams.start();
streamThreadOne.shutdown(true);
streamThreadOne.shutdown(CloseOptions.GroupMembershipOperation.LEAVE_GROUP);
final Set<ThreadMetadata> threads = streams.metadataForLocalThreads();
assertThat(threads.size(), equalTo(1));
assertThat(threads, hasItem(streamThreadTwo.threadMetadata()));
@ -1016,9 +1019,8 @@ public class KafkaStreamsTest {
() -> streams.state() == KafkaStreams.State.RUNNING,
"Streams never started.");
final KafkaStreams.CloseOptions closeOptions = new KafkaStreams.CloseOptions();
closeOptions.timeout(Duration.ZERO);
closeOptions.leaveGroup(true);
final CloseOptions closeOptions = CloseOptions.timeout(Duration.ZERO)
.withGroupMembershipOperation(CloseOptions.GroupMembershipOperation.LEAVE_GROUP);
streams.close(closeOptions);
assertThat(streams.state() == State.PENDING_SHUTDOWN, equalTo(true));
@ -1041,8 +1043,7 @@ public class KafkaStreamsTest {
() -> streams.state() == KafkaStreams.State.RUNNING,
"Streams never started.");
final KafkaStreams.CloseOptions closeOptions = new KafkaStreams.CloseOptions();
closeOptions.timeout(Duration.ZERO);
final CloseOptions closeOptions = CloseOptions.timeout(Duration.ZERO);
streams.close(closeOptions);
assertThat(streams.state() == State.PENDING_SHUTDOWN, equalTo(true));
@ -1229,8 +1230,7 @@ public class KafkaStreamsTest {
prepareStreamThread(streamThreadTwo, 2);
prepareTerminableThread(streamThreadOne);
final KafkaStreams.CloseOptions closeOptions = new KafkaStreams.CloseOptions();
closeOptions.timeout(Duration.ofMillis(10L));
final CloseOptions closeOptions = CloseOptions.timeout(Duration.ofMillis(10L));
try (final KafkaStreams streams = new KafkaStreamsWithTerminableThread(getBuilderWithSource().build(), props, supplier)) {
assertFalse(streams.close(closeOptions));
}
@ -1243,8 +1243,7 @@ public class KafkaStreamsTest {
prepareStreamThread(streamThreadTwo, 2);
prepareTerminableThread(streamThreadOne);
final KafkaStreams.CloseOptions closeOptions = new KafkaStreams.CloseOptions();
closeOptions.timeout(Duration.ofMillis(-1L));
final CloseOptions closeOptions = CloseOptions.timeout(Duration.ofMillis(-1L));
try (final KafkaStreams streams = new KafkaStreamsWithTerminableThread(getBuilderWithSource().build(), props, supplier, time)) {
assertThrows(IllegalArgumentException.class, () -> streams.close(closeOptions));
}
@ -1257,8 +1256,7 @@ public class KafkaStreamsTest {
prepareStreamThread(streamThreadTwo, 2);
prepareTerminableThread(streamThreadOne);
final KafkaStreams.CloseOptions closeOptions = new KafkaStreams.CloseOptions();
closeOptions.timeout(Duration.ZERO);
final CloseOptions closeOptions = CloseOptions.timeout(Duration.ZERO);
try (final KafkaStreams streams = new KafkaStreamsWithTerminableThread(getBuilderWithSource().build(), props, supplier)) {
assertFalse(streams.close(closeOptions));
}
@ -1275,9 +1273,8 @@ public class KafkaStreamsTest {
when(mockClientSupplier.getAdmin(any())).thenReturn(adminClient);
final KafkaStreams.CloseOptions closeOptions = new KafkaStreams.CloseOptions();
closeOptions.timeout(Duration.ofMillis(10L));
closeOptions.leaveGroup(true);
final CloseOptions closeOptions = CloseOptions.timeout(Duration.ofMillis(10L))
.withGroupMembershipOperation(CloseOptions.GroupMembershipOperation.LEAVE_GROUP);
try (final KafkaStreams streams = new KafkaStreamsWithTerminableThread(getBuilderWithSource().build(), props, mockClientSupplier)) {
assertFalse(streams.close(closeOptions));
}
@ -1293,9 +1290,8 @@ public class KafkaStreamsTest {
final MockClientSupplier mockClientSupplier = spy(MockClientSupplier.class);
when(mockClientSupplier.getAdmin(any())).thenReturn(adminClient);
final KafkaStreams.CloseOptions closeOptions = new KafkaStreams.CloseOptions();
closeOptions.timeout(Duration.ofMillis(-1L));
closeOptions.leaveGroup(true);
final CloseOptions closeOptions = CloseOptions.timeout(Duration.ofMillis(-1L))
.withGroupMembershipOperation(CloseOptions.GroupMembershipOperation.LEAVE_GROUP);
try (final KafkaStreams streams = new KafkaStreamsWithTerminableThread(getBuilderWithSource().build(), props, mockClientSupplier, time)) {
assertThrows(IllegalArgumentException.class, () -> streams.close(closeOptions));
}
@ -1312,9 +1308,8 @@ public class KafkaStreamsTest {
when(mockClientSupplier.getAdmin(any())).thenReturn(adminClient);
final KafkaStreams.CloseOptions closeOptions = new KafkaStreams.CloseOptions();
closeOptions.timeout(Duration.ZERO);
closeOptions.leaveGroup(true);
final CloseOptions closeOptions = CloseOptions.timeout(Duration.ZERO)
.withGroupMembershipOperation(CloseOptions.GroupMembershipOperation.LEAVE_GROUP);
try (final KafkaStreams streams = new KafkaStreamsWithTerminableThread(getBuilderWithSource().build(), props, mockClientSupplier)) {
assertFalse(streams.close(closeOptions));
}

View File

@ -60,6 +60,7 @@ import org.apache.kafka.common.utils.LogCaptureAppender;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.CloseOptions;
import org.apache.kafka.streams.GroupProtocol;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.StreamsConfig.InternalConfig;
@ -247,7 +248,7 @@ public class StreamThreadTest {
if (thread.state() != State.CREATED) {
thread.taskManager().shutdown(false);
}
thread.shutdown(true);
thread.shutdown(CloseOptions.GroupMembershipOperation.LEAVE_GROUP);
thread = null;
}
final Set<Thread> t = Collections.unmodifiableSet(Thread.getAllStackTraces().keySet());
@ -409,7 +410,7 @@ public class StreamThreadTest {
assertEquals(4, stateListener.numChanges);
assertEquals(StreamThread.State.PARTITIONS_ASSIGNED, stateListener.oldState);
thread.shutdown(true);
thread.shutdown(CloseOptions.GroupMembershipOperation.LEAVE_GROUP);
assertSame(StreamThread.State.PENDING_SHUTDOWN, thread.state());
}
@ -427,14 +428,14 @@ public class StreamThreadTest {
10 * 1000,
"Thread never started.");
thread.shutdown(true);
thread.shutdown(CloseOptions.GroupMembershipOperation.LEAVE_GROUP);
TestUtils.waitForCondition(
() -> thread.state() == StreamThread.State.DEAD,
10 * 1000,
"Thread never shut down.");
thread.shutdown(true);
assertEquals(thread.state(), StreamThread.State.DEAD);
thread.shutdown(CloseOptions.GroupMembershipOperation.LEAVE_GROUP);
assertEquals(State.DEAD, thread.state());
}
@ParameterizedTest
@ -812,7 +813,7 @@ public class StreamThreadTest {
10 * 1000,
"Thread never started.");
thread.shutdown(true);
thread.shutdown(CloseOptions.GroupMembershipOperation.LEAVE_GROUP);
TestUtils.waitForCondition(
() -> thread.state() == StreamThread.State.DEAD,
10 * 1000,
@ -880,7 +881,7 @@ public class StreamThreadTest {
() -> { }
);
thread.shutdown(true);
thread.shutdown(CloseOptions.GroupMembershipOperation.LEAVE_GROUP);
// Validate that the scheduled rebalance wasn't reset then set to MAX_VALUE so we
// don't trigger one before we can shut down, since the rebalance must be ended
@ -1390,7 +1391,7 @@ public class StreamThreadTest {
10 * 1000,
"Thread never started.");
thread.shutdown(true);
thread.shutdown(CloseOptions.GroupMembershipOperation.LEAVE_GROUP);
// even if thread is no longer running, it should still be polling
// as long as the rebalance is still ongoing
@ -1426,7 +1427,7 @@ public class StreamThreadTest {
thread.setStateListener(
(t, newState, oldState) -> {
if (oldState == StreamThread.State.CREATED && newState == StreamThread.State.STARTING) {
thread.shutdown(true);
thread.shutdown(CloseOptions.GroupMembershipOperation.LEAVE_GROUP);
}
});
thread.run();
@ -1524,7 +1525,7 @@ public class StreamThreadTest {
topologyMetadata.buildAndRewriteTopology();
thread = buildStreamThread(consumer, taskManager, config, topologyMetadata)
.updateThreadMetadata(adminClientId(CLIENT_ID));
thread.shutdown(true);
thread.shutdown(CloseOptions.GroupMembershipOperation.LEAVE_GROUP);
verify(taskManager).shutdown(true);
}
@ -1542,7 +1543,7 @@ public class StreamThreadTest {
topologyMetadata.buildAndRewriteTopology();
thread = buildStreamThread(consumer, taskManager, config, topologyMetadata)
.updateThreadMetadata(adminClientId(CLIENT_ID));
thread.shutdown(true);
thread.shutdown(CloseOptions.GroupMembershipOperation.LEAVE_GROUP);
// Execute the run method. Verification of the mock will check that shutdown was only done once
thread.run();

View File

@ -27,6 +27,7 @@ import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.streams.CloseOptions;
import org.apache.kafka.streams.GroupProtocol;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValueTimestamp;
@ -341,9 +342,8 @@ public class DeleteStreamsGroupOffsetTest {
private void stopKSApp(String appId, String topic, KafkaStreams streams, StreamsGroupCommand.StreamsGroupService service) throws InterruptedException {
if (streams != null) {
KafkaStreams.CloseOptions closeOptions = new KafkaStreams.CloseOptions();
closeOptions.timeout(Duration.ofSeconds(30));
closeOptions.leaveGroup(true);
CloseOptions closeOptions = CloseOptions.timeout(Duration.ofSeconds(30))
.withGroupMembershipOperation(CloseOptions.GroupMembershipOperation.LEAVE_GROUP);
streams.close(closeOptions);
streams.cleanUp();

View File

@ -31,6 +31,7 @@ import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.CloseOptions;
import org.apache.kafka.streams.GroupProtocol;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValueTimestamp;
@ -512,9 +513,8 @@ public class DeleteStreamsGroupTest {
private void stopKSApp(String appId, KafkaStreams streams, StreamsGroupCommand.StreamsGroupService service) throws InterruptedException {
if (streams != null) {
KafkaStreams.CloseOptions closeOptions = new KafkaStreams.CloseOptions();
closeOptions.timeout(Duration.ofSeconds(30));
closeOptions.leaveGroup(true);
CloseOptions closeOptions = CloseOptions.timeout(Duration.ofSeconds(30))
.withGroupMembershipOperation(CloseOptions.GroupMembershipOperation.LEAVE_GROUP);
streams.close(closeOptions);
streams.cleanUp();