diff --git a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/ProcessingExceptionHandlerIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/ProcessingExceptionHandlerIntegrationTest.java index 9b848ab9005..13e291e887c 100644 --- a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/ProcessingExceptionHandlerIntegrationTest.java +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/ProcessingExceptionHandlerIntegrationTest.java @@ -26,6 +26,8 @@ import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.TestInputTopic; import org.apache.kafka.streams.TopologyTestDriver; import org.apache.kafka.streams.errors.ErrorHandlerContext; +import org.apache.kafka.streams.errors.LogAndContinueProcessingExceptionHandler; +import org.apache.kafka.streams.errors.LogAndFailProcessingExceptionHandler; import org.apache.kafka.streams.errors.ProcessingExceptionHandler; import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.kstream.Consumed; @@ -108,6 +110,52 @@ public class ProcessingExceptionHandlerIntegrationTest { } } + @Test + public void shouldFailWhenProcessingExceptionOccursFromFlushingCacheIfExceptionHandlerReturnsFail() { + final List> events = Arrays.asList( + new KeyValue<>("ID123-1", "ID123-A1"), + new KeyValue<>("ID123-1", "ID123-A2"), + new KeyValue<>("ID123-1", "ID123-A3"), + new KeyValue<>("ID123-1", "ID123-A4") + ); + + final List> expectedProcessedRecords = Arrays.asList( + new KeyValueTimestamp<>("ID123-1", "1", TIMESTAMP.toEpochMilli()), + new KeyValueTimestamp<>("ID123-1", "2", TIMESTAMP.toEpochMilli()) + ); + + final MockProcessorSupplier processor = new MockProcessorSupplier<>(); + final StreamsBuilder builder = new StreamsBuilder(); + builder + .stream("TOPIC_NAME", Consumed.with(Serdes.String(), Serdes.String())) + .groupByKey() + .count() + .toStream() + .mapValues(value -> value.toString()) + .process(runtimeErrorProcessorSupplierMock()) + .process(processor); + + final Properties properties = new Properties(); + properties.put(StreamsConfig.PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG, LogAndFailProcessingExceptionHandler.class); + + try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), properties, Instant.ofEpochMilli(0L))) { + final TestInputTopic inputTopic = driver.createInputTopic("TOPIC_NAME", new StringSerializer(), new StringSerializer()); + + final StreamsException exception = assertThrows(StreamsException.class, + () -> inputTopic.pipeKeyValueList(events, TIMESTAMP, Duration.ZERO)); + + assertTrue(exception.getMessage().contains("Failed to flush cache of store KSTREAM-AGGREGATE-STATE-STORE-0000000001")); + assertEquals(expectedProcessedRecords.size(), processor.theCapturedProcessor().processed().size()); + assertIterableEquals(expectedProcessedRecords, processor.theCapturedProcessor().processed()); + + final MetricName dropTotal = droppedRecordsTotalMetric(); + final MetricName dropRate = droppedRecordsRateMetric(); + + assertEquals(0.0, driver.metrics().get(dropTotal).metricValue()); + assertEquals(0.0, driver.metrics().get(dropRate).metricValue()); + } + } + @Test public void shouldContinueWhenProcessingExceptionOccursIfExceptionHandlerReturnsContinue() { final List> events = Arrays.asList( @@ -153,6 +201,50 @@ public class ProcessingExceptionHandlerIntegrationTest { } } + @Test + public void shouldContinueWhenProcessingExceptionOccursFromFlushingCacheIfExceptionHandlerReturnsContinue() { + final List> events = Arrays.asList( + new KeyValue<>("ID123-1", "ID123-A1"), + new KeyValue<>("ID123-1", "ID123-A2"), + new KeyValue<>("ID123-1", "ID123-A3"), + new KeyValue<>("ID123-1", "ID123-A4") + ); + + final List> expectedProcessedRecords = Arrays.asList( + new KeyValueTimestamp<>("ID123-1", "1", TIMESTAMP.toEpochMilli()), + new KeyValueTimestamp<>("ID123-1", "2", TIMESTAMP.toEpochMilli()), + new KeyValueTimestamp<>("ID123-1", "4", TIMESTAMP.toEpochMilli()) + ); + + final MockProcessorSupplier processor = new MockProcessorSupplier<>(); + final StreamsBuilder builder = new StreamsBuilder(); + builder + .stream("TOPIC_NAME", Consumed.with(Serdes.String(), Serdes.String())) + .groupByKey() + .count() + .toStream() + .mapValues(value -> value.toString()) + .process(runtimeErrorProcessorSupplierMock()) + .process(processor); + + final Properties properties = new Properties(); + properties.put(StreamsConfig.PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG, LogAndContinueProcessingExceptionHandler.class); + + try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), properties, Instant.ofEpochMilli(0L))) { + final TestInputTopic inputTopic = driver.createInputTopic("TOPIC_NAME", new StringSerializer(), new StringSerializer()); + inputTopic.pipeKeyValueList(events, TIMESTAMP, Duration.ZERO); + + assertEquals(expectedProcessedRecords.size(), processor.theCapturedProcessor().processed().size()); + assertIterableEquals(expectedProcessedRecords, processor.theCapturedProcessor().processed()); + + final MetricName dropTotal = droppedRecordsTotalMetric(); + final MetricName dropRate = droppedRecordsRateMetric(); + + assertEquals(1.0, driver.metrics().get(dropTotal).metricValue()); + assertTrue((Double) driver.metrics().get(dropRate).metricValue() > 0.0); + } + } + @Test public void shouldStopOnFailedProcessorWhenProcessingExceptionOccursInFailProcessingExceptionHandler() { final KeyValue event = new KeyValue<>("ID123-1", "ID123-A1"); @@ -377,7 +469,7 @@ public class ProcessingExceptionHandlerIntegrationTest { return () -> new ContextualProcessor() { @Override public void process(final Record record) { - if (record.key().contains("ERR")) { + if (record.key().contains("ERR") || record.value().equals("3")) { throw new RuntimeException("Exception should be handled by processing exception handler"); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java index 30334abc53e..99092dbb4fa 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java @@ -24,6 +24,7 @@ import org.apache.kafka.streams.errors.ProcessorStateException; import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.errors.TaskCorruptedException; import org.apache.kafka.streams.errors.TaskMigratedException; +import org.apache.kafka.streams.errors.internals.FailedProcessingException; import org.apache.kafka.streams.processor.CommitCallback; import org.apache.kafka.streams.processor.StateRestoreCallback; import org.apache.kafka.streams.processor.StateRestoreListener; @@ -538,13 +539,20 @@ public class ProcessorStateManager implements StateManager { } catch (final RuntimeException exception) { if (firstException == null) { // do NOT wrap the error if it is actually caused by Streams itself - if (exception instanceof StreamsException) + // In case of FailedProcessingException Do not keep the failed processing exception in the stack trace + if (exception instanceof FailedProcessingException) + firstException = new ProcessorStateException( + format("%sFailed to flush state store %s", logPrefix, store.name()), + exception.getCause()); + else if (exception instanceof StreamsException) firstException = exception; else firstException = new ProcessorStateException( format("%sFailed to flush state store %s", logPrefix, store.name()), exception); + log.error("Failed to flush state store {}: ", store.name(), firstException); + } else { + log.error("Failed to flush state store {}: ", store.name(), exception); } - log.error("Failed to flush state store {}: ", store.name(), exception); } } } @@ -573,7 +581,12 @@ public class ProcessorStateManager implements StateManager { } catch (final RuntimeException exception) { if (firstException == null) { // do NOT wrap the error if it is actually caused by Streams itself - if (exception instanceof StreamsException) { + // In case of FailedProcessingException Do not keep the failed processing exception in the stack trace + if (exception instanceof FailedProcessingException) { + firstException = new ProcessorStateException( + format("%sFailed to flush cache of store %s", logPrefix, store.name()), + exception.getCause()); + } else if (exception instanceof StreamsException) { firstException = exception; } else { firstException = new ProcessorStateException( @@ -581,8 +594,10 @@ public class ProcessorStateManager implements StateManager { exception ); } + log.error("Failed to flush cache of store {}: ", store.name(), firstException); + } else { + log.error("Failed to flush cache of store {}: ", store.name(), exception); } - log.error("Failed to flush cache of store {}: ", store.name(), exception); } } } @@ -618,13 +633,20 @@ public class ProcessorStateManager implements StateManager { } catch (final RuntimeException exception) { if (firstException == null) { // do NOT wrap the error if it is actually caused by Streams itself - if (exception instanceof StreamsException) + // In case of FailedProcessingException Do not keep the failed processing exception in the stack trace + if (exception instanceof FailedProcessingException) + firstException = new ProcessorStateException( + format("%sFailed to close state store %s", logPrefix, store.name()), + exception.getCause()); + else if (exception instanceof StreamsException) firstException = exception; else firstException = new ProcessorStateException( format("%sFailed to close state store %s", logPrefix, store.name()), exception); + log.error("Failed to close state store {}: ", store.name(), firstException); + } else { + log.error("Failed to close state store {}: ", store.name(), exception); } - log.error("Failed to close state store {}: ", store.name(), exception); } } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java index a46c54ee05b..da6c418bd1e 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java @@ -27,6 +27,7 @@ import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.errors.ProcessorStateException; import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.errors.TaskCorruptedException; +import org.apache.kafka.streams.errors.internals.FailedProcessingException; import org.apache.kafka.streams.processor.CommitCallback; import org.apache.kafka.streams.processor.StateRestoreCallback; import org.apache.kafka.streams.processor.StateStore; @@ -38,6 +39,7 @@ import org.apache.kafka.streams.state.TimestampedBytesStore; import org.apache.kafka.streams.state.internals.CachedStateStore; import org.apache.kafka.streams.state.internals.OffsetCheckpoint; import org.apache.kafka.streams.state.internals.StoreQueryUtils; +import org.apache.kafka.test.MockCachedKeyValueStore; import org.apache.kafka.test.MockKeyValueStore; import org.apache.kafka.test.MockRestoreCallback; import org.apache.kafka.test.TestUtils; @@ -57,6 +59,7 @@ import java.io.FileWriter; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.nio.file.Files; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.Map; @@ -771,6 +774,64 @@ public class ProcessorStateManagerTest { assertEquals(exception, thrown); } + @Test + public void shouldThrowProcessorStateExceptionOnFlushIfStoreThrowsAFailedProcessingException() { + final RuntimeException exception = new RuntimeException("KABOOM!"); + final ProcessorStateManager stateManager = getStateManager(Task.TaskType.ACTIVE); + final MockKeyValueStore stateStore = new MockKeyValueStore(persistentStoreName, true) { + @Override + public void flush() { + throw new FailedProcessingException("processor", exception); + } + }; + stateManager.registerStore(stateStore, stateStore.stateRestoreCallback, null); + + final ProcessorStateException thrown = assertThrows(ProcessorStateException.class, stateManager::flush); + assertEquals(exception, thrown.getCause()); + assertFalse(exception.getMessage().contains("FailedProcessingException")); + assertFalse(Arrays.stream(thrown.getStackTrace()).anyMatch( + element -> element.getClassName().contains(FailedProcessingException.class.getSimpleName()))); + } + + @Test + public void shouldThrowProcessorStateExceptionOnFlushCacheIfStoreThrowsAFailedProcessingException() { + final RuntimeException exception = new RuntimeException("KABOOM!"); + final ProcessorStateManager stateManager = getStateManager(Task.TaskType.ACTIVE); + final MockCachedKeyValueStore stateStore = new MockCachedKeyValueStore(persistentStoreName, true) { + @Override + public void flushCache() { + throw new FailedProcessingException("processor", exception); + } + }; + stateManager.registerStore(stateStore, stateStore.stateRestoreCallback, null); + + final ProcessorStateException thrown = assertThrows(ProcessorStateException.class, stateManager::flushCache); + assertEquals(exception, thrown.getCause()); + assertFalse(exception.getMessage().contains("FailedProcessingException")); + assertFalse(Arrays.stream(thrown.getStackTrace()).anyMatch( + element -> element.getClassName().contains(FailedProcessingException.class.getSimpleName()))); + + } + + @Test + public void shouldThrowProcessorStateExceptionOnCloseIfStoreThrowsAFailedProcessingException() { + final RuntimeException exception = new RuntimeException("KABOOM!"); + final ProcessorStateManager stateManager = getStateManager(Task.TaskType.ACTIVE); + final MockKeyValueStore stateStore = new MockKeyValueStore(persistentStoreName, true) { + @Override + public void close() { + throw new FailedProcessingException("processor", exception); + } + }; + stateManager.registerStore(stateStore, stateStore.stateRestoreCallback, null); + + final ProcessorStateException thrown = assertThrows(ProcessorStateException.class, stateManager::close); + assertEquals(exception, thrown.getCause()); + assertFalse(exception.getMessage().contains("FailedProcessingException")); + assertFalse(Arrays.stream(thrown.getStackTrace()).anyMatch( + element -> element.getClassName().contains(FailedProcessingException.class.getSimpleName()))); + } + @Test public void shouldThrowIfRestoringUnregisteredStore() { final ProcessorStateManager stateManager = getStateManager(Task.TaskType.ACTIVE); diff --git a/streams/src/test/java/org/apache/kafka/test/MockCachedKeyValueStore.java b/streams/src/test/java/org/apache/kafka/test/MockCachedKeyValueStore.java new file mode 100644 index 00000000000..ee55757f7bf --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/test/MockCachedKeyValueStore.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.test; + +import org.apache.kafka.streams.state.internals.CacheFlushListener; +import org.apache.kafka.streams.state.internals.CachedStateStore; + +public class MockCachedKeyValueStore extends MockKeyValueStore implements CachedStateStore { + + public MockCachedKeyValueStore(String name, boolean persistent) { + super(name, persistent); + } + + @Override + public boolean setFlushListener(CacheFlushListener listener, boolean sendOldValues) { + return false; + } + + @Override + public void flushCache() { + + } + + @Override + public void clearCache() { + + } +}