KAFKA-17850: fix leaking internal exception in state manager (#17711)

Following the KIP-1033 a FailedProcessingException is passed to the Streams-specific uncaught exception handler.

The goal of the PR is to unwrap a FailedProcessingException into a StreamsException when an exception occurs during the flushing or closing of a store

Reviewer: Bruno Cadonna <cadonna@apache.org>
This commit is contained in:
Sebastien Viale 2024-11-19 10:51:07 +01:00 committed by GitHub
parent 389f96aabd
commit 615c8c0e11
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 224 additions and 7 deletions

View File

@ -26,6 +26,8 @@ import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.errors.ErrorHandlerContext;
import org.apache.kafka.streams.errors.LogAndContinueProcessingExceptionHandler;
import org.apache.kafka.streams.errors.LogAndFailProcessingExceptionHandler;
import org.apache.kafka.streams.errors.ProcessingExceptionHandler;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.kstream.Consumed;
@ -108,6 +110,52 @@ public class ProcessingExceptionHandlerIntegrationTest {
}
}
@Test
public void shouldFailWhenProcessingExceptionOccursFromFlushingCacheIfExceptionHandlerReturnsFail() {
final List<KeyValue<String, String>> events = Arrays.asList(
new KeyValue<>("ID123-1", "ID123-A1"),
new KeyValue<>("ID123-1", "ID123-A2"),
new KeyValue<>("ID123-1", "ID123-A3"),
new KeyValue<>("ID123-1", "ID123-A4")
);
final List<KeyValueTimestamp<String, String>> expectedProcessedRecords = Arrays.asList(
new KeyValueTimestamp<>("ID123-1", "1", TIMESTAMP.toEpochMilli()),
new KeyValueTimestamp<>("ID123-1", "2", TIMESTAMP.toEpochMilli())
);
final MockProcessorSupplier<String, String, Void, Void> processor = new MockProcessorSupplier<>();
final StreamsBuilder builder = new StreamsBuilder();
builder
.stream("TOPIC_NAME", Consumed.with(Serdes.String(), Serdes.String()))
.groupByKey()
.count()
.toStream()
.mapValues(value -> value.toString())
.process(runtimeErrorProcessorSupplierMock())
.process(processor);
final Properties properties = new Properties();
properties.put(StreamsConfig.PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG, LogAndFailProcessingExceptionHandler.class);
try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), properties, Instant.ofEpochMilli(0L))) {
final TestInputTopic<String, String> inputTopic = driver.createInputTopic("TOPIC_NAME", new StringSerializer(), new StringSerializer());
final StreamsException exception = assertThrows(StreamsException.class,
() -> inputTopic.pipeKeyValueList(events, TIMESTAMP, Duration.ZERO));
assertTrue(exception.getMessage().contains("Failed to flush cache of store KSTREAM-AGGREGATE-STATE-STORE-0000000001"));
assertEquals(expectedProcessedRecords.size(), processor.theCapturedProcessor().processed().size());
assertIterableEquals(expectedProcessedRecords, processor.theCapturedProcessor().processed());
final MetricName dropTotal = droppedRecordsTotalMetric();
final MetricName dropRate = droppedRecordsRateMetric();
assertEquals(0.0, driver.metrics().get(dropTotal).metricValue());
assertEquals(0.0, driver.metrics().get(dropRate).metricValue());
}
}
@Test
public void shouldContinueWhenProcessingExceptionOccursIfExceptionHandlerReturnsContinue() {
final List<KeyValue<String, String>> events = Arrays.asList(
@ -153,6 +201,50 @@ public class ProcessingExceptionHandlerIntegrationTest {
}
}
@Test
public void shouldContinueWhenProcessingExceptionOccursFromFlushingCacheIfExceptionHandlerReturnsContinue() {
final List<KeyValue<String, String>> events = Arrays.asList(
new KeyValue<>("ID123-1", "ID123-A1"),
new KeyValue<>("ID123-1", "ID123-A2"),
new KeyValue<>("ID123-1", "ID123-A3"),
new KeyValue<>("ID123-1", "ID123-A4")
);
final List<KeyValueTimestamp<String, String>> expectedProcessedRecords = Arrays.asList(
new KeyValueTimestamp<>("ID123-1", "1", TIMESTAMP.toEpochMilli()),
new KeyValueTimestamp<>("ID123-1", "2", TIMESTAMP.toEpochMilli()),
new KeyValueTimestamp<>("ID123-1", "4", TIMESTAMP.toEpochMilli())
);
final MockProcessorSupplier<String, String, Void, Void> processor = new MockProcessorSupplier<>();
final StreamsBuilder builder = new StreamsBuilder();
builder
.stream("TOPIC_NAME", Consumed.with(Serdes.String(), Serdes.String()))
.groupByKey()
.count()
.toStream()
.mapValues(value -> value.toString())
.process(runtimeErrorProcessorSupplierMock())
.process(processor);
final Properties properties = new Properties();
properties.put(StreamsConfig.PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG, LogAndContinueProcessingExceptionHandler.class);
try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), properties, Instant.ofEpochMilli(0L))) {
final TestInputTopic<String, String> inputTopic = driver.createInputTopic("TOPIC_NAME", new StringSerializer(), new StringSerializer());
inputTopic.pipeKeyValueList(events, TIMESTAMP, Duration.ZERO);
assertEquals(expectedProcessedRecords.size(), processor.theCapturedProcessor().processed().size());
assertIterableEquals(expectedProcessedRecords, processor.theCapturedProcessor().processed());
final MetricName dropTotal = droppedRecordsTotalMetric();
final MetricName dropRate = droppedRecordsRateMetric();
assertEquals(1.0, driver.metrics().get(dropTotal).metricValue());
assertTrue((Double) driver.metrics().get(dropRate).metricValue() > 0.0);
}
}
@Test
public void shouldStopOnFailedProcessorWhenProcessingExceptionOccursInFailProcessingExceptionHandler() {
final KeyValue<String, String> event = new KeyValue<>("ID123-1", "ID123-A1");
@ -377,7 +469,7 @@ public class ProcessingExceptionHandlerIntegrationTest {
return () -> new ContextualProcessor<String, String, String, String>() {
@Override
public void process(final Record<String, String> record) {
if (record.key().contains("ERR")) {
if (record.key().contains("ERR") || record.value().equals("3")) {
throw new RuntimeException("Exception should be handled by processing exception handler");
}

View File

@ -24,6 +24,7 @@ import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TaskCorruptedException;
import org.apache.kafka.streams.errors.TaskMigratedException;
import org.apache.kafka.streams.errors.internals.FailedProcessingException;
import org.apache.kafka.streams.processor.CommitCallback;
import org.apache.kafka.streams.processor.StateRestoreCallback;
import org.apache.kafka.streams.processor.StateRestoreListener;
@ -538,13 +539,20 @@ public class ProcessorStateManager implements StateManager {
} catch (final RuntimeException exception) {
if (firstException == null) {
// do NOT wrap the error if it is actually caused by Streams itself
if (exception instanceof StreamsException)
// In case of FailedProcessingException Do not keep the failed processing exception in the stack trace
if (exception instanceof FailedProcessingException)
firstException = new ProcessorStateException(
format("%sFailed to flush state store %s", logPrefix, store.name()),
exception.getCause());
else if (exception instanceof StreamsException)
firstException = exception;
else
firstException = new ProcessorStateException(
format("%sFailed to flush state store %s", logPrefix, store.name()), exception);
log.error("Failed to flush state store {}: ", store.name(), firstException);
} else {
log.error("Failed to flush state store {}: ", store.name(), exception);
}
log.error("Failed to flush state store {}: ", store.name(), exception);
}
}
}
@ -573,7 +581,12 @@ public class ProcessorStateManager implements StateManager {
} catch (final RuntimeException exception) {
if (firstException == null) {
// do NOT wrap the error if it is actually caused by Streams itself
if (exception instanceof StreamsException) {
// In case of FailedProcessingException Do not keep the failed processing exception in the stack trace
if (exception instanceof FailedProcessingException) {
firstException = new ProcessorStateException(
format("%sFailed to flush cache of store %s", logPrefix, store.name()),
exception.getCause());
} else if (exception instanceof StreamsException) {
firstException = exception;
} else {
firstException = new ProcessorStateException(
@ -581,8 +594,10 @@ public class ProcessorStateManager implements StateManager {
exception
);
}
log.error("Failed to flush cache of store {}: ", store.name(), firstException);
} else {
log.error("Failed to flush cache of store {}: ", store.name(), exception);
}
log.error("Failed to flush cache of store {}: ", store.name(), exception);
}
}
}
@ -618,13 +633,20 @@ public class ProcessorStateManager implements StateManager {
} catch (final RuntimeException exception) {
if (firstException == null) {
// do NOT wrap the error if it is actually caused by Streams itself
if (exception instanceof StreamsException)
// In case of FailedProcessingException Do not keep the failed processing exception in the stack trace
if (exception instanceof FailedProcessingException)
firstException = new ProcessorStateException(
format("%sFailed to close state store %s", logPrefix, store.name()),
exception.getCause());
else if (exception instanceof StreamsException)
firstException = exception;
else
firstException = new ProcessorStateException(
format("%sFailed to close state store %s", logPrefix, store.name()), exception);
log.error("Failed to close state store {}: ", store.name(), firstException);
} else {
log.error("Failed to close state store {}: ", store.name(), exception);
}
log.error("Failed to close state store {}: ", store.name(), exception);
}
}

View File

@ -27,6 +27,7 @@ import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TaskCorruptedException;
import org.apache.kafka.streams.errors.internals.FailedProcessingException;
import org.apache.kafka.streams.processor.CommitCallback;
import org.apache.kafka.streams.processor.StateRestoreCallback;
import org.apache.kafka.streams.processor.StateStore;
@ -38,6 +39,7 @@ import org.apache.kafka.streams.state.TimestampedBytesStore;
import org.apache.kafka.streams.state.internals.CachedStateStore;
import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
import org.apache.kafka.streams.state.internals.StoreQueryUtils;
import org.apache.kafka.test.MockCachedKeyValueStore;
import org.apache.kafka.test.MockKeyValueStore;
import org.apache.kafka.test.MockRestoreCallback;
import org.apache.kafka.test.TestUtils;
@ -57,6 +59,7 @@ import java.io.FileWriter;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
@ -771,6 +774,64 @@ public class ProcessorStateManagerTest {
assertEquals(exception, thrown);
}
@Test
public void shouldThrowProcessorStateExceptionOnFlushIfStoreThrowsAFailedProcessingException() {
final RuntimeException exception = new RuntimeException("KABOOM!");
final ProcessorStateManager stateManager = getStateManager(Task.TaskType.ACTIVE);
final MockKeyValueStore stateStore = new MockKeyValueStore(persistentStoreName, true) {
@Override
public void flush() {
throw new FailedProcessingException("processor", exception);
}
};
stateManager.registerStore(stateStore, stateStore.stateRestoreCallback, null);
final ProcessorStateException thrown = assertThrows(ProcessorStateException.class, stateManager::flush);
assertEquals(exception, thrown.getCause());
assertFalse(exception.getMessage().contains("FailedProcessingException"));
assertFalse(Arrays.stream(thrown.getStackTrace()).anyMatch(
element -> element.getClassName().contains(FailedProcessingException.class.getSimpleName())));
}
@Test
public void shouldThrowProcessorStateExceptionOnFlushCacheIfStoreThrowsAFailedProcessingException() {
final RuntimeException exception = new RuntimeException("KABOOM!");
final ProcessorStateManager stateManager = getStateManager(Task.TaskType.ACTIVE);
final MockCachedKeyValueStore stateStore = new MockCachedKeyValueStore(persistentStoreName, true) {
@Override
public void flushCache() {
throw new FailedProcessingException("processor", exception);
}
};
stateManager.registerStore(stateStore, stateStore.stateRestoreCallback, null);
final ProcessorStateException thrown = assertThrows(ProcessorStateException.class, stateManager::flushCache);
assertEquals(exception, thrown.getCause());
assertFalse(exception.getMessage().contains("FailedProcessingException"));
assertFalse(Arrays.stream(thrown.getStackTrace()).anyMatch(
element -> element.getClassName().contains(FailedProcessingException.class.getSimpleName())));
}
@Test
public void shouldThrowProcessorStateExceptionOnCloseIfStoreThrowsAFailedProcessingException() {
final RuntimeException exception = new RuntimeException("KABOOM!");
final ProcessorStateManager stateManager = getStateManager(Task.TaskType.ACTIVE);
final MockKeyValueStore stateStore = new MockKeyValueStore(persistentStoreName, true) {
@Override
public void close() {
throw new FailedProcessingException("processor", exception);
}
};
stateManager.registerStore(stateStore, stateStore.stateRestoreCallback, null);
final ProcessorStateException thrown = assertThrows(ProcessorStateException.class, stateManager::close);
assertEquals(exception, thrown.getCause());
assertFalse(exception.getMessage().contains("FailedProcessingException"));
assertFalse(Arrays.stream(thrown.getStackTrace()).anyMatch(
element -> element.getClassName().contains(FailedProcessingException.class.getSimpleName())));
}
@Test
public void shouldThrowIfRestoringUnregisteredStore() {
final ProcessorStateManager stateManager = getStateManager(Task.TaskType.ACTIVE);

View File

@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.test;
import org.apache.kafka.streams.state.internals.CacheFlushListener;
import org.apache.kafka.streams.state.internals.CachedStateStore;
public class MockCachedKeyValueStore extends MockKeyValueStore implements CachedStateStore<Object, Object> {
public MockCachedKeyValueStore(String name, boolean persistent) {
super(name, persistent);
}
@Override
public boolean setFlushListener(CacheFlushListener<Object, Object> listener, boolean sendOldValues) {
return false;
}
@Override
public void flushCache() {
}
@Override
public void clearCache() {
}
}