KAFKA-16505: Add source raw key and value (#18739)

This PR is part of the KIP-1034.

It brings the support for the source raw key and the source raw
value in the `ErrorHandlerContext`. Required by the routing to DLQ implemented
by https://github.com/apache/kafka/pull/17942.

Reviewers: Bruno Cadonna <cadonna@apache.org>

Co-authored-by: Damien Gasparina <d.gasparina@gmail.com>
This commit is contained in:
Loïc GREFFIER 2025-06-05 09:35:03 +01:00 committed by GitHub
parent 8eb84399f6
commit 3edb406f98
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
24 changed files with 464 additions and 35 deletions

View File

@ -16,14 +16,17 @@
*/ */
package org.apache.kafka.streams.integration; package org.apache.kafka.streams.integration;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.MetricName; import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.KeyValueTimestamp; import org.apache.kafka.streams.KeyValueTimestamp;
import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.TestInputTopic; import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyTestDriver; import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.errors.ErrorHandlerContext; import org.apache.kafka.streams.errors.ErrorHandlerContext;
import org.apache.kafka.streams.errors.LogAndContinueProcessingExceptionHandler; import org.apache.kafka.streams.errors.LogAndContinueProcessingExceptionHandler;
@ -31,14 +34,22 @@ import org.apache.kafka.streams.errors.LogAndFailProcessingExceptionHandler;
import org.apache.kafka.streams.errors.ProcessingExceptionHandler; import org.apache.kafka.streams.errors.ProcessingExceptionHandler;
import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.StreamJoined;
import org.apache.kafka.streams.processor.api.ContextualProcessor; import org.apache.kafka.streams.processor.api.ContextualProcessor;
import org.apache.kafka.streams.processor.api.ProcessorSupplier; import org.apache.kafka.streams.processor.api.ProcessorSupplier;
import org.apache.kafka.streams.processor.api.Record; import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.test.MockProcessorSupplier; import org.apache.kafka.test.MockProcessorSupplier;
import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout; import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import java.time.Duration; import java.time.Duration;
import java.time.Instant; import java.time.Instant;
@ -48,6 +59,7 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Properties; import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Stream;
import static org.apache.kafka.common.utils.Utils.mkEntry; import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap; import static org.apache.kafka.common.utils.Utils.mkMap;
@ -385,6 +397,131 @@ public class ProcessingExceptionHandlerIntegrationTest {
} }
} }
static Stream<Arguments> sourceRawRecordTopologyTestCases() {
// Validate source raw key and source raw value for fully stateless topology
final List<ProducerRecord<String, String>> statelessTopologyEvent = List.of(new ProducerRecord<>("TOPIC_NAME", "ID123-1", "ID123-A1"));
final StreamsBuilder statelessTopologyBuilder = new StreamsBuilder();
statelessTopologyBuilder
.stream("TOPIC_NAME", Consumed.with(Serdes.String(), Serdes.String()))
.selectKey((key, value) -> "newKey")
.mapValues(value -> {
throw new RuntimeException("Error");
});
// Validate source raw key and source raw value for processing exception in aggregator with caching enabled
final List<ProducerRecord<String, String>> cacheAggregateExceptionInAggregatorEvent = List.of(new ProducerRecord<>("TOPIC_NAME", "INITIAL-KEY123-1", "ID123-A1"));
final StreamsBuilder cacheAggregateExceptionInAggregatorTopologyBuilder = new StreamsBuilder();
cacheAggregateExceptionInAggregatorTopologyBuilder
.stream("TOPIC_NAME", Consumed.with(Serdes.String(), Serdes.String()))
.groupBy((key, value) -> "ID123-1", Grouped.with(Serdes.String(), Serdes.String()))
.aggregate(() -> "initialValue",
(key, value, aggregate) -> {
throw new RuntimeException("Error");
},
Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("aggregate")
.withKeySerde(Serdes.String())
.withValueSerde(Serdes.String())
.withCachingEnabled());
// Validate source raw key and source raw value for processing exception after aggregation with caching enabled
final List<ProducerRecord<String, String>> cacheAggregateExceptionAfterAggregationEvent = List.of(new ProducerRecord<>("TOPIC_NAME", "INITIAL-KEY123-1", "ID123-A1"));
final StreamsBuilder cacheAggregateExceptionAfterAggregationTopologyBuilder = new StreamsBuilder();
cacheAggregateExceptionAfterAggregationTopologyBuilder
.stream("TOPIC_NAME", Consumed.with(Serdes.String(), Serdes.String()))
.groupBy((key, value) -> "ID123-1", Grouped.with(Serdes.String(), Serdes.String()))
.aggregate(() -> "initialValue",
(key, value, aggregate) -> value,
Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("aggregate")
.withKeySerde(Serdes.String())
.withValueSerde(Serdes.String())
.withCachingEnabled())
.mapValues(value -> {
throw new RuntimeException("Error");
});
// Validate source raw key and source raw value for processing exception after aggregation with caching disabled
final List<ProducerRecord<String, String>> noCacheAggregateExceptionAfterAggregationEvents = List.of(new ProducerRecord<>("TOPIC_NAME", "INITIAL-KEY123-1", "ID123-A1"));
final StreamsBuilder noCacheAggregateExceptionAfterAggregationTopologyBuilder = new StreamsBuilder();
noCacheAggregateExceptionAfterAggregationTopologyBuilder
.stream("TOPIC_NAME", Consumed.with(Serdes.String(), Serdes.String()))
.groupBy((key, value) -> "ID123-1", Grouped.with(Serdes.String(), Serdes.String()))
.aggregate(() -> "initialValue",
(key, value, aggregate) -> value,
Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("aggregate")
.withKeySerde(Serdes.String())
.withValueSerde(Serdes.String())
.withCachingDisabled())
.mapValues(value -> {
throw new RuntimeException("Error");
});
// Validate source raw key and source raw value for processing exception after table creation with caching enabled
final List<ProducerRecord<String, String>> cacheTableEvents = List.of(new ProducerRecord<>("TOPIC_NAME", "ID123-1", "ID123-A1"));
final StreamsBuilder cacheTableTopologyBuilder = new StreamsBuilder();
cacheTableTopologyBuilder
.table("TOPIC_NAME", Consumed.with(Serdes.String(), Serdes.String()),
Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("table")
.withKeySerde(Serdes.String())
.withValueSerde(Serdes.String())
.withCachingEnabled())
.mapValues(value -> {
throw new RuntimeException("Error");
});
// Validate source raw key and source raw value for processing exception in join
final List<ProducerRecord<String, String>> joinEvents = List.of(
new ProducerRecord<>("TOPIC_NAME_2", "INITIAL-KEY123-1", "ID123-A1"),
new ProducerRecord<>("TOPIC_NAME", "INITIAL-KEY123-2", "ID123-A1")
);
final StreamsBuilder joinTopologyBuilder = new StreamsBuilder();
joinTopologyBuilder
.stream("TOPIC_NAME", Consumed.with(Serdes.String(), Serdes.String()))
.selectKey((key, value) -> "ID123-1")
.leftJoin(joinTopologyBuilder.stream("TOPIC_NAME_2", Consumed.with(Serdes.String(), Serdes.String()))
.selectKey((key, value) -> "ID123-1"),
(key, left, right) -> {
throw new RuntimeException("Error");
},
JoinWindows.ofTimeDifferenceAndGrace(Duration.ofMinutes(5), Duration.ofMinutes(1)),
StreamJoined.with(
Serdes.String(), Serdes.String(), Serdes.String())
.withName("join-rekey")
.withStoreName("join-store"));
return Stream.of(
Arguments.of(statelessTopologyEvent, statelessTopologyBuilder.build()),
Arguments.of(cacheAggregateExceptionInAggregatorEvent, cacheAggregateExceptionInAggregatorTopologyBuilder.build()),
Arguments.of(cacheAggregateExceptionAfterAggregationEvent, noCacheAggregateExceptionAfterAggregationTopologyBuilder.build()),
Arguments.of(noCacheAggregateExceptionAfterAggregationEvents, cacheAggregateExceptionInAggregatorTopologyBuilder.build()),
Arguments.of(cacheTableEvents, cacheTableTopologyBuilder.build()),
Arguments.of(joinEvents, joinTopologyBuilder.build())
);
}
@ParameterizedTest
@MethodSource("sourceRawRecordTopologyTestCases")
public void shouldVerifySourceRawKeyAndSourceRawValuePresentOrNotInErrorHandlerContext(final List<ProducerRecord<String, String>> events,
final Topology topology) {
final Properties properties = new Properties();
properties.put(StreamsConfig.PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG,
AssertSourceRawRecordProcessingExceptionHandlerMockTest.class);
try (final TopologyTestDriver driver = new TopologyTestDriver(topology, properties, Instant.ofEpochMilli(0L))) {
for (final ProducerRecord<String, String> event : events) {
final TestInputTopic<String, String> inputTopic = driver.createInputTopic(event.topic(), new StringSerializer(), new StringSerializer());
final String key = event.key();
final String value = event.value();
if (event.topic().equals("TOPIC_NAME")) {
assertThrows(StreamsException.class, () -> inputTopic.pipeInput(key, value, TIMESTAMP));
} else {
inputTopic.pipeInput(event.key(), event.value(), TIMESTAMP);
}
}
}
}
public static class ContinueProcessingExceptionHandlerMockTest implements ProcessingExceptionHandler { public static class ContinueProcessingExceptionHandlerMockTest implements ProcessingExceptionHandler {
@Override @Override
public ProcessingExceptionHandler.ProcessingHandlerResponse handle(final ErrorHandlerContext context, final Record<?, ?> record, final Exception exception) { public ProcessingExceptionHandler.ProcessingHandlerResponse handle(final ErrorHandlerContext context, final Record<?, ?> record, final Exception exception) {
@ -422,10 +559,28 @@ public class ProcessingExceptionHandlerIntegrationTest {
assertTrue(Arrays.asList("ID123-A2", "ID123-A5").contains((String) record.value())); assertTrue(Arrays.asList("ID123-A2", "ID123-A5").contains((String) record.value()));
assertEquals("TOPIC_NAME", context.topic()); assertEquals("TOPIC_NAME", context.topic());
assertEquals("KSTREAM-PROCESSOR-0000000003", context.processorNodeId()); assertEquals("KSTREAM-PROCESSOR-0000000003", context.processorNodeId());
assertTrue(Arrays.equals("ID123-2-ERR".getBytes(), context.sourceRawKey())
|| Arrays.equals("ID123-5-ERR".getBytes(), context.sourceRawKey()));
assertTrue(Arrays.equals("ID123-A2".getBytes(), context.sourceRawValue())
|| Arrays.equals("ID123-A5".getBytes(), context.sourceRawValue()));
assertEquals(TIMESTAMP.toEpochMilli(), context.timestamp()); assertEquals(TIMESTAMP.toEpochMilli(), context.timestamp());
assertTrue(exception.getMessage().contains("Exception should be handled by processing exception handler")); assertTrue(exception.getMessage().contains("Exception should be handled by processing exception handler"));
} }
public static class AssertSourceRawRecordProcessingExceptionHandlerMockTest implements ProcessingExceptionHandler {
@Override
public ProcessingExceptionHandler.ProcessingHandlerResponse handle(final ErrorHandlerContext context, final Record<?, ?> record, final Exception exception) {
assertEquals("ID123-1", Serdes.String().deserializer().deserialize("topic", context.sourceRawKey()));
assertEquals("ID123-A1", Serdes.String().deserializer().deserialize("topic", context.sourceRawValue()));
return ProcessingExceptionHandler.ProcessingHandlerResponse.FAIL;
}
@Override
public void configure(final Map<String, ?> configs) {
// No-op
}
}
/** /**
* Metric name for dropped records total. * Metric name for dropped records total.
* *

View File

@ -147,4 +147,38 @@ public interface ErrorHandlerContext {
* @return The timestamp. * @return The timestamp.
*/ */
long timestamp(); long timestamp();
/**
* Return the non-deserialized byte[] of the input message key if the context has been triggered by a message.
*
* <p> If this method is invoked within a {@link Punctuator#punctuate(long)
* punctuation callback}, or while processing a record that was forwarded by a punctuation
* callback, it will return null.
*
* <p> If this method is invoked in a sub-topology due to a repartition, the returned key would be one sent
* to the repartition topic.
*
* <p> Always returns null if this method is invoked within a
* ProductionExceptionHandler.handle(ErrorHandlerContext, ProducerRecord, Exception)
*
* @return the raw byte of the key of the source message
*/
byte[] sourceRawKey();
/**
* Return the non-deserialized byte[] of the input message value if the context has been triggered by a message.
*
* <p> If this method is invoked within a {@link Punctuator#punctuate(long)
* punctuation callback}, or while processing a record that was forwarded by a punctuation
* callback, it will return {@code null}.
*
* <p> If this method is invoked in a sub-topology due to a repartition, the returned key would be one sent
* to the repartition topic.
*
* <p> Always returns null if this method is invoked within a
* ProductionExceptionHandler.handle(ErrorHandlerContext, ProducerRecord, Exception)
*
* @return the raw byte of the value of the source message
*/
byte[] sourceRawValue();
} }

View File

@ -33,6 +33,8 @@ public class DefaultErrorHandlerContext implements ErrorHandlerContext {
private final Headers headers; private final Headers headers;
private final String processorNodeId; private final String processorNodeId;
private final TaskId taskId; private final TaskId taskId;
private final byte[] sourceRawKey;
private final byte[] sourceRawValue;
private final long timestamp; private final long timestamp;
private final ProcessorContext processorContext; private final ProcessorContext processorContext;
@ -44,7 +46,9 @@ public class DefaultErrorHandlerContext implements ErrorHandlerContext {
final Headers headers, final Headers headers,
final String processorNodeId, final String processorNodeId,
final TaskId taskId, final TaskId taskId,
final long timestamp) { final long timestamp,
final byte[] sourceRawKey,
final byte[] sourceRawValue) {
this.topic = topic; this.topic = topic;
this.partition = partition; this.partition = partition;
this.offset = offset; this.offset = offset;
@ -53,6 +57,8 @@ public class DefaultErrorHandlerContext implements ErrorHandlerContext {
this.taskId = taskId; this.taskId = taskId;
this.processorContext = processorContext; this.processorContext = processorContext;
this.timestamp = timestamp; this.timestamp = timestamp;
this.sourceRawKey = sourceRawKey;
this.sourceRawValue = sourceRawValue;
} }
@Override @Override
@ -90,6 +96,14 @@ public class DefaultErrorHandlerContext implements ErrorHandlerContext {
return timestamp; return timestamp;
} }
public byte[] sourceRawKey() {
return sourceRawKey;
}
public byte[] sourceRawValue() {
return sourceRawValue;
}
@Override @Override
public String toString() { public String toString() {
// we do exclude headers on purpose, to not accidentally log user data // we do exclude headers on purpose, to not accidentally log user data

View File

@ -110,4 +110,31 @@ public interface RecordContext {
*/ */
Headers headers(); Headers headers();
/**
* Return the non-deserialized byte[] of the input message key if the context has been triggered by a message.
*
* <p> If this method is invoked within a {@link Punctuator#punctuate(long)
* punctuation callback}, or while processing a record that was forwarded by a punctuation
* callback, it will return {@code null}.
*
* <p> If this method is invoked in a sub-topology due to a repartition, the returned key would be one sent
* to the repartition topic.
*
* @return the raw byte of the key of the source message
*/
byte[] sourceRawKey();
/**
* Return the non-deserialized byte[] of the input message value if the context has been triggered by a message.
*
* <p> If this method is invoked within a {@link Punctuator#punctuate(long)
* punctuation callback}, or while processing a record that was forwarded by a punctuation
* callback, it will return {@code null}.
*
* <p> If this method is invoked in a sub-topology due to a repartition, the returned key would be one sent
* to the repartition topic.
*
* @return the raw byte of the value of the source message
*/
byte[] sourceRawValue();
} }

View File

@ -260,7 +260,10 @@ public final class ProcessorContextImpl extends AbstractProcessorContext<Object,
recordContext.offset(), recordContext.offset(),
recordContext.partition(), recordContext.partition(),
recordContext.topic(), recordContext.topic(),
record.headers()); record.headers(),
recordContext.sourceRawKey(),
recordContext.sourceRawValue()
);
} }
if (childName == null) { if (childName == null) {

View File

@ -215,7 +215,9 @@ public class ProcessorNode<KIn, VIn, KOut, VOut> {
internalProcessorContext.recordContext().headers(), internalProcessorContext.recordContext().headers(),
internalProcessorContext.currentNode().name(), internalProcessorContext.currentNode().name(),
internalProcessorContext.taskId(), internalProcessorContext.taskId(),
internalProcessorContext.recordContext().timestamp() internalProcessorContext.recordContext().timestamp(),
internalProcessorContext.recordContext().sourceRawKey(),
internalProcessorContext.recordContext().sourceRawValue()
); );
final ProcessingExceptionHandler.ProcessingHandlerResponse response; final ProcessingExceptionHandler.ProcessingHandlerResponse response;

View File

@ -24,6 +24,7 @@ import org.apache.kafka.streams.processor.RecordContext;
import org.apache.kafka.streams.processor.api.RecordMetadata; import org.apache.kafka.streams.processor.api.RecordMetadata;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Objects; import java.util.Objects;
import static java.nio.charset.StandardCharsets.UTF_8; import static java.nio.charset.StandardCharsets.UTF_8;
@ -37,6 +38,8 @@ public class ProcessorRecordContext implements RecordContext, RecordMetadata {
private final String topic; private final String topic;
private final int partition; private final int partition;
private final Headers headers; private final Headers headers;
private byte[] sourceRawKey;
private byte[] sourceRawValue;
public ProcessorRecordContext(final long timestamp, public ProcessorRecordContext(final long timestamp,
final long offset, final long offset,
@ -48,6 +51,24 @@ public class ProcessorRecordContext implements RecordContext, RecordMetadata {
this.topic = topic; this.topic = topic;
this.partition = partition; this.partition = partition;
this.headers = Objects.requireNonNull(headers); this.headers = Objects.requireNonNull(headers);
this.sourceRawKey = null;
this.sourceRawValue = null;
}
public ProcessorRecordContext(final long timestamp,
final long offset,
final int partition,
final String topic,
final Headers headers,
final byte[] sourceRawKey,
final byte[] sourceRawValue) {
this.timestamp = timestamp;
this.offset = offset;
this.topic = topic;
this.partition = partition;
this.headers = Objects.requireNonNull(headers);
this.sourceRawKey = sourceRawKey;
this.sourceRawValue = sourceRawValue;
} }
@Override @Override
@ -75,6 +96,16 @@ public class ProcessorRecordContext implements RecordContext, RecordMetadata {
return headers; return headers;
} }
@Override
public byte[] sourceRawKey() {
return sourceRawKey;
}
@Override
public byte[] sourceRawValue() {
return sourceRawValue;
}
public long residentMemorySizeEstimate() { public long residentMemorySizeEstimate() {
long size = 0; long size = 0;
size += Long.BYTES; // value.context.timestamp size += Long.BYTES; // value.context.timestamp
@ -176,6 +207,11 @@ public class ProcessorRecordContext implements RecordContext, RecordMetadata {
return new ProcessorRecordContext(timestamp, offset, partition, topic, headers); return new ProcessorRecordContext(timestamp, offset, partition, topic, headers);
} }
public void freeRawRecord() {
this.sourceRawKey = null;
this.sourceRawValue = null;
}
@Override @Override
public boolean equals(final Object o) { public boolean equals(final Object o) {
if (this == o) { if (this == o) {
@ -189,7 +225,9 @@ public class ProcessorRecordContext implements RecordContext, RecordMetadata {
offset == that.offset && offset == that.offset &&
partition == that.partition && partition == that.partition &&
Objects.equals(topic, that.topic) && Objects.equals(topic, that.topic) &&
Objects.equals(headers, that.headers); Objects.equals(headers, that.headers) &&
Arrays.equals(sourceRawKey, that.sourceRawKey) &&
Arrays.equals(sourceRawValue, that.sourceRawValue);
} }
/** /**

View File

@ -259,6 +259,10 @@ public class RecordCollectorImpl implements RecordCollector {
final ProducerRecord<byte[], byte[]> serializedRecord = new ProducerRecord<>(topic, partition, timestamp, keyBytes, valBytes, headers); final ProducerRecord<byte[], byte[]> serializedRecord = new ProducerRecord<>(topic, partition, timestamp, keyBytes, valBytes, headers);
// As many records could be in-flight,
// freeing raw records in the context to reduce memory pressure
freeRawInputRecordFromContext(context);
streamsProducer.send(serializedRecord, (metadata, exception) -> { streamsProducer.send(serializedRecord, (metadata, exception) -> {
try { try {
// if there's already an exception record, skip logging offsets or new exceptions // if there's already an exception record, skip logging offsets or new exceptions
@ -311,6 +315,12 @@ public class RecordCollectorImpl implements RecordCollector {
}); });
} }
private static void freeRawInputRecordFromContext(final InternalProcessorContext<Void, Void> context) {
if (context != null && context.recordContext() != null) {
context.recordContext().freeRawRecord();
}
}
private <K, V> void handleException(final ProductionExceptionHandler.SerializationExceptionOrigin origin, private <K, V> void handleException(final ProductionExceptionHandler.SerializationExceptionOrigin origin,
final String topic, final String topic,
final K key, final K key,
@ -388,7 +398,9 @@ public class RecordCollectorImpl implements RecordCollector {
recordContext.headers(), recordContext.headers(),
processorNodeId, processorNodeId,
taskId, taskId,
recordContext.timestamp() recordContext.timestamp(),
context.recordContext().sourceRawKey(),
context.recordContext().sourceRawValue()
) : ) :
new DefaultErrorHandlerContext( new DefaultErrorHandlerContext(
context, context,
@ -398,7 +410,9 @@ public class RecordCollectorImpl implements RecordCollector {
new RecordHeaders(), new RecordHeaders(),
processorNodeId, processorNodeId,
taskId, taskId,
-1L -1L,
null,
null
); );
} }

View File

@ -95,7 +95,10 @@ public class RecordDeserializer {
rawRecord.headers(), rawRecord.headers(),
sourceNodeName, sourceNodeName,
processorContext.taskId(), processorContext.taskId(),
rawRecord.timestamp()); rawRecord.timestamp(),
rawRecord.key(),
rawRecord.value()
);
final DeserializationHandlerResponse response; final DeserializationHandlerResponse response;
try { try {

View File

@ -243,7 +243,7 @@ public class RecordQueue {
lastCorruptedRecord = raw; lastCorruptedRecord = raw;
continue; continue;
} }
headRecord = new StampedRecord(deserialized, timestamp); headRecord = new StampedRecord(deserialized, timestamp, raw.key(), raw.value());
headRecordSizeInBytes = consumerRecordSizeInBytes(raw); headRecordSizeInBytes = consumerRecordSizeInBytes(raw);
} }

View File

@ -23,8 +23,22 @@ import java.util.Optional;
public class StampedRecord extends Stamped<ConsumerRecord<?, ?>> { public class StampedRecord extends Stamped<ConsumerRecord<?, ?>> {
private final byte[] rawKey;
private final byte[] rawValue;
public StampedRecord(final ConsumerRecord<?, ?> record, final long timestamp) { public StampedRecord(final ConsumerRecord<?, ?> record, final long timestamp) {
super(record, timestamp); super(record, timestamp);
this.rawKey = null;
this.rawValue = null;
}
public StampedRecord(final ConsumerRecord<?, ?> record,
final long timestamp,
final byte[] rawKey,
final byte[] rawValue) {
super(record, timestamp);
this.rawKey = rawKey;
this.rawValue = rawValue;
} }
public String topic() { public String topic() {
@ -55,8 +69,26 @@ public class StampedRecord extends Stamped<ConsumerRecord<?, ?>> {
return value.headers(); return value.headers();
} }
public byte[] rawKey() {
return rawKey;
}
public byte[] rawValue() {
return rawValue;
}
@Override @Override
public String toString() { public String toString() {
return value.toString() + ", timestamp = " + timestamp; return value.toString() + ", timestamp = " + timestamp;
} }
@Override
public boolean equals(final Object other) {
return super.equals(other);
}
@Override
public int hashCode() {
return super.hashCode();
}
} }

View File

@ -856,7 +856,9 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
record.offset(), record.offset(),
record.partition(), record.partition(),
record.topic(), record.topic(),
record.headers() record.headers(),
record.rawKey(),
record.rawValue()
); );
updateProcessorContext(currNode, wallClockTime, recordContext); updateProcessorContext(currNode, wallClockTime, recordContext);
@ -938,7 +940,9 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
recordContext.headers(), recordContext.headers(),
node.name(), node.name(),
id(), id(),
recordContext.timestamp() recordContext.timestamp(),
recordContext.sourceRawKey(),
recordContext.sourceRawValue()
); );
final ProcessingExceptionHandler.ProcessingHandlerResponse response; final ProcessingExceptionHandler.ProcessingHandlerResponse response;

View File

@ -277,7 +277,9 @@ public class CachingKeyValueStore
internalContext.recordContext().offset(), internalContext.recordContext().offset(),
internalContext.recordContext().timestamp(), internalContext.recordContext().timestamp(),
internalContext.recordContext().partition(), internalContext.recordContext().partition(),
internalContext.recordContext().topic() internalContext.recordContext().topic(),
internalContext.recordContext().sourceRawKey(),
internalContext.recordContext().sourceRawValue()
) )
); );

View File

@ -140,7 +140,9 @@ class CachingSessionStore
internalContext.recordContext().offset(), internalContext.recordContext().offset(),
internalContext.recordContext().timestamp(), internalContext.recordContext().timestamp(),
internalContext.recordContext().partition(), internalContext.recordContext().partition(),
internalContext.recordContext().topic() internalContext.recordContext().topic(),
internalContext.recordContext().sourceRawKey(),
internalContext.recordContext().sourceRawValue()
); );
internalContext.cache().put(cacheName, cacheFunction.cacheKey(binaryKey), entry); internalContext.cache().put(cacheName, cacheFunction.cacheKey(binaryKey), entry);

View File

@ -158,7 +158,9 @@ class CachingWindowStore
internalContext.recordContext().offset(), internalContext.recordContext().offset(),
internalContext.recordContext().timestamp(), internalContext.recordContext().timestamp(),
internalContext.recordContext().partition(), internalContext.recordContext().partition(),
internalContext.recordContext().topic() internalContext.recordContext().topic(),
internalContext.recordContext().sourceRawKey(),
internalContext.recordContext().sourceRawValue()
); );
internalContext.cache().put(cacheName, cacheFunction.cacheKey(keyBytes), entry); internalContext.cache().put(cacheName, cacheFunction.cacheKey(keyBytes), entry);

View File

@ -32,7 +32,7 @@ class LRUCacheEntry {
LRUCacheEntry(final byte[] value) { LRUCacheEntry(final byte[] value) {
this(value, new RecordHeaders(), false, -1, -1, -1, ""); this(value, new RecordHeaders(), false, -1, -1, -1, "", null, null);
} }
LRUCacheEntry(final byte[] value, LRUCacheEntry(final byte[] value,
@ -41,8 +41,18 @@ class LRUCacheEntry {
final long offset, final long offset,
final long timestamp, final long timestamp,
final int partition, final int partition,
final String topic) { final String topic,
final ProcessorRecordContext context = new ProcessorRecordContext(timestamp, offset, partition, topic, headers); final byte[] rawKey,
final byte[] rawValue) {
final ProcessorRecordContext context = new ProcessorRecordContext(
timestamp,
offset,
partition,
topic,
headers,
rawKey,
rawValue
);
this.record = new ContextualRecord( this.record = new ContextualRecord(
value, value,

View File

@ -261,7 +261,9 @@ class TimeOrderedCachingWindowStore
internalContext.recordContext().offset(), internalContext.recordContext().offset(),
internalContext.recordContext().timestamp(), internalContext.recordContext().timestamp(),
internalContext.recordContext().partition(), internalContext.recordContext().partition(),
internalContext.recordContext().topic() internalContext.recordContext().topic(),
internalContext.recordContext().sourceRawKey(),
internalContext.recordContext().sourceRawValue()
); );
// Put to index first so that base can be evicted later // Put to index first so that base can be evicted later
@ -279,7 +281,9 @@ class TimeOrderedCachingWindowStore
internalContext.recordContext().offset(), internalContext.recordContext().offset(),
internalContext.recordContext().timestamp(), internalContext.recordContext().timestamp(),
internalContext.recordContext().partition(), internalContext.recordContext().partition(),
"" "",
internalContext.recordContext().sourceRawKey(),
internalContext.recordContext().sourceRawValue()
); );
final Bytes indexKey = KeyFirstWindowKeySchema.toStoreKeyBinary(key, windowStartTimestamp, 0); final Bytes indexKey = KeyFirstWindowKeySchema.toStoreKeyBinary(key, windowStartTimestamp, 0);
internalContext.cache().put(cacheName, indexKeyCacheFunction.cacheKey(indexKey), emptyEntry); internalContext.cache().put(cacheName, indexKeyCacheFunction.cacheKey(indexKey), emptyEntry);

View File

@ -80,6 +80,8 @@ public class ProcessorNodeTest {
private static final String NAME = "name"; private static final String NAME = "name";
private static final String KEY = "key"; private static final String KEY = "key";
private static final String VALUE = "value"; private static final String VALUE = "value";
private static final byte[] RAW_KEY = KEY.getBytes();
private static final byte[] RAW_VALUE = VALUE.getBytes();
@Test @Test
public void shouldThrowStreamsExceptionIfExceptionCaughtDuringInit() { public void shouldThrowStreamsExceptionIfExceptionCaughtDuringInit() {
@ -331,7 +333,9 @@ public class ProcessorNodeTest {
OFFSET, OFFSET,
PARTITION, PARTITION,
TOPIC, TOPIC,
new RecordHeaders())); new RecordHeaders(),
RAW_KEY,
RAW_VALUE));
when(internalProcessorContext.currentNode()).thenReturn(new ProcessorNode<>(NAME)); when(internalProcessorContext.currentNode()).thenReturn(new ProcessorNode<>(NAME));
return internalProcessorContext; return internalProcessorContext;
@ -359,6 +363,9 @@ public class ProcessorNodeTest {
assertEquals(internalProcessorContext.currentNode().name(), context.processorNodeId()); assertEquals(internalProcessorContext.currentNode().name(), context.processorNodeId());
assertEquals(internalProcessorContext.taskId(), context.taskId()); assertEquals(internalProcessorContext.taskId(), context.taskId());
assertEquals(internalProcessorContext.recordContext().timestamp(), context.timestamp()); assertEquals(internalProcessorContext.recordContext().timestamp(), context.timestamp());
assertEquals(internalProcessorContext.recordContext().sourceRawKey(), context.sourceRawKey());
assertEquals(internalProcessorContext.recordContext().sourceRawValue(), context.sourceRawValue());
assertEquals(KEY, record.key()); assertEquals(KEY, record.key());
assertEquals(VALUE, record.value()); assertEquals(VALUE, record.value());
assertInstanceOf(RuntimeException.class, exception); assertInstanceOf(RuntimeException.class, exception);

View File

@ -100,6 +100,8 @@ import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertInstanceOf; import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.any;
@ -1890,6 +1892,68 @@ public class RecordCollectorTest {
)); ));
} }
@Test
public void shouldFreeRawRecordsInContextBeforeSending() {
final KafkaException exception = new KafkaException("KABOOM!");
final byte[][] sourceRawData = new byte[][]{new byte[]{}, new byte[]{}};
final RecordCollector collector = new RecordCollectorImpl(
logContext,
taskId,
getExceptionalStreamsProducerOnSend(exception),
new ProductionExceptionHandler() {
@Override
public void configure(final Map<String, ?> configs) {
}
@Override
public ProductionExceptionHandlerResponse handle(final ErrorHandlerContext context, final ProducerRecord<byte[], byte[]> record, final Exception exception) {
sourceRawData[0] = context.sourceRawKey();
sourceRawData[1] = context.sourceRawValue();
return ProductionExceptionHandlerResponse.CONTINUE;
}
},
streamsMetrics,
topology
);
collector.send(topic, "3", "0", null, null, stringSerializer, stringSerializer, sinkNodeName, context, streamPartitioner);
assertNull(sourceRawData[0]);
assertNull(sourceRawData[1]);
}
@Test
public void shouldHaveRawDataDuringExceptionInSerialization() {
final byte[][] sourceRawData = new byte[][]{new byte[]{}, new byte[]{}};
try (final ErrorStringSerializer errorSerializer = new ErrorStringSerializer()) {
final RecordCollector collector = newRecordCollector(
new ProductionExceptionHandler() {
@Override
@SuppressWarnings({"rawtypes", "unused"})
public ProductionExceptionHandlerResponse handleSerializationException(final ErrorHandlerContext context, final ProducerRecord record, final Exception exception, final SerializationExceptionOrigin origin) {
sourceRawData[0] = context.sourceRawKey();
sourceRawData[1] = context.sourceRawValue();
return ProductionExceptionHandlerResponse.CONTINUE;
}
@Override
public void configure(final Map<String, ?> configs) {
}
}
);
collector.initialize();
collector.send(topic, "hello", "val", null, 0, null, (Serializer) errorSerializer, stringSerializer, sinkNodeName, context);
assertNotNull(sourceRawData[0]);
assertNotNull(sourceRawData[1]);
}
}
private RecordCollector newRecordCollector(final ProductionExceptionHandler productionExceptionHandler) { private RecordCollector newRecordCollector(final ProductionExceptionHandler productionExceptionHandler) {
return new RecordCollectorImpl( return new RecordCollectorImpl(
logContext, logContext,

View File

@ -44,6 +44,8 @@ public class NamedCacheTest {
private final Headers headers = new RecordHeaders(new Header[]{new RecordHeader("key", "value".getBytes())}); private final Headers headers = new RecordHeaders(new Header[]{new RecordHeader("key", "value".getBytes())});
private NamedCache cache; private NamedCache cache;
private final byte[] rawKey = new byte[]{0};
private final byte[] rawValue = new byte[]{0};
@BeforeEach @BeforeEach
public void setUp() { public void setUp() {
@ -64,7 +66,7 @@ public class NamedCacheTest {
final byte[] key = stringStringKeyValue.key.getBytes(); final byte[] key = stringStringKeyValue.key.getBytes();
final byte[] value = stringStringKeyValue.value.getBytes(); final byte[] value = stringStringKeyValue.value.getBytes();
cache.put(Bytes.wrap(key), cache.put(Bytes.wrap(key),
new LRUCacheEntry(value, new RecordHeaders(), true, 1, 1, 1, "")); new LRUCacheEntry(value, new RecordHeaders(), true, 1, 1, 1, "", rawKey, rawValue));
final LRUCacheEntry head = cache.first(); final LRUCacheEntry head = cache.first();
final LRUCacheEntry tail = cache.last(); final LRUCacheEntry tail = cache.last();
assertEquals(new String(head.value()), stringStringKeyValue.value); assertEquals(new String(head.value()), stringStringKeyValue.value);
@ -152,9 +154,9 @@ public class NamedCacheTest {
@Test @Test
public void shouldFlushDirtEntriesOnEviction() { public void shouldFlushDirtEntriesOnEviction() {
final List<ThreadCache.DirtyEntry> flushed = new ArrayList<>(); final List<ThreadCache.DirtyEntry> flushed = new ArrayList<>();
cache.put(Bytes.wrap(new byte[]{0}), new LRUCacheEntry(new byte[]{10}, headers, true, 0, 0, 0, "")); cache.put(Bytes.wrap(new byte[]{0}), new LRUCacheEntry(new byte[]{10}, headers, true, 0, 0, 0, "", rawKey, rawValue));
cache.put(Bytes.wrap(new byte[]{1}), new LRUCacheEntry(new byte[]{20})); cache.put(Bytes.wrap(new byte[]{1}), new LRUCacheEntry(new byte[]{20}));
cache.put(Bytes.wrap(new byte[]{2}), new LRUCacheEntry(new byte[]{30}, headers, true, 0, 0, 0, "")); cache.put(Bytes.wrap(new byte[]{2}), new LRUCacheEntry(new byte[]{30}, headers, true, 0, 0, 0, "", rawKey, rawValue));
cache.setListener(flushed::addAll); cache.setListener(flushed::addAll);
@ -176,16 +178,16 @@ public class NamedCacheTest {
@Test @Test
public void shouldThrowIllegalStateExceptionWhenTryingToOverwriteDirtyEntryWithCleanEntry() { public void shouldThrowIllegalStateExceptionWhenTryingToOverwriteDirtyEntryWithCleanEntry() {
cache.put(Bytes.wrap(new byte[]{0}), new LRUCacheEntry(new byte[]{10}, headers, true, 0, 0, 0, "")); cache.put(Bytes.wrap(new byte[]{0}), new LRUCacheEntry(new byte[]{10}, headers, true, 0, 0, 0, "", rawKey, rawValue));
assertThrows(IllegalStateException.class, () -> cache.put(Bytes.wrap(new byte[]{0}), assertThrows(IllegalStateException.class, () -> cache.put(Bytes.wrap(new byte[]{0}),
new LRUCacheEntry(new byte[]{10}, new RecordHeaders(), false, 0, 0, 0, ""))); new LRUCacheEntry(new byte[]{10}, new RecordHeaders(), false, 0, 0, 0, "", rawKey, rawValue)));
} }
@Test @Test
public void shouldRemoveDeletedValuesOnFlush() { public void shouldRemoveDeletedValuesOnFlush() {
cache.setListener(dirty -> { /* no-op */ }); cache.setListener(dirty -> { /* no-op */ });
cache.put(Bytes.wrap(new byte[]{0}), new LRUCacheEntry(null, headers, true, 0, 0, 0, "")); cache.put(Bytes.wrap(new byte[]{0}), new LRUCacheEntry(null, headers, true, 0, 0, 0, "", rawKey, rawValue));
cache.put(Bytes.wrap(new byte[]{1}), new LRUCacheEntry(new byte[]{20}, new RecordHeaders(), true, 0, 0, 0, "")); cache.put(Bytes.wrap(new byte[]{1}), new LRUCacheEntry(new byte[]{20}, new RecordHeaders(), true, 0, 0, 0, "", rawKey, rawValue));
cache.flush(); cache.flush();
assertEquals(1, cache.size()); assertEquals(1, cache.size());
assertNotNull(cache.get(Bytes.wrap(new byte[]{1}))); assertNotNull(cache.get(Bytes.wrap(new byte[]{1})));
@ -193,7 +195,7 @@ public class NamedCacheTest {
@Test @Test
public void shouldBeReentrantAndNotBreakLRU() { public void shouldBeReentrantAndNotBreakLRU() {
final LRUCacheEntry dirty = new LRUCacheEntry(new byte[]{3}, new RecordHeaders(), true, 0, 0, 0, ""); final LRUCacheEntry dirty = new LRUCacheEntry(new byte[]{3}, new RecordHeaders(), true, 0, 0, 0, "", rawKey, rawValue);
final LRUCacheEntry clean = new LRUCacheEntry(new byte[]{3}); final LRUCacheEntry clean = new LRUCacheEntry(new byte[]{3});
cache.put(Bytes.wrap(new byte[]{0}), dirty); cache.put(Bytes.wrap(new byte[]{0}), dirty);
cache.put(Bytes.wrap(new byte[]{1}), clean); cache.put(Bytes.wrap(new byte[]{1}), clean);
@ -236,7 +238,7 @@ public class NamedCacheTest {
@Test @Test
public void shouldNotThrowIllegalArgumentAfterEvictingDirtyRecordAndThenPuttingNewRecordWithSameKey() { public void shouldNotThrowIllegalArgumentAfterEvictingDirtyRecordAndThenPuttingNewRecordWithSameKey() {
final LRUCacheEntry dirty = new LRUCacheEntry(new byte[]{3}, new RecordHeaders(), true, 0, 0, 0, ""); final LRUCacheEntry dirty = new LRUCacheEntry(new byte[]{3}, new RecordHeaders(), true, 0, 0, 0, "", rawKey, rawValue);
final LRUCacheEntry clean = new LRUCacheEntry(new byte[]{3}); final LRUCacheEntry clean = new LRUCacheEntry(new byte[]{3});
final Bytes key = Bytes.wrap(new byte[] {3}); final Bytes key = Bytes.wrap(new byte[] {3});
cache.setListener(dirty1 -> cache.put(key, clean)); cache.setListener(dirty1 -> cache.put(key, clean));

View File

@ -48,6 +48,8 @@ public class ThreadCacheTest {
final String namespace2 = "0.2-namespace"; final String namespace2 = "0.2-namespace";
private final LogContext logContext = new LogContext("testCache "); private final LogContext logContext = new LogContext("testCache ");
private final byte[][] bytes = new byte[][]{{0}, {1}, {2}, {3}, {4}, {5}, {6}, {7}, {8}, {9}, {10}}; private final byte[][] bytes = new byte[][]{{0}, {1}, {2}, {3}, {4}, {5}, {6}, {7}, {8}, {9}, {10}};
private final byte[] rawKey = new byte[]{0};
private final byte[] rawValue = new byte[]{0};
@Test @Test
public void basicPutGet() { public void basicPutGet() {
@ -65,7 +67,7 @@ public class ThreadCacheTest {
for (final KeyValue<String, String> kvToInsert : toInsert) { for (final KeyValue<String, String> kvToInsert : toInsert) {
final Bytes key = Bytes.wrap(kvToInsert.key.getBytes()); final Bytes key = Bytes.wrap(kvToInsert.key.getBytes());
final byte[] value = kvToInsert.value.getBytes(); final byte[] value = kvToInsert.value.getBytes();
cache.put(namespace, key, new LRUCacheEntry(value, new RecordHeaders(), true, 1L, 1L, 1, "")); cache.put(namespace, key, new LRUCacheEntry(value, new RecordHeaders(), true, 1L, 1L, 1, "", rawKey, rawValue));
} }
for (final KeyValue<String, String> kvToInsert : toInsert) { for (final KeyValue<String, String> kvToInsert : toInsert) {
@ -98,7 +100,7 @@ public class ThreadCacheTest {
final String keyStr = "K" + i; final String keyStr = "K" + i;
final Bytes key = Bytes.wrap(keyStr.getBytes()); final Bytes key = Bytes.wrap(keyStr.getBytes());
final byte[] value = new byte[valueSizeBytes]; final byte[] value = new byte[valueSizeBytes];
cache.put(namespace, key, new LRUCacheEntry(value, new RecordHeaders(), true, 1L, 1L, 1, "")); cache.put(namespace, key, new LRUCacheEntry(value, new RecordHeaders(), true, 1L, 1L, 1, "", rawKey, rawValue));
} }
@ -176,7 +178,7 @@ public class ThreadCacheTest {
for (final KeyValue<String, String> kvToInsert : toInsert) { for (final KeyValue<String, String> kvToInsert : toInsert) {
final Bytes key = Bytes.wrap(kvToInsert.key.getBytes()); final Bytes key = Bytes.wrap(kvToInsert.key.getBytes());
final byte[] value = kvToInsert.value.getBytes(); final byte[] value = kvToInsert.value.getBytes();
cache.put(namespace, key, new LRUCacheEntry(value, new RecordHeaders(), true, 1, 1, 1, "")); cache.put(namespace, key, new LRUCacheEntry(value, new RecordHeaders(), true, 1, 1, 1, "", rawKey, rawValue));
} }
for (int i = 0; i < expected.size(); i++) { for (int i = 0; i < expected.size(); i++) {
@ -617,7 +619,7 @@ public class ThreadCacheTest {
} }
private LRUCacheEntry dirtyEntry(final byte[] key) { private LRUCacheEntry dirtyEntry(final byte[] key) {
return new LRUCacheEntry(key, new RecordHeaders(), true, -1, -1, -1, ""); return new LRUCacheEntry(key, new RecordHeaders(), true, -1, -1, -1, "", rawKey, rawValue);
} }
private LRUCacheEntry cleanEntry(final byte[] key) { private LRUCacheEntry cleanEntry(final byte[] key) {

View File

@ -938,7 +938,10 @@ public class TimeOrderedCachingPersistentWindowStoreTest {
context.recordContext().offset(), context.recordContext().offset(),
context.recordContext().timestamp(), context.recordContext().timestamp(),
context.recordContext().partition(), context.recordContext().partition(),
"") "",
context.recordContext().sourceRawKey(),
context.recordContext().sourceRawValue()
)
); );
underlyingStore.put(key, value, 1); underlyingStore.put(key, value, 1);

View File

@ -944,7 +944,9 @@ public class TimeOrderedWindowStoreTest {
context.recordContext().offset(), context.recordContext().offset(),
context.recordContext().timestamp(), context.recordContext().timestamp(),
context.recordContext().partition(), context.recordContext().partition(),
"" "",
context.recordContext().sourceRawKey(),
context.recordContext().sourceRawValue()
) )
); );

View File

@ -56,6 +56,7 @@ import org.apache.kafka.streams.state.internals.ThreadCache;
import org.apache.kafka.streams.state.internals.ThreadCache.DirtyEntryFlushListener; import org.apache.kafka.streams.state.internals.ThreadCache.DirtyEntryFlushListener;
import java.io.File; import java.io.File;
import java.nio.charset.StandardCharsets;
import java.time.Duration; import java.time.Duration;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
@ -244,7 +245,9 @@ public class InternalMockProcessorContext<KOut, VOut>
0, 0,
0, 0,
"topic", "topic",
new RecordHeaders() new RecordHeaders(),
"sourceKey".getBytes(StandardCharsets.UTF_8),
"sourceValue".getBytes(StandardCharsets.UTF_8)
); );
} }