KAFKA-8830 make Record Headers available in onAcknowledgement (#17099)

Two sets of tests are added:
1. KafkaProducerTest
- when send success, both record.headers() and onAcknowledgement headers
are read only
- when send failure, record.headers() is writable as before and
onAcknowledgement headers is read only
2. ProducerInterceptorsTest
- make both old and new onAcknowledgement method are called successfully

Reviewers: Lianet Magrans <lmagrans@confluent.io>, Omnia Ibrahim
<o.g.h.ibrahim@gmail.com>, Matthias J. Sax <matthias@confluent.io>,
Andrew Schofield <aschofield@confluent.io>, Chia-Ping Tsai
<chia7712@gmail.com>
This commit is contained in:
Rich Chen 2025-04-21 09:01:55 -04:00 committed by GitHub
parent a04c2fed04
commit ae771d73d1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 185 additions and 29 deletions

View File

@ -1546,6 +1546,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
private final String recordLogString;
private volatile int partition = RecordMetadata.UNKNOWN_PARTITION;
private volatile TopicPartition topicPartition;
private final Headers headers;
private AppendCallbacks(Callback userCallback, ProducerInterceptors<K, V> interceptors, ProducerRecord<K, V> record) {
this.userCallback = userCallback;
@ -1554,6 +1555,12 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
// whole lifetime of the batch.
// We don't want to have an NPE here, because the interceptors would not be notified (see .doSend).
topic = record != null ? record.topic() : null;
if (record != null) {
headers = record.headers();
} else {
headers = new RecordHeaders();
((RecordHeaders) headers).setReadOnly();
}
recordPartition = record != null ? record.partition() : null;
recordLogString = log.isTraceEnabled() && record != null ? record.toString() : "";
}
@ -1563,7 +1570,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
if (metadata == null) {
metadata = new RecordMetadata(topicPartition(), -1, -1, RecordBatch.NO_TIMESTAMP, -1, -1);
}
this.interceptors.onAcknowledgement(metadata, exception);
this.interceptors.onAcknowledgement(metadata, exception, headers);
if (this.userCallback != null)
this.userCallback.onCompletion(metadata, exception);
}

View File

@ -17,6 +17,7 @@
package org.apache.kafka.clients.producer;
import org.apache.kafka.common.Configurable;
import org.apache.kafka.common.header.Headers;
/**
* A plugin interface that allows you to intercept (and possibly mutate) the records received by the producer before
@ -83,12 +84,37 @@ public interface ProducerInterceptor<K, V> extends Configurable, AutoCloseable {
* @param metadata The metadata for the record that was sent (i.e. the partition and offset).
* If an error occurred, metadata will contain only valid topic and maybe
* partition. If partition is not given in ProducerRecord and an error occurs
* before partition gets assigned, then partition will be set to RecordMetadata.NO_PARTITION.
* before partition gets assigned, then partition will be set to {@link RecordMetadata#UNKNOWN_PARTITION}.
* The metadata may be null if the client passed null record to
* {@link org.apache.kafka.clients.producer.KafkaProducer#send(ProducerRecord)}.
* @param exception The exception thrown during processing of this record. Null if no error occurred.
*/
void onAcknowledgement(RecordMetadata metadata, Exception exception);
default void onAcknowledgement(RecordMetadata metadata, Exception exception) {}
/**
* This method is called when the record sent to the server has been acknowledged, or when sending the record fails before
* it gets sent to the server.
* <p>
* This method is generally called just before the user callback is called, and in additional cases when <code>KafkaProducer.send()</code>
* throws an exception.
* <p>
* Any exception thrown by this method will be ignored by the caller.
* <p>
* This method will generally execute in the background I/O thread, so the implementation should be reasonably fast.
* Otherwise, sending of messages from other threads could be delayed.
*
* @param metadata The metadata for the record that was sent (i.e. the partition and offset).
* If an error occurred, metadata will contain only valid topic and maybe
* partition. If partition is not given in ProducerRecord and an error occurs
* before partition gets assigned, then partition will be set to {@link RecordMetadata#UNKNOWN_PARTITION}.
* The metadata may be null if the client passed null record to
* {@link org.apache.kafka.clients.producer.KafkaProducer#send(ProducerRecord)}.
* @param exception The exception thrown during processing of this record. Null if no error occurred.
* @param headers The headers for the record that was sent. It is read-only.
*/
default void onAcknowledgement(RecordMetadata metadata, Exception exception, Headers headers) {
onAcknowledgement(metadata, exception);
}
/**
* This is called when interceptor is closed

View File

@ -22,6 +22,8 @@ import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.internals.Plugin;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.record.RecordBatch;
@ -77,7 +79,7 @@ public class ProducerInterceptors<K, V> implements Closeable {
/**
* This method is called when the record sent to the server has been acknowledged, or when sending the record fails before
* it gets sent to the server. This method calls {@link ProducerInterceptor#onAcknowledgement(RecordMetadata, Exception)}
* it gets sent to the server. This method calls {@link ProducerInterceptor#onAcknowledgement(RecordMetadata, Exception, Headers)}
* method for each interceptor.
*
* This method does not throw exceptions. Exceptions thrown by any of interceptor methods are caught and ignored.
@ -85,11 +87,12 @@ public class ProducerInterceptors<K, V> implements Closeable {
* @param metadata The metadata for the record that was sent (i.e. the partition and offset).
* If an error occurred, metadata will only contain valid topic and maybe partition.
* @param exception The exception thrown during processing of this record. Null if no error occurred.
* @param headers The headers for the record that was sent
*/
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
public void onAcknowledgement(RecordMetadata metadata, Exception exception, Headers headers) {
for (Plugin<ProducerInterceptor<K, V>> interceptorPlugin : this.interceptorPlugins) {
try {
interceptorPlugin.get().onAcknowledgement(metadata, exception);
interceptorPlugin.get().onAcknowledgement(metadata, exception, headers);
} catch (Exception e) {
// do not propagate interceptor exceptions, just log
log.warn("Error executing interceptor onAcknowledgement callback", e);
@ -99,7 +102,7 @@ public class ProducerInterceptors<K, V> implements Closeable {
/**
* This method is called when sending the record fails in {@link ProducerInterceptor#onSend
* (ProducerRecord)} method. This method calls {@link ProducerInterceptor#onAcknowledgement(RecordMetadata, Exception)}
* (ProducerRecord)} method. This method calls {@link ProducerInterceptor#onAcknowledgement(RecordMetadata, Exception, Headers)}
* method for each interceptor
*
* @param record The record from client
@ -110,14 +113,22 @@ public class ProducerInterceptors<K, V> implements Closeable {
public void onSendError(ProducerRecord<K, V> record, TopicPartition interceptTopicPartition, Exception exception) {
for (Plugin<ProducerInterceptor<K, V>> interceptorPlugin : this.interceptorPlugins) {
try {
Headers headers = record != null ? record.headers() : new RecordHeaders();
if (headers instanceof RecordHeaders && !((RecordHeaders) headers).isReadOnly()) {
// make a copy of the headers to make sure we don't change the state of origin record's headers.
// original headers are still writable because client might want to mutate them before retrying.
RecordHeaders recordHeaders = (RecordHeaders) headers;
headers = new RecordHeaders(recordHeaders);
((RecordHeaders) headers).setReadOnly();
}
if (record == null && interceptTopicPartition == null) {
interceptorPlugin.get().onAcknowledgement(null, exception);
interceptorPlugin.get().onAcknowledgement(null, exception, headers);
} else {
if (interceptTopicPartition == null) {
interceptTopicPartition = extractTopicPartition(record);
}
interceptorPlugin.get().onAcknowledgement(new RecordMetadata(interceptTopicPartition, -1, -1,
RecordBatch.NO_TIMESTAMP, -1, -1), exception);
RecordBatch.NO_TIMESTAMP, -1, -1), exception, headers);
}
} catch (Exception e) {
// do not propagate interceptor exceptions, just log

View File

@ -108,6 +108,10 @@ public class RecordHeaders implements Headers {
this.isReadOnly = true;
}
public boolean isReadOnly() {
return isReadOnly;
}
public Header[] toArray() {
return headers.isEmpty() ? Record.EMPTY_HEADERS : headers.toArray(new Header[0]);
}

View File

@ -48,7 +48,9 @@ import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.errors.RecordTooLargeException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.internals.ClusterResourceListeners;
import org.apache.kafka.common.message.AddOffsetsToTxnResponseData;
import org.apache.kafka.common.message.ApiVersionsResponseData;
@ -1084,13 +1086,14 @@ public class KafkaProducerTest {
@SuppressWarnings("unchecked")
@Test
public void testHeaders() {
public void testHeadersSuccess() {
doTestHeaders(Serializer.class);
}
private <T extends Serializer<String>> void doTestHeaders(Class<T> serializerClassToMock) {
Map<String, Object> configs = new HashMap<>();
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
configs.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, ProducerInterceptorForHeaders.class.getName());
Serializer<String> keySerializer = mock(serializerClassToMock);
Serializer<String> valueSerializer = mock(serializerClassToMock);
@ -1119,7 +1122,9 @@ public class KafkaProducerTest {
producer.send(record, null);
//ensure headers are closed and cannot be mutated post send
assertThrows(IllegalStateException.class, () -> record.headers().add(new RecordHeader("test", "test".getBytes())));
RecordHeaders recordHeaders = (RecordHeaders) record.headers();
assertTrue(recordHeaders.isReadOnly());
assertThrows(IllegalStateException.class, () -> recordHeaders.add(new RecordHeader("test", "test".getBytes())));
//ensure existing headers are not changed, and last header for key is still original value
assertArrayEquals(record.headers().lastHeader("test").value(), "header2".getBytes());
@ -1130,6 +1135,28 @@ public class KafkaProducerTest {
producer.close(Duration.ofMillis(0));
}
@Test
public void testHeadersFailure() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 5);
props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, ProducerInterceptorForHeaders.class.getName());
Serializer<String> keySerializer = mock(StringSerializer.class);
Serializer<String> valueSerializer = mock(StringSerializer.class);
KafkaProducer<String, String> producer = new KafkaProducer<>(props, keySerializer, valueSerializer);
ProducerRecord<String, String> record = new ProducerRecord<>("topic", "key", "value");
Future<RecordMetadata> future = producer.send(record, (recordMetadata, exception) -> { });
try {
TestUtils.assertFutureThrows(TimeoutException.class, future);
//ensure headers are writable if send failure
RecordHeaders recordHeaders = (RecordHeaders) record.headers();
assertFalse(recordHeaders.isReadOnly());
} finally {
producer.close(Duration.ofMillis(0));
}
}
@Test
public void closeShouldBeIdempotent() {
Properties producerProps = new Properties();
@ -2500,6 +2527,29 @@ public class KafkaProducerTest {
}
}
public static class ProducerInterceptorForHeaders implements ProducerInterceptor<byte[], byte[]> {
@Override
public ProducerRecord<byte[], byte[]> onSend(ProducerRecord<byte[], byte[]> record) {
return record;
}
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception, Headers headers) {
RecordHeaders recordHeaders = (RecordHeaders) headers;
// Ensure that the headers are read-only, no matter send success or send failure
assertTrue(recordHeaders.isReadOnly());
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> configs) {
}
}
public static class ProducerInterceptorForClientId implements ProducerInterceptor<byte[], byte[]> {
@Override

View File

@ -22,6 +22,7 @@ import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Headers;
import org.junit.jupiter.api.Test;
@ -95,13 +96,68 @@ public class ProducerInterceptorsTest {
}
}
private class AppendNewProducerInterceptor implements ProducerInterceptor<Integer, String> {
private final String appendStr;
private boolean throwExceptionOnSend = false;
private boolean throwExceptionOnAck = false;
public AppendNewProducerInterceptor(String appendStr) {
this.appendStr = appendStr;
}
@Override
public void configure(Map<String, ?> configs) {
}
@Override
public ProducerRecord<Integer, String> onSend(ProducerRecord<Integer, String> record) {
onSendCount++;
if (throwExceptionOnSend)
throw new KafkaException("Injected exception in AppendNewProducerInterceptor.onSend");
return new ProducerRecord<>(
record.topic(), record.partition(), record.key(), record.value().concat(appendStr));
}
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception, Headers headers) {
onAckCount++;
if (exception != null) {
onErrorAckCount++;
// the length check is just to call topic() method and let it throw an exception
// if RecordMetadata.TopicPartition is null
if (metadata != null && metadata.topic().length() >= 0) {
onErrorAckWithTopicSetCount++;
if (metadata.partition() >= 0)
onErrorAckWithTopicPartitionSetCount++;
}
}
if (throwExceptionOnAck)
throw new KafkaException("Injected exception in AppendNewProducerInterceptor.onAcknowledgement");
}
@Override
public void close() {
}
// if 'on' is true, onSend will always throw an exception
public void injectOnSendError(boolean on) {
throwExceptionOnSend = on;
}
// if 'on' is true, onAcknowledgement will always throw an exception
public void injectOnAcknowledgementError(boolean on) {
throwExceptionOnAck = on;
}
}
@Test
public void testOnSendChain() {
List<ProducerInterceptor<Integer, String>> interceptorList = new ArrayList<>();
// we are testing two different interceptors by configuring the same interceptor differently, which is not
// how it would be done in KafkaProducer, but ok for testing interceptor callbacks
AppendProducerInterceptor interceptor1 = new AppendProducerInterceptor("One");
AppendProducerInterceptor interceptor2 = new AppendProducerInterceptor("Two");
AppendNewProducerInterceptor interceptor2 = new AppendNewProducerInterceptor("Two");
interceptorList.add(interceptor1);
interceptorList.add(interceptor2);
ProducerInterceptors<Integer, String> interceptors = new ProducerInterceptors<>(interceptorList, null);
@ -139,23 +195,23 @@ public class ProducerInterceptorsTest {
// we are testing two different interceptors by configuring the same interceptor differently, which is not
// how it would be done in KafkaProducer, but ok for testing interceptor callbacks
AppendProducerInterceptor interceptor1 = new AppendProducerInterceptor("One");
AppendProducerInterceptor interceptor2 = new AppendProducerInterceptor("Two");
AppendNewProducerInterceptor interceptor2 = new AppendNewProducerInterceptor("Two");
interceptorList.add(interceptor1);
interceptorList.add(interceptor2);
ProducerInterceptors<Integer, String> interceptors = new ProducerInterceptors<>(interceptorList, null);
// verify onAck is called on all interceptors
RecordMetadata meta = new RecordMetadata(tp, 0, 0, 0, 0, 0);
interceptors.onAcknowledgement(meta, null);
interceptors.onAcknowledgement(meta, null, null);
assertEquals(2, onAckCount);
// verify that onAcknowledgement exceptions do not propagate
interceptor1.injectOnAcknowledgementError(true);
interceptors.onAcknowledgement(meta, null);
interceptors.onAcknowledgement(meta, null, null);
assertEquals(4, onAckCount);
interceptor2.injectOnAcknowledgementError(true);
interceptors.onAcknowledgement(meta, null);
interceptors.onAcknowledgement(meta, null, null);
assertEquals(6, onAckCount);
interceptors.close();
@ -165,27 +221,29 @@ public class ProducerInterceptorsTest {
public void testOnAcknowledgementWithErrorChain() {
List<ProducerInterceptor<Integer, String>> interceptorList = new ArrayList<>();
AppendProducerInterceptor interceptor1 = new AppendProducerInterceptor("One");
AppendNewProducerInterceptor interceptor2 = new AppendNewProducerInterceptor("Two");
interceptorList.add(interceptor1);
interceptorList.add(interceptor2);
ProducerInterceptors<Integer, String> interceptors = new ProducerInterceptors<>(interceptorList, null);
// verify that metadata contains both topic and partition
interceptors.onSendError(producerRecord,
new TopicPartition(producerRecord.topic(), producerRecord.partition()),
new KafkaException("Test"));
assertEquals(1, onErrorAckCount);
assertEquals(1, onErrorAckWithTopicPartitionSetCount);
assertEquals(2, onErrorAckCount);
assertEquals(2, onErrorAckWithTopicPartitionSetCount);
// verify that metadata contains both topic and partition (because record already contains partition)
interceptors.onSendError(producerRecord, null, new KafkaException("Test"));
assertEquals(2, onErrorAckCount);
assertEquals(2, onErrorAckWithTopicPartitionSetCount);
assertEquals(4, onErrorAckCount);
assertEquals(4, onErrorAckWithTopicPartitionSetCount);
// if producer record does not contain partition, interceptor should get partition == -1
ProducerRecord<Integer, String> record2 = new ProducerRecord<>("test2", null, 1, "value");
interceptors.onSendError(record2, null, new KafkaException("Test"));
assertEquals(3, onErrorAckCount);
assertEquals(3, onErrorAckWithTopicSetCount);
assertEquals(2, onErrorAckWithTopicPartitionSetCount);
assertEquals(6, onErrorAckCount);
assertEquals(6, onErrorAckWithTopicSetCount);
assertEquals(4, onErrorAckWithTopicPartitionSetCount);
// if producer record does not contain partition, but topic/partition is passed to
// onSendError, then interceptor should get valid partition
@ -193,15 +251,15 @@ public class ProducerInterceptorsTest {
interceptors.onSendError(record2,
new TopicPartition(record2.topic(), reassignedPartition),
new KafkaException("Test"));
assertEquals(4, onErrorAckCount);
assertEquals(4, onErrorAckWithTopicSetCount);
assertEquals(3, onErrorAckWithTopicPartitionSetCount);
assertEquals(8, onErrorAckCount);
assertEquals(8, onErrorAckWithTopicSetCount);
assertEquals(6, onErrorAckWithTopicPartitionSetCount);
// if both record and topic/partition are null, interceptor should not receive metadata
interceptors.onSendError(null, null, new KafkaException("Test"));
assertEquals(5, onErrorAckCount);
assertEquals(4, onErrorAckWithTopicSetCount);
assertEquals(3, onErrorAckWithTopicPartitionSetCount);
assertEquals(10, onErrorAckCount);
assertEquals(8, onErrorAckWithTopicSetCount);
assertEquals(6, onErrorAckWithTopicPartitionSetCount);
interceptors.close();
}