diff --git a/streams/src/main/java/org/apache/kafka/streams/errors/ErrorHandlerContext.java b/streams/src/main/java/org/apache/kafka/streams/errors/ErrorHandlerContext.java
index af67c8f03a4..82d32581255 100644
--- a/streams/src/main/java/org/apache/kafka/streams/errors/ErrorHandlerContext.java
+++ b/streams/src/main/java/org/apache/kafka/streams/errors/ErrorHandlerContext.java
@@ -28,7 +28,10 @@ import org.apache.kafka.streams.processor.api.Record;
/**
- * This interface allows user code to inspect the context of a record that has failed processing.
+ * This interface allows user code to inspect the context of a record that has failed during processing.
+ *
+ *
{@code ErrorHandlerContext} instances are passed into {@link DeserializationExceptionHandler},
+ * {@link ProcessingExceptionHandler}, or {@link ProductionExceptionHandler} dependent on what error occurred.
*/
public interface ErrorHandlerContext {
/**
@@ -42,6 +45,8 @@ public interface ErrorHandlerContext {
* {@link org.apache.kafka.streams.kstream.KTable#transformValues(ValueTransformerWithKeySupplier, String...)}
* (and siblings), that do not always guarantee to provide a valid topic name, as they might be
* executed "out-of-band" due to some internal optimizations applied by the Kafka Streams DSL.
+ * Additionally, when writing into a changelog topic, there is no associated input record,
+ * and thus no topic name is available.
*
* @return the topic name
*/
@@ -58,6 +63,8 @@ public interface ErrorHandlerContext {
* {@link org.apache.kafka.streams.kstream.KTable#transformValues(ValueTransformerWithKeySupplier, String...)}
* (and siblings), that do not always guarantee to provide a valid partition ID, as they might be
* executed "out-of-band" due to some internal optimizations applied by the Kafka Streams DSL.
+ * Additionally, when writing into a changelog topic, there is no associated input record,
+ * and thus no partition is available.
*
* @return the partition ID
*/
@@ -74,6 +81,8 @@ public interface ErrorHandlerContext {
* {@link org.apache.kafka.streams.kstream.KTable#transformValues(ValueTransformerWithKeySupplier, String...)}
* (and siblings), that do not always guarantee to provide a valid offset, as they might be
* executed "out-of-band" due to some internal optimizations applied by the Kafka Streams DSL.
+ * Additionally, when writing into a changelog topic, there is no associated input record,
+ * and thus no offset is available.
*
* @return the offset
*/
@@ -90,6 +99,8 @@ public interface ErrorHandlerContext {
* {@link org.apache.kafka.streams.kstream.KTable#transformValues(ValueTransformerWithKeySupplier, String...)}
* (and siblings), that do not always guarantee to provide valid headers, as they might be
* executed "out-of-band" due to some internal optimizations applied by the Kafka Streams DSL.
+ * Additionally, when writing into a changelog topic, there is no associated input record,
+ * and thus no headers are available.
*
* @return the headers
*/
@@ -110,7 +121,10 @@ public interface ErrorHandlerContext {
TaskId taskId();
/**
- * Return the current timestamp.
+ * Return the current timestamp; could be {@code -1} if it is not available.
+ *
+ *
For example, when writing into a changelog topic, there is no associated input record,
+ * and thus no timestamp is available.
*
*
If it is triggered while processing a record streamed from the source processor,
* timestamp is defined as the timestamp of the current input record; the timestamp is extracted from
@@ -129,8 +143,7 @@ public interface ErrorHandlerContext {
*
*
*
If it is triggered from a deserialization failure, timestamp is defined as the timestamp of the
- * current rawRecord
- * {@link org.apache.kafka.clients.consumer.ConsumerRecord ConsumerRecord}
+ * current rawRecord {@link org.apache.kafka.clients.consumer.ConsumerRecord ConsumerRecord}.
*
* @return the timestamp
*/
diff --git a/streams/src/main/java/org/apache/kafka/streams/errors/internals/DefaultErrorHandlerContext.java b/streams/src/main/java/org/apache/kafka/streams/errors/internals/DefaultErrorHandlerContext.java
index fc44e9c95fb..efaa6d57e7a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/errors/internals/DefaultErrorHandlerContext.java
+++ b/streams/src/main/java/org/apache/kafka/streams/errors/internals/DefaultErrorHandlerContext.java
@@ -35,7 +35,7 @@ public class DefaultErrorHandlerContext implements ErrorHandlerContext {
private final TaskId taskId;
private final long timestamp;
- private ProcessorContext processorContext;
+ private final ProcessorContext processorContext;
public DefaultErrorHandlerContext(final ProcessorContext processorContext,
final String topic,
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
index 6a96e4823b6..bd589a8b7a1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
@@ -36,6 +36,7 @@ import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.UnknownServerException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.LogContext;
@@ -46,6 +47,7 @@ import org.apache.kafka.streams.errors.TaskCorruptedException;
import org.apache.kafka.streams.errors.TaskMigratedException;
import org.apache.kafka.streams.errors.internals.DefaultErrorHandlerContext;
import org.apache.kafka.streams.errors.internals.FailedProcessingException;
+import org.apache.kafka.streams.processor.RecordContext;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
@@ -252,40 +254,53 @@ public class RecordCollectorImpl implements RecordCollector {
final ProducerRecord serializedRecord = new ProducerRecord<>(topic, partition, timestamp, keyBytes, valBytes, headers);
streamsProducer.send(serializedRecord, (metadata, exception) -> {
- // if there's already an exception record, skip logging offsets or new exceptions
- if (sendException.get() != null) {
- return;
- }
-
- if (exception == null) {
- final TopicPartition tp = new TopicPartition(metadata.topic(), metadata.partition());
- if (metadata.offset() >= 0L) {
- offsets.put(tp, metadata.offset());
- } else {
- log.warn("Received offset={} in produce response for {}", metadata.offset(), tp);
+ try {
+ // if there's already an exception record, skip logging offsets or new exceptions
+ if (sendException.get() != null) {
+ return;
}
- if (!topic.endsWith("-changelog")) {
- // we may not have created a sensor during initialization if the node uses dynamic topic routing,
- // as all topics are not known up front, so create the sensor for this topic if absent
- final Sensor topicProducedSensor = producedSensorByTopic.computeIfAbsent(
- topic,
- t -> TopicMetrics.producedSensor(
- Thread.currentThread().getName(),
- taskId.toString(),
- processorNodeId,
+ if (exception == null) {
+ final TopicPartition tp = new TopicPartition(metadata.topic(), metadata.partition());
+ if (metadata.offset() >= 0L) {
+ offsets.put(tp, metadata.offset());
+ } else {
+ log.warn("Received offset={} in produce response for {}", metadata.offset(), tp);
+ }
+
+ if (!topic.endsWith("-changelog")) {
+ // we may not have created a sensor during initialization if the node uses dynamic topic routing,
+ // as all topics are not known up front, so create the sensor for this topic if absent
+ final Sensor topicProducedSensor = producedSensorByTopic.computeIfAbsent(
topic,
- context.metrics()
- )
+ t -> TopicMetrics.producedSensor(
+ Thread.currentThread().getName(),
+ taskId.toString(),
+ processorNodeId,
+ topic,
+ context.metrics()
+ )
+ );
+ final long bytesProduced = producerRecordSizeInBytes(serializedRecord);
+ topicProducedSensor.record(
+ bytesProduced,
+ context.currentSystemTimeMs()
+ );
+ }
+ } else {
+ recordSendError(
+ topic,
+ exception,
+ serializedRecord,
+ context,
+ processorNodeId
);
- final long bytesProduced = producerRecordSizeInBytes(serializedRecord);
- topicProducedSensor.record(bytesProduced, context.currentSystemTimeMs());
- }
- } else {
- recordSendError(topic, exception, serializedRecord, context, processorNodeId);
- // KAFKA-7510 only put message key and value in TRACE level log so we don't leak data by default
- log.trace("Failed record: (key {} value {} timestamp {}) topic=[{}] partition=[{}]", key, value, timestamp, topic, partition);
+ // KAFKA-7510 only put message key and value in TRACE level log so we don't leak data by default
+ log.trace("Failed record: (key {} value {} timestamp {}) topic=[{}] partition=[{}]", key, value, timestamp, topic, partition);
+ }
+ } catch (final RuntimeException fatal) {
+ sendException.set(new StreamsException("Producer.send `Callback` failed", fatal));
}
});
}
@@ -302,22 +317,17 @@ public class RecordCollectorImpl implements RecordCollector {
final RuntimeException serializationException) {
log.debug(String.format("Error serializing record for topic %s", topic), serializationException);
- final DefaultErrorHandlerContext errorHandlerContext = new DefaultErrorHandlerContext(
- null, // only required to pass for DeserializationExceptionHandler
- context.recordContext().topic(),
- context.recordContext().partition(),
- context.recordContext().offset(),
- context.recordContext().headers(),
- processorNodeId,
- taskId,
- context.recordContext().timestamp()
- );
final ProducerRecord record = new ProducerRecord<>(topic, partition, timestamp, key, value, headers);
final ProductionExceptionHandlerResponse response;
try {
response = Objects.requireNonNull(
- productionExceptionHandler.handleSerializationException(errorHandlerContext, record, serializationException, origin),
+ productionExceptionHandler.handleSerializationException(
+ errorHandlerContext(context, processorNodeId),
+ record,
+ serializationException,
+ origin
+ ),
"Invalid ProductionExceptionHandler response."
);
} catch (final RuntimeException fatalUserException) {
@@ -325,7 +335,7 @@ public class RecordCollectorImpl implements RecordCollector {
String.format(
"Production error callback failed after serialization error for record %s: %s",
origin.toString().toLowerCase(Locale.ROOT),
- errorHandlerContext
+ errorHandlerContext(context, processorNodeId)
),
serializationException
);
@@ -352,6 +362,33 @@ public class RecordCollectorImpl implements RecordCollector {
droppedRecordsSensor.record();
}
+ private DefaultErrorHandlerContext errorHandlerContext(final InternalProcessorContext context,
+ final String processorNodeId) {
+ final RecordContext recordContext = context != null ? context.recordContext() : null;
+
+ return recordContext != null ?
+ new DefaultErrorHandlerContext(
+ context,
+ recordContext.topic(),
+ recordContext.partition(),
+ recordContext.offset(),
+ recordContext.headers(),
+ processorNodeId,
+ taskId,
+ recordContext.timestamp()
+ ) :
+ new DefaultErrorHandlerContext(
+ context,
+ null,
+ -1,
+ -1,
+ new RecordHeaders(),
+ processorNodeId,
+ taskId,
+ -1L
+ );
+ }
+
private StreamsException createStreamsExceptionForClassCastException(final ProductionExceptionHandler.SerializationExceptionOrigin origin,
final String topic,
final KV keyOrValue,
@@ -400,21 +437,14 @@ public class RecordCollectorImpl implements RecordCollector {
"`delivery.timeout.ms` to a larger value to wait longer for such scenarios and avoid timeout errors";
sendException.set(new TaskCorruptedException(Collections.singleton(taskId)));
} else {
- final DefaultErrorHandlerContext errorHandlerContext = new DefaultErrorHandlerContext(
- null, // only required to pass for DeserializationExceptionHandler
- context.recordContext().topic(),
- context.recordContext().partition(),
- context.recordContext().offset(),
- context.recordContext().headers(),
- processorNodeId,
- taskId,
- context.recordContext().timestamp()
- );
-
final ProductionExceptionHandlerResponse response;
try {
response = Objects.requireNonNull(
- productionExceptionHandler.handle(errorHandlerContext, serializedRecord, productionException),
+ productionExceptionHandler.handle(
+ errorHandlerContext(context, processorNodeId),
+ serializedRecord,
+ productionException
+ ),
"Invalid ProductionExceptionHandler response."
);
} catch (final RuntimeException fatalUserException) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
index fe747171ab7..316ff8fa201 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
@@ -1580,6 +1580,98 @@ public class RecordCollectorTest {
}
}
+ @Test
+ public void shouldNotFailIfContextIsNotAvailableOnSerializationError() {
+ try (final ErrorStringSerializer errorSerializer = new ErrorStringSerializer()) {
+ final RecordCollector collector = new RecordCollectorImpl(
+ logContext,
+ taskId,
+ streamsProducer,
+ productionExceptionHandler,
+ streamsMetrics,
+ topology
+ );
+
+ assertThrows(
+ StreamsException.class, // should not crash with NullPointerException
+ () -> collector.send(
+ topic,
+ "key",
+ "val",
+ null,
+ 0,
+ null,
+ errorSerializer,
+ stringSerializer,
+ sinkNodeName,
+ null // pass `null` context for testing
+ )
+ );
+ }
+ }
+
+ @Test
+ public void shouldNotFailIfRecordContextIsNotAvailableOnSerializationError() {
+ try (final ErrorStringSerializer errorSerializer = new ErrorStringSerializer()) {
+ final RecordCollector collector = new RecordCollectorImpl(
+ logContext,
+ taskId,
+ streamsProducer,
+ productionExceptionHandler,
+ streamsMetrics,
+ topology
+ );
+
+ // RecordContext is null when writing into a changelog topic
+ context.setRecordContext(null);
+ assertThrows(
+ StreamsException.class, // should not crash with NullPointerException
+ () -> collector.send(topic, "key", "val", null, 0, null, errorSerializer, stringSerializer, sinkNodeName, context)
+ );
+ }
+ }
+
+ @Test
+ public void shouldNotFailIfContextIsNotAvailableOnSendError() {
+ final RecordCollector collector = new RecordCollectorImpl(
+ logContext,
+ taskId,
+ getExceptionalStreamsProducerOnSend(new RuntimeException("Kaboom!")),
+ productionExceptionHandler,
+ streamsMetrics,
+ topology
+ );
+
+ collector.send(
+ topic,
+ "key",
+ "val",
+ null,
+ 0,
+ null,
+ stringSerializer,
+ stringSerializer,
+ sinkNodeName,
+ null // pass `null` context for testing
+ );
+ }
+
+ @Test
+ public void shouldNotFailIfRecordContextIsNotAvailableOnSendError() {
+ final RecordCollector collector = new RecordCollectorImpl(
+ logContext,
+ taskId,
+ getExceptionalStreamsProducerOnSend(new RuntimeException("Kaboom!")),
+ productionExceptionHandler,
+ streamsMetrics,
+ topology
+ );
+
+ // RecordContext is null when writing into a changelog topic
+ context.setRecordContext(null);
+ collector.send(topic, "key", "val", null, 0, null, stringSerializer, stringSerializer, sinkNodeName, context);
+ }
+
@Test
public void shouldThrowStreamsExceptionWhenSerializationFailedAndProductionExceptionHandlerReturnsNull() {
try (final ErrorStringSerializer errorSerializer = new ErrorStringSerializer()) {