KAFKA-17527: Fix NPE for null RecordContext (#17169)

Reviewers: Bruno Cadonna <bruno@confluent.io>
This commit is contained in:
Matthias J. Sax 2024-09-13 16:34:15 -07:00 committed by GitHub
parent 45d040d881
commit 6610a4d46f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 193 additions and 58 deletions

View File

@ -28,7 +28,10 @@ import org.apache.kafka.streams.processor.api.Record;
/**
* This interface allows user code to inspect the context of a record that has failed processing.
* This interface allows user code to inspect the context of a record that has failed during processing.
*
* <p> {@code ErrorHandlerContext} instances are passed into {@link DeserializationExceptionHandler},
* {@link ProcessingExceptionHandler}, or {@link ProductionExceptionHandler} dependent on what error occurred.
*/
public interface ErrorHandlerContext {
/**
@ -42,6 +45,8 @@ public interface ErrorHandlerContext {
* {@link org.apache.kafka.streams.kstream.KTable#transformValues(ValueTransformerWithKeySupplier, String...)}
* (and siblings), that do not always guarantee to provide a valid topic name, as they might be
* executed "out-of-band" due to some internal optimizations applied by the Kafka Streams DSL.
* Additionally, when writing into a changelog topic, there is no associated input record,
* and thus no topic name is available.
*
* @return the topic name
*/
@ -58,6 +63,8 @@ public interface ErrorHandlerContext {
* {@link org.apache.kafka.streams.kstream.KTable#transformValues(ValueTransformerWithKeySupplier, String...)}
* (and siblings), that do not always guarantee to provide a valid partition ID, as they might be
* executed "out-of-band" due to some internal optimizations applied by the Kafka Streams DSL.
* Additionally, when writing into a changelog topic, there is no associated input record,
* and thus no partition is available.
*
* @return the partition ID
*/
@ -74,6 +81,8 @@ public interface ErrorHandlerContext {
* {@link org.apache.kafka.streams.kstream.KTable#transformValues(ValueTransformerWithKeySupplier, String...)}
* (and siblings), that do not always guarantee to provide a valid offset, as they might be
* executed "out-of-band" due to some internal optimizations applied by the Kafka Streams DSL.
* Additionally, when writing into a changelog topic, there is no associated input record,
* and thus no offset is available.
*
* @return the offset
*/
@ -90,6 +99,8 @@ public interface ErrorHandlerContext {
* {@link org.apache.kafka.streams.kstream.KTable#transformValues(ValueTransformerWithKeySupplier, String...)}
* (and siblings), that do not always guarantee to provide valid headers, as they might be
* executed "out-of-band" due to some internal optimizations applied by the Kafka Streams DSL.
* Additionally, when writing into a changelog topic, there is no associated input record,
* and thus no headers are available.
*
* @return the headers
*/
@ -110,7 +121,10 @@ public interface ErrorHandlerContext {
TaskId taskId();
/**
* Return the current timestamp.
* Return the current timestamp; could be {@code -1} if it is not available.
*
* <p> For example, when writing into a changelog topic, there is no associated input record,
* and thus no timestamp is available.
*
* <p> If it is triggered while processing a record streamed from the source processor,
* timestamp is defined as the timestamp of the current input record; the timestamp is extracted from
@ -129,8 +143,7 @@ public interface ErrorHandlerContext {
* </ul>
*
* <p> If it is triggered from a deserialization failure, timestamp is defined as the timestamp of the
* current rawRecord
* {@link org.apache.kafka.clients.consumer.ConsumerRecord ConsumerRecord}
* current rawRecord {@link org.apache.kafka.clients.consumer.ConsumerRecord ConsumerRecord}.
*
* @return the timestamp
*/

View File

@ -35,7 +35,7 @@ public class DefaultErrorHandlerContext implements ErrorHandlerContext {
private final TaskId taskId;
private final long timestamp;
private ProcessorContext processorContext;
private final ProcessorContext processorContext;
public DefaultErrorHandlerContext(final ProcessorContext processorContext,
final String topic,

View File

@ -36,6 +36,7 @@ import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.UnknownServerException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.LogContext;
@ -46,6 +47,7 @@ import org.apache.kafka.streams.errors.TaskCorruptedException;
import org.apache.kafka.streams.errors.TaskMigratedException;
import org.apache.kafka.streams.errors.internals.DefaultErrorHandlerContext;
import org.apache.kafka.streams.errors.internals.FailedProcessingException;
import org.apache.kafka.streams.processor.RecordContext;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
@ -252,40 +254,53 @@ public class RecordCollectorImpl implements RecordCollector {
final ProducerRecord<byte[], byte[]> serializedRecord = new ProducerRecord<>(topic, partition, timestamp, keyBytes, valBytes, headers);
streamsProducer.send(serializedRecord, (metadata, exception) -> {
// if there's already an exception record, skip logging offsets or new exceptions
if (sendException.get() != null) {
return;
}
if (exception == null) {
final TopicPartition tp = new TopicPartition(metadata.topic(), metadata.partition());
if (metadata.offset() >= 0L) {
offsets.put(tp, metadata.offset());
} else {
log.warn("Received offset={} in produce response for {}", metadata.offset(), tp);
try {
// if there's already an exception record, skip logging offsets or new exceptions
if (sendException.get() != null) {
return;
}
if (!topic.endsWith("-changelog")) {
// we may not have created a sensor during initialization if the node uses dynamic topic routing,
// as all topics are not known up front, so create the sensor for this topic if absent
final Sensor topicProducedSensor = producedSensorByTopic.computeIfAbsent(
topic,
t -> TopicMetrics.producedSensor(
Thread.currentThread().getName(),
taskId.toString(),
processorNodeId,
if (exception == null) {
final TopicPartition tp = new TopicPartition(metadata.topic(), metadata.partition());
if (metadata.offset() >= 0L) {
offsets.put(tp, metadata.offset());
} else {
log.warn("Received offset={} in produce response for {}", metadata.offset(), tp);
}
if (!topic.endsWith("-changelog")) {
// we may not have created a sensor during initialization if the node uses dynamic topic routing,
// as all topics are not known up front, so create the sensor for this topic if absent
final Sensor topicProducedSensor = producedSensorByTopic.computeIfAbsent(
topic,
context.metrics()
)
t -> TopicMetrics.producedSensor(
Thread.currentThread().getName(),
taskId.toString(),
processorNodeId,
topic,
context.metrics()
)
);
final long bytesProduced = producerRecordSizeInBytes(serializedRecord);
topicProducedSensor.record(
bytesProduced,
context.currentSystemTimeMs()
);
}
} else {
recordSendError(
topic,
exception,
serializedRecord,
context,
processorNodeId
);
final long bytesProduced = producerRecordSizeInBytes(serializedRecord);
topicProducedSensor.record(bytesProduced, context.currentSystemTimeMs());
}
} else {
recordSendError(topic, exception, serializedRecord, context, processorNodeId);
// KAFKA-7510 only put message key and value in TRACE level log so we don't leak data by default
log.trace("Failed record: (key {} value {} timestamp {}) topic=[{}] partition=[{}]", key, value, timestamp, topic, partition);
// KAFKA-7510 only put message key and value in TRACE level log so we don't leak data by default
log.trace("Failed record: (key {} value {} timestamp {}) topic=[{}] partition=[{}]", key, value, timestamp, topic, partition);
}
} catch (final RuntimeException fatal) {
sendException.set(new StreamsException("Producer.send `Callback` failed", fatal));
}
});
}
@ -302,22 +317,17 @@ public class RecordCollectorImpl implements RecordCollector {
final RuntimeException serializationException) {
log.debug(String.format("Error serializing record for topic %s", topic), serializationException);
final DefaultErrorHandlerContext errorHandlerContext = new DefaultErrorHandlerContext(
null, // only required to pass for DeserializationExceptionHandler
context.recordContext().topic(),
context.recordContext().partition(),
context.recordContext().offset(),
context.recordContext().headers(),
processorNodeId,
taskId,
context.recordContext().timestamp()
);
final ProducerRecord<K, V> record = new ProducerRecord<>(topic, partition, timestamp, key, value, headers);
final ProductionExceptionHandlerResponse response;
try {
response = Objects.requireNonNull(
productionExceptionHandler.handleSerializationException(errorHandlerContext, record, serializationException, origin),
productionExceptionHandler.handleSerializationException(
errorHandlerContext(context, processorNodeId),
record,
serializationException,
origin
),
"Invalid ProductionExceptionHandler response."
);
} catch (final RuntimeException fatalUserException) {
@ -325,7 +335,7 @@ public class RecordCollectorImpl implements RecordCollector {
String.format(
"Production error callback failed after serialization error for record %s: %s",
origin.toString().toLowerCase(Locale.ROOT),
errorHandlerContext
errorHandlerContext(context, processorNodeId)
),
serializationException
);
@ -352,6 +362,33 @@ public class RecordCollectorImpl implements RecordCollector {
droppedRecordsSensor.record();
}
private DefaultErrorHandlerContext errorHandlerContext(final InternalProcessorContext<Void, Void> context,
final String processorNodeId) {
final RecordContext recordContext = context != null ? context.recordContext() : null;
return recordContext != null ?
new DefaultErrorHandlerContext(
context,
recordContext.topic(),
recordContext.partition(),
recordContext.offset(),
recordContext.headers(),
processorNodeId,
taskId,
recordContext.timestamp()
) :
new DefaultErrorHandlerContext(
context,
null,
-1,
-1,
new RecordHeaders(),
processorNodeId,
taskId,
-1L
);
}
private <KV> StreamsException createStreamsExceptionForClassCastException(final ProductionExceptionHandler.SerializationExceptionOrigin origin,
final String topic,
final KV keyOrValue,
@ -400,21 +437,14 @@ public class RecordCollectorImpl implements RecordCollector {
"`delivery.timeout.ms` to a larger value to wait longer for such scenarios and avoid timeout errors";
sendException.set(new TaskCorruptedException(Collections.singleton(taskId)));
} else {
final DefaultErrorHandlerContext errorHandlerContext = new DefaultErrorHandlerContext(
null, // only required to pass for DeserializationExceptionHandler
context.recordContext().topic(),
context.recordContext().partition(),
context.recordContext().offset(),
context.recordContext().headers(),
processorNodeId,
taskId,
context.recordContext().timestamp()
);
final ProductionExceptionHandlerResponse response;
try {
response = Objects.requireNonNull(
productionExceptionHandler.handle(errorHandlerContext, serializedRecord, productionException),
productionExceptionHandler.handle(
errorHandlerContext(context, processorNodeId),
serializedRecord,
productionException
),
"Invalid ProductionExceptionHandler response."
);
} catch (final RuntimeException fatalUserException) {

View File

@ -1580,6 +1580,98 @@ public class RecordCollectorTest {
}
}
@Test
public void shouldNotFailIfContextIsNotAvailableOnSerializationError() {
try (final ErrorStringSerializer errorSerializer = new ErrorStringSerializer()) {
final RecordCollector collector = new RecordCollectorImpl(
logContext,
taskId,
streamsProducer,
productionExceptionHandler,
streamsMetrics,
topology
);
assertThrows(
StreamsException.class, // should not crash with NullPointerException
() -> collector.send(
topic,
"key",
"val",
null,
0,
null,
errorSerializer,
stringSerializer,
sinkNodeName,
null // pass `null` context for testing
)
);
}
}
@Test
public void shouldNotFailIfRecordContextIsNotAvailableOnSerializationError() {
try (final ErrorStringSerializer errorSerializer = new ErrorStringSerializer()) {
final RecordCollector collector = new RecordCollectorImpl(
logContext,
taskId,
streamsProducer,
productionExceptionHandler,
streamsMetrics,
topology
);
// RecordContext is null when writing into a changelog topic
context.setRecordContext(null);
assertThrows(
StreamsException.class, // should not crash with NullPointerException
() -> collector.send(topic, "key", "val", null, 0, null, errorSerializer, stringSerializer, sinkNodeName, context)
);
}
}
@Test
public void shouldNotFailIfContextIsNotAvailableOnSendError() {
final RecordCollector collector = new RecordCollectorImpl(
logContext,
taskId,
getExceptionalStreamsProducerOnSend(new RuntimeException("Kaboom!")),
productionExceptionHandler,
streamsMetrics,
topology
);
collector.send(
topic,
"key",
"val",
null,
0,
null,
stringSerializer,
stringSerializer,
sinkNodeName,
null // pass `null` context for testing
);
}
@Test
public void shouldNotFailIfRecordContextIsNotAvailableOnSendError() {
final RecordCollector collector = new RecordCollectorImpl(
logContext,
taskId,
getExceptionalStreamsProducerOnSend(new RuntimeException("Kaboom!")),
productionExceptionHandler,
streamsMetrics,
topology
);
// RecordContext is null when writing into a changelog topic
context.setRecordContext(null);
collector.send(topic, "key", "val", null, 0, null, stringSerializer, stringSerializer, sinkNodeName, context);
}
@Test
public void shouldThrowStreamsExceptionWhenSerializationFailedAndProductionExceptionHandlerReturnsNull() {
try (final ErrorStringSerializer errorSerializer = new ErrorStringSerializer()) {