KAFKA-19644 Enhance the documentation for producer headers and integration tests (#20524)

- Improve the docs for Record Headers.
- Add integration tests to verify that the order of headers in a record
is preserved when producing and consuming.
- Add unit tests for RecordHeaders.java.

Reviewers: Ken Huang <s7133700@gmail.com>, Hong-Yi Chen
 <apalan60@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
Jhen-Yung Hsu 2025-09-20 11:46:19 +08:00 committed by GitHub
parent 848e3d0092
commit 57e9f98e15
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 100 additions and 15 deletions

View File

@ -203,7 +203,10 @@ public class PlaintextConsumerTest {
) {
var record = new ProducerRecord<>(TP.topic(), TP.partition(), null, "key".getBytes(), "value".getBytes());
record.headers().add("headerKey", "headerValue".getBytes());
record.headers().add("headerKey2", "headerValue2".getBytes());
record.headers().add("headerKey3", "headerValue3".getBytes());
producer.send(record);
producer.flush();
assertEquals(0, consumer.assignment().size());
consumer.assign(List.of(TP));
@ -212,8 +215,15 @@ public class PlaintextConsumerTest {
consumer.seek(TP, 0);
var records = consumeRecords(consumer, numRecords);
assertEquals(numRecords, records.size());
var header = records.get(0).headers().lastHeader("headerKey");
assertEquals("headerValue", header == null ? null : new String(header.value()));
// Test the order of headers in a record is preserved when producing and consuming
Header[] headers = records.get(0).headers().toArray();
assertEquals("headerKey", headers[0].key());
assertEquals("headerKey2", headers[1].key());
assertEquals("headerKey3", headers[2].key());
}
}

View File

@ -467,6 +467,8 @@ public class ShareConsumerTest {
int numRecords = 1;
ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes());
record.headers().add("headerKey", "headerValue".getBytes());
record.headers().add("headerKey2", "headerValue2".getBytes());
record.headers().add("headerKey3", "headerValue3".getBytes());
producer.send(record);
producer.flush();
@ -475,11 +477,15 @@ public class ShareConsumerTest {
List<ConsumerRecord<byte[], byte[]>> records = consumeRecords(shareConsumer, numRecords);
assertEquals(numRecords, records.size());
for (ConsumerRecord<byte[], byte[]> consumerRecord : records) {
Header header = consumerRecord.headers().lastHeader("headerKey");
if (header != null)
assertEquals("headerValue", new String(header.value()));
}
Header header = records.get(0).headers().lastHeader("headerKey");
assertEquals("headerValue", new String(header.value()));
// Test the order of headers in a record is preserved when producing and consuming
Header[] headers = records.get(0).headers().toArray();
assertEquals("headerKey", headers[0].key());
assertEquals("headerKey2", headers[1].key());
assertEquals("headerKey3", headers[2].key());
verifyShareGroupStateTopicRecordsProduced();
}
}

View File

@ -16,10 +16,23 @@
*/
package org.apache.kafka.common.header;
/**
* A header is a key-value pair.
*/
public interface Header {
/**
* Returns the key of the header.
*
* @return the header's key; must not be null.
*/
String key();
/**
* Returns the value of the header.
*
* @return the header's value; may be null.
*/
byte[] value();
}

View File

@ -16,12 +16,18 @@
*/
package org.apache.kafka.common.header;
/**
* A mutable ordered collection of {@link Header} objects. Note that multiple headers may have the same {@link Header#key() key}.
* <p>
* The order of headers is preserved in the order they were added.
*/
public interface Headers extends Iterable<Header> {
/**
* Adds a header (key inside), to the end, returning if the operation succeeded.
*
* @param header the Header to be added
* @param header the Header to be added.
* @return this instance of the Headers, once the header is added.
* @throws IllegalStateException is thrown if headers are in a read-only state.
*/
@ -30,17 +36,18 @@ public interface Headers extends Iterable<Header> {
/**
* Creates and adds a header, to the end, returning if the operation succeeded.
*
* @param key of the header to be added.
* @param value of the header to be added.
* @param key of the header to be added; must not be null.
* @param value of the header to be added; may be null.
* @return this instance of the Headers, once the header is added.
* @throws IllegalStateException is thrown if headers are in a read-only state.
*/
Headers add(String key, byte[] value) throws IllegalStateException;
/**
* Removes all headers for the given key returning if the operation succeeded.
* Removes all headers for the given key returning if the operation succeeded,
* while preserving the insertion order of the remaining headers.
*
* @param key to remove all headers for.
* @param key to remove all headers for; must not be null.
* @return this instance of the Headers, once the header is removed.
* @throws IllegalStateException is thrown if headers are in a read-only state.
*/
@ -49,15 +56,16 @@ public interface Headers extends Iterable<Header> {
/**
* Returns just one (the very last) header for the given key, if present.
*
* @param key to get the last header for.
* @param key to get the last header for; must not be null.
* @return this last header matching the given key, returns null if not present.
*/
Header lastHeader(String key);
/**
* Returns all headers for the given key, in the order they were added in, if present.
* The iterator does not support {@link java.util.Iterator#remove()}.
*
* @param key to return the headers for.
* @param key to return the headers for; must not be null.
* @return all headers for the given key, in the order they were added in, if NO headers are present an empty iterable is returned.
*/
Iterable<Header> headers(String key);

View File

@ -47,6 +47,21 @@ public class RecordHeadersTest {
assertEquals(2, getCount(headers));
}
@Test
public void testAddHeadersPreserveOrder() {
Headers headers = new RecordHeaders();
headers.add(new RecordHeader("key", "value".getBytes()));
headers.add(new RecordHeader("key2", "value2".getBytes()));
headers.add(new RecordHeader("key3", "value3".getBytes()));
Header[] headersArr = headers.toArray();
assertHeader("key", "value", headersArr[0]);
assertHeader("key2", "value2", headersArr[1]);
assertHeader("key3", "value3", headersArr[2]);
assertEquals(3, getCount(headers));
}
@Test
public void testRemove() {
Headers headers = new RecordHeaders();
@ -59,6 +74,27 @@ public class RecordHeadersTest {
assertFalse(headers.iterator().hasNext());
}
@Test
public void testPreserveOrderAfterRemove() {
Headers headers = new RecordHeaders();
headers.add(new RecordHeader("key", "value".getBytes()));
headers.add(new RecordHeader("key2", "value2".getBytes()));
headers.add(new RecordHeader("key3", "value3".getBytes()));
headers.remove("key");
Header[] headersArr = headers.toArray();
assertHeader("key2", "value2", headersArr[0]);
assertHeader("key3", "value3", headersArr[1]);
assertEquals(2, getCount(headers));
headers.add(new RecordHeader("key4", "value4".getBytes()));
headers.remove("key3");
headersArr = headers.toArray();
assertHeader("key2", "value2", headersArr[0]);
assertHeader("key4", "value4", headersArr[1]);
assertEquals(2, getCount(headers));
}
@Test
public void testAddRemoveInterleaved() {
Headers headers = new RecordHeaders();
@ -127,6 +163,17 @@ public class RecordHeadersTest {
}
@Test
public void testHeadersIteratorRemove() {
Headers headers = new RecordHeaders();
headers.add(new RecordHeader("key", "value".getBytes()));
Iterator<Header> headersIterator = headers.headers("key").iterator();
headersIterator.next();
assertThrows(UnsupportedOperationException.class,
headersIterator::remove);
}
@Test
public void testReadOnly() {
RecordHeaders headers = new RecordHeaders();

View File

@ -101,6 +101,7 @@ Headers => [Header]</code></pre>
headerKey: String
headerValueLength: varint
Value: byte[]</code></pre>
<p>The key of a record header is guaranteed to be non-null, while the value of a record header may be null. The order of headers in a record is preserved when producing and consuming.</p>
<p>We use the same varint encoding as Protobuf. More information on the latter can be found <a href="https://developers.google.com/protocol-buffers/docs/encoding#varints">here</a>. The count of headers in a record
is also encoded as a varint.</p>