KAFKA-16448: Add timestamp to error handler context (#17054)

Part of KIP-1033.

Co-authored-by: Dabz <d.gasparina@gmail.com>
Co-authored-by: loicgreffier <loic.greffier@michelin.com>

Reviewers: Matthias J. Sax <matthias@confluent.io>
This commit is contained in:
Sebastien Viale 2024-09-05 17:42:52 +02:00 committed by GitHub
parent 190df07ace
commit b4f47aeff5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 72 additions and 19 deletions

View File

@ -18,8 +18,14 @@ package org.apache.kafka.streams.errors;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.Punctuator;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.TimestampExtractor;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
/**
* This interface allows user code to inspect the context of a record that has failed processing.
@ -102,4 +108,31 @@ public interface ErrorHandlerContext {
* @return the task ID
*/
TaskId taskId();
/**
* Return the current timestamp.
*
* <p> If it is triggered while processing a record streamed from the source processor,
* timestamp is defined as the timestamp of the current input record; the timestamp is extracted from
* {@link org.apache.kafka.clients.consumer.ConsumerRecord ConsumerRecord} by {@link TimestampExtractor}.
* Note, that an upstream {@link Processor} might have set a new timestamp by calling
* {@link ProcessorContext#forward(Record) forward(record.withTimestamp(...))}.
* In particular, some Kafka Streams DSL operators set result record timestamps explicitly,
* to guarantee deterministic results.
*
* <p> If it is triggered while processing a record generated not from the source processor (for example,
* if this method is invoked from the punctuate call):
* <ul>
* <li>In case of {@link PunctuationType#STREAM_TIME} timestamp is defined as the current task's stream time,
* which is defined as the largest timestamp of any record processed by the task
* <li>In case of {@link PunctuationType#WALL_CLOCK_TIME} timestamp is defined the current system time
* </ul>
*
* <p> If it is triggered from a deserialization failure, timestamp is defined as the timestamp of the
* current rawRecord
* {@link org.apache.kafka.clients.consumer.ConsumerRecord ConsumerRecord}
*
* @return the timestamp
*/
long timestamp();
}

View File

@ -33,6 +33,8 @@ public class DefaultErrorHandlerContext implements ErrorHandlerContext {
private final Headers headers;
private final String processorNodeId;
private final TaskId taskId;
private final long timestamp;
private ProcessorContext processorContext;
public DefaultErrorHandlerContext(final ProcessorContext processorContext,
@ -41,7 +43,8 @@ public class DefaultErrorHandlerContext implements ErrorHandlerContext {
final long offset,
final Headers headers,
final String processorNodeId,
final TaskId taskId) {
final TaskId taskId,
final long timestamp) {
this.topic = topic;
this.partition = partition;
this.offset = offset;
@ -49,6 +52,7 @@ public class DefaultErrorHandlerContext implements ErrorHandlerContext {
this.processorNodeId = processorNodeId;
this.taskId = taskId;
this.processorContext = processorContext;
this.timestamp = timestamp;
}
@Override
@ -81,6 +85,11 @@ public class DefaultErrorHandlerContext implements ErrorHandlerContext {
return taskId;
}
@Override
public long timestamp() {
return timestamp;
}
@Override
public String toString() {
// we do exclude headers on purpose, to not accidentally log user data

View File

@ -211,7 +211,8 @@ public class ProcessorNode<KIn, VIn, KOut, VOut> {
internalProcessorContext.offset(),
internalProcessorContext.headers(),
internalProcessorContext.currentNode().name(),
internalProcessorContext.taskId());
internalProcessorContext.taskId(),
internalProcessorContext.timestamp());
final ProcessingExceptionHandler.ProcessingHandlerResponse response;
try {

View File

@ -309,7 +309,8 @@ public class RecordCollectorImpl implements RecordCollector {
context.recordContext().offset(),
context.recordContext().headers(),
processorNodeId,
taskId
taskId,
context.recordContext().timestamp()
);
final ProducerRecord<K, V> record = new ProducerRecord<>(topic, partition, timestamp, key, value, headers);
@ -406,7 +407,8 @@ public class RecordCollectorImpl implements RecordCollector {
context.recordContext().offset(),
context.recordContext().headers(),
processorNodeId,
taskId
taskId,
context.recordContext().timestamp()
);
final ProductionExceptionHandlerResponse response;

View File

@ -92,7 +92,8 @@ public class RecordDeserializer {
rawRecord.offset(),
rawRecord.headers(),
sourceNodeName,
processorContext.taskId());
processorContext.taskId(),
rawRecord.timestamp());
final DeserializationHandlerResponse response;
try {

View File

@ -946,7 +946,8 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
recordContext.offset(),
recordContext.headers(),
node.name(),
id()
id(),
recordContext.timestamp()
);
final ProcessingExceptionHandler.ProcessingHandlerResponse response;

View File

@ -61,6 +61,8 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
public class ProcessingExceptionHandlerIntegrationTest {
private final String threadId = Thread.currentThread().getName();
private static final Instant TIMESTAMP = Instant.now();
@Test
public void shouldFailWhenProcessingExceptionOccursIfExceptionHandlerReturnsFail() {
final List<KeyValue<String, String>> events = Arrays.asList(
@ -71,7 +73,7 @@ public class ProcessingExceptionHandlerIntegrationTest {
);
final List<KeyValueTimestamp<String, String>> expectedProcessedRecords = Collections.singletonList(
new KeyValueTimestamp<>("ID123-1", "ID123-A1", 0)
new KeyValueTimestamp<>("ID123-1", "ID123-A1", TIMESTAMP.toEpochMilli())
);
final MockProcessorSupplier<String, String, Void, Void> processor = new MockProcessorSupplier<>();
@ -90,7 +92,7 @@ public class ProcessingExceptionHandlerIntegrationTest {
final TestInputTopic<String, String> inputTopic = driver.createInputTopic("TOPIC_NAME", new StringSerializer(), new StringSerializer());
final StreamsException exception = assertThrows(StreamsException.class,
() -> inputTopic.pipeKeyValueList(events, Instant.EPOCH, Duration.ZERO));
() -> inputTopic.pipeKeyValueList(events, TIMESTAMP, Duration.ZERO));
assertTrue(exception.getMessage().contains("Exception caught in process. "
+ "taskId=0_0, processor=KSTREAM-SOURCE-0000000000, topic=TOPIC_NAME, "
@ -118,10 +120,10 @@ public class ProcessingExceptionHandlerIntegrationTest {
);
final List<KeyValueTimestamp<String, String>> expectedProcessedRecords = Arrays.asList(
new KeyValueTimestamp<>("ID123-1", "ID123-A1", 0),
new KeyValueTimestamp<>("ID123-3", "ID123-A3", 0),
new KeyValueTimestamp<>("ID123-4", "ID123-A4", 0),
new KeyValueTimestamp<>("ID123-6", "ID123-A6", 0)
new KeyValueTimestamp<>("ID123-1", "ID123-A1", TIMESTAMP.toEpochMilli()),
new KeyValueTimestamp<>("ID123-3", "ID123-A3", TIMESTAMP.toEpochMilli()),
new KeyValueTimestamp<>("ID123-4", "ID123-A4", TIMESTAMP.toEpochMilli()),
new KeyValueTimestamp<>("ID123-6", "ID123-A6", TIMESTAMP.toEpochMilli())
);
final MockProcessorSupplier<String, String, Void, Void> processor = new MockProcessorSupplier<>();
@ -138,7 +140,7 @@ public class ProcessingExceptionHandlerIntegrationTest {
try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), properties, Instant.ofEpochMilli(0L))) {
final TestInputTopic<String, String> inputTopic = driver.createInputTopic("TOPIC_NAME", new StringSerializer(), new StringSerializer());
inputTopic.pipeKeyValueList(events, Instant.EPOCH, Duration.ZERO);
inputTopic.pipeKeyValueList(events, TIMESTAMP, Duration.ZERO);
assertEquals(expectedProcessedRecords.size(), processor.theCapturedProcessor().processed().size());
assertIterableEquals(expectedProcessedRecords, processor.theCapturedProcessor().processed());
@ -176,10 +178,10 @@ public class ProcessingExceptionHandlerIntegrationTest {
try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), properties, Instant.ofEpochMilli(0L))) {
final TestInputTopic<String, String> inputTopic = driver.createInputTopic("TOPIC_NAME", new StringSerializer(), new StringSerializer());
isExecuted.set(false);
inputTopic.pipeInput(event.key, event.value, Instant.EPOCH);
inputTopic.pipeInput(event.key, event.value, TIMESTAMP);
assertTrue(isExecuted.get());
isExecuted.set(false);
final StreamsException e = assertThrows(StreamsException.class, () -> inputTopic.pipeInput(eventError.key, eventError.value, Instant.EPOCH));
final StreamsException e = assertThrows(StreamsException.class, () -> inputTopic.pipeInput(eventError.key, eventError.value, TIMESTAMP));
assertTrue(e.getMessage().contains("Exception caught in process. "
+ "taskId=0_0, processor=KSTREAM-SOURCE-0000000000, topic=TOPIC_NAME, "
+ "partition=0, offset=1"));
@ -212,10 +214,10 @@ public class ProcessingExceptionHandlerIntegrationTest {
try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), properties, Instant.ofEpochMilli(0L))) {
final TestInputTopic<String, String> inputTopic = driver.createInputTopic("TOPIC_NAME", new StringSerializer(), new StringSerializer());
isExecuted.set(false);
inputTopic.pipeInput(event.key, event.value, Instant.EPOCH);
inputTopic.pipeInput(event.key, event.value, TIMESTAMP);
assertTrue(isExecuted.get());
isExecuted.set(false);
inputTopic.pipeInput(eventFalse.key, eventFalse.value, Instant.EPOCH);
inputTopic.pipeInput(eventFalse.key, eventFalse.value, TIMESTAMP);
assertFalse(isExecuted.get());
}
}
@ -245,7 +247,7 @@ public class ProcessingExceptionHandlerIntegrationTest {
try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), properties, Instant.ofEpochMilli(0L))) {
final TestInputTopic<String, String> inputTopic = driver.createInputTopic("TOPIC_NAME", new StringSerializer(), new StringSerializer());
isExecuted.set(false);
inputTopic.pipeInput(event.key, event.value, Instant.EPOCH);
inputTopic.pipeInput(event.key, event.value, TIMESTAMP);
assertTrue(isExecuted.get());
isExecuted.set(false);
final StreamsException e = assertThrows(StreamsException.class, () -> inputTopic.pipeInput(eventError.key, eventError.value, Instant.EPOCH));
@ -281,7 +283,7 @@ public class ProcessingExceptionHandlerIntegrationTest {
try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), properties, Instant.ofEpochMilli(0L))) {
final TestInputTopic<String, String> inputTopic = driver.createInputTopic("TOPIC_NAME", new StringSerializer(), new StringSerializer());
isExecuted.set(false);
inputTopic.pipeInput(event.key, event.value, Instant.EPOCH);
inputTopic.pipeInput(event.key, event.value, TIMESTAMP);
assertTrue(isExecuted.get());
isExecuted.set(false);
final StreamsException e = assertThrows(StreamsException.class, () -> inputTopic.pipeInput(eventError.key, eventError.value, Instant.EPOCH));
@ -328,6 +330,7 @@ public class ProcessingExceptionHandlerIntegrationTest {
assertTrue(Arrays.asList("ID123-A2", "ID123-A5").contains((String) record.value()));
assertEquals("TOPIC_NAME", context.topic());
assertEquals("KSTREAM-PROCESSOR-0000000003", context.processorNodeId());
assertEquals(TIMESTAMP.toEpochMilli(), context.timestamp());
assertTrue(exception.getMessage().contains("Exception should be handled by processing exception handler"));
}

View File

@ -356,6 +356,7 @@ public class ProcessorNodeTest {
assertEquals(internalProcessorContext.offset(), context.offset());
assertEquals(internalProcessorContext.currentNode().name(), context.processorNodeId());
assertEquals(internalProcessorContext.taskId(), context.taskId());
assertEquals(internalProcessorContext.timestamp(), context.timestamp());
assertEquals(KEY, record.key());
assertEquals(VALUE, record.value());
assertInstanceOf(RuntimeException.class, exception);

View File

@ -1914,6 +1914,7 @@ public class RecordCollectorTest {
assertEquals(expectedContext.recordContext().offset(), context.offset());
assertEquals(expectedProcessorNodeId, context.processorNodeId());
assertEquals(expectedTaskId, context.taskId());
assertEquals(expectedContext.recordContext().timestamp(), context.timestamp());
assertInstanceOf(RuntimeException.class, exception);
assertEquals("KABOOM!", exception.getMessage());
}

View File

@ -280,6 +280,7 @@ public class RecordDeserializerTest {
assertEquals(expectedRecord.offset(), context.offset());
assertEquals(expectedProcessorNodeId, context.processorNodeId());
assertEquals(expectedTaskId, context.taskId());
assertEquals(expectedRecord.timestamp(), context.timestamp());
assertEquals(expectedRecord, record);
assertInstanceOf(RuntimeException.class, exception);
assertEquals("KABOOM!", exception.getMessage());