mirror of https://github.com/apache/kafka.git
KAFKA-19190: Handle shutdown application correctly (#19544)
If the streams rebalance protocol is enabled in StreamsUncaughtExceptionHandlerIntegrationTest, the streams application does not shut down correctly upon error. There are two causes for this. First, sometimes, the SHUTDOWN_APPLICATION code only sent with the leave heartbeat, but that is not handled broker side. Second, the SHUTDOWN_APPLICATION code wasn't properly handled client-side at all. Reviewers: Bruno Cadonna <cadonna@apache.org>, Bill Bejeck <bill@confluent.io>, PoAn Yang <payang@apache.org>
This commit is contained in:
parent
2ce7c44612
commit
732ed0696b
|
@ -521,11 +521,14 @@ public class StreamsGroupHeartbeatRequestManager implements RequestManager {
|
||||||
}
|
}
|
||||||
|
|
||||||
List<StreamsGroupHeartbeatResponseData.Status> statuses = data.status();
|
List<StreamsGroupHeartbeatResponseData.Status> statuses = data.status();
|
||||||
if (statuses != null && !statuses.isEmpty()) {
|
if (statuses != null) {
|
||||||
String statusDetails = statuses.stream()
|
streamsRebalanceData.setStatuses(statuses);
|
||||||
.map(status -> "(" + status.statusCode() + ") " + status.statusDetail())
|
if (!statuses.isEmpty()) {
|
||||||
.collect(Collectors.joining(", "));
|
String statusDetails = statuses.stream()
|
||||||
logger.warn("Membership is in the following statuses: {}", statusDetails);
|
.map(status -> "(" + status.statusCode() + ") " + status.statusDetail())
|
||||||
|
.collect(Collectors.joining(", "));
|
||||||
|
logger.warn("Membership is in the following statuses: {}", statusDetails);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
membershipManager.onHeartbeatSuccess(response);
|
membershipManager.onHeartbeatSuccess(response);
|
||||||
|
|
|
@ -17,6 +17,7 @@
|
||||||
package org.apache.kafka.clients.consumer.internals;
|
package org.apache.kafka.clients.consumer.internals;
|
||||||
|
|
||||||
import org.apache.kafka.common.TopicPartition;
|
import org.apache.kafka.common.TopicPartition;
|
||||||
|
import org.apache.kafka.common.message.StreamsGroupHeartbeatResponseData;
|
||||||
|
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
|
@ -300,6 +301,8 @@ public class StreamsRebalanceData {
|
||||||
|
|
||||||
private final AtomicBoolean shutdownRequested = new AtomicBoolean(false);
|
private final AtomicBoolean shutdownRequested = new AtomicBoolean(false);
|
||||||
|
|
||||||
|
private final AtomicReference<List<StreamsGroupHeartbeatResponseData.Status>> statuses = new AtomicReference<>(List.of());
|
||||||
|
|
||||||
public StreamsRebalanceData(final UUID processId,
|
public StreamsRebalanceData(final UUID processId,
|
||||||
final Optional<HostInfo> endpoint,
|
final Optional<HostInfo> endpoint,
|
||||||
final Map<String, Subtopology> subtopologies,
|
final Map<String, Subtopology> subtopologies,
|
||||||
|
@ -346,11 +349,24 @@ public class StreamsRebalanceData {
|
||||||
return partitionsByHost.get();
|
return partitionsByHost.get();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/** For the current stream thread to request a shutdown of all Streams clients belonging to the same application. */
|
||||||
public void requestShutdown() {
|
public void requestShutdown() {
|
||||||
shutdownRequested.set(true);
|
shutdownRequested.set(true);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/** True if the current stream thread requested a shutdown of all Streams clients belonging to the same application. */
|
||||||
public boolean shutdownRequested() {
|
public boolean shutdownRequested() {
|
||||||
return shutdownRequested.get();
|
return shutdownRequested.get();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/** Updated whenever the status of the streams group is updated. */
|
||||||
|
public void setStatuses(final List<StreamsGroupHeartbeatResponseData.Status> s) {
|
||||||
|
statuses.set(s);
|
||||||
|
}
|
||||||
|
|
||||||
|
/** For communicating the current status of the group to the stream thread */
|
||||||
|
public List<StreamsGroupHeartbeatResponseData.Status> statuses() {
|
||||||
|
return statuses.get();
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -420,4 +420,21 @@ public class StreamsRebalanceDataTest {
|
||||||
|
|
||||||
assertFalse(streamsRebalanceData.shutdownRequested());
|
assertFalse(streamsRebalanceData.shutdownRequested());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void streamsRebalanceDataShouldBeConstructedWithEmptyStatuses() {
|
||||||
|
final UUID processId = UUID.randomUUID();
|
||||||
|
final Optional<StreamsRebalanceData.HostInfo> endpoint = Optional.of(new StreamsRebalanceData.HostInfo("localhost", 9090));
|
||||||
|
final Map<String, StreamsRebalanceData.Subtopology> subtopologies = Map.of();
|
||||||
|
final Map<String, String> clientTags = Map.of("clientTag1", "clientTagValue1");
|
||||||
|
final StreamsRebalanceData streamsRebalanceData = new StreamsRebalanceData(
|
||||||
|
processId,
|
||||||
|
endpoint,
|
||||||
|
subtopologies,
|
||||||
|
clientTags
|
||||||
|
);
|
||||||
|
|
||||||
|
assertTrue(streamsRebalanceData.statuses().isEmpty());
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -3979,9 +3979,13 @@ public class GroupMetadataManager {
|
||||||
String groupId,
|
String groupId,
|
||||||
String instanceId,
|
String instanceId,
|
||||||
String memberId,
|
String memberId,
|
||||||
int memberEpoch
|
int memberEpoch,
|
||||||
|
boolean shutdownApplication
|
||||||
) throws ApiException {
|
) throws ApiException {
|
||||||
StreamsGroup group = streamsGroup(groupId);
|
StreamsGroup group = streamsGroup(groupId);
|
||||||
|
if (shutdownApplication) {
|
||||||
|
group.setShutdownRequestMemberId(memberId);
|
||||||
|
}
|
||||||
StreamsGroupHeartbeatResponseData response = new StreamsGroupHeartbeatResponseData()
|
StreamsGroupHeartbeatResponseData response = new StreamsGroupHeartbeatResponseData()
|
||||||
.setMemberId(memberId)
|
.setMemberId(memberId)
|
||||||
.setMemberEpoch(memberEpoch);
|
.setMemberEpoch(memberEpoch);
|
||||||
|
@ -4734,7 +4738,8 @@ public class GroupMetadataManager {
|
||||||
request.groupId(),
|
request.groupId(),
|
||||||
request.instanceId(),
|
request.instanceId(),
|
||||||
request.memberId(),
|
request.memberId(),
|
||||||
request.memberEpoch()
|
request.memberEpoch(),
|
||||||
|
request.shutdownApplication()
|
||||||
);
|
);
|
||||||
} else {
|
} else {
|
||||||
return streamsGroupHeartbeat(
|
return streamsGroupHeartbeat(
|
||||||
|
|
|
@ -16371,6 +16371,97 @@ public class GroupMetadataManagerTest {
|
||||||
assertRecordsEquals(List.of(), result2.records());
|
assertRecordsEquals(List.of(), result2.records());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testStreamsGroupMemberRequestingShutdownApplicationUponLeaving() {
|
||||||
|
String groupId = "fooup";
|
||||||
|
String memberId1 = Uuid.randomUuid().toString();
|
||||||
|
String memberId2 = Uuid.randomUuid().toString();
|
||||||
|
String subtopology1 = "subtopology1";
|
||||||
|
String fooTopicName = "foo";
|
||||||
|
Uuid fooTopicId = Uuid.randomUuid();
|
||||||
|
Topology topology = new Topology().setSubtopologies(List.of(
|
||||||
|
new Subtopology().setSubtopologyId(subtopology1).setSourceTopics(List.of(fooTopicName))
|
||||||
|
));
|
||||||
|
|
||||||
|
MockTaskAssignor assignor = new MockTaskAssignor("sticky");
|
||||||
|
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
|
||||||
|
.withStreamsGroupTaskAssignors(List.of(assignor))
|
||||||
|
.withMetadataImage(new MetadataImageBuilder()
|
||||||
|
.addTopic(fooTopicId, fooTopicName, 2)
|
||||||
|
.build())
|
||||||
|
.withStreamsGroup(new StreamsGroupBuilder(groupId, 10)
|
||||||
|
.withMember(streamsGroupMemberBuilderWithDefaults(memberId1)
|
||||||
|
.setState(org.apache.kafka.coordinator.group.streams.MemberState.STABLE)
|
||||||
|
.setMemberEpoch(10)
|
||||||
|
.setPreviousMemberEpoch(9)
|
||||||
|
.build())
|
||||||
|
.withMember(streamsGroupMemberBuilderWithDefaults(memberId2)
|
||||||
|
.setState(org.apache.kafka.coordinator.group.streams.MemberState.STABLE)
|
||||||
|
.setMemberEpoch(10)
|
||||||
|
.setPreviousMemberEpoch(9)
|
||||||
|
.build())
|
||||||
|
.withTargetAssignmentEpoch(10)
|
||||||
|
.withTopology(StreamsTopology.fromHeartbeatRequest(topology))
|
||||||
|
.withPartitionMetadata(Map.of(
|
||||||
|
fooTopicName, new org.apache.kafka.coordinator.group.streams.TopicMetadata(fooTopicId, fooTopicName, 6)
|
||||||
|
))
|
||||||
|
)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord> result1 = context.streamsGroupHeartbeat(
|
||||||
|
new StreamsGroupHeartbeatRequestData()
|
||||||
|
.setGroupId(groupId)
|
||||||
|
.setMemberId(memberId1)
|
||||||
|
.setMemberEpoch(LEAVE_GROUP_MEMBER_EPOCH)
|
||||||
|
.setShutdownApplication(true)
|
||||||
|
);
|
||||||
|
|
||||||
|
String statusDetail = String.format("Streams group member %s encountered a fatal error and requested a shutdown for the entire application.", memberId1);
|
||||||
|
|
||||||
|
assertResponseEquals(
|
||||||
|
new StreamsGroupHeartbeatResponseData()
|
||||||
|
.setMemberId(memberId1)
|
||||||
|
.setMemberEpoch(LEAVE_GROUP_MEMBER_EPOCH),
|
||||||
|
result1.response().data()
|
||||||
|
);
|
||||||
|
assertRecordsEquals(
|
||||||
|
List.of(
|
||||||
|
StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentTombstoneRecord(groupId, memberId1),
|
||||||
|
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentTombstoneRecord(groupId, memberId1),
|
||||||
|
StreamsCoordinatorRecordHelpers.newStreamsGroupMemberTombstoneRecord(groupId, memberId1),
|
||||||
|
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 11)
|
||||||
|
),
|
||||||
|
result1.records()
|
||||||
|
);
|
||||||
|
|
||||||
|
for (CoordinatorRecord record : result1.records()) {
|
||||||
|
context.replay(record);
|
||||||
|
}
|
||||||
|
assignor.prepareGroupAssignment(
|
||||||
|
Map.of(memberId1, TasksTuple.EMPTY)
|
||||||
|
);
|
||||||
|
|
||||||
|
CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord> result2 = context.streamsGroupHeartbeat(
|
||||||
|
new StreamsGroupHeartbeatRequestData()
|
||||||
|
.setGroupId(groupId)
|
||||||
|
.setMemberId(memberId2)
|
||||||
|
.setMemberEpoch(10)
|
||||||
|
);
|
||||||
|
|
||||||
|
assertResponseEquals(
|
||||||
|
new StreamsGroupHeartbeatResponseData()
|
||||||
|
.setMemberId(memberId2)
|
||||||
|
.setMemberEpoch(12)
|
||||||
|
.setHeartbeatIntervalMs(5000)
|
||||||
|
.setStatus(List.of(
|
||||||
|
new StreamsGroupHeartbeatResponseData.Status()
|
||||||
|
.setStatusCode(Status.SHUTDOWN_APPLICATION.code())
|
||||||
|
.setStatusDetail(statusDetail)
|
||||||
|
)),
|
||||||
|
result2.response().data()
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testStreamsUpdatingMemberMetadataTriggersNewTargetAssignment() {
|
public void testStreamsUpdatingMemberMetadataTriggersNewTargetAssignment() {
|
||||||
String groupId = "fooup";
|
String groupId = "fooup";
|
||||||
|
|
|
@ -25,6 +25,7 @@ import org.apache.kafka.common.serialization.StringDeserializer;
|
||||||
import org.apache.kafka.common.serialization.StringSerializer;
|
import org.apache.kafka.common.serialization.StringSerializer;
|
||||||
import org.apache.kafka.common.utils.LogCaptureAppender;
|
import org.apache.kafka.common.utils.LogCaptureAppender;
|
||||||
import org.apache.kafka.common.utils.MockTime;
|
import org.apache.kafka.common.utils.MockTime;
|
||||||
|
import org.apache.kafka.streams.GroupProtocol;
|
||||||
import org.apache.kafka.streams.KafkaStreams;
|
import org.apache.kafka.streams.KafkaStreams;
|
||||||
import org.apache.kafka.streams.KeyValue;
|
import org.apache.kafka.streams.KeyValue;
|
||||||
import org.apache.kafka.streams.StreamsBuilder;
|
import org.apache.kafka.streams.StreamsBuilder;
|
||||||
|
@ -52,9 +53,10 @@ import org.junit.jupiter.api.AfterEach;
|
||||||
import org.junit.jupiter.api.BeforeAll;
|
import org.junit.jupiter.api.BeforeAll;
|
||||||
import org.junit.jupiter.api.BeforeEach;
|
import org.junit.jupiter.api.BeforeEach;
|
||||||
import org.junit.jupiter.api.Tag;
|
import org.junit.jupiter.api.Tag;
|
||||||
import org.junit.jupiter.api.Test;
|
|
||||||
import org.junit.jupiter.api.TestInfo;
|
import org.junit.jupiter.api.TestInfo;
|
||||||
import org.junit.jupiter.api.Timeout;
|
import org.junit.jupiter.api.Timeout;
|
||||||
|
import org.junit.jupiter.params.ParameterizedTest;
|
||||||
|
import org.junit.jupiter.params.provider.ValueSource;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.time.Duration;
|
import java.time.Duration;
|
||||||
|
@ -62,6 +64,7 @@ import java.time.Instant;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Locale;
|
||||||
import java.util.Properties;
|
import java.util.Properties;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
@ -86,7 +89,7 @@ import static org.junit.jupiter.api.Assertions.fail;
|
||||||
public class StreamsUncaughtExceptionHandlerIntegrationTest {
|
public class StreamsUncaughtExceptionHandlerIntegrationTest {
|
||||||
private static final long NOW = Instant.now().toEpochMilli();
|
private static final long NOW = Instant.now().toEpochMilli();
|
||||||
|
|
||||||
public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
|
public static final EmbeddedKafkaCluster CLUSTER = EmbeddedKafkaCluster.withStreamsRebalanceProtocol(1);
|
||||||
|
|
||||||
@BeforeAll
|
@BeforeAll
|
||||||
public static void startCluster() throws IOException {
|
public static void startCluster() throws IOException {
|
||||||
|
@ -111,7 +114,13 @@ public class StreamsUncaughtExceptionHandlerIntegrationTest {
|
||||||
|
|
||||||
private Properties properties;
|
private Properties properties;
|
||||||
|
|
||||||
private Properties basicProps() {
|
private Properties basicProps(final boolean streamsRebalanceProtocolEnabled) {
|
||||||
|
final String protocol;
|
||||||
|
if (streamsRebalanceProtocolEnabled) {
|
||||||
|
protocol = GroupProtocol.STREAMS.name().toLowerCase(Locale.getDefault());
|
||||||
|
} else {
|
||||||
|
protocol = GroupProtocol.CLASSIC.name().toLowerCase(Locale.getDefault());
|
||||||
|
}
|
||||||
return mkObjectProperties(
|
return mkObjectProperties(
|
||||||
mkMap(
|
mkMap(
|
||||||
mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()),
|
mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()),
|
||||||
|
@ -120,7 +129,8 @@ public class StreamsUncaughtExceptionHandlerIntegrationTest {
|
||||||
mkEntry(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2),
|
mkEntry(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2),
|
||||||
mkEntry(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class),
|
mkEntry(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class),
|
||||||
mkEntry(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class),
|
mkEntry(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class),
|
||||||
mkEntry(StreamsConfig.consumerPrefix(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG), 10000)
|
mkEntry(StreamsConfig.consumerPrefix(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG), 10000),
|
||||||
|
mkEntry(StreamsConfig.GROUP_PROTOCOL_CONFIG, protocol)
|
||||||
)
|
)
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
@ -136,7 +146,6 @@ public class StreamsUncaughtExceptionHandlerIntegrationTest {
|
||||||
IntegrationTestUtils.cleanStateBeforeTest(CLUSTER, inputTopic, inputTopic2, outputTopic, outputTopic2);
|
IntegrationTestUtils.cleanStateBeforeTest(CLUSTER, inputTopic, inputTopic2, outputTopic, outputTopic2);
|
||||||
final KStream<String, String> stream = builder.stream(inputTopic);
|
final KStream<String, String> stream = builder.stream(inputTopic);
|
||||||
stream.process(() -> new ShutdownProcessor<>(processorValueCollector), Named.as("process"));
|
stream.process(() -> new ShutdownProcessor<>(processorValueCollector), Named.as("process"));
|
||||||
properties = basicProps();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@AfterEach
|
@AfterEach
|
||||||
|
@ -144,8 +153,10 @@ public class StreamsUncaughtExceptionHandlerIntegrationTest {
|
||||||
purgeLocalStreamsState(properties);
|
purgeLocalStreamsState(properties);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@ParameterizedTest
|
||||||
public void shouldShutdownClient() throws Exception {
|
@ValueSource(booleans = {false, true})
|
||||||
|
public void shouldShutdownClient(final boolean streamsRebalanceProtocolEnabled) throws Exception {
|
||||||
|
properties = basicProps(streamsRebalanceProtocolEnabled);
|
||||||
try (final KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties)) {
|
try (final KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties)) {
|
||||||
|
|
||||||
kafkaStreams.setUncaughtExceptionHandler(exception -> SHUTDOWN_CLIENT);
|
kafkaStreams.setUncaughtExceptionHandler(exception -> SHUTDOWN_CLIENT);
|
||||||
|
@ -159,29 +170,39 @@ public class StreamsUncaughtExceptionHandlerIntegrationTest {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@ParameterizedTest
|
||||||
public void shouldReplaceThreads() throws Exception {
|
@ValueSource(booleans = {false, true})
|
||||||
|
public void shouldReplaceThreads(final boolean streamsRebalanceProtocolEnabled) throws Exception {
|
||||||
|
properties = basicProps(streamsRebalanceProtocolEnabled);
|
||||||
testReplaceThreads(2);
|
testReplaceThreads(2);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@ParameterizedTest
|
||||||
public void shouldReplaceThreadsWithoutJavaHandler() throws Exception {
|
@ValueSource(booleans = {false, true})
|
||||||
|
public void shouldReplaceThreadsWithoutJavaHandler(final boolean streamsRebalanceProtocolEnabled) throws Exception {
|
||||||
|
properties = basicProps(streamsRebalanceProtocolEnabled);
|
||||||
Thread.setDefaultUncaughtExceptionHandler((t, e) -> fail("exception thrown"));
|
Thread.setDefaultUncaughtExceptionHandler((t, e) -> fail("exception thrown"));
|
||||||
testReplaceThreads(2);
|
testReplaceThreads(2);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@ParameterizedTest
|
||||||
public void shouldReplaceSingleThread() throws Exception {
|
@ValueSource(booleans = {false, true})
|
||||||
|
public void shouldReplaceSingleThread(final boolean streamsRebalanceProtocolEnabled) throws Exception {
|
||||||
|
properties = basicProps(streamsRebalanceProtocolEnabled);
|
||||||
testReplaceThreads(1);
|
testReplaceThreads(1);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@ParameterizedTest
|
||||||
public void shouldShutdownMultipleThreadApplication() throws Exception {
|
@ValueSource(booleans = {false, true})
|
||||||
|
public void shouldShutdownMultipleThreadApplication(final boolean streamsRebalanceProtocolEnabled) throws Exception {
|
||||||
|
properties = basicProps(streamsRebalanceProtocolEnabled);
|
||||||
testShutdownApplication(2);
|
testShutdownApplication(2);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@ParameterizedTest
|
||||||
public void shouldShutdownSingleThreadApplication() throws Exception {
|
@ValueSource(booleans = {false, true})
|
||||||
|
public void shouldShutdownSingleThreadApplication(final boolean streamsRebalanceProtocolEnabled) throws Exception {
|
||||||
|
properties = basicProps(streamsRebalanceProtocolEnabled);
|
||||||
testShutdownApplication(1);
|
testShutdownApplication(1);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -212,8 +233,10 @@ public class StreamsUncaughtExceptionHandlerIntegrationTest {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@ParameterizedTest
|
||||||
public void shouldShutDownClientIfGlobalStreamThreadWantsToReplaceThread() throws Exception {
|
@ValueSource(booleans = {false, true})
|
||||||
|
public void shouldShutDownClientIfGlobalStreamThreadWantsToReplaceThread(final boolean streamsRebalanceProtocolEnabled) throws Exception {
|
||||||
|
properties = basicProps(streamsRebalanceProtocolEnabled);
|
||||||
builder.addGlobalStore(
|
builder.addGlobalStore(
|
||||||
new KeyValueStoreBuilder<>(
|
new KeyValueStoreBuilder<>(
|
||||||
Stores.persistentKeyValueStore("globalStore"),
|
Stores.persistentKeyValueStore("globalStore"),
|
||||||
|
@ -239,8 +262,10 @@ public class StreamsUncaughtExceptionHandlerIntegrationTest {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@ParameterizedTest
|
||||||
public void shouldEmitSameRecordAfterFailover() throws Exception {
|
@ValueSource(booleans = {false, true})
|
||||||
|
public void shouldEmitSameRecordAfterFailover(final boolean streamsRebalanceProtocolEnabled) throws Exception {
|
||||||
|
properties = basicProps(streamsRebalanceProtocolEnabled);
|
||||||
properties.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1);
|
properties.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1);
|
||||||
properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 300000L);
|
properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 300000L);
|
||||||
properties.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
|
properties.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
|
||||||
|
|
|
@ -67,7 +67,6 @@ import org.apache.kafka.streams.processor.internals.StreamsMetadataState;
|
||||||
import org.apache.kafka.streams.processor.internals.Task;
|
import org.apache.kafka.streams.processor.internals.Task;
|
||||||
import org.apache.kafka.streams.processor.internals.ThreadStateTransitionValidator;
|
import org.apache.kafka.streams.processor.internals.ThreadStateTransitionValidator;
|
||||||
import org.apache.kafka.streams.processor.internals.TopologyMetadata;
|
import org.apache.kafka.streams.processor.internals.TopologyMetadata;
|
||||||
import org.apache.kafka.streams.processor.internals.assignment.AssignorError;
|
|
||||||
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
|
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
|
||||||
import org.apache.kafka.streams.processor.internals.namedtopology.NamedTopology;
|
import org.apache.kafka.streams.processor.internals.namedtopology.NamedTopology;
|
||||||
import org.apache.kafka.streams.query.FailureReason;
|
import org.apache.kafka.streams.query.FailureReason;
|
||||||
|
@ -518,7 +517,7 @@ public class KafkaStreams implements AutoCloseable {
|
||||||
break;
|
break;
|
||||||
case SHUTDOWN_CLIENT:
|
case SHUTDOWN_CLIENT:
|
||||||
log.error(
|
log.error(
|
||||||
"Encountered the following exception during processing and the registered exception handler" +
|
"Encountered the following exception during processing and the registered exception handler " +
|
||||||
"opted to {}. The streams client is going to shut down now.",
|
"opted to {}. The streams client is going to shut down now.",
|
||||||
action,
|
action,
|
||||||
throwable
|
throwable
|
||||||
|
@ -542,7 +541,7 @@ public class KafkaStreams implements AutoCloseable {
|
||||||
closeToError();
|
closeToError();
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
processStreamThread(thread -> thread.sendShutdownRequest(AssignorError.SHUTDOWN_REQUESTED));
|
processStreamThread(StreamThread::sendShutdownRequest);
|
||||||
log.error("Encountered the following exception during processing " +
|
log.error("Encountered the following exception during processing " +
|
||||||
"and sent shutdown request for the entire application.", throwable);
|
"and sent shutdown request for the entire application.", throwable);
|
||||||
break;
|
break;
|
||||||
|
|
|
@ -39,7 +39,9 @@ import org.apache.kafka.common.Uuid;
|
||||||
import org.apache.kafka.common.errors.TimeoutException;
|
import org.apache.kafka.common.errors.TimeoutException;
|
||||||
import org.apache.kafka.common.errors.UnsupportedVersionException;
|
import org.apache.kafka.common.errors.UnsupportedVersionException;
|
||||||
import org.apache.kafka.common.internals.KafkaFutureImpl;
|
import org.apache.kafka.common.internals.KafkaFutureImpl;
|
||||||
|
import org.apache.kafka.common.message.StreamsGroupHeartbeatResponseData;
|
||||||
import org.apache.kafka.common.metrics.Sensor;
|
import org.apache.kafka.common.metrics.Sensor;
|
||||||
|
import org.apache.kafka.common.requests.StreamsGroupHeartbeatResponse;
|
||||||
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
|
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
|
||||||
import org.apache.kafka.common.utils.LogContext;
|
import org.apache.kafka.common.utils.LogContext;
|
||||||
import org.apache.kafka.common.utils.Time;
|
import org.apache.kafka.common.utils.Time;
|
||||||
|
@ -1079,8 +1081,9 @@ public class StreamThread extends Thread implements ProcessingThread {
|
||||||
shutdownErrorHook.run();
|
shutdownErrorHook.run();
|
||||||
}
|
}
|
||||||
|
|
||||||
public void sendShutdownRequest(final AssignorError assignorError) {
|
public void sendShutdownRequest() {
|
||||||
assignmentErrorCode.set(assignorError.code());
|
assignmentErrorCode.set(AssignorError.SHUTDOWN_REQUESTED.code());
|
||||||
|
streamsRebalanceData.ifPresent(StreamsRebalanceData::requestShutdown);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void handleTaskMigrated(final TaskMigratedException e) {
|
private void handleTaskMigrated(final TaskMigratedException e) {
|
||||||
|
@ -1486,9 +1489,10 @@ public class StreamThread extends Thread implements ProcessingThread {
|
||||||
|
|
||||||
public void handleStreamsRebalanceData() {
|
public void handleStreamsRebalanceData() {
|
||||||
if (streamsRebalanceData.isPresent()) {
|
if (streamsRebalanceData.isPresent()) {
|
||||||
|
for (final StreamsGroupHeartbeatResponseData.Status status : streamsRebalanceData.get().statuses()) {
|
||||||
if (streamsRebalanceData.get().shutdownRequested()) {
|
if (status.statusCode() == StreamsGroupHeartbeatResponse.Status.SHUTDOWN_APPLICATION.code()) {
|
||||||
assignmentErrorCode.set(AssignorError.SHUTDOWN_REQUESTED.code());
|
shutdownErrorHook.run();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue