KAFKA-16505: Adding dead letter queue in Kafka Streams (#17942)

Implements KIP-1034 to add support of Dead Letter
Queue in Kafka Streams. 

Reviewers: Lucas Brutschy <lbrutschy@confluent.io>, Bruno Cadonna
 <cadonna@apache.org>
Co-authored-by: Sebastien Viale <sebastien.viale@michelin.com>
This commit is contained in:
Gasparina Damien 2025-07-21 15:54:40 +02:00 committed by GitHub
parent f52f2b99e5
commit cdc2d957ed
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
24 changed files with 1479 additions and 172 deletions

View File

@ -357,7 +357,7 @@ public class ProcessingExceptionHandlerIntegrationTest {
final StreamsException e = assertThrows(StreamsException.class, () -> inputTopic.pipeInput(eventError.key, eventError.value, Instant.EPOCH));
assertEquals("Fatal user code error in processing error callback", e.getMessage());
assertInstanceOf(NullPointerException.class, e.getCause());
assertEquals("Invalid ProductionExceptionHandler response.", e.getCause().getMessage());
assertEquals("Invalid ProcessingExceptionHandler response.", e.getCause().getMessage());
assertFalse(isExecuted.get());
}
}
@ -524,7 +524,7 @@ public class ProcessingExceptionHandlerIntegrationTest {
public static class ContinueProcessingExceptionHandlerMockTest implements ProcessingExceptionHandler {
@Override
public ProcessingExceptionHandler.ProcessingHandlerResponse handle(final ErrorHandlerContext context, final Record<?, ?> record, final Exception exception) {
public Response handleError(final ErrorHandlerContext context, final Record<?, ?> record, final Exception exception) {
if (((String) record.key()).contains("FATAL")) {
throw new RuntimeException("KABOOM!");
}
@ -532,7 +532,7 @@ public class ProcessingExceptionHandlerIntegrationTest {
return null;
}
assertProcessingExceptionHandlerInputs(context, record, exception);
return ProcessingExceptionHandler.ProcessingHandlerResponse.CONTINUE;
return Response.resume();
}
@Override
@ -543,9 +543,9 @@ public class ProcessingExceptionHandlerIntegrationTest {
public static class FailProcessingExceptionHandlerMockTest implements ProcessingExceptionHandler {
@Override
public ProcessingExceptionHandler.ProcessingHandlerResponse handle(final ErrorHandlerContext context, final Record<?, ?> record, final Exception exception) {
public Response handleError(final ErrorHandlerContext context, final Record<?, ?> record, final Exception exception) {
assertProcessingExceptionHandlerInputs(context, record, exception);
return ProcessingExceptionHandler.ProcessingHandlerResponse.FAIL;
return Response.fail();
}
@Override

View File

@ -156,15 +156,15 @@ public class SwallowUnknownTopicErrorIntegrationTest {
public void configure(final Map<String, ?> configs) { }
@Override
public ProductionExceptionHandlerResponse handle(final ErrorHandlerContext context,
public Response handleError(final ErrorHandlerContext context,
final ProducerRecord<byte[], byte[]> record,
final Exception exception) {
if (exception instanceof TimeoutException &&
exception.getCause() != null &&
exception.getCause() instanceof UnknownTopicOrPartitionException) {
return ProductionExceptionHandlerResponse.CONTINUE;
return Response.resume();
}
return ProductionExceptionHandler.super.handle(context, record, exception);
return ProductionExceptionHandler.super.handleError(context, record, exception);
}
}

View File

@ -619,6 +619,11 @@ public class StreamsConfig extends AbstractConfig {
"support \"classic\" or \"streams\". If \"streams\" is specified, then the streams rebalance protocol will be " +
"used. Otherwise, the classic group protocol will be used.";
public static final String ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG = "errors.dead.letter.queue.topic.name";
private static final String ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_DOC = "If not null, the default exception handler will build and send a Dead Letter Queue record to the topic with the provided name if an error occurs.\n" +
"If a custom deserialization/production or processing exception handler is set, this parameter is ignored for this handler.";
/** {@code log.summary.interval.ms} */
public static final String LOG_SUMMARY_INTERVAL_MS_CONFIG = "log.summary.interval.ms";
private static final String LOG_SUMMARY_INTERVAL_MS_DOC = "The output interval in milliseconds for logging summary information.\n" +
@ -991,6 +996,11 @@ public class StreamsConfig extends AbstractConfig {
LogAndFailExceptionHandler.class.getName(),
Importance.MEDIUM,
DESERIALIZATION_EXCEPTION_HANDLER_CLASS_DOC)
.define(ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG,
Type.STRING,
null,
Importance.MEDIUM,
ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_DOC)
.define(MAX_TASK_IDLE_MS_CONFIG,
Type.LONG,
0L,

View File

@ -18,38 +18,42 @@ package org.apache.kafka.streams.errors;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.streams.StreamsConfig;
import java.util.Map;
import static org.apache.kafka.streams.errors.internals.ExceptionHandlerUtils.maybeBuildDeadLetterQueueRecords;
/**
* {@code ProductionExceptionHandler} that always instructs streams to fail when an exception
* happens while attempting to produce result records.
*/
public class DefaultProductionExceptionHandler implements ProductionExceptionHandler {
/**
* @deprecated Since 3.9. Use {@link #handle(ErrorHandlerContext, ProducerRecord, Exception)} instead.
*/
@SuppressWarnings("deprecation")
@Deprecated
@Override
public ProductionExceptionHandlerResponse handle(final ProducerRecord<byte[], byte[]> record,
final Exception exception) {
return exception instanceof RetriableException ?
ProductionExceptionHandlerResponse.RETRY :
ProductionExceptionHandlerResponse.FAIL;
}
private String deadLetterQueueTopic = null;
@Override
public ProductionExceptionHandlerResponse handle(final ErrorHandlerContext context,
public Response handleError(final ErrorHandlerContext context,
final ProducerRecord<byte[], byte[]> record,
final Exception exception) {
return exception instanceof RetriableException ?
ProductionExceptionHandlerResponse.RETRY :
ProductionExceptionHandlerResponse.FAIL;
Response.retry() :
Response.fail(maybeBuildDeadLetterQueueRecords(deadLetterQueueTopic, context.sourceRawKey(), context.sourceRawValue(), context, exception));
}
@SuppressWarnings("rawtypes")
@Override
public Response handleSerializationError(final ErrorHandlerContext context,
final ProducerRecord record,
final Exception exception,
final SerializationExceptionOrigin origin) {
return Response.fail(maybeBuildDeadLetterQueueRecords(deadLetterQueueTopic, context.sourceRawKey(), context.sourceRawValue(), context, exception));
}
@Override
public void configure(final Map<String, ?> configs) {
// ignore
if (configs.get(StreamsConfig.ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG) != null)
deadLetterQueueTopic = String.valueOf(configs.get(StreamsConfig.ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG));
}
}

View File

@ -17,10 +17,14 @@
package org.apache.kafka.streams.errors;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.Configurable;
import org.apache.kafka.streams.errors.internals.DefaultErrorHandlerContext;
import org.apache.kafka.streams.processor.ProcessorContext;
import java.util.Collections;
import java.util.List;
/**
* Interface that specifies how an exception from source node deserialization
* (e.g., reading from Kafka) should be handled.
@ -63,16 +67,35 @@ public interface DeserializationExceptionHandler extends Configurable {
* The actual exception.
*
* @return Whether to continue or stop processing.
*
* @deprecated Use {@link #handleError(ErrorHandlerContext, ConsumerRecord, Exception)} instead.
*/
@Deprecated
default DeserializationHandlerResponse handle(final ErrorHandlerContext context,
final ConsumerRecord<byte[], byte[]> record,
final Exception exception) {
return handle(((DefaultErrorHandlerContext) context).processorContext().orElse(null), record, exception);
}
/**
* Inspects a record and the exception received during deserialization.
*
* @param context
* Error handler context.
* @param record
* Record that failed deserialization.
* @param exception
* The actual exception.
*
* @return a {@link Response} object
*/
default Response handleError(final ErrorHandlerContext context, final ConsumerRecord<byte[], byte[]> record, final Exception exception) {
return new Response(Result.from(handle(context, record, exception)), Collections.emptyList());
}
/**
* Enumeration that describes the response from the exception handler.
*/
@Deprecated
enum DeserializationHandlerResponse {
/** Continue processing. */
CONTINUE(0, "CONTINUE"),
@ -95,4 +118,137 @@ public interface DeserializationExceptionHandler extends Configurable {
}
}
/**
* Enumeration that describes the response from the exception handler.
*/
enum Result {
/** Continue processing. */
RESUME(0, "RESUME"),
/** Fail processing. */
FAIL(1, "FAIL");
/**
* An english description for the used option. This is for debugging only and may change.
*/
public final String name;
/**
* The permanent and immutable id for the used option. This can't change ever.
*/
public final int id;
Result(final int id, final String name) {
this.id = id;
this.name = name;
}
/**
* Converts the deprecated enum DeserializationHandlerResponse into the new Result enum.
*
* @param value the old DeserializationHandlerResponse enum value
* @return a {@link Result} enum value
* @throws IllegalArgumentException if the provided value does not map to a valid {@link Result}
*/
private static DeserializationExceptionHandler.Result from(final DeserializationHandlerResponse value) {
switch (value) {
case FAIL:
return Result.FAIL;
case CONTINUE:
return Result.RESUME;
default:
throw new IllegalArgumentException("No Result enum found for old value: " + value);
}
}
}
/**
* Represents the result of handling a deserialization exception.
* <p>
* The {@code Response} class encapsulates a {@link Result},
* indicating whether processing should continue or fail, along with an optional list of
* {@link ProducerRecord} instances to be sent to a dead letter queue.
* </p>
*/
class Response {
private final Result result;
private final List<ProducerRecord<byte[], byte[]>> deadLetterQueueRecords;
/**
* Constructs a new {@code DeserializationExceptionResponse} object.
*
* @param result the result indicating whether processing should continue or fail;
* must not be {@code null}.
* @param deadLetterQueueRecords the list of records to be sent to the dead letter queue; may be {@code null}.
*/
private Response(final Result result,
final List<ProducerRecord<byte[], byte[]>> deadLetterQueueRecords) {
this.result = result;
this.deadLetterQueueRecords = deadLetterQueueRecords;
}
/**
* Creates a {@code Response} indicating that processing should fail.
*
* @param deadLetterQueueRecords the list of records to be sent to the dead letter queue; may be {@code null}.
* @return a {@code Response} with a {@link DeserializationExceptionHandler.Result#FAIL} status.
*/
public static Response fail(final List<ProducerRecord<byte[], byte[]>> deadLetterQueueRecords) {
return new Response(Result.FAIL, deadLetterQueueRecords);
}
/**
* Creates a {@code Response} indicating that processing should fail.
*
* @return a {@code Response} with a {@link DeserializationExceptionHandler.Result#FAIL} status.
*/
public static Response fail() {
return fail(Collections.emptyList());
}
/**
* Creates a {@code Response} indicating that processing should continue.
*
* @param deadLetterQueueRecords the list of records to be sent to the dead letter queue; may be {@code null}.
* @return a {@code Response} with a {@link DeserializationExceptionHandler.Result#RESUME} status.
*/
public static Response resume(final List<ProducerRecord<byte[], byte[]>> deadLetterQueueRecords) {
return new Response(Result.RESUME, deadLetterQueueRecords);
}
/**
* Creates a {@code Response} indicating that processing should continue.
*
* @return a {@code Response} with a {@link DeserializationExceptionHandler.Result#RESUME} status.
*/
public static Response resume() {
return resume(Collections.emptyList());
}
/**
* Retrieves the deserialization handler result.
*
* @return the {@link Result} indicating whether processing should continue or fail.
*/
public Result result() {
return result;
}
/**
* Retrieves an unmodifiable list of records to be sent to the dead letter queue.
* <p>
* If the list is {@code null}, an empty list is returned.
* </p>
*
* @return an unmodifiable list of {@link ProducerRecord} instances
* for the dead letter queue, or an empty list if no records are available.
*/
public List<ProducerRecord<byte[], byte[]>> deadLetterQueueRecords() {
if (deadLetterQueueRecords == null) {
return Collections.emptyList();
}
return Collections.unmodifiableList(deadLetterQueueRecords);
}
}
}

View File

@ -17,30 +17,27 @@
package org.apache.kafka.streams.errors;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.StreamsConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
import static org.apache.kafka.streams.errors.internals.ExceptionHandlerUtils.maybeBuildDeadLetterQueueRecords;
/**
* Deserialization handler that logs a deserialization exception and then
* signals the processing pipeline to continue processing more records.
*/
public class LogAndContinueExceptionHandler implements DeserializationExceptionHandler {
private static final Logger log = LoggerFactory.getLogger(LogAndContinueExceptionHandler.class);
private String deadLetterQueueTopic = null;
/**
* @deprecated Since 3.9. Use {@link #handle(ErrorHandlerContext, ConsumerRecord, Exception)} instead.
*/
@SuppressWarnings("deprecation")
@Deprecated
@Override
public DeserializationHandlerResponse handle(final ProcessorContext context,
public Response handleError(final ErrorHandlerContext context,
final ConsumerRecord<byte[], byte[]> record,
final Exception exception) {
log.warn(
"Exception caught during Deserialization, taskId: {}, topic: {}, partition: {}, offset: {}",
context.taskId(),
@ -50,28 +47,12 @@ public class LogAndContinueExceptionHandler implements DeserializationExceptionH
exception
);
return DeserializationHandlerResponse.CONTINUE;
}
@Override
public DeserializationHandlerResponse handle(final ErrorHandlerContext context,
final ConsumerRecord<byte[], byte[]> record,
final Exception exception) {
log.warn(
"Exception caught during Deserialization, taskId: {}, topic: {}, partition: {}, offset: {}",
context.taskId(),
record.topic(),
record.partition(),
record.offset(),
exception
);
return DeserializationHandlerResponse.CONTINUE;
return Response.resume(maybeBuildDeadLetterQueueRecords(deadLetterQueueTopic, context.sourceRawKey(), context.sourceRawValue(), context, exception));
}
@Override
public void configure(final Map<String, ?> configs) {
// ignore
if (configs.get(StreamsConfig.ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG) != null)
deadLetterQueueTopic = String.valueOf(configs.get(StreamsConfig.ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG));
}
}

View File

@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.errors;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.processor.api.Record;
import org.slf4j.Logger;
@ -23,15 +24,20 @@ import org.slf4j.LoggerFactory;
import java.util.Map;
import static org.apache.kafka.streams.errors.internals.ExceptionHandlerUtils.maybeBuildDeadLetterQueueRecords;
/**
* Processing exception handler that logs a processing exception and then
* signals the processing pipeline to continue processing more records.
*/
public class LogAndContinueProcessingExceptionHandler implements ProcessingExceptionHandler {
private static final Logger log = LoggerFactory.getLogger(LogAndContinueProcessingExceptionHandler.class);
private String deadLetterQueueTopic = null;
@Override
public ProcessingHandlerResponse handle(final ErrorHandlerContext context, final Record<?, ?> record, final Exception exception) {
public Response handleError(final ErrorHandlerContext context,
final Record<?, ?> record,
final Exception exception) {
log.warn(
"Exception caught during message processing, processor node: {}, taskId: {}, source topic: {}, source partition: {}, source offset: {}",
context.processorNodeId(),
@ -41,12 +47,12 @@ public class LogAndContinueProcessingExceptionHandler implements ProcessingExcep
context.offset(),
exception
);
return ProcessingHandlerResponse.CONTINUE;
return Response.resume(maybeBuildDeadLetterQueueRecords(deadLetterQueueTopic, context.sourceRawKey(), context.sourceRawValue(), context, exception));
}
@Override
public void configure(final Map<String, ?> configs) {
// ignore
if (configs.get(StreamsConfig.ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG) != null)
deadLetterQueueTopic = String.valueOf(configs.get(StreamsConfig.ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG));
}
}

View File

@ -17,30 +17,27 @@
package org.apache.kafka.streams.errors;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.StreamsConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
import static org.apache.kafka.streams.errors.internals.ExceptionHandlerUtils.maybeBuildDeadLetterQueueRecords;
/**
* Deserialization handler that logs a deserialization exception and then
* signals the processing pipeline to stop processing more records and fail.
*/
public class LogAndFailExceptionHandler implements DeserializationExceptionHandler {
private static final Logger log = LoggerFactory.getLogger(LogAndFailExceptionHandler.class);
private String deadLetterQueueTopic = null;
/**
* @deprecated Since 3.9. Use {@link #handle(ErrorHandlerContext, ConsumerRecord, Exception)} instead.
*/
@SuppressWarnings("deprecation")
@Deprecated
@Override
public DeserializationHandlerResponse handle(final ProcessorContext context,
public Response handleError(final ErrorHandlerContext context,
final ConsumerRecord<byte[], byte[]> record,
final Exception exception) {
log.error(
"Exception caught during Deserialization, taskId: {}, topic: {}, partition: {}, offset: {}",
context.taskId(),
@ -50,28 +47,12 @@ public class LogAndFailExceptionHandler implements DeserializationExceptionHandl
exception
);
return DeserializationHandlerResponse.FAIL;
}
@Override
public DeserializationHandlerResponse handle(final ErrorHandlerContext context,
final ConsumerRecord<byte[], byte[]> record,
final Exception exception) {
log.error(
"Exception caught during Deserialization, taskId: {}, topic: {}, partition: {}, offset: {}",
context.taskId(),
record.topic(),
record.partition(),
record.offset(),
exception
);
return DeserializationHandlerResponse.FAIL;
return Response.fail(maybeBuildDeadLetterQueueRecords(deadLetterQueueTopic, context.sourceRawKey(), context.sourceRawValue(), context, exception));
}
@Override
public void configure(final Map<String, ?> configs) {
// ignore
if (configs.get(StreamsConfig.ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG) != null)
deadLetterQueueTopic = String.valueOf(configs.get(StreamsConfig.ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG));
}
}

View File

@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.errors;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.processor.api.Record;
import org.slf4j.Logger;
@ -23,15 +24,20 @@ import org.slf4j.LoggerFactory;
import java.util.Map;
import static org.apache.kafka.streams.errors.internals.ExceptionHandlerUtils.maybeBuildDeadLetterQueueRecords;
/**
* Processing exception handler that logs a processing exception and then
* signals the processing pipeline to stop processing more records and fail.
*/
public class LogAndFailProcessingExceptionHandler implements ProcessingExceptionHandler {
private static final Logger log = LoggerFactory.getLogger(LogAndFailProcessingExceptionHandler.class);
private String deadLetterQueueTopic = null;
@Override
public ProcessingHandlerResponse handle(final ErrorHandlerContext context, final Record<?, ?> record, final Exception exception) {
public Response handleError(final ErrorHandlerContext context,
final Record<?, ?> record,
final Exception exception) {
log.error(
"Exception caught during message processing, processor node: {}, taskId: {}, source topic: {}, source partition: {}, source offset: {}",
context.processorNodeId(),
@ -42,11 +48,12 @@ public class LogAndFailProcessingExceptionHandler implements ProcessingException
exception
);
return ProcessingHandlerResponse.FAIL;
return Response.fail(maybeBuildDeadLetterQueueRecords(deadLetterQueueTopic, context.sourceRawKey(), context.sourceRawValue(), context, exception));
}
@Override
public void configure(final Map<String, ?> configs) {
// ignore
if (configs.get(StreamsConfig.ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG) != null)
deadLetterQueueTopic = String.valueOf(configs.get(StreamsConfig.ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG));
}
}

View File

@ -16,13 +16,18 @@
*/
package org.apache.kafka.streams.errors;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.Configurable;
import org.apache.kafka.streams.processor.api.Record;
import java.util.Collections;
import java.util.List;
/**
* An interface that allows user code to inspect a record that has failed processing
*/
public interface ProcessingExceptionHandler extends Configurable {
/**
* Inspect a record and the exception received
*
@ -34,9 +39,30 @@ public interface ProcessingExceptionHandler extends Configurable {
* The actual exception.
*
* @return Whether to continue or stop processing.
* @deprecated Use {@link #handleError(ErrorHandlerContext, Record, Exception)} instead.
*/
ProcessingHandlerResponse handle(final ErrorHandlerContext context, final Record<?, ?> record, final Exception exception);
@Deprecated
default ProcessingHandlerResponse handle(final ErrorHandlerContext context, final Record<?, ?> record, final Exception exception) {
throw new UnsupportedOperationException();
};
/**
* Inspects a record and the exception received during processing.
*
* @param context
* Processing context metadata.
* @param record
* Record where the exception occurred.
* @param exception
* The actual exception.
*
* @return a {@link Response} object
*/
default Response handleError(final ErrorHandlerContext context, final Record<?, ?> record, final Exception exception) {
return new Response(ProcessingExceptionHandler.Result.from(handle(context, record, exception)), Collections.emptyList());
}
@Deprecated
enum ProcessingHandlerResponse {
/** Continue processing. */
CONTINUE(1, "CONTINUE"),
@ -58,4 +84,138 @@ public interface ProcessingExceptionHandler extends Configurable {
this.name = name;
}
}
/**
* Enumeration that describes the response from the exception handler.
*/
enum Result {
/** Resume processing. */
RESUME(1, "RESUME"),
/** Fail processing. */
FAIL(2, "FAIL");
/**
* An english description for the used option. This is for debugging only and may change.
*/
public final String name;
/**
* The permanent and immutable id for the used option. This can't change ever.
*/
public final int id;
Result(final int id, final String name) {
this.id = id;
this.name = name;
}
/**
* Converts the deprecated enum ProcessingHandlerResponse into the new Result enum.
*
* @param value the old DeserializationHandlerResponse enum value
* @return a {@link ProcessingExceptionHandler.Result} enum value
* @throws IllegalArgumentException if the provided value does not map to a valid {@link ProcessingExceptionHandler.Result}
*/
private static ProcessingExceptionHandler.Result from(final ProcessingHandlerResponse value) {
switch (value) {
case FAIL:
return Result.FAIL;
case CONTINUE:
return Result.RESUME;
default:
throw new IllegalArgumentException("No Result enum found for old value: " + value);
}
}
}
/**
* Represents the result of handling a processing exception.
* <p>
* The {@code Response} class encapsulates a {@link Result},
* indicating whether processing should continue or fail, along with an optional list of
* {@link org.apache.kafka.clients.producer.ProducerRecord} instances to be sent to a dead letter queue.
* </p>
*/
class Response {
private final Result result;
private final List<ProducerRecord<byte[], byte[]>> deadLetterQueueRecords;
/**
* Constructs a new {@code ProcessingExceptionResponse} object.
*
* @param result the result indicating whether processing should continue or fail;
* must not be {@code null}.
* @param deadLetterQueueRecords the list of records to be sent to the dead letter queue; may be {@code null}.
*/
private Response(final Result result,
final List<ProducerRecord<byte[], byte[]>> deadLetterQueueRecords) {
this.result = result;
this.deadLetterQueueRecords = deadLetterQueueRecords;
}
/**
* Creates a {@code Response} indicating that processing should fail.
*
* @param deadLetterQueueRecords the list of records to be sent to the dead letter queue; may be {@code null}.
* @return a {@code Response} with a {@link ProcessingExceptionHandler.Result#FAIL} status.
*/
public static Response fail(final List<ProducerRecord<byte[], byte[]>> deadLetterQueueRecords) {
return new Response(Result.FAIL, deadLetterQueueRecords);
}
/**
* Creates a {@code Response} indicating that processing should fail.
*
* @return a {@code Response} with a {@link ProcessingExceptionHandler.Result#FAIL} status.
*/
public static Response fail() {
return fail(Collections.emptyList());
}
/**
* Creates a {@code Response} indicating that processing should continue.
*
* @param deadLetterQueueRecords the list of records to be sent to the dead letter queue; may be {@code null}.
* @return a {@code Response} with a {@link ProcessingExceptionHandler.Result#RESUME} status.
*/
public static Response resume(final List<ProducerRecord<byte[], byte[]>> deadLetterQueueRecords) {
return new Response(Result.RESUME, deadLetterQueueRecords);
}
/**
* Creates a {@code Response} indicating that processing should continue.
*
* @return a {@code Response} with a {@link ProcessingExceptionHandler.Result#RESUME} status.
*/
public static Response resume() {
return resume(Collections.emptyList());
}
/**
* Retrieves the processing handler result.
*
* @return the {@link Result} indicating whether processing should continue or fail.
*/
public Result result() {
return result;
}
/**
* Retrieves an unmodifiable list of records to be sent to the dead letter queue.
* <p>
* If the list is {@code null}, an empty list is returned.
* </p>
*
* @return an unmodifiable list of {@link ProducerRecord} instances
* for the dead letter queue, or an empty list if no records are available.
*/
public List<ProducerRecord<byte[], byte[]>> deadLetterQueueRecords() {
if (deadLetterQueueRecords == null) {
return Collections.emptyList();
}
return Collections.unmodifiableList(deadLetterQueueRecords);
}
}
}

View File

@ -19,6 +19,9 @@ package org.apache.kafka.streams.errors;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.Configurable;
import java.util.Collections;
import java.util.List;
/**
* Interface that specifies how an exception when attempting to produce a result to
* Kafka should be handled.
@ -55,13 +58,34 @@ public interface ProductionExceptionHandler extends Configurable {
* The exception that occurred during production.
*
* @return Whether to continue or stop processing, or retry the failed operation.
* @deprecated Use {@link #handleError(ErrorHandlerContext, ProducerRecord, Exception)} instead.
*/
@Deprecated
default ProductionExceptionHandlerResponse handle(final ErrorHandlerContext context,
final ProducerRecord<byte[], byte[]> record,
final Exception exception) {
return handle(record, exception);
}
/**
* Inspect a record that we attempted to produce, and the exception that resulted
* from attempting to produce it and determine to continue or stop processing.
*
* @param context
* The error handler context metadata.
* @param record
* The record that failed to produce.
* @param exception
* The exception that occurred during production.
*
* @return a {@link Response} object
*/
default Response handleError(final ErrorHandlerContext context,
final ProducerRecord<byte[], byte[]> record,
final Exception exception) {
return new Response(Result.from(handle(context, record, exception)), Collections.emptyList());
}
/**
* Handles serialization exception and determine if the process should continue. The default implementation is to
* fail the process.
@ -79,7 +103,7 @@ public interface ProductionExceptionHandler extends Configurable {
@Deprecated
default ProductionExceptionHandlerResponse handleSerializationException(final ProducerRecord record,
final Exception exception) {
return ProductionExceptionHandlerResponse.FAIL;
return ProductionExceptionHandler.ProductionExceptionHandlerResponse.FAIL;
}
/**
@ -96,8 +120,11 @@ public interface ProductionExceptionHandler extends Configurable {
* The origin of the serialization exception.
*
* @return Whether to continue or stop processing, or retry the failed operation.
*
* @deprecated Use {@link #handleSerializationError(ErrorHandlerContext, ProducerRecord, Exception, SerializationExceptionOrigin)} instead.
*/
@SuppressWarnings("rawtypes")
@Deprecated
default ProductionExceptionHandlerResponse handleSerializationException(final ErrorHandlerContext context,
final ProducerRecord record,
final Exception exception,
@ -105,6 +132,30 @@ public interface ProductionExceptionHandler extends Configurable {
return handleSerializationException(record, exception);
}
/**
* Handles serialization exception and determine if the process should continue. The default implementation is to
* fail the process.
*
* @param context
* The error handler context metadata.
* @param record
* The record that failed to serialize.
* @param exception
* The exception that occurred during serialization.
* @param origin
* The origin of the serialization exception.
*
* @return a {@link Response} object
*/
@SuppressWarnings("rawtypes")
default Response handleSerializationError(final ErrorHandlerContext context,
final ProducerRecord record,
final Exception exception,
final SerializationExceptionOrigin origin) {
return new Response(Result.from(handleSerializationException(context, record, exception, origin)), Collections.emptyList());
}
@Deprecated
enum ProductionExceptionHandlerResponse {
/** Continue processing.
*
@ -147,10 +198,174 @@ public interface ProductionExceptionHandler extends Configurable {
}
}
/**
* Enumeration that describes the response from the exception handler.
*/
enum Result {
/** Resume processing.
*
* <p> For this case, output records which could not be written successfully are lost.
* Use this option only if you can tolerate data loss.
*/
RESUME(0, "RESUME"),
/** Fail processing.
*
* <p> Kafka Streams will raise an exception and the {@code StreamsThread} will fail.
* No offsets (for {@link org.apache.kafka.streams.StreamsConfig#AT_LEAST_ONCE at-least-once}) or transactions
* (for {@link org.apache.kafka.streams.StreamsConfig#EXACTLY_ONCE_V2 exactly-once}) will be committed.
*/
FAIL(1, "FAIL"),
/** Retry the failed operation.
*
* <p> Retrying might imply that a {@link TaskCorruptedException} exception is thrown, and that the retry
* is started from the last committed offset.
*
* <p> <b>NOTE:</b> {@code RETRY} is only a valid return value for
* {@link org.apache.kafka.common.errors.RetriableException retriable exceptions}.
* If {@code RETRY} is returned for a non-retriable exception it will be interpreted as {@link #FAIL}.
*/
RETRY(2, "RETRY");
/**
* An english description for the used option. This is for debugging only and may change.
*/
public final String name;
/**
* The permanent and immutable id for the used option. This can't change ever.
*/
public final int id;
Result(final int id, final String name) {
this.id = id;
this.name = name;
}
/**
* Converts the deprecated enum ProductionExceptionHandlerResponse into the new Result enum.
*
* @param value the old ProductionExceptionHandlerResponse enum value
* @return a {@link ProductionExceptionHandler.Result} enum value
* @throws IllegalArgumentException if the provided value does not map to a valid {@link ProductionExceptionHandler.Result}
*/
private static ProductionExceptionHandler.Result from(final ProductionExceptionHandlerResponse value) {
switch (value) {
case FAIL:
return Result.FAIL;
case CONTINUE:
return Result.RESUME;
case RETRY:
return Result.RETRY;
default:
throw new IllegalArgumentException("No Result enum found for old value: " + value);
}
}
}
enum SerializationExceptionOrigin {
/** Serialization exception occurred during serialization of the key. */
KEY,
/** Serialization exception occurred during serialization of the value. */
VALUE
}
/**
* Represents the result of handling a production exception.
* <p>
* The {@code Response} class encapsulates a {@link Result},
* indicating whether processing should continue or fail, along with an optional list of
* {@link ProducerRecord} instances to be sent to a dead letter queue.
* </p>
*/
class Response {
private final Result result;
private final List<ProducerRecord<byte[], byte[]>> deadLetterQueueRecords;
/**
* Constructs a new {@code Response} object.
*
* @param result the result indicating whether processing should continue or fail;
* must not be {@code null}.
* @param deadLetterQueueRecords the list of records to be sent to the dead letter queue; may be {@code null}.
*/
private Response(final Result result,
final List<ProducerRecord<byte[], byte[]>> deadLetterQueueRecords) {
this.result = result;
this.deadLetterQueueRecords = deadLetterQueueRecords;
}
/**
* Creates a {@code Response} indicating that processing should fail.
*
* @param deadLetterQueueRecords the list of records to be sent to the dead letter queue; may be {@code null}.
* @return a {@code Response} with a {@link ProductionExceptionHandler.Result#FAIL} status.
*/
public static Response fail(final List<ProducerRecord<byte[], byte[]>> deadLetterQueueRecords) {
return new Response(Result.FAIL, deadLetterQueueRecords);
}
/**
* Creates a {@code Response} indicating that processing should fail.
*
* @return a {@code Response} with a {@link ProductionExceptionHandler.Result#FAIL} status.
*/
public static Response fail() {
return fail(Collections.emptyList());
}
/**
* Creates a {@code Response} indicating that processing should continue.
*
* @param deadLetterQueueRecords the list of records to be sent to the dead letter queue; may be {@code null}.
* @return a {@code Response} with a {@link ProductionExceptionHandler.Result#RESUME} status.
*/
public static Response resume(final List<ProducerRecord<byte[], byte[]>> deadLetterQueueRecords) {
return new Response(Result.RESUME, deadLetterQueueRecords);
}
/**
* Creates a {@code Response} indicating that processing should continue.
*
* @return a {@code Response} with a {@link ProductionExceptionHandler.Result#RESUME} status.
*/
public static Response resume() {
return resume(Collections.emptyList());
}
/**
* Creates a {@code Response} indicating that processing should retry.
*
* @return a {@code Response} with a {@link ProductionExceptionHandler.Result#RETRY} status.
*/
public static Response retry() {
return new Response(Result.RETRY, Collections.emptyList());
}
/**
* Retrieves the production exception handler result.
*
* @return the {@link Result} indicating whether processing should continue, fail or retry.
*/
public Result result() {
return result;
}
/**
* Retrieves an unmodifiable list of records to be sent to the dead letter queue.
* <p>
* If the list is {@code null}, an empty list is returned.
* </p>
*
* @return an unmodifiable list of {@link ProducerRecord} instances
* for the dead letter queue, or an empty list if no records are available.
*/
public List<ProducerRecord<byte[], byte[]>> deadLetterQueueRecords() {
if (deadLetterQueueRecords == null) {
return Collections.emptyList();
}
return Collections.unmodifiableList(deadLetterQueueRecords);
}
}
}

View File

@ -0,0 +1,101 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.errors.internals;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.errors.InvalidConfigurationException;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.ErrorHandlerContext;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.Collections;
import java.util.List;
/**
* {@code ExceptionHandlerUtils} Contains utilities method that could be used by all exception handlers
*/
public class ExceptionHandlerUtils {
public static final String HEADER_ERRORS_EXCEPTION_NAME = "__streams.errors.exception";
public static final String HEADER_ERRORS_STACKTRACE_NAME = "__streams.errors.stacktrace";
public static final String HEADER_ERRORS_EXCEPTION_MESSAGE_NAME = "__streams.errors.message";
public static final String HEADER_ERRORS_TOPIC_NAME = "__streams.errors.topic";
public static final String HEADER_ERRORS_PARTITION_NAME = "__streams.errors.partition";
public static final String HEADER_ERRORS_OFFSET_NAME = "__streams.errors.offset";
public static boolean shouldBuildDeadLetterQueueRecord(final String deadLetterQueueTopicName) {
return deadLetterQueueTopicName != null;
}
/**
* If required, return Dead Letter Queue records for the provided exception
*
* @param key Serialized key for the records
* @param value Serialized value for the records
* @param context ErrorHandlerContext of the exception
* @param exception Thrown exception
* @return A list of Dead Letter Queue records to produce
*/
public static List<ProducerRecord<byte[], byte[]>> maybeBuildDeadLetterQueueRecords(final String deadLetterQueueTopicName,
final byte[] key,
final byte[] value,
final ErrorHandlerContext context,
final Exception exception) {
if (!shouldBuildDeadLetterQueueRecord(deadLetterQueueTopicName)) {
return Collections.emptyList();
}
return Collections.singletonList(buildDeadLetterQueueRecord(deadLetterQueueTopicName, key, value, context, exception));
}
/**
* Build dead letter queue record for the provided exception.
*
* @param key Serialized key for the record.
* @param value Serialized value for the record.
* @param context error handler context of the exception.
* @return A dead letter queue record to produce.
*/
public static ProducerRecord<byte[], byte[]> buildDeadLetterQueueRecord(final String deadLetterQueueTopicName,
final byte[] key,
final byte[] value,
final ErrorHandlerContext context,
final Exception e) {
if (deadLetterQueueTopicName == null) {
throw new InvalidConfigurationException(String.format("%s cannot be null while building dead letter queue record", StreamsConfig.ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG));
}
final ProducerRecord<byte[], byte[]> producerRecord = new ProducerRecord<>(deadLetterQueueTopicName, null, context.timestamp(), key, value);
final StringWriter stackTraceStringWriter = new StringWriter();
final PrintWriter stackTracePrintWriter = new PrintWriter(stackTraceStringWriter);
e.printStackTrace(stackTracePrintWriter);
try (final StringSerializer stringSerializer = new StringSerializer()) {
producerRecord.headers().add(HEADER_ERRORS_EXCEPTION_NAME, stringSerializer.serialize(null, e.toString()));
producerRecord.headers().add(HEADER_ERRORS_EXCEPTION_MESSAGE_NAME, stringSerializer.serialize(null, e.getMessage()));
producerRecord.headers().add(HEADER_ERRORS_STACKTRACE_NAME, stringSerializer.serialize(null, stackTraceStringWriter.toString()));
producerRecord.headers().add(HEADER_ERRORS_TOPIC_NAME, stringSerializer.serialize(null, context.topic()));
producerRecord.headers().add(HEADER_ERRORS_PARTITION_NAME, stringSerializer.serialize(null, String.valueOf(context.partition())));
producerRecord.headers().add(HEADER_ERRORS_OFFSET_NAME, stringSerializer.serialize(null, String.valueOf(context.offset())));
}
return producerRecord;
}
}

View File

@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.streams.errors.ErrorHandlerContext;
import org.apache.kafka.streams.errors.ProcessingExceptionHandler;
@ -220,11 +221,11 @@ public class ProcessorNode<KIn, VIn, KOut, VOut> {
internalProcessorContext.recordContext().sourceRawValue()
);
final ProcessingExceptionHandler.ProcessingHandlerResponse response;
final ProcessingExceptionHandler.Response response;
try {
response = Objects.requireNonNull(
processingExceptionHandler.handle(errorHandlerContext, record, processingException),
"Invalid ProductionExceptionHandler response."
processingExceptionHandler.handleError(errorHandlerContext, record, processingException),
"Invalid ProcessingExceptionHandler response."
);
} catch (final Exception fatalUserException) {
// while Java distinguishes checked vs unchecked exceptions, other languages
@ -242,7 +243,21 @@ public class ProcessorNode<KIn, VIn, KOut, VOut> {
);
}
if (response == ProcessingExceptionHandler.ProcessingHandlerResponse.FAIL) {
final List<ProducerRecord<byte[], byte[]>> deadLetterQueueRecords = response.deadLetterQueueRecords();
if (!deadLetterQueueRecords.isEmpty()) {
final RecordCollector collector = ((RecordCollector.Supplier) internalProcessorContext).recordCollector();
for (final ProducerRecord<byte[], byte[]> deadLetterQueueRecord : deadLetterQueueRecords) {
collector.send(
deadLetterQueueRecord.key(),
deadLetterQueueRecord.value(),
name(),
internalProcessorContext,
deadLetterQueueRecord
);
}
}
if (response.result() == ProcessingExceptionHandler.Result.FAIL) {
log.error("Processing exception handler is set to fail upon" +
" a processing error. If you would rather have the streaming pipeline" +
" continue after a processing error, please set the " +

View File

@ -17,6 +17,7 @@
package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.serialization.Serializer;
@ -48,6 +49,12 @@ public interface RecordCollector {
final InternalProcessorContext<Void, Void> context,
final StreamPartitioner<? super K, ? super V> partitioner);
<K, V> void send(K key,
V value,
String processorNodeId,
InternalProcessorContext<?, ?> context,
ProducerRecord<byte[], byte[]> serializedRecord);
/**
* Initialize the internal {@link Producer}; note this function should be made idempotent
*

View File

@ -41,7 +41,6 @@ import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.errors.ProductionExceptionHandler;
import org.apache.kafka.streams.errors.ProductionExceptionHandler.ProductionExceptionHandlerResponse;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TaskCorruptedException;
import org.apache.kafka.streams.errors.TaskMigratedException;
@ -263,6 +262,15 @@ public class RecordCollectorImpl implements RecordCollector {
// freeing raw records in the context to reduce memory pressure
freeRawInputRecordFromContext(context);
send(key, value, processorNodeId, context, serializedRecord);
}
public <K, V> void send(final K key,
final V value,
final String processorNodeId,
final InternalProcessorContext<?, ?> context,
final ProducerRecord<byte[], byte[]> serializedRecord) {
streamsProducer.send(serializedRecord, (metadata, exception) -> {
try {
// if there's already an exception record, skip logging offsets or new exceptions
@ -278,16 +286,16 @@ public class RecordCollectorImpl implements RecordCollector {
log.warn("Received offset={} in produce response for {}", metadata.offset(), tp);
}
if (!topic.endsWith("-changelog")) {
if (!serializedRecord.topic().endsWith("-changelog")) {
// we may not have created a sensor during initialization if the node uses dynamic topic routing,
// as all topics are not known up front, so create the sensor for this topic if absent
final Sensor topicProducedSensor = producedSensorByTopic.computeIfAbsent(
topic,
serializedRecord.topic(),
t -> TopicMetrics.producedSensor(
Thread.currentThread().getName(),
taskId.toString(),
processorNodeId,
topic,
serializedRecord.topic(),
context.metrics()
)
);
@ -299,7 +307,7 @@ public class RecordCollectorImpl implements RecordCollector {
}
} else {
recordSendError(
topic,
serializedRecord.topic(),
exception,
serializedRecord,
context,
@ -307,7 +315,7 @@ public class RecordCollectorImpl implements RecordCollector {
);
// KAFKA-7510 only put message key and value in TRACE level log so we don't leak data by default
log.trace("Failed record: (key {} value {} timestamp {}) topic=[{}] partition=[{}]", key, value, timestamp, topic, partition);
log.trace("Failed record: (key {} value {} timestamp {}) topic=[{}] partition=[{}]", key, value, serializedRecord.timestamp(), serializedRecord.topic(), serializedRecord.partition());
}
} catch (final RuntimeException fatal) {
sendException.set(new StreamsException("Producer.send `Callback` failed", fatal));
@ -329,16 +337,16 @@ public class RecordCollectorImpl implements RecordCollector {
final Integer partition,
final Long timestamp,
final String processorNodeId,
final InternalProcessorContext<Void, Void> context,
final InternalProcessorContext<?, ?> context,
final Exception serializationException) {
log.debug(String.format("Error serializing record for topic %s", topic), serializationException);
final ProducerRecord<K, V> record = new ProducerRecord<>(topic, partition, timestamp, key, value, headers);
final ProductionExceptionHandlerResponse response;
final ProductionExceptionHandler.Response response;
try {
response = Objects.requireNonNull(
productionExceptionHandler.handleSerializationException(
productionExceptionHandler.handleSerializationError(
errorHandlerContext(context, processorNodeId),
record,
serializationException,
@ -365,7 +373,20 @@ public class RecordCollectorImpl implements RecordCollector {
);
}
if (maybeFailResponse(response) == ProductionExceptionHandlerResponse.FAIL) {
final List<ProducerRecord<byte[], byte[]>> deadLetterQueueRecords = response.deadLetterQueueRecords();
if (!deadLetterQueueRecords.isEmpty()) {
for (final ProducerRecord<byte[], byte[]> deadLetterQueueRecord : deadLetterQueueRecords) {
this.send(
deadLetterQueueRecord.key(),
deadLetterQueueRecord.value(),
processorNodeId,
context,
deadLetterQueueRecord
);
}
}
if (maybeFailResponse(response.result()) == ProductionExceptionHandler.Result.FAIL) {
throw new StreamsException(
String.format(
"Unable to serialize record. ProducerRecord(topic=[%s], partition=[%d], timestamp=[%d]",
@ -385,7 +406,7 @@ public class RecordCollectorImpl implements RecordCollector {
droppedRecordsSensor.record();
}
private DefaultErrorHandlerContext errorHandlerContext(final InternalProcessorContext<Void, Void> context,
private DefaultErrorHandlerContext errorHandlerContext(final InternalProcessorContext<?, ?> context,
final String processorNodeId) {
final RecordContext recordContext = context != null ? context.recordContext() : null;
@ -442,7 +463,7 @@ public class RecordCollectorImpl implements RecordCollector {
private void recordSendError(final String topic,
final Exception productionException,
final ProducerRecord<byte[], byte[]> serializedRecord,
final InternalProcessorContext<Void, Void> context,
final InternalProcessorContext<?, ?> context,
final String processorNodeId) {
String errorMessage = String.format(SEND_EXCEPTION_MESSAGE, topic, taskId, productionException.toString());
@ -462,10 +483,10 @@ public class RecordCollectorImpl implements RecordCollector {
// TransactionAbortedException is only thrown after `abortTransaction()` was called,
// so it's only a followup error, and Kafka Streams is already handling the original error
} else {
final ProductionExceptionHandlerResponse response;
final ProductionExceptionHandler.Response response;
try {
response = Objects.requireNonNull(
productionExceptionHandler.handle(
productionExceptionHandler.handleError(
errorHandlerContext(context, processorNodeId),
serializedRecord,
productionException
@ -490,14 +511,27 @@ public class RecordCollectorImpl implements RecordCollector {
return;
}
if (productionException instanceof RetriableException && response == ProductionExceptionHandlerResponse.RETRY) {
final List<ProducerRecord<byte[], byte[]>> deadLetterQueueRecords = response.deadLetterQueueRecords();
if (!deadLetterQueueRecords.isEmpty()) {
for (final ProducerRecord<byte[], byte[]> deadLetterQueueRecord : deadLetterQueueRecords) {
this.send(
deadLetterQueueRecord.key(),
deadLetterQueueRecord.value(),
processorNodeId,
context,
deadLetterQueueRecord
);
}
}
if (productionException instanceof RetriableException && response.result() == ProductionExceptionHandler.Result.RETRY) {
errorMessage += "\nThe broker is either slow or in bad state (like not having enough replicas) in responding the request, " +
"or the connection to broker was interrupted sending the request or receiving the response. " +
"\nConsider overwriting `max.block.ms` and /or " +
"`delivery.timeout.ms` to a larger value to wait longer for such scenarios and avoid timeout errors";
sendException.set(new TaskCorruptedException(Collections.singleton(taskId)));
} else {
if (maybeFailResponse(response) == ProductionExceptionHandlerResponse.FAIL) {
if (maybeFailResponse(response.result()) == ProductionExceptionHandler.Result.FAIL) {
errorMessage += "\nException handler choose to FAIL the processing, no more records would be sent.";
sendException.set(new StreamsException(errorMessage, productionException));
} else {
@ -510,12 +544,12 @@ public class RecordCollectorImpl implements RecordCollector {
log.error(errorMessage, productionException);
}
private ProductionExceptionHandlerResponse maybeFailResponse(final ProductionExceptionHandlerResponse response) {
if (response == ProductionExceptionHandlerResponse.RETRY) {
private ProductionExceptionHandler.Result maybeFailResponse(final ProductionExceptionHandler.Result result) {
if (result == ProductionExceptionHandler.Result.RETRY) {
log.warn("ProductionExceptionHandler returned RETRY for a non-retriable exception. Will treat it as FAIL.");
return ProductionExceptionHandlerResponse.FAIL;
return ProductionExceptionHandler.Result.FAIL;
} else {
return response;
return result;
}
}

View File

@ -17,17 +17,18 @@
package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.errors.DeserializationExceptionHandler;
import org.apache.kafka.streams.errors.DeserializationExceptionHandler.DeserializationHandlerResponse;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.internals.DefaultErrorHandlerContext;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.slf4j.Logger;
import java.util.List;
import java.util.Objects;
import static org.apache.kafka.streams.StreamsConfig.DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG;
@ -50,7 +51,7 @@ public class RecordDeserializer {
/**
* @throws StreamsException if a deserialization error occurs and the deserialization callback returns
* {@link DeserializationHandlerResponse#FAIL FAIL}
* {@link DeserializationExceptionHandler.Result#FAIL FAIL}
* or throws an exception itself
*/
ConsumerRecord<Object, Object> deserialize(final ProcessorContext<?, ?> processorContext,
@ -100,11 +101,11 @@ public class RecordDeserializer {
rawRecord.value()
);
final DeserializationHandlerResponse response;
final DeserializationExceptionHandler.Response response;
try {
response = Objects.requireNonNull(
deserializationExceptionHandler.handle(errorHandlerContext, rawRecord, deserializationException),
"Invalid DeserializationExceptionHandler response."
deserializationExceptionHandler.handleError(errorHandlerContext, rawRecord, deserializationException),
"Invalid DeserializationExceptionResponse response."
);
} catch (final Exception fatalUserException) {
// while Java distinguishes checked vs unchecked exceptions, other languages
@ -118,7 +119,21 @@ public class RecordDeserializer {
throw new StreamsException("Fatal user code error in deserialization error callback", fatalUserException);
}
if (response == DeserializationHandlerResponse.FAIL) {
final List<ProducerRecord<byte[], byte[]>> deadLetterQueueRecords = response.deadLetterQueueRecords();
if (!deadLetterQueueRecords.isEmpty()) {
final RecordCollector collector = ((RecordCollector.Supplier) processorContext).recordCollector();
for (final ProducerRecord<byte[], byte[]> deadLetterQueueRecord : deadLetterQueueRecords) {
collector.send(
deadLetterQueueRecord.key(),
deadLetterQueueRecord.value(),
sourceNodeName,
(InternalProcessorContext) processorContext,
deadLetterQueueRecord
);
}
}
if (response.result() == DeserializationExceptionHandler.Result.FAIL) {
throw new StreamsException("Deserialization exception handler is set to fail upon" +
" a deserialization error. If you would rather have the streaming pipeline" +
" continue after a deserialization error, please set the " +

View File

@ -20,6 +20,7 @@ import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException;
@ -945,10 +946,10 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
recordContext.sourceRawValue()
);
final ProcessingExceptionHandler.ProcessingHandlerResponse response;
final ProcessingExceptionHandler.Response processingExceptionResponse;
try {
response = Objects.requireNonNull(
processingExceptionHandler.handle(errorHandlerContext, null, processingException),
processingExceptionResponse = Objects.requireNonNull(
processingExceptionHandler.handleError(errorHandlerContext, null, processingException),
"Invalid ProcessingExceptionHandler response."
);
} catch (final Exception fatalUserException) {
@ -963,7 +964,20 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
throw new FailedProcessingException("Fatal user code error in processing error callback", node.name(), fatalUserException);
}
if (response == ProcessingExceptionHandler.ProcessingHandlerResponse.FAIL) {
final List<ProducerRecord<byte[], byte[]>> deadLetterQueueRecords = processingExceptionResponse.deadLetterQueueRecords();
if (!deadLetterQueueRecords.isEmpty()) {
final RecordCollector collector = ((RecordCollector.Supplier) processorContext).recordCollector();
for (final ProducerRecord<byte[], byte[]> deadLetterQueueRecord : deadLetterQueueRecords) {
collector.send(
deadLetterQueueRecord.key(),
deadLetterQueueRecord.value(),
node.name(),
processorContext,
deadLetterQueueRecord);
}
}
if (processingExceptionResponse.result() == ProcessingExceptionHandler.Result.FAIL) {
log.error("Processing exception handler is set to fail upon" +
" a processing error. If you would rather have the streaming pipeline" +
" continue after a processing error, please set the " +

View File

@ -1683,6 +1683,11 @@ public class StreamsConfigTest {
"Please set group.protocol=classic or remove group.instance.id from the configuration."));
}
public void shouldSetDefaultDeadLetterQueue() {
final StreamsConfig config = new StreamsConfig(props);
assertNull(config.getString(StreamsConfig.ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG));
}
static class MisconfiguredSerde implements Serde<Object> {
@Override
public void configure(final Map<String, ?> configs, final boolean isKey) {

View File

@ -0,0 +1,114 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.errors;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.errors.internals.DefaultErrorHandlerContext;
import org.apache.kafka.streams.errors.internals.ExceptionHandlerUtils;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.apache.kafka.streams.state.StateSerdes;
import org.apache.kafka.test.InternalMockProcessorContext;
import org.apache.kafka.test.MockRecordCollector;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.junit.jupiter.MockitoExtension;
import java.util.Collections;
import java.util.Iterator;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
@ExtendWith(MockitoExtension.class)
public class ExceptionHandlerUtilsTest {
@Test
public void checkDeadLetterQueueRecords() {
final StringSerializer stringSerializer = new StringSerializer();
final StringDeserializer stringDeserializer = new StringDeserializer();
final MockRecordCollector collector = new MockRecordCollector();
final String key = "key";
final String value = "value";
final InternalProcessorContext<Object, Object> internalProcessorContext = new InternalMockProcessorContext<>(
new StateSerdes<>("sink", Serdes.ByteArray(), Serdes.ByteArray()),
collector
);
internalProcessorContext.setRecordContext(new ProcessorRecordContext(
1L,
2,
3,
"source",
new RecordHeaders(Collections.singletonList(
new RecordHeader("sourceHeader", stringSerializer.serialize(null, "hello world")))),
key.getBytes(),
value.getBytes()
));
final ErrorHandlerContext errorHandlerContext = getErrorHandlerContext(internalProcessorContext);
final NullPointerException exception = new NullPointerException("Oopsie!");
final Iterable<ProducerRecord<byte[], byte[]>> dlqRecords = ExceptionHandlerUtils.maybeBuildDeadLetterQueueRecords("dlq", errorHandlerContext.sourceRawKey(), errorHandlerContext.sourceRawValue(), errorHandlerContext, exception);
final Iterator<ProducerRecord<byte[], byte[]>> iterator = dlqRecords.iterator();
assertTrue(iterator.hasNext());
final ProducerRecord<byte[], byte[]> dlqRecord = iterator.next();
final Headers headers = dlqRecord.headers();
assertFalse(iterator.hasNext()); // There should be only one record
assertEquals("dlq", dlqRecord.topic());
assertEquals(errorHandlerContext.timestamp(), dlqRecord.timestamp());
assertEquals(1, dlqRecord.timestamp());
assertEquals(key, new String(dlqRecord.key()));
assertEquals(value, new String(dlqRecord.value()));
assertEquals(exception.toString(), stringDeserializer.deserialize(null, headers.lastHeader(ExceptionHandlerUtils.HEADER_ERRORS_EXCEPTION_NAME).value()));
assertEquals(exception.getMessage(), stringDeserializer.deserialize(null, headers.lastHeader(ExceptionHandlerUtils.HEADER_ERRORS_EXCEPTION_MESSAGE_NAME).value()));
assertEquals("source", stringDeserializer.deserialize(null, headers.lastHeader(ExceptionHandlerUtils.HEADER_ERRORS_TOPIC_NAME).value()));
assertEquals("3", stringDeserializer.deserialize(null, headers.lastHeader(ExceptionHandlerUtils.HEADER_ERRORS_PARTITION_NAME).value()));
assertEquals("2", stringDeserializer.deserialize(null, headers.lastHeader(ExceptionHandlerUtils.HEADER_ERRORS_OFFSET_NAME).value()));
}
@Test
public void doNotBuildDeadLetterQueueRecordsIfNotConfigured() {
final NullPointerException exception = new NullPointerException("Oopsie!");
final Iterable<ProducerRecord<byte[], byte[]>> dlqRecords = ExceptionHandlerUtils.maybeBuildDeadLetterQueueRecords(null, null, null, null, exception);
final Iterator<ProducerRecord<byte[], byte[]>> iterator = dlqRecords.iterator();
assertFalse(iterator.hasNext());
}
private static DefaultErrorHandlerContext getErrorHandlerContext(final InternalProcessorContext<Object, Object> internalProcessorContext) {
return new DefaultErrorHandlerContext(
null,
internalProcessorContext.topic(),
internalProcessorContext.partition(),
internalProcessorContext.offset(),
internalProcessorContext.headers(),
internalProcessorContext.currentNode().name(),
internalProcessorContext.taskId(),
internalProcessorContext.timestamp(),
internalProcessorContext.recordContext().sourceRawKey(),
internalProcessorContext.recordContext().sourceRawValue());
}
}

View File

@ -17,6 +17,7 @@
package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.clients.consumer.InvalidOffsetException;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.metrics.Metrics;
@ -29,6 +30,8 @@ import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.errors.ErrorHandlerContext;
import org.apache.kafka.streams.errors.LogAndContinueProcessingExceptionHandler;
import org.apache.kafka.streams.errors.LogAndFailProcessingExceptionHandler;
import org.apache.kafka.streams.errors.ProcessingExceptionHandler;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TaskCorruptedException;
@ -39,7 +42,9 @@ import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.StateSerdes;
import org.apache.kafka.test.InternalMockProcessorContext;
import org.apache.kafka.test.MockRecordCollector;
import org.apache.kafka.test.StreamsTestUtils;
import org.junit.jupiter.api.Test;
@ -52,10 +57,13 @@ import org.mockito.quality.Strictness;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import static org.apache.kafka.streams.errors.ProcessingExceptionHandler.Response;
import static org.apache.kafka.streams.errors.ProcessingExceptionHandler.Result;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.ROLLUP_VALUE;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
@ -103,7 +111,7 @@ public class ProcessorNodeTest {
new ProcessorNode<>(NAME, new IgnoredInternalExceptionsProcessor(), Collections.emptySet());
final InternalProcessorContext<Object, Object> internalProcessorContext = mockInternalProcessorContext();
node.init(internalProcessorContext, new ProcessingExceptionHandlerMock(ProcessingExceptionHandler.ProcessingHandlerResponse.FAIL, internalProcessorContext, false));
node.init(internalProcessorContext, new ProcessingExceptionHandlerMock(ProcessingExceptionHandler.Response.fail(), internalProcessorContext, false));
final FailedProcessingException failedProcessingException = assertThrows(FailedProcessingException.class,
() -> node.process(new Record<>(KEY, VALUE, TIMESTAMP)));
@ -120,7 +128,7 @@ public class ProcessorNodeTest {
new ProcessorNode<>(NAME, new IgnoredInternalExceptionsProcessor(), Collections.emptySet());
final InternalProcessorContext<Object, Object> internalProcessorContext = mockInternalProcessorContext();
node.init(internalProcessorContext, new ProcessingExceptionHandlerMock(ProcessingExceptionHandler.ProcessingHandlerResponse.CONTINUE, internalProcessorContext, false));
node.init(internalProcessorContext, new ProcessingExceptionHandlerMock(ProcessingExceptionHandler.Response.resume(), internalProcessorContext, false));
assertDoesNotThrow(() -> node.process(new Record<>(KEY, VALUE, TIMESTAMP)));
}
@ -147,7 +155,7 @@ public class ProcessorNodeTest {
assertEquals(ignoredExceptionCause, runtimeException.getCause().getClass());
assertEquals(ignoredExceptionCauseMessage, runtimeException.getCause().getMessage());
verify(processingExceptionHandler, never()).handle(any(), any(), any());
verify(processingExceptionHandler, never()).handleError(any(), any(), any());
}
@Test
@ -156,7 +164,7 @@ public class ProcessorNodeTest {
new ProcessorNode<>(NAME, new IgnoredInternalExceptionsProcessor(), Collections.emptySet());
final InternalProcessorContext<Object, Object> internalProcessorContext = mockInternalProcessorContext();
node.init(internalProcessorContext, new ProcessingExceptionHandlerMock(ProcessingExceptionHandler.ProcessingHandlerResponse.CONTINUE, internalProcessorContext, true));
node.init(internalProcessorContext, new ProcessingExceptionHandlerMock(ProcessingExceptionHandler.Response.resume(), internalProcessorContext, true));
final FailedProcessingException failedProcessingException = assertThrows(FailedProcessingException.class,
() -> node.process(new Record<>(KEY, VALUE, TIMESTAMP)));
@ -166,6 +174,58 @@ public class ProcessorNodeTest {
assertEquals(NAME, failedProcessingException.failedProcessorNodeName());
}
@Test
public void shouldBuildDeadLetterQueueRecordsInDefaultProcessingExceptionHandler() {
final ProcessorNode<Object, Object, Object, Object> node = new ProcessorNode<>("processor",
(Processor<Object, Object, Object, Object>) record -> {
throw new NullPointerException("Oopsie!");
}, Collections.emptySet());
final MockRecordCollector collector = new MockRecordCollector();
final InternalProcessorContext<Object, Object> internalProcessorContext =
new InternalMockProcessorContext<>(
new StateSerdes<>("sink", Serdes.ByteArray(), Serdes.ByteArray()),
collector
);
final ProcessingExceptionHandler processingExceptionHandler = new LogAndFailProcessingExceptionHandler();
processingExceptionHandler.configure(Collections.singletonMap(StreamsConfig.ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG, "dlq"));
node.init(internalProcessorContext, processingExceptionHandler);
assertThrows(RuntimeException.class,
() -> node.process(new Record<>("hello", "world", 1L)));
assertEquals(1, collector.collected().size());
assertEquals("dlq", collector.collected().get(0).topic());
assertEquals("sourceKey", new String((byte[]) collector.collected().get(0).key()));
assertEquals("sourceValue", new String((byte[]) collector.collected().get(0).value()));
}
@Test
public void shouldBuildDeadLetterQueueRecordsInLogAndContinueProcessingExceptionHandler() {
final ProcessorNode<Object, Object, Object, Object> node = new ProcessorNode<>("processor",
(Processor<Object, Object, Object, Object>) record -> {
throw new NullPointerException("Oopsie!");
}, Collections.emptySet());
final MockRecordCollector collector = new MockRecordCollector();
final InternalProcessorContext<Object, Object> internalProcessorContext =
new InternalMockProcessorContext<>(
new StateSerdes<>("sink", Serdes.ByteArray(), Serdes.ByteArray()),
collector
);
final ProcessingExceptionHandler processingExceptionHandler = new LogAndContinueProcessingExceptionHandler();
processingExceptionHandler.configure(Collections.singletonMap(StreamsConfig.ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG, "dlq"));
node.init(internalProcessorContext, processingExceptionHandler);
node.process(new Record<>("hello", "world", 0L));
assertEquals(1, collector.collected().size());
assertEquals("dlq", collector.collected().get(0).topic());
assertEquals("sourceKey", new String((byte[]) collector.collected().get(0).key()));
assertEquals("sourceValue", new String((byte[]) collector.collected().get(0).value()));
}
private static class ExceptionalProcessor implements Processor<Object, Object, Object, Object> {
@Override
public void init(final ProcessorContext<Object, Object> context) {
@ -318,6 +378,64 @@ public class ProcessorNodeTest {
assertTrue(se.getMessage().contains("pname"));
}
@Test
void shouldFailWithDeadLetterQueueRecords() {
final ProducerRecord<byte[], byte[]> record = new ProducerRecord<>("topic", new byte[]{}, new byte[]{});
final List<ProducerRecord<byte[], byte[]>> records = Collections.singletonList(record);
final Response response = Response.fail(records);
assertEquals(Result.FAIL, response.result());
assertEquals(1, response.deadLetterQueueRecords().size());
assertEquals(record, response.deadLetterQueueRecords().get(0));
}
@Test
void shouldFailWithoutDeadLetterQueueRecords() {
final Response response = Response.fail();
assertEquals(Result.FAIL, response.result());
assertTrue(response.deadLetterQueueRecords().isEmpty());
}
@Test
void shouldResumeWithDeadLetterQueueRecords() {
final ProducerRecord<byte[], byte[]> record = new ProducerRecord<>("topic", new byte[]{}, new byte[]{});
final List<ProducerRecord<byte[], byte[]>> records = Collections.singletonList(record);
final Response response = Response.resume(records);
assertEquals(Result.RESUME, response.result());
assertEquals(1, response.deadLetterQueueRecords().size());
assertEquals(record, response.deadLetterQueueRecords().get(0));
}
@Test
void shouldResumeWithoutDeadLetterQueueRecords() {
final Response response = Response.resume();
assertEquals(Result.RESUME, response.result());
assertTrue(response.deadLetterQueueRecords().isEmpty());
}
@Test
void shouldNotBeModifiable() {
final ProducerRecord<byte[], byte[]> record = new ProducerRecord<>("topic", new byte[]{}, new byte[]{});
final List<ProducerRecord<byte[], byte[]>> records = Collections.singletonList(record);
final Response response = Response.fail(records);
assertThrows(UnsupportedOperationException.class, () -> response.deadLetterQueueRecords().add(record));
}
@Test
void shouldReturnsEmptyList() {
final Response response = Response.fail();
assertTrue(response.deadLetterQueueRecords().isEmpty());
}
@SuppressWarnings("unchecked")
private InternalProcessorContext<Object, Object> mockInternalProcessorContext() {
final InternalProcessorContext<Object, Object> internalProcessorContext = mock(InternalProcessorContext.class, withSettings().strictness(Strictness.LENIENT));
@ -342,12 +460,12 @@ public class ProcessorNodeTest {
}
public static class ProcessingExceptionHandlerMock implements ProcessingExceptionHandler {
private final ProcessingExceptionHandler.ProcessingHandlerResponse response;
private final Response response;
private final InternalProcessorContext<Object, Object> internalProcessorContext;
private final boolean shouldThrowException;
public ProcessingExceptionHandlerMock(final ProcessingExceptionHandler.ProcessingHandlerResponse response,
public ProcessingExceptionHandlerMock(final Response response,
final InternalProcessorContext<Object, Object> internalProcessorContext,
final boolean shouldThrowException) {
this.response = response;
@ -356,7 +474,7 @@ public class ProcessorNodeTest {
}
@Override
public ProcessingExceptionHandler.ProcessingHandlerResponse handle(final ErrorHandlerContext context, final Record<?, ?> record, final Exception exception) {
public Response handleError(final ErrorHandlerContext context, final Record<?, ?> record, final Exception exception) {
assertEquals(internalProcessorContext.topic(), context.topic());
assertEquals(internalProcessorContext.partition(), context.partition());
assertEquals(internalProcessorContext.offset(), context.offset());

View File

@ -47,10 +47,10 @@ import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.LogCaptureAppender;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.DefaultProductionExceptionHandler;
import org.apache.kafka.streams.errors.ErrorHandlerContext;
import org.apache.kafka.streams.errors.ProductionExceptionHandler;
import org.apache.kafka.streams.errors.ProductionExceptionHandler.ProductionExceptionHandlerResponse;
import org.apache.kafka.streams.errors.ProductionExceptionHandler.SerializationExceptionOrigin;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TaskCorruptedException;
@ -89,6 +89,8 @@ import static java.util.Collections.emptySet;
import static java.util.Collections.singletonMap;
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.streams.errors.ProductionExceptionHandler.Response;
import static org.apache.kafka.streams.errors.ProductionExceptionHandler.Result;
import static org.apache.kafka.streams.internals.StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE;
import static org.apache.kafka.streams.internals.StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_V2;
import static org.apache.kafka.streams.processor.internals.ClientUtils.producerRecordSizeInBytes;
@ -1201,7 +1203,7 @@ public class RecordCollectorTest {
logContext,
taskId,
getExceptionalStreamsProducerOnSend(exception),
new ProductionExceptionHandlerMock(Optional.of(ProductionExceptionHandlerResponse.CONTINUE)),
new ProductionExceptionHandlerMock(Optional.of(ProductionExceptionHandler.Response.resume())),
streamsMetrics,
topology
);
@ -1228,7 +1230,7 @@ public class RecordCollectorTest {
logContext,
taskId,
getExceptionalStreamsProducerOnSend(exception),
new ProductionExceptionHandlerMock(Optional.of(ProductionExceptionHandlerResponse.CONTINUE)),
new ProductionExceptionHandlerMock(Optional.of(ProductionExceptionHandler.Response.resume())),
streamsMetrics,
topology
);
@ -1252,7 +1254,7 @@ public class RecordCollectorTest {
logContext,
taskId,
getExceptionalStreamsProducerOnSend(exception),
new ProductionExceptionHandlerMock(Optional.of(ProductionExceptionHandlerResponse.CONTINUE)),
new ProductionExceptionHandlerMock(Optional.of(ProductionExceptionHandler.Response.resume())),
streamsMetrics,
topology
);
@ -1276,7 +1278,7 @@ public class RecordCollectorTest {
taskId,
getExceptionalStreamsProducerOnSend(new RuntimeException("KABOOM!")),
new ProductionExceptionHandlerMock(
Optional.of(ProductionExceptionHandlerResponse.CONTINUE),
Optional.of(ProductionExceptionHandler.Response.resume()),
context,
sinkNodeName,
taskId
@ -1347,7 +1349,7 @@ public class RecordCollectorTest {
taskId,
getExceptionalStreamsProducerOnSend(exception),
new ProductionExceptionHandlerMock(
Optional.of(ProductionExceptionHandlerResponse.FAIL),
Optional.of(ProductionExceptionHandler.Response.fail()),
context,
sinkNodeName,
taskId
@ -1377,7 +1379,7 @@ public class RecordCollectorTest {
taskId,
getExceptionalStreamsProducerOnSend(exception),
new ProductionExceptionHandlerMock(
Optional.of(ProductionExceptionHandlerResponse.CONTINUE),
Optional.of(ProductionExceptionHandler.Response.resume()),
context,
sinkNodeName,
taskId
@ -1400,7 +1402,7 @@ public class RecordCollectorTest {
taskId,
getExceptionalStreamsProducerOnSend(exception),
new ProductionExceptionHandlerMock(
Optional.of(ProductionExceptionHandlerResponse.RETRY),
Optional.of(ProductionExceptionHandler.Response.retry()),
context,
sinkNodeName,
taskId
@ -1535,7 +1537,7 @@ public class RecordCollectorTest {
public void shouldDropRecordExceptionUsingAlwaysContinueExceptionHandler() {
try (final ErrorStringSerializer errorSerializer = new ErrorStringSerializer()) {
final RecordCollector collector = newRecordCollector(new ProductionExceptionHandlerMock(
Optional.of(ProductionExceptionHandlerResponse.CONTINUE),
Optional.of(ProductionExceptionHandler.Response.resume()),
context,
sinkNodeName,
taskId,
@ -1564,7 +1566,7 @@ public class RecordCollectorTest {
public void shouldThrowStreamsExceptionWhenValueSerializationFailedAndProductionExceptionHandlerRepliesWithFail() {
try (final ErrorStringSerializer errorSerializer = new ErrorStringSerializer()) {
final RecordCollector collector = newRecordCollector(new ProductionExceptionHandlerMock(
Optional.of(ProductionExceptionHandlerResponse.FAIL),
Optional.of(ProductionExceptionHandler.Response.fail()),
context,
sinkNodeName,
taskId,
@ -1585,7 +1587,7 @@ public class RecordCollectorTest {
public void shouldThrowStreamsExceptionWhenKeySerializationFailedAndProductionExceptionHandlerRepliesWithFail() {
try (final ErrorStringSerializer errorSerializer = new ErrorStringSerializer()) {
final RecordCollector collector = newRecordCollector(new ProductionExceptionHandlerMock(
Optional.of(ProductionExceptionHandlerResponse.FAIL),
Optional.of(ProductionExceptionHandler.Response.fail()),
context,
sinkNodeName,
taskId,
@ -1795,7 +1797,7 @@ public class RecordCollectorTest {
public void shouldNotCallProductionExceptionHandlerOnClassCastException() {
try (final ErrorStringSerializer errorSerializer = new ErrorStringSerializer()) {
final RecordCollector collector = newRecordCollector(
new ProductionExceptionHandlerMock(Optional.of(ProductionExceptionHandlerResponse.CONTINUE))
new ProductionExceptionHandlerMock(Optional.of(ProductionExceptionHandler.Response.resume()))
);
collector.initialize();
@ -1834,6 +1836,58 @@ public class RecordCollectorTest {
collector.flush(); // need to call flush() to check for internal exceptions
}
@Test
public void shouldBuildDeadLetterQueueRecordsInDefaultExceptionHandlerDuringDeserialization() {
try (final ErrorStringSerializer errorSerializer = new ErrorStringSerializer()) {
final DefaultProductionExceptionHandler productionExceptionHandler = new DefaultProductionExceptionHandler();
productionExceptionHandler.configure(Collections.singletonMap(
StreamsConfig.ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG,
"dlq"
));
final RecordCollector collector = newRecordCollector(productionExceptionHandler);
collector.initialize();
assertThat(mockProducer.history().isEmpty(), equalTo(true));
assertThrows(
StreamsException.class,
() ->
collector.send(topic, "hello", "world", null, 0, null, errorSerializer, stringSerializer, sinkNodeName, context)
);
assertEquals(1, mockProducer.history().size());
assertEquals("dlq", mockProducer.history().get(0).topic());
}
}
@Test
public void shouldBuildDeadLetterQueueRecordsInDefaultExceptionHandler() {
final KafkaException exception = new KafkaException("KABOOM!");
final StreamsProducer streamProducer = getExceptionalStreamsProducerOnSend(exception);
final MockProducer<byte[], byte[]> mockProducer = (MockProducer<byte[], byte[]>) streamProducer.kafkaProducer();
final DefaultProductionExceptionHandler productionExceptionHandler = new DefaultProductionExceptionHandler();
productionExceptionHandler.configure(Collections.singletonMap(
StreamsConfig.ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG,
"dlq"
));
final RecordCollector collector = new RecordCollectorImpl(
logContext,
taskId,
streamProducer,
productionExceptionHandler,
streamsMetrics,
topology
);
collector.initialize();
collector.send(topic, "hello", "world", null, 0, null, stringSerializer, stringSerializer, sinkNodeName, context);
assertThrows(StreamsException.class, collector::flush);
assertEquals(1, mockProducer.history().size());
assertEquals("dlq", mockProducer.history().get(0).topic());
}
@Test
public void shouldNotSendIfSendOfOtherTaskFailedInCallback() {
final TaskId taskId1 = new TaskId(0, 0);
@ -1954,6 +2008,116 @@ public class RecordCollectorTest {
}
}
public void shouldCallOldImplementationExceptionHandler() {
final KafkaException exception = new KafkaException("KABOOM!");
final StreamsProducer streamProducer = getExceptionalStreamsProducerOnSend(exception);
final OldProductionExceptionHandlerImplementation productionExceptionHandler = new OldProductionExceptionHandlerImplementation();
final RecordCollector collector = new RecordCollectorImpl(
logContext,
taskId,
streamProducer,
productionExceptionHandler,
streamsMetrics,
topology
);
collector.initialize();
collector.send(topic, "hello", "world", null, 0, null, stringSerializer, stringSerializer, sinkNodeName, context);
final Exception thrown = assertThrows(StreamsException.class, collector::flush);
assertEquals(exception, thrown.getCause());
}
@Test
public void shouldCallOldImplementationWithRecordContextExceptionHandler() {
final KafkaException exception = new KafkaException("KABOOM!");
final StreamsProducer streamProducer = getExceptionalStreamsProducerOnSend(exception);
final OldProductionExceptionHandlerWithRecordContextImplementation productionExceptionHandler = new OldProductionExceptionHandlerWithRecordContextImplementation();
final RecordCollector collector = new RecordCollectorImpl(
logContext,
taskId,
streamProducer,
productionExceptionHandler,
streamsMetrics,
topology
);
collector.initialize();
collector.send(topic, "hello", "world", null, 0, null, stringSerializer, stringSerializer, sinkNodeName, context);
final Exception thrown = assertThrows(StreamsException.class, collector::flush);
assertEquals(exception, thrown.getCause());
}
@Test
void shouldFailWithDeadLetterQueueRecords() {
final ProducerRecord<byte[], byte[]> record = new ProducerRecord<>("topic", new byte[]{}, new byte[]{});
final List<ProducerRecord<byte[], byte[]>> records = Collections.singletonList(record);
final ProductionExceptionHandler.Response response = ProductionExceptionHandler.Response.fail(records);
assertEquals(Result.FAIL, response.result());
assertEquals(1, response.deadLetterQueueRecords().size());
assertEquals(record, response.deadLetterQueueRecords().get(0));
}
@Test
void shouldFailWithoutDeadLetterQueueRecords() {
final ProductionExceptionHandler.Response response = ProductionExceptionHandler.Response.fail();
assertEquals(Result.FAIL, response.result());
assertTrue(response.deadLetterQueueRecords().isEmpty());
}
@Test
void shouldResumeWithDeadLetterQueueRecords() {
final ProducerRecord<byte[], byte[]> record = new ProducerRecord<>("topic", new byte[]{}, new byte[]{});
final List<ProducerRecord<byte[], byte[]>> records = Collections.singletonList(record);
final Response response = Response.resume(records);
assertEquals(Result.RESUME, response.result());
assertEquals(1, response.deadLetterQueueRecords().size());
assertEquals(record, response.deadLetterQueueRecords().get(0));
}
@Test
void shouldResumeWithoutDeadLetterQueueRecords() {
final Response response = Response.resume();
assertEquals(Result.RESUME, response.result());
assertTrue(response.deadLetterQueueRecords().isEmpty());
}
@Test
void shouldRetryWithoutDeadLetterQueueRecords() {
final Response response = Response.retry();
assertEquals(Result.RETRY, response.result());
assertTrue(response.deadLetterQueueRecords().isEmpty());
}
@Test
void shouldNotBeModifiable() {
final ProducerRecord<byte[], byte[]> record = new ProducerRecord<>("topic", new byte[]{}, new byte[]{});
final List<ProducerRecord<byte[], byte[]>> records = Collections.singletonList(record);
final Response response = Response.fail(records);
assertThrows(UnsupportedOperationException.class, () -> response.deadLetterQueueRecords().add(record));
}
@Test
void shouldReturnsEmptyList() {
final Response response = Response.fail();
assertTrue(response.deadLetterQueueRecords().isEmpty());
}
private RecordCollector newRecordCollector(final ProductionExceptionHandler productionExceptionHandler) {
return new RecordCollectorImpl(
logContext,
@ -1978,9 +2142,13 @@ public class RecordCollectorTest {
new MockProducer<>(cluster, true, null, byteArraySerializer, byteArraySerializer) {
@Override
public synchronized Future<RecordMetadata> send(final ProducerRecord<byte[], byte[]> record, final Callback callback) {
if (record.topic().equals("dlq")) {
return super.send(record, callback);
} else {
callback.onCompletion(null, exception);
return null;
}
}
},
AT_LEAST_ONCE,
Time.SYSTEM,
@ -2023,7 +2191,7 @@ public class RecordCollectorTest {
}
public static class ProductionExceptionHandlerMock implements ProductionExceptionHandler {
private final Optional<ProductionExceptionHandlerResponse> response;
private final Optional<Response> response;
private boolean shouldThrowException;
private InternalProcessorContext<Void, Void> expectedContext;
private String expectedProcessorNodeId;
@ -2040,11 +2208,11 @@ public class RecordCollectorTest {
this.expectedSerializationExceptionOrigin = null;
}
public ProductionExceptionHandlerMock(final Optional<ProductionExceptionHandlerResponse> response) {
public ProductionExceptionHandlerMock(final Optional<Response> response) {
this.response = response;
}
public ProductionExceptionHandlerMock(final Optional<ProductionExceptionHandlerResponse> response,
public ProductionExceptionHandlerMock(final Optional<Response> response,
final InternalProcessorContext<Void, Void> context,
final String processorNodeId,
final TaskId taskId) {
@ -2064,7 +2232,7 @@ public class RecordCollectorTest {
this.shouldThrowException = shouldThrowException;
}
public ProductionExceptionHandlerMock(final Optional<ProductionExceptionHandlerResponse> response,
public ProductionExceptionHandlerMock(final Optional<Response> response,
final InternalProcessorContext<Void, Void> context,
final String processorNodeId,
final TaskId taskId,
@ -2075,7 +2243,7 @@ public class RecordCollectorTest {
}
@Override
public ProductionExceptionHandlerResponse handle(final ErrorHandlerContext context,
public Response handleError(final ErrorHandlerContext context,
final ProducerRecord<byte[], byte[]> record,
final Exception exception) {
assertInputs(context, exception);
@ -2087,7 +2255,7 @@ public class RecordCollectorTest {
@SuppressWarnings("rawtypes")
@Override
public ProductionExceptionHandlerResponse handleSerializationException(final ErrorHandlerContext context,
public Response handleSerializationError(final ErrorHandlerContext context,
final ProducerRecord record,
final Exception exception,
final SerializationExceptionOrigin origin) {
@ -2115,4 +2283,33 @@ public class RecordCollectorTest {
assertEquals("KABOOM!", exception.getMessage());
}
}
public static class OldProductionExceptionHandlerImplementation implements ProductionExceptionHandler {
@SuppressWarnings("deprecation")
@Override
public ProductionExceptionHandlerResponse handle(final ProducerRecord<byte[], byte[]> record,
final Exception exception) {
return ProductionExceptionHandlerResponse.FAIL;
}
@Override
public void configure(final Map<String, ?> configs) {
}
}
public static class OldProductionExceptionHandlerWithRecordContextImplementation implements ProductionExceptionHandler {
@SuppressWarnings("deprecation")
@Override
public ProductionExceptionHandlerResponse handle(final ErrorHandlerContext context,
final ProducerRecord<byte[], byte[]> record,
final Exception exception) {
return ProductionExceptionHandlerResponse.FAIL;
}
@Override
public void configure(final Map<String, ?> configs) {
}
}
}

View File

@ -17,32 +17,44 @@
package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.DeserializationExceptionHandler;
import org.apache.kafka.streams.errors.DeserializationExceptionHandler.DeserializationHandlerResponse;
import org.apache.kafka.streams.errors.ErrorHandlerContext;
import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler;
import org.apache.kafka.streams.errors.LogAndFailExceptionHandler;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.state.StateSerdes;
import org.apache.kafka.test.InternalMockProcessorContext;
import org.apache.kafka.test.MockRecordCollector;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import static org.apache.kafka.streams.StreamsConfig.DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG;
import static org.apache.kafka.streams.errors.DeserializationExceptionHandler.Response;
import static org.apache.kafka.streams.errors.DeserializationExceptionHandler.Result;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class RecordDeserializerTest {
private final String sourceNodeName = "source-node";
@ -108,7 +120,7 @@ public class RecordDeserializerTest {
"value"
),
new DeserializationExceptionHandlerMock(
Optional.of(DeserializationHandlerResponse.FAIL),
Optional.of(DeserializationExceptionHandler.Response.fail()),
rawRecord,
sourceNodeName,
taskId
@ -147,7 +159,7 @@ public class RecordDeserializerTest {
"value"
),
new DeserializationExceptionHandlerMock(
Optional.of(DeserializationHandlerResponse.CONTINUE),
Optional.of(DeserializationExceptionHandler.Response.resume()),
rawRecord,
sourceNodeName,
taskId
@ -188,7 +200,7 @@ public class RecordDeserializerTest {
);
assertEquals("Fatal user code error in deserialization error callback", exception.getMessage());
assertInstanceOf(NullPointerException.class, exception.getCause());
assertEquals("Invalid DeserializationExceptionHandler response.", exception.getCause().getMessage());
assertEquals("Invalid DeserializationExceptionResponse response.", exception.getCause().getMessage());
}
}
@ -222,6 +234,144 @@ public class RecordDeserializerTest {
}
}
@Test
public void shouldBuildDeadLetterQueueRecordsInDefaultDeserializationException() {
try (Metrics metrics = new Metrics()) {
final MockRecordCollector collector = new MockRecordCollector();
final InternalProcessorContext<Object, Object> internalProcessorContext =
new InternalMockProcessorContext<>(
new StateSerdes<>("sink", Serdes.ByteArray(), Serdes.ByteArray()),
collector
);
final DeserializationExceptionHandler deserializationExceptionHandler = new LogAndFailExceptionHandler();
deserializationExceptionHandler.configure(Collections.singletonMap(StreamsConfig.ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG, "dlq"));
assertThrows(StreamsException.class, () -> RecordDeserializer.handleDeserializationFailure(
deserializationExceptionHandler,
internalProcessorContext,
new RuntimeException(new NullPointerException("Oopsie")),
new ConsumerRecord<>("source",
0,
0,
123,
TimestampType.CREATE_TIME,
-1,
-1,
"hello".getBytes(StandardCharsets.UTF_8),
"world".getBytes(StandardCharsets.UTF_8),
new RecordHeaders(),
Optional.empty()),
new LogContext().logger(this.getClass()),
metrics.sensor("dropped-records"),
"sourceNode"
));
assertEquals(1, collector.collected().size());
assertEquals("dlq", collector.collected().get(0).topic());
assertEquals("hello", new String((byte[]) collector.collected().get(0).key()));
assertEquals("world", new String((byte[]) collector.collected().get(0).value()));
}
}
@Test
public void shouldBuildDeadLetterQueueRecordsInLogAndContinueDeserializationException() {
try (Metrics metrics = new Metrics()) {
final MockRecordCollector collector = new MockRecordCollector();
final InternalProcessorContext<Object, Object> internalProcessorContext =
new InternalMockProcessorContext<>(
new StateSerdes<>("sink", Serdes.ByteArray(), Serdes.ByteArray()),
collector
);
final DeserializationExceptionHandler deserializationExceptionHandler = new LogAndContinueExceptionHandler();
deserializationExceptionHandler.configure(Collections.singletonMap(StreamsConfig.ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG, "dlq"));
RecordDeserializer.handleDeserializationFailure(
deserializationExceptionHandler,
internalProcessorContext,
new RuntimeException(new NullPointerException("Oopsie")),
new ConsumerRecord<>("source",
0,
0,
123,
TimestampType.CREATE_TIME,
-1,
-1,
"hello".getBytes(StandardCharsets.UTF_8),
"world".getBytes(StandardCharsets.UTF_8),
new RecordHeaders(),
Optional.empty()),
new LogContext().logger(this.getClass()),
metrics.sensor("dropped-records"),
"sourceNode"
);
assertEquals(1, collector.collected().size());
assertEquals("dlq", collector.collected().get(0).topic());
assertEquals("hello", new String((byte[]) collector.collected().get(0).key()));
assertEquals("world", new String((byte[]) collector.collected().get(0).value()));
}
}
@Test
void shouldFailWithDeadLetterQueueRecords() {
final ProducerRecord<byte[], byte[]> record = new ProducerRecord<>("topic", new byte[]{}, new byte[]{});
final List<ProducerRecord<byte[], byte[]>> records = Collections.singletonList(record);
final Response response = Response.fail(records);
assertEquals(Result.FAIL, response.result());
assertEquals(1, response.deadLetterQueueRecords().size());
assertEquals(record, response.deadLetterQueueRecords().get(0));
}
@Test
void shouldFailWithoutDeadLetterQueueRecords() {
final Response response = DeserializationExceptionHandler.Response.fail();
assertEquals(Result.FAIL, response.result());
assertTrue(response.deadLetterQueueRecords().isEmpty());
}
@Test
void shouldResumeWithDeadLetterQueueRecords() {
final ProducerRecord<byte[], byte[]> record = new ProducerRecord<>("topic", new byte[]{}, new byte[]{});
final List<ProducerRecord<byte[], byte[]>> records = Collections.singletonList(record);
final Response response = Response.resume(records);
assertEquals(Result.RESUME, response.result());
assertEquals(1, response.deadLetterQueueRecords().size());
assertEquals(record, response.deadLetterQueueRecords().get(0));
}
@Test
void shouldResumeWithoutDeadLetterQueueRecords() {
final Response response = Response.resume();
assertEquals(Result.RESUME, response.result());
assertTrue(response.deadLetterQueueRecords().isEmpty());
}
@Test
void shouldNotBeModifiable() {
final ProducerRecord<byte[], byte[]> record = new ProducerRecord<>("topic", new byte[]{}, new byte[]{});
final List<ProducerRecord<byte[], byte[]>> records = Collections.singletonList(record);
final Response response = Response.fail(records);
assertThrows(UnsupportedOperationException.class, () -> response.deadLetterQueueRecords().add(record));
}
@Test
void shouldReturnsEmptyList() {
final Response response = Response.fail();
assertTrue(response.deadLetterQueueRecords().isEmpty());
}
static class TheSourceNode extends SourceNode<Object, Object> {
private final boolean keyThrowsException;
private final boolean valueThrowsException;
@ -258,12 +408,12 @@ public class RecordDeserializerTest {
}
public static class DeserializationExceptionHandlerMock implements DeserializationExceptionHandler {
private final Optional<DeserializationHandlerResponse> response;
private final Optional<Response> response;
private final ConsumerRecord<byte[], byte[]> expectedRecord;
private final String expectedProcessorNodeId;
private final TaskId expectedTaskId;
public DeserializationExceptionHandlerMock(final Optional<DeserializationHandlerResponse> response,
public DeserializationExceptionHandlerMock(final Optional<Response> response,
final ConsumerRecord<byte[], byte[]> record,
final String processorNodeId,
final TaskId taskId) {
@ -274,7 +424,7 @@ public class RecordDeserializerTest {
}
@Override
public DeserializationHandlerResponse handle(final ErrorHandlerContext context,
public Response handleError(final ErrorHandlerContext context,
final ConsumerRecord<byte[], byte[]> record,
final Exception exception) {
assertEquals(expectedRecord.topic(), context.topic());

View File

@ -3032,7 +3032,7 @@ public class StreamTaskTest {
public static class CrashingProcessingExceptionHandler implements ProcessingExceptionHandler {
@Override
public ProcessingHandlerResponse handle(final ErrorHandlerContext context, final Record<?, ?> record, final Exception exception) {
public Response handleError(final ErrorHandlerContext context, final Record<?, ?> record, final Exception exception) {
throw new RuntimeException("KABOOM from ProcessingExceptionHandlerMock!");
}
@ -3044,7 +3044,7 @@ public class StreamTaskTest {
public static class NullProcessingExceptionHandler implements ProcessingExceptionHandler {
@Override
public ProcessingHandlerResponse handle(final ErrorHandlerContext context, final Record<?, ?> record, final Exception exception) {
public Response handleError(final ErrorHandlerContext context, final Record<?, ?> record, final Exception exception) {
return null;
}

View File

@ -81,6 +81,23 @@ public class MockRecordCollector implements RecordCollector {
);
}
@Override
public <K, V> void send(final K key,
final V value,
final String processorNodeId,
final InternalProcessorContext<?, ?> context,
final ProducerRecord<byte[], byte[]> serializedRecord) {
// Building a new ProducerRecord for key & value type conversion
collected.add(new ProducerRecord<>(
serializedRecord.topic(),
serializedRecord.partition(),
serializedRecord.timestamp(),
key,
value,
serializedRecord.headers())
);
}
@Override
public void initialize() {}