KAFKA-16448: Add ErrorHandlerContext in deserialization exception handler (#16432)

This PR is part of KIP1033 which aims to bring a ProcessingExceptionHandler to Kafka Streams in order to deal with exceptions that occur during processing.

This PR expose the new ErrorHandlerContext as a parameter to the Deserialization exception handlers and deprecate the previous handle signature.

Co-authored-by: Dabz <d.gasparina@gmail.com>
Co-authored-by: loicgreffier <loic.greffier@michelin.com>

Reviewers: Bruno Cadonna <bruno@confluent.io>, Matthias J. Sax <matthias@confluent.io>
This commit is contained in:
Sebastien Viale 2024-07-30 05:33:33 +02:00 committed by GitHub
parent b6d5f0556c
commit faaef527d7
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 205 additions and 17 deletions

View File

@ -16,9 +16,9 @@
*/
package org.apache.kafka.streams.errors;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.Configurable;
import org.apache.kafka.streams.errors.internals.DefaultErrorHandlerContext;
import org.apache.kafka.streams.processor.ProcessorContext;
/**
@ -37,11 +37,27 @@ public interface DeserializationExceptionHandler extends Configurable {
* @param context processor context
* @param record record that failed deserialization
* @param exception the actual exception
* @deprecated Since 3.9. Use Please {@link #handle(ErrorHandlerContext, ConsumerRecord, Exception)}
*/
@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
DeserializationHandlerResponse handle(final ProcessorContext context,
final ConsumerRecord<byte[], byte[]> record,
final Exception exception);
@Deprecated
default DeserializationHandlerResponse handle(final ProcessorContext context,
final ConsumerRecord<byte[], byte[]> record,
final Exception exception) {
throw new UnsupportedOperationException();
}
/**
* Inspect a record and the exception received.
*
* @param context error handler context
* @param record record that failed deserialization
* @param exception the actual exception
*/
default DeserializationHandlerResponse handle(final ErrorHandlerContext context,
final ConsumerRecord<byte[], byte[]> record,
final Exception exception) {
return handle(((DefaultErrorHandlerContext) context).processorContext().orElse(null), record, exception);
}
/**
* Enumeration that describes the response from the exception handler.

View File

@ -32,6 +32,7 @@ import java.util.Map;
public class LogAndContinueExceptionHandler implements DeserializationExceptionHandler {
private static final Logger log = LoggerFactory.getLogger(LogAndContinueExceptionHandler.class);
@Deprecated
@Override
public DeserializationHandlerResponse handle(final ProcessorContext context,
final ConsumerRecord<byte[], byte[]> record,
@ -45,6 +46,19 @@ public class LogAndContinueExceptionHandler implements DeserializationExceptionH
return DeserializationHandlerResponse.CONTINUE;
}
@Override
public DeserializationHandlerResponse handle(final ErrorHandlerContext context,
final ConsumerRecord<byte[], byte[]> record,
final Exception exception) {
log.warn("Exception caught during Deserialization, " +
"taskId: {}, topic: {}, partition: {}, offset: {}",
context.taskId(), record.topic(), record.partition(), record.offset(),
exception);
return DeserializationHandlerResponse.CONTINUE;
}
@Override
public void configure(final Map<String, ?> configs) {
// ignore

View File

@ -33,6 +33,7 @@ public class LogAndFailExceptionHandler implements DeserializationExceptionHandl
private static final Logger log = LoggerFactory.getLogger(LogAndFailExceptionHandler.class);
@Override
@Deprecated
public DeserializationHandlerResponse handle(final ProcessorContext context,
final ConsumerRecord<byte[], byte[]> record,
final Exception exception) {
@ -45,6 +46,19 @@ public class LogAndFailExceptionHandler implements DeserializationExceptionHandl
return DeserializationHandlerResponse.FAIL;
}
@Override
public DeserializationHandlerResponse handle(final ErrorHandlerContext context,
final ConsumerRecord<byte[], byte[]> record,
final Exception exception) {
log.error("Exception caught during Deserialization, " +
"taskId: {}, topic: {}, partition: {}, offset: {}",
context.taskId(), record.topic(), record.partition(), record.offset(),
exception);
return DeserializationHandlerResponse.FAIL;
}
@Override
public void configure(final Map<String, ?> configs) {
// ignore

View File

@ -18,8 +18,11 @@ package org.apache.kafka.streams.errors.internals;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.streams.errors.ErrorHandlerContext;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.TaskId;
import java.util.Optional;
/**
* Default implementation of {@link ErrorHandlerContext} that provides access to the metadata of the record that caused the error.
*/
@ -30,8 +33,10 @@ public class DefaultErrorHandlerContext implements ErrorHandlerContext {
private final Headers headers;
private final String processorNodeId;
private final TaskId taskId;
private ProcessorContext processorContext;
public DefaultErrorHandlerContext(final String topic,
public DefaultErrorHandlerContext(final ProcessorContext processorContext,
final String topic,
final int partition,
final long offset,
final Headers headers,
@ -43,6 +48,7 @@ public class DefaultErrorHandlerContext implements ErrorHandlerContext {
this.headers = headers;
this.processorNodeId = processorNodeId;
this.taskId = taskId;
this.processorContext = processorContext;
}
@Override
@ -74,4 +80,8 @@ public class DefaultErrorHandlerContext implements ErrorHandlerContext {
public TaskId taskId() {
return taskId;
}
public Optional<ProcessorContext> processorContext() {
return Optional.ofNullable(processorContext);
}
}

View File

@ -205,6 +205,7 @@ public class ProcessorNode<KIn, VIn, KOut, VOut> {
throw e;
} catch (final Exception e) {
final ErrorHandlerContext errorHandlerContext = new DefaultErrorHandlerContext(
null,
internalProcessorContext.topic(),
internalProcessorContext.partition(),
internalProcessorContext.offset(),

View File

@ -22,6 +22,7 @@ import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.errors.DeserializationExceptionHandler;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.internals.DefaultErrorHandlerContext;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.slf4j.Logger;
@ -69,7 +70,7 @@ public class RecordDeserializer {
Optional.empty()
);
} catch (final Exception deserializationException) {
handleDeserializationFailure(deserializationExceptionHandler, processorContext, deserializationException, rawRecord, log, droppedRecordsSensor);
handleDeserializationFailure(deserializationExceptionHandler, processorContext, deserializationException, rawRecord, log, droppedRecordsSensor, sourceNode().name());
return null; // 'handleDeserializationFailure' would either throw or swallow -- if we swallow we need to skip the record by returning 'null'
}
}
@ -80,12 +81,27 @@ public class RecordDeserializer {
final ConsumerRecord<byte[], byte[]> rawRecord,
final Logger log,
final Sensor droppedRecordsSensor) {
handleDeserializationFailure(deserializationExceptionHandler, processorContext, deserializationException, rawRecord, log, droppedRecordsSensor, null);
}
public static void handleDeserializationFailure(final DeserializationExceptionHandler deserializationExceptionHandler,
final ProcessorContext<?, ?> processorContext,
final Exception deserializationException,
final ConsumerRecord<byte[], byte[]> rawRecord,
final Logger log,
final Sensor droppedRecordsSensor,
final String sourceNodeName) {
final DeserializationExceptionHandler.DeserializationHandlerResponse response;
try {
response = deserializationExceptionHandler.handle(
final DefaultErrorHandlerContext errorHandlerContext = new DefaultErrorHandlerContext(
(InternalProcessorContext<?, ?>) processorContext,
rawRecord,
deserializationException);
rawRecord.topic(),
rawRecord.partition(),
rawRecord.offset(),
rawRecord.headers(),
sourceNodeName,
processorContext.taskId());
response = deserializationExceptionHandler.handle(errorHandlerContext, rawRecord, deserializationException);
} catch (final Exception fatalUserException) {
log.error(
"Deserialization error callback failed after deserialization error for record {}",

View File

@ -24,16 +24,28 @@ import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.errors.DeserializationExceptionHandler;
import org.apache.kafka.streams.errors.ErrorHandlerContext;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.test.InternalMockProcessorContext;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import java.util.Map;
import java.util.Optional;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
public class RecordDeserializerTest {
private final RecordHeaders headers = new RecordHeaders(new Header[] {new RecordHeader("key", "value".getBytes())});
private final String sourceNodeName = "source-node";
private final TaskId taskId = new TaskId(0, 0);
private final RecordHeaders headers = new RecordHeaders(new Header[]{new RecordHeader("key", "value".getBytes())});
private final ConsumerRecord<byte[], byte[]> rawRecord = new ConsumerRecord<>("topic",
1,
1,
@ -46,13 +58,17 @@ public class RecordDeserializerTest {
headers,
Optional.empty());
private final InternalProcessorContext<Void, Void> context = new InternalMockProcessorContext<>();
@Test
public void shouldReturnConsumerRecordWithDeserializedValueWhenNoExceptions() {
final RecordDeserializer recordDeserializer = new RecordDeserializer(
new TheSourceNode(
sourceNodeName,
false,
false,
"key", "value"
"key",
"value"
),
null,
new LogContext(),
@ -69,17 +85,82 @@ public class RecordDeserializerTest {
assertEquals(rawRecord.headers(), record.headers());
}
@ParameterizedTest
@CsvSource({
"true, true",
"true, false",
"false, true",
})
public void shouldThrowStreamsExceptionWhenDeserializationFailsAndExceptionHandlerRepliesWithFail(final boolean keyThrowsException,
final boolean valueThrowsException) {
final RecordDeserializer recordDeserializer = new RecordDeserializer(
new TheSourceNode(
sourceNodeName,
keyThrowsException,
valueThrowsException,
"key",
"value"
),
new DeserializationExceptionHandlerMock(
DeserializationExceptionHandler.DeserializationHandlerResponse.FAIL,
rawRecord,
sourceNodeName,
taskId
),
new LogContext(),
new Metrics().sensor("dropped-records")
);
final StreamsException e = assertThrows(StreamsException.class, () -> recordDeserializer.deserialize(context, rawRecord));
assertEquals(e.getMessage(), "Deserialization exception handler is set "
+ "to fail upon a deserialization error. "
+ "If you would rather have the streaming pipeline "
+ "continue after a deserialization error, please set the "
+ "default.deserialization.exception.handler appropriately.");
}
@ParameterizedTest
@CsvSource({
"true, true",
"true, false",
"false, true"
})
public void shouldNotThrowStreamsExceptionWhenDeserializationFailsAndExceptionHandlerRepliesWithContinue(final boolean keyThrowsException,
final boolean valueThrowsException) {
final RecordDeserializer recordDeserializer = new RecordDeserializer(
new TheSourceNode(
sourceNodeName,
keyThrowsException,
valueThrowsException,
"key",
"value"
),
new DeserializationExceptionHandlerMock(
DeserializationExceptionHandler.DeserializationHandlerResponse.CONTINUE,
rawRecord,
sourceNodeName,
taskId
),
new LogContext(),
new Metrics().sensor("dropped-records")
);
final ConsumerRecord<Object, Object> record = recordDeserializer.deserialize(context, rawRecord);
assertNull(record);
}
static class TheSourceNode extends SourceNode<Object, Object> {
private final boolean keyThrowsException;
private final boolean valueThrowsException;
private final Object key;
private final Object value;
TheSourceNode(final boolean keyThrowsException,
TheSourceNode(final String name,
final boolean keyThrowsException,
final boolean valueThrowsException,
final Object key,
final Object value) {
super("", null, null);
super(name, null, null);
this.keyThrowsException = keyThrowsException;
this.valueThrowsException = valueThrowsException;
this.key = key;
@ -89,7 +170,7 @@ public class RecordDeserializerTest {
@Override
public Object deserializeKey(final String topic, final Headers headers, final byte[] data) {
if (keyThrowsException) {
throw new RuntimeException();
throw new RuntimeException("KABOOM!");
}
return key;
}
@ -97,10 +178,46 @@ public class RecordDeserializerTest {
@Override
public Object deserializeValue(final String topic, final Headers headers, final byte[] data) {
if (valueThrowsException) {
throw new RuntimeException();
throw new RuntimeException("KABOOM!");
}
return value;
}
}
public static class DeserializationExceptionHandlerMock implements DeserializationExceptionHandler {
private final DeserializationHandlerResponse response;
private final ConsumerRecord<byte[], byte[]> expectedRecord;
private final String expectedProcessorNodeId;
private final TaskId expectedTaskId;
public DeserializationExceptionHandlerMock(final DeserializationHandlerResponse response,
final ConsumerRecord<byte[], byte[]> record,
final String processorNodeId,
final TaskId taskId) {
this.response = response;
this.expectedRecord = record;
this.expectedProcessorNodeId = processorNodeId;
this.expectedTaskId = taskId;
}
@Override
public DeserializationHandlerResponse handle(final ErrorHandlerContext context,
final ConsumerRecord<byte[], byte[]> record,
final Exception exception) {
assertEquals(expectedRecord.topic(), context.topic());
assertEquals(expectedRecord.partition(), context.partition());
assertEquals(expectedRecord.offset(), context.offset());
assertEquals(expectedProcessorNodeId, context.processorNodeId());
assertEquals(expectedTaskId, context.taskId());
assertEquals(expectedRecord, record);
assertInstanceOf(RuntimeException.class, exception);
assertEquals("KABOOM!", exception.getMessage());
return response;
}
@Override
public void configure(final Map<String, ?> configs) {
// do nothing
}
}
}