From 57e9f98e156186d62e43a9fec19e0a8d49690102 Mon Sep 17 00:00:00 2001 From: Jhen-Yung Hsu Date: Sat, 20 Sep 2025 11:46:19 +0800 Subject: [PATCH] KAFKA-19644 Enhance the documentation for producer headers and integration tests (#20524) - Improve the docs for Record Headers. - Add integration tests to verify that the order of headers in a record is preserved when producing and consuming. - Add unit tests for RecordHeaders.java. Reviewers: Ken Huang , Hong-Yi Chen , Chia-Ping Tsai --- .../consumer/PlaintextConsumerTest.java | 10 ++++ .../clients/consumer/ShareConsumerTest.java | 16 +++++-- .../apache/kafka/common/header/Header.java | 17 ++++++- .../apache/kafka/common/header/Headers.java | 24 ++++++---- .../header/internals/RecordHeadersTest.java | 47 +++++++++++++++++++ docs/implementation.html | 1 + 6 files changed, 100 insertions(+), 15 deletions(-) diff --git a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerTest.java b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerTest.java index 13e681cfdda..bd92f0c5685 100644 --- a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerTest.java +++ b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerTest.java @@ -203,7 +203,10 @@ public class PlaintextConsumerTest { ) { var record = new ProducerRecord<>(TP.topic(), TP.partition(), null, "key".getBytes(), "value".getBytes()); record.headers().add("headerKey", "headerValue".getBytes()); + record.headers().add("headerKey2", "headerValue2".getBytes()); + record.headers().add("headerKey3", "headerValue3".getBytes()); producer.send(record); + producer.flush(); assertEquals(0, consumer.assignment().size()); consumer.assign(List.of(TP)); @@ -212,8 +215,15 @@ public class PlaintextConsumerTest { consumer.seek(TP, 0); var records = consumeRecords(consumer, numRecords); assertEquals(numRecords, records.size()); + var header = records.get(0).headers().lastHeader("headerKey"); assertEquals("headerValue", header == null ? null : new String(header.value())); + + // Test the order of headers in a record is preserved when producing and consuming + Header[] headers = records.get(0).headers().toArray(); + assertEquals("headerKey", headers[0].key()); + assertEquals("headerKey2", headers[1].key()); + assertEquals("headerKey3", headers[2].key()); } } diff --git a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java index 2335c223b07..998ac2c585d 100644 --- a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java +++ b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java @@ -467,6 +467,8 @@ public class ShareConsumerTest { int numRecords = 1; ProducerRecord record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); record.headers().add("headerKey", "headerValue".getBytes()); + record.headers().add("headerKey2", "headerValue2".getBytes()); + record.headers().add("headerKey3", "headerValue3".getBytes()); producer.send(record); producer.flush(); @@ -475,11 +477,15 @@ public class ShareConsumerTest { List> records = consumeRecords(shareConsumer, numRecords); assertEquals(numRecords, records.size()); - for (ConsumerRecord consumerRecord : records) { - Header header = consumerRecord.headers().lastHeader("headerKey"); - if (header != null) - assertEquals("headerValue", new String(header.value())); - } + Header header = records.get(0).headers().lastHeader("headerKey"); + assertEquals("headerValue", new String(header.value())); + + // Test the order of headers in a record is preserved when producing and consuming + Header[] headers = records.get(0).headers().toArray(); + assertEquals("headerKey", headers[0].key()); + assertEquals("headerKey2", headers[1].key()); + assertEquals("headerKey3", headers[2].key()); + verifyShareGroupStateTopicRecordsProduced(); } } diff --git a/clients/src/main/java/org/apache/kafka/common/header/Header.java b/clients/src/main/java/org/apache/kafka/common/header/Header.java index 58869b41fb7..e1d0aa00a44 100644 --- a/clients/src/main/java/org/apache/kafka/common/header/Header.java +++ b/clients/src/main/java/org/apache/kafka/common/header/Header.java @@ -16,10 +16,23 @@ */ package org.apache.kafka.common.header; +/** + * A header is a key-value pair. + */ public interface Header { - + + /** + * Returns the key of the header. + * + * @return the header's key; must not be null. + */ String key(); + /** + * Returns the value of the header. + * + * @return the header's value; may be null. + */ byte[] value(); - + } diff --git a/clients/src/main/java/org/apache/kafka/common/header/Headers.java b/clients/src/main/java/org/apache/kafka/common/header/Headers.java index b736cbcabcc..9cce54a5c5c 100644 --- a/clients/src/main/java/org/apache/kafka/common/header/Headers.java +++ b/clients/src/main/java/org/apache/kafka/common/header/Headers.java @@ -16,12 +16,18 @@ */ package org.apache.kafka.common.header; + +/** + * A mutable ordered collection of {@link Header} objects. Note that multiple headers may have the same {@link Header#key() key}. + *

+ * The order of headers is preserved in the order they were added. + */ public interface Headers extends Iterable

{ /** * Adds a header (key inside), to the end, returning if the operation succeeded. * - * @param header the Header to be added + * @param header the Header to be added. * @return this instance of the Headers, once the header is added. * @throws IllegalStateException is thrown if headers are in a read-only state. */ @@ -30,17 +36,18 @@ public interface Headers extends Iterable
{ /** * Creates and adds a header, to the end, returning if the operation succeeded. * - * @param key of the header to be added. - * @param value of the header to be added. + * @param key of the header to be added; must not be null. + * @param value of the header to be added; may be null. * @return this instance of the Headers, once the header is added. * @throws IllegalStateException is thrown if headers are in a read-only state. */ Headers add(String key, byte[] value) throws IllegalStateException; /** - * Removes all headers for the given key returning if the operation succeeded. + * Removes all headers for the given key returning if the operation succeeded, + * while preserving the insertion order of the remaining headers. * - * @param key to remove all headers for. + * @param key to remove all headers for; must not be null. * @return this instance of the Headers, once the header is removed. * @throws IllegalStateException is thrown if headers are in a read-only state. */ @@ -49,16 +56,17 @@ public interface Headers extends Iterable
{ /** * Returns just one (the very last) header for the given key, if present. * - * @param key to get the last header for. + * @param key to get the last header for; must not be null. * @return this last header matching the given key, returns null if not present. */ Header lastHeader(String key); /** * Returns all headers for the given key, in the order they were added in, if present. + * The iterator does not support {@link java.util.Iterator#remove()}. * - * @param key to return the headers for. - * @return all headers for the given key, in the order they were added in, if NO headers are present an empty iterable is returned. + * @param key to return the headers for; must not be null. + * @return all headers for the given key, in the order they were added in, if NO headers are present an empty iterable is returned. */ Iterable
headers(String key); diff --git a/clients/src/test/java/org/apache/kafka/common/header/internals/RecordHeadersTest.java b/clients/src/test/java/org/apache/kafka/common/header/internals/RecordHeadersTest.java index 3d431f202ec..41104194991 100644 --- a/clients/src/test/java/org/apache/kafka/common/header/internals/RecordHeadersTest.java +++ b/clients/src/test/java/org/apache/kafka/common/header/internals/RecordHeadersTest.java @@ -47,6 +47,21 @@ public class RecordHeadersTest { assertEquals(2, getCount(headers)); } + @Test + public void testAddHeadersPreserveOrder() { + Headers headers = new RecordHeaders(); + headers.add(new RecordHeader("key", "value".getBytes())); + headers.add(new RecordHeader("key2", "value2".getBytes())); + headers.add(new RecordHeader("key3", "value3".getBytes())); + + Header[] headersArr = headers.toArray(); + assertHeader("key", "value", headersArr[0]); + assertHeader("key2", "value2", headersArr[1]); + assertHeader("key3", "value3", headersArr[2]); + + assertEquals(3, getCount(headers)); + } + @Test public void testRemove() { Headers headers = new RecordHeaders(); @@ -59,6 +74,27 @@ public class RecordHeadersTest { assertFalse(headers.iterator().hasNext()); } + @Test + public void testPreserveOrderAfterRemove() { + Headers headers = new RecordHeaders(); + headers.add(new RecordHeader("key", "value".getBytes())); + headers.add(new RecordHeader("key2", "value2".getBytes())); + headers.add(new RecordHeader("key3", "value3".getBytes())); + + headers.remove("key"); + Header[] headersArr = headers.toArray(); + assertHeader("key2", "value2", headersArr[0]); + assertHeader("key3", "value3", headersArr[1]); + assertEquals(2, getCount(headers)); + + headers.add(new RecordHeader("key4", "value4".getBytes())); + headers.remove("key3"); + headersArr = headers.toArray(); + assertHeader("key2", "value2", headersArr[0]); + assertHeader("key4", "value4", headersArr[1]); + assertEquals(2, getCount(headers)); + } + @Test public void testAddRemoveInterleaved() { Headers headers = new RecordHeaders(); @@ -127,6 +163,17 @@ public class RecordHeadersTest { } + @Test + public void testHeadersIteratorRemove() { + Headers headers = new RecordHeaders(); + headers.add(new RecordHeader("key", "value".getBytes())); + + Iterator
headersIterator = headers.headers("key").iterator(); + headersIterator.next(); + assertThrows(UnsupportedOperationException.class, + headersIterator::remove); + } + @Test public void testReadOnly() { RecordHeaders headers = new RecordHeaders(); diff --git a/docs/implementation.html b/docs/implementation.html index 1573dd3d60e..3be539e0ba8 100644 --- a/docs/implementation.html +++ b/docs/implementation.html @@ -101,6 +101,7 @@ Headers => [Header] headerKey: String headerValueLength: varint Value: byte[] +

The key of a record header is guaranteed to be non-null, while the value of a record header may be null. The order of headers in a record is preserved when producing and consuming.

We use the same varint encoding as Protobuf. More information on the latter can be found here. The count of headers in a record is also encoded as a varint.