KAFKA-19190: Handle shutdown application correctly (#19544)

If the streams rebalance protocol is enabled in
StreamsUncaughtExceptionHandlerIntegrationTest, the streams application
does not shut down correctly upon error.

There are two causes for this. First, sometimes, the SHUTDOWN_APPLICATION
code only sent with the leave heartbeat, but that is not handled broker
side. Second, the SHUTDOWN_APPLICATION code wasn't properly handled
client-side at all.

Reviewers: Bruno Cadonna <cadonna@apache.org>, Bill Bejeck
 <bill@confluent.io>, PoAn Yang <payang@apache.org>
This commit is contained in:
Lucas Brutschy 2025-04-25 09:56:09 +02:00 committed by GitHub
parent 2ce7c44612
commit 732ed0696b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 196 additions and 36 deletions

View File

@ -521,11 +521,14 @@ public class StreamsGroupHeartbeatRequestManager implements RequestManager {
}
List<StreamsGroupHeartbeatResponseData.Status> statuses = data.status();
if (statuses != null && !statuses.isEmpty()) {
String statusDetails = statuses.stream()
.map(status -> "(" + status.statusCode() + ") " + status.statusDetail())
.collect(Collectors.joining(", "));
logger.warn("Membership is in the following statuses: {}", statusDetails);
if (statuses != null) {
streamsRebalanceData.setStatuses(statuses);
if (!statuses.isEmpty()) {
String statusDetails = statuses.stream()
.map(status -> "(" + status.statusCode() + ") " + status.statusDetail())
.collect(Collectors.joining(", "));
logger.warn("Membership is in the following statuses: {}", statusDetails);
}
}
membershipManager.onHeartbeatSuccess(response);

View File

@ -17,6 +17,7 @@
package org.apache.kafka.clients.consumer.internals;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.message.StreamsGroupHeartbeatResponseData;
import java.util.Collection;
import java.util.Collections;
@ -300,6 +301,8 @@ public class StreamsRebalanceData {
private final AtomicBoolean shutdownRequested = new AtomicBoolean(false);
private final AtomicReference<List<StreamsGroupHeartbeatResponseData.Status>> statuses = new AtomicReference<>(List.of());
public StreamsRebalanceData(final UUID processId,
final Optional<HostInfo> endpoint,
final Map<String, Subtopology> subtopologies,
@ -346,11 +349,24 @@ public class StreamsRebalanceData {
return partitionsByHost.get();
}
/** For the current stream thread to request a shutdown of all Streams clients belonging to the same application. */
public void requestShutdown() {
shutdownRequested.set(true);
}
/** True if the current stream thread requested a shutdown of all Streams clients belonging to the same application. */
public boolean shutdownRequested() {
return shutdownRequested.get();
}
/** Updated whenever the status of the streams group is updated. */
public void setStatuses(final List<StreamsGroupHeartbeatResponseData.Status> s) {
statuses.set(s);
}
/** For communicating the current status of the group to the stream thread */
public List<StreamsGroupHeartbeatResponseData.Status> statuses() {
return statuses.get();
}
}

View File

@ -420,4 +420,21 @@ public class StreamsRebalanceDataTest {
assertFalse(streamsRebalanceData.shutdownRequested());
}
@Test
public void streamsRebalanceDataShouldBeConstructedWithEmptyStatuses() {
final UUID processId = UUID.randomUUID();
final Optional<StreamsRebalanceData.HostInfo> endpoint = Optional.of(new StreamsRebalanceData.HostInfo("localhost", 9090));
final Map<String, StreamsRebalanceData.Subtopology> subtopologies = Map.of();
final Map<String, String> clientTags = Map.of("clientTag1", "clientTagValue1");
final StreamsRebalanceData streamsRebalanceData = new StreamsRebalanceData(
processId,
endpoint,
subtopologies,
clientTags
);
assertTrue(streamsRebalanceData.statuses().isEmpty());
}
}

View File

@ -3979,9 +3979,13 @@ public class GroupMetadataManager {
String groupId,
String instanceId,
String memberId,
int memberEpoch
int memberEpoch,
boolean shutdownApplication
) throws ApiException {
StreamsGroup group = streamsGroup(groupId);
if (shutdownApplication) {
group.setShutdownRequestMemberId(memberId);
}
StreamsGroupHeartbeatResponseData response = new StreamsGroupHeartbeatResponseData()
.setMemberId(memberId)
.setMemberEpoch(memberEpoch);
@ -4734,7 +4738,8 @@ public class GroupMetadataManager {
request.groupId(),
request.instanceId(),
request.memberId(),
request.memberEpoch()
request.memberEpoch(),
request.shutdownApplication()
);
} else {
return streamsGroupHeartbeat(

View File

@ -16371,6 +16371,97 @@ public class GroupMetadataManagerTest {
assertRecordsEquals(List.of(), result2.records());
}
@Test
public void testStreamsGroupMemberRequestingShutdownApplicationUponLeaving() {
String groupId = "fooup";
String memberId1 = Uuid.randomUuid().toString();
String memberId2 = Uuid.randomUuid().toString();
String subtopology1 = "subtopology1";
String fooTopicName = "foo";
Uuid fooTopicId = Uuid.randomUuid();
Topology topology = new Topology().setSubtopologies(List.of(
new Subtopology().setSubtopologyId(subtopology1).setSourceTopics(List.of(fooTopicName))
));
MockTaskAssignor assignor = new MockTaskAssignor("sticky");
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
.withStreamsGroupTaskAssignors(List.of(assignor))
.withMetadataImage(new MetadataImageBuilder()
.addTopic(fooTopicId, fooTopicName, 2)
.build())
.withStreamsGroup(new StreamsGroupBuilder(groupId, 10)
.withMember(streamsGroupMemberBuilderWithDefaults(memberId1)
.setState(org.apache.kafka.coordinator.group.streams.MemberState.STABLE)
.setMemberEpoch(10)
.setPreviousMemberEpoch(9)
.build())
.withMember(streamsGroupMemberBuilderWithDefaults(memberId2)
.setState(org.apache.kafka.coordinator.group.streams.MemberState.STABLE)
.setMemberEpoch(10)
.setPreviousMemberEpoch(9)
.build())
.withTargetAssignmentEpoch(10)
.withTopology(StreamsTopology.fromHeartbeatRequest(topology))
.withPartitionMetadata(Map.of(
fooTopicName, new org.apache.kafka.coordinator.group.streams.TopicMetadata(fooTopicId, fooTopicName, 6)
))
)
.build();
CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord> result1 = context.streamsGroupHeartbeat(
new StreamsGroupHeartbeatRequestData()
.setGroupId(groupId)
.setMemberId(memberId1)
.setMemberEpoch(LEAVE_GROUP_MEMBER_EPOCH)
.setShutdownApplication(true)
);
String statusDetail = String.format("Streams group member %s encountered a fatal error and requested a shutdown for the entire application.", memberId1);
assertResponseEquals(
new StreamsGroupHeartbeatResponseData()
.setMemberId(memberId1)
.setMemberEpoch(LEAVE_GROUP_MEMBER_EPOCH),
result1.response().data()
);
assertRecordsEquals(
List.of(
StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentTombstoneRecord(groupId, memberId1),
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentTombstoneRecord(groupId, memberId1),
StreamsCoordinatorRecordHelpers.newStreamsGroupMemberTombstoneRecord(groupId, memberId1),
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 11)
),
result1.records()
);
for (CoordinatorRecord record : result1.records()) {
context.replay(record);
}
assignor.prepareGroupAssignment(
Map.of(memberId1, TasksTuple.EMPTY)
);
CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord> result2 = context.streamsGroupHeartbeat(
new StreamsGroupHeartbeatRequestData()
.setGroupId(groupId)
.setMemberId(memberId2)
.setMemberEpoch(10)
);
assertResponseEquals(
new StreamsGroupHeartbeatResponseData()
.setMemberId(memberId2)
.setMemberEpoch(12)
.setHeartbeatIntervalMs(5000)
.setStatus(List.of(
new StreamsGroupHeartbeatResponseData.Status()
.setStatusCode(Status.SHUTDOWN_APPLICATION.code())
.setStatusDetail(statusDetail)
)),
result2.response().data()
);
}
@Test
public void testStreamsUpdatingMemberMetadataTriggersNewTargetAssignment() {
String groupId = "fooup";

View File

@ -25,6 +25,7 @@ import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.LogCaptureAppender;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.streams.GroupProtocol;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
@ -52,9 +53,10 @@ import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import java.io.IOException;
import java.time.Duration;
@ -62,6 +64,7 @@ import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@ -86,7 +89,7 @@ import static org.junit.jupiter.api.Assertions.fail;
public class StreamsUncaughtExceptionHandlerIntegrationTest {
private static final long NOW = Instant.now().toEpochMilli();
public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
public static final EmbeddedKafkaCluster CLUSTER = EmbeddedKafkaCluster.withStreamsRebalanceProtocol(1);
@BeforeAll
public static void startCluster() throws IOException {
@ -111,7 +114,13 @@ public class StreamsUncaughtExceptionHandlerIntegrationTest {
private Properties properties;
private Properties basicProps() {
private Properties basicProps(final boolean streamsRebalanceProtocolEnabled) {
final String protocol;
if (streamsRebalanceProtocolEnabled) {
protocol = GroupProtocol.STREAMS.name().toLowerCase(Locale.getDefault());
} else {
protocol = GroupProtocol.CLASSIC.name().toLowerCase(Locale.getDefault());
}
return mkObjectProperties(
mkMap(
mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()),
@ -120,7 +129,8 @@ public class StreamsUncaughtExceptionHandlerIntegrationTest {
mkEntry(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2),
mkEntry(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class),
mkEntry(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class),
mkEntry(StreamsConfig.consumerPrefix(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG), 10000)
mkEntry(StreamsConfig.consumerPrefix(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG), 10000),
mkEntry(StreamsConfig.GROUP_PROTOCOL_CONFIG, protocol)
)
);
}
@ -136,7 +146,6 @@ public class StreamsUncaughtExceptionHandlerIntegrationTest {
IntegrationTestUtils.cleanStateBeforeTest(CLUSTER, inputTopic, inputTopic2, outputTopic, outputTopic2);
final KStream<String, String> stream = builder.stream(inputTopic);
stream.process(() -> new ShutdownProcessor<>(processorValueCollector), Named.as("process"));
properties = basicProps();
}
@AfterEach
@ -144,8 +153,10 @@ public class StreamsUncaughtExceptionHandlerIntegrationTest {
purgeLocalStreamsState(properties);
}
@Test
public void shouldShutdownClient() throws Exception {
@ParameterizedTest
@ValueSource(booleans = {false, true})
public void shouldShutdownClient(final boolean streamsRebalanceProtocolEnabled) throws Exception {
properties = basicProps(streamsRebalanceProtocolEnabled);
try (final KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties)) {
kafkaStreams.setUncaughtExceptionHandler(exception -> SHUTDOWN_CLIENT);
@ -159,29 +170,39 @@ public class StreamsUncaughtExceptionHandlerIntegrationTest {
}
}
@Test
public void shouldReplaceThreads() throws Exception {
@ParameterizedTest
@ValueSource(booleans = {false, true})
public void shouldReplaceThreads(final boolean streamsRebalanceProtocolEnabled) throws Exception {
properties = basicProps(streamsRebalanceProtocolEnabled);
testReplaceThreads(2);
}
@Test
public void shouldReplaceThreadsWithoutJavaHandler() throws Exception {
@ParameterizedTest
@ValueSource(booleans = {false, true})
public void shouldReplaceThreadsWithoutJavaHandler(final boolean streamsRebalanceProtocolEnabled) throws Exception {
properties = basicProps(streamsRebalanceProtocolEnabled);
Thread.setDefaultUncaughtExceptionHandler((t, e) -> fail("exception thrown"));
testReplaceThreads(2);
}
@Test
public void shouldReplaceSingleThread() throws Exception {
@ParameterizedTest
@ValueSource(booleans = {false, true})
public void shouldReplaceSingleThread(final boolean streamsRebalanceProtocolEnabled) throws Exception {
properties = basicProps(streamsRebalanceProtocolEnabled);
testReplaceThreads(1);
}
@Test
public void shouldShutdownMultipleThreadApplication() throws Exception {
@ParameterizedTest
@ValueSource(booleans = {false, true})
public void shouldShutdownMultipleThreadApplication(final boolean streamsRebalanceProtocolEnabled) throws Exception {
properties = basicProps(streamsRebalanceProtocolEnabled);
testShutdownApplication(2);
}
@Test
public void shouldShutdownSingleThreadApplication() throws Exception {
@ParameterizedTest
@ValueSource(booleans = {false, true})
public void shouldShutdownSingleThreadApplication(final boolean streamsRebalanceProtocolEnabled) throws Exception {
properties = basicProps(streamsRebalanceProtocolEnabled);
testShutdownApplication(1);
}
@ -212,8 +233,10 @@ public class StreamsUncaughtExceptionHandlerIntegrationTest {
}
}
@Test
public void shouldShutDownClientIfGlobalStreamThreadWantsToReplaceThread() throws Exception {
@ParameterizedTest
@ValueSource(booleans = {false, true})
public void shouldShutDownClientIfGlobalStreamThreadWantsToReplaceThread(final boolean streamsRebalanceProtocolEnabled) throws Exception {
properties = basicProps(streamsRebalanceProtocolEnabled);
builder.addGlobalStore(
new KeyValueStoreBuilder<>(
Stores.persistentKeyValueStore("globalStore"),
@ -239,8 +262,10 @@ public class StreamsUncaughtExceptionHandlerIntegrationTest {
}
}
@Test
public void shouldEmitSameRecordAfterFailover() throws Exception {
@ParameterizedTest
@ValueSource(booleans = {false, true})
public void shouldEmitSameRecordAfterFailover(final boolean streamsRebalanceProtocolEnabled) throws Exception {
properties = basicProps(streamsRebalanceProtocolEnabled);
properties.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1);
properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 300000L);
properties.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);

View File

@ -67,7 +67,6 @@ import org.apache.kafka.streams.processor.internals.StreamsMetadataState;
import org.apache.kafka.streams.processor.internals.Task;
import org.apache.kafka.streams.processor.internals.ThreadStateTransitionValidator;
import org.apache.kafka.streams.processor.internals.TopologyMetadata;
import org.apache.kafka.streams.processor.internals.assignment.AssignorError;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.processor.internals.namedtopology.NamedTopology;
import org.apache.kafka.streams.query.FailureReason;
@ -518,7 +517,7 @@ public class KafkaStreams implements AutoCloseable {
break;
case SHUTDOWN_CLIENT:
log.error(
"Encountered the following exception during processing and the registered exception handler" +
"Encountered the following exception during processing and the registered exception handler " +
"opted to {}. The streams client is going to shut down now.",
action,
throwable
@ -542,7 +541,7 @@ public class KafkaStreams implements AutoCloseable {
closeToError();
break;
}
processStreamThread(thread -> thread.sendShutdownRequest(AssignorError.SHUTDOWN_REQUESTED));
processStreamThread(StreamThread::sendShutdownRequest);
log.error("Encountered the following exception during processing " +
"and sent shutdown request for the entire application.", throwable);
break;

View File

@ -39,7 +39,9 @@ import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.common.message.StreamsGroupHeartbeatResponseData;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.requests.StreamsGroupHeartbeatResponse;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
@ -1079,8 +1081,9 @@ public class StreamThread extends Thread implements ProcessingThread {
shutdownErrorHook.run();
}
public void sendShutdownRequest(final AssignorError assignorError) {
assignmentErrorCode.set(assignorError.code());
public void sendShutdownRequest() {
assignmentErrorCode.set(AssignorError.SHUTDOWN_REQUESTED.code());
streamsRebalanceData.ifPresent(StreamsRebalanceData::requestShutdown);
}
private void handleTaskMigrated(final TaskMigratedException e) {
@ -1486,9 +1489,10 @@ public class StreamThread extends Thread implements ProcessingThread {
public void handleStreamsRebalanceData() {
if (streamsRebalanceData.isPresent()) {
if (streamsRebalanceData.get().shutdownRequested()) {
assignmentErrorCode.set(AssignorError.SHUTDOWN_REQUESTED.code());
for (final StreamsGroupHeartbeatResponseData.Status status : streamsRebalanceData.get().statuses()) {
if (status.statusCode() == StreamsGroupHeartbeatResponse.Status.SHUTDOWN_APPLICATION.code()) {
shutdownErrorHook.run();
}
}
}
}