diff --git a/build.gradle b/build.gradle
index 394baf641bc..7ed584ffd54 100644
--- a/build.gradle
+++ b/build.gradle
@@ -754,6 +754,7 @@ project(':clients') {
include "**/org/apache/kafka/clients/producer/*"
include "**/org/apache/kafka/common/*"
include "**/org/apache/kafka/common/errors/*"
+ include "**/org/apache/kafka/common/header/*"
include "**/org/apache/kafka/common/serialization/*"
include "**/org/apache/kafka/common/config/*"
}
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index a6de9a727a8..d7851a581d5 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -93,6 +93,7 @@
+
@@ -100,6 +101,11 @@
+
+
+
+
+
@@ -110,6 +116,7 @@
+
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecord.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecord.java
index fe9ede82ffd..464091ace5c 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecord.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecord.java
@@ -16,6 +16,8 @@
*/
package org.apache.kafka.clients.consumer;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.record.TimestampType;
@@ -37,6 +39,7 @@ public class ConsumerRecord {
private final long checksum;
private final int serializedKeySize;
private final int serializedValueSize;
+ private final Headers headers;
private final K key;
private final V value;
@@ -62,7 +65,8 @@ public class ConsumerRecord {
/**
- * Creates a record to be received from a specified topic and partition
+ * Creates a record to be received from a specified topic and partition (provided for
+ * compatibility with Kafka 0.10 before the message format supported headers).
*
* @param topic The topic this record is received from
* @param partition The partition of the topic this record is received from
@@ -85,6 +89,35 @@ public class ConsumerRecord {
int serializedValueSize,
K key,
V value) {
+ this(topic, partition, offset, timestamp, timestampType, checksum, serializedKeySize, serializedValueSize, key, value, new RecordHeaders());
+ }
+
+ /**
+ * Creates a record to be received from a specified topic and partition
+ *
+ * @param topic The topic this record is received from
+ * @param partition The partition of the topic this record is received from
+ * @param offset The offset of this record in the corresponding Kafka partition
+ * @param timestamp The timestamp of the record.
+ * @param timestampType The timestamp type
+ * @param checksum The checksum (CRC32) of the full record
+ * @param serializedKeySize The length of the serialized key
+ * @param serializedValueSize The length of the serialized value
+ * @param key The key of the record, if one exists (null is allowed)
+ * @param value The record contents
+ * @param headers The headers of the record.
+ */
+ public ConsumerRecord(String topic,
+ int partition,
+ long offset,
+ long timestamp,
+ TimestampType timestampType,
+ long checksum,
+ int serializedKeySize,
+ int serializedValueSize,
+ K key,
+ V value,
+ Headers headers) {
if (topic == null)
throw new IllegalArgumentException("Topic cannot be null");
this.topic = topic;
@@ -97,6 +130,7 @@ public class ConsumerRecord {
this.serializedValueSize = serializedValueSize;
this.key = key;
this.value = value;
+ this.headers = headers;
}
/**
@@ -113,6 +147,13 @@ public class ConsumerRecord {
return this.partition;
}
+ /**
+ * The headers
+ */
+ public Headers headers() {
+ return headers;
+ }
+
/**
* The key (or null if no key is specified)
*/
@@ -177,6 +218,7 @@ public class ConsumerRecord {
+ ", " + timestampType + " = " + timestamp + ", checksum = " + checksum
+ ", serialized key size = " + serializedKeySize
+ ", serialized value size = " + serializedValueSize
+ + ", headers = " + headers
+ ", key = " + key + ", value = " + value + ")";
}
}
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index 947214f2e76..0c5c3858ee8 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -37,6 +37,8 @@ import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Avg;
@@ -58,6 +60,7 @@ import org.apache.kafka.common.requests.ListOffsetResponse;
import org.apache.kafka.common.requests.MetadataRequest;
import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.ExtendedDeserializer;
import org.apache.kafka.common.utils.CloseableIterator;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
@@ -103,8 +106,9 @@ public class Fetcher implements SubscriptionState.Listener, Closeable {
private final FetchManagerMetrics sensors;
private final SubscriptionState subscriptions;
private final ConcurrentLinkedQueue completedFetches;
- private final Deserializer keyDeserializer;
- private final Deserializer valueDeserializer;
+
+ private final ExtendedDeserializer keyDeserializer;
+ private final ExtendedDeserializer valueDeserializer;
private final IsolationLevel isolationLevel;
private PartitionRecords nextInLineRecords = null;
@@ -136,8 +140,8 @@ public class Fetcher implements SubscriptionState.Listener, Closeable {
this.fetchSize = fetchSize;
this.maxPollRecords = maxPollRecords;
this.checkCrcs = checkCrcs;
- this.keyDeserializer = keyDeserializer;
- this.valueDeserializer = valueDeserializer;
+ this.keyDeserializer = ensureExtended(keyDeserializer);
+ this.valueDeserializer = ensureExtended(valueDeserializer);
this.completedFetches = new ConcurrentLinkedQueue<>();
this.sensors = new FetchManagerMetrics(metrics, metricGrpPrefix);
this.retryBackoffMs = retryBackoffMs;
@@ -146,6 +150,10 @@ public class Fetcher implements SubscriptionState.Listener, Closeable {
subscriptions.addListener(this);
}
+ private ExtendedDeserializer ensureExtended(Deserializer deserializer) {
+ return deserializer instanceof ExtendedDeserializer ? (ExtendedDeserializer) deserializer : new ExtendedDeserializer.Wrapper<>(deserializer);
+ }
+
/**
* Represents data about an offset returned by a broker.
*/
@@ -894,18 +902,18 @@ public class Fetcher implements SubscriptionState.Listener, Closeable {
long offset = record.offset();
long timestamp = record.timestamp();
TimestampType timestampType = batch.timestampType();
+ Headers headers = new RecordHeaders(record.headers());
ByteBuffer keyBytes = record.key();
byte[] keyByteArray = keyBytes == null ? null : Utils.toArray(keyBytes);
- K key = keyBytes == null ? null : this.keyDeserializer.deserialize(partition.topic(), keyByteArray);
+ K key = keyBytes == null ? null : this.keyDeserializer.deserialize(partition.topic(), headers, keyByteArray);
ByteBuffer valueBytes = record.value();
byte[] valueByteArray = valueBytes == null ? null : Utils.toArray(valueBytes);
- V value = valueBytes == null ? null : this.valueDeserializer.deserialize(partition.topic(), valueByteArray);
-
+ V value = valueBytes == null ? null : this.valueDeserializer.deserialize(partition.topic(), headers, valueByteArray);
return new ConsumerRecord<>(partition.topic(), partition.partition(), offset,
timestamp, timestampType, record.checksum(),
keyByteArray == null ? ConsumerRecord.NULL_SIZE : keyByteArray.length,
valueByteArray == null ? ConsumerRecord.NULL_SIZE : valueByteArray.length,
- key, value);
+ key, value, headers);
} catch (RuntimeException e) {
throw new SerializationException("Error deserializing key/value for partition " + partition +
" at offset " + record.offset(), e);
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index e0d9938c4df..deca51f83ef 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -40,6 +40,9 @@ import org.apache.kafka.common.errors.RecordTooLargeException;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.internals.ClusterResourceListeners;
import org.apache.kafka.common.metrics.JmxReporter;
import org.apache.kafka.common.metrics.MetricConfig;
@@ -51,6 +54,7 @@ import org.apache.kafka.common.network.Selector;
import org.apache.kafka.common.record.AbstractRecords;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.serialization.ExtendedSerializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.common.utils.KafkaThread;
@@ -156,8 +160,8 @@ public class KafkaProducer implements Producer {
private final CompressionType compressionType;
private final Sensor errors;
private final Time time;
- private final Serializer keySerializer;
- private final Serializer valueSerializer;
+ private final ExtendedSerializer keySerializer;
+ private final ExtendedSerializer valueSerializer;
private final ProducerConfig producerConfig;
private final long maxBlockTimeMs;
private final int requestTimeoutMs;
@@ -238,21 +242,22 @@ public class KafkaProducer implements Producer {
this.partitioner = config.getConfiguredInstance(ProducerConfig.PARTITIONER_CLASS_CONFIG, Partitioner.class);
long retryBackoffMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG);
if (keySerializer == null) {
- this.keySerializer = config.getConfiguredInstance(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
- Serializer.class);
+ this.keySerializer = ensureExtended(config.getConfiguredInstance(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
+ Serializer.class));
this.keySerializer.configure(config.originals(), true);
} else {
config.ignore(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG);
- this.keySerializer = keySerializer;
+ this.keySerializer = ensureExtended(keySerializer);
}
if (valueSerializer == null) {
- this.valueSerializer = config.getConfiguredInstance(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
- Serializer.class);
+ this.valueSerializer = ensureExtended(config.getConfiguredInstance(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
+ Serializer.class));
this.valueSerializer.configure(config.originals(), false);
} else {
config.ignore(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG);
- this.valueSerializer = valueSerializer;
+ this.valueSerializer = ensureExtended(valueSerializer);
}
+
// load interceptors and make sure they get clientId
userProvidedConfigs.put(ProducerConfig.CLIENT_ID_CONFIG, clientId);
@@ -326,6 +331,10 @@ public class KafkaProducer implements Producer {
}
}
+ private ExtendedSerializer ensureExtended(Serializer serializer) {
+ return serializer instanceof ExtendedSerializer ? (ExtendedSerializer) serializer : new ExtendedSerializer.Wrapper<>(serializer);
+ }
+
private static long configureMaxBlockTime(ProducerConfig config, Map userProvidedConfigs) {
/* check for user defined settings.
* If the BLOCK_ON_BUFFER_FULL is set to true,we do not honor METADATA_FETCH_TIMEOUT_CONFIG.
@@ -641,7 +650,7 @@ public class KafkaProducer implements Producer {
Cluster cluster = clusterAndWaitTime.cluster;
byte[] serializedKey;
try {
- serializedKey = keySerializer.serialize(record.topic(), record.key());
+ serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());
} catch (ClassCastException cce) {
throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() +
" to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() +
@@ -649,16 +658,19 @@ public class KafkaProducer implements Producer {
}
byte[] serializedValue;
try {
- serializedValue = valueSerializer.serialize(record.topic(), record.value());
+ serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value());
} catch (ClassCastException cce) {
throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() +
" to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() +
" specified in value.serializer");
}
-
int partition = partition(record, serializedKey, serializedValue, cluster);
+
+ setReadOnly(record.headers());
+ Header[] headers = record.headers().toArray();
+
int serializedSize = AbstractRecords.sizeInBytesUpperBound(apiVersions.maxUsableProduceMagic(),
- serializedKey, serializedValue);
+ serializedKey, serializedValue, headers);
ensureValidRecordSize(serializedSize);
tp = new TopicPartition(record.topic(), partition);
long timestamp = record.timestamp() == null ? time.milliseconds() : record.timestamp();
@@ -670,7 +682,7 @@ public class KafkaProducer implements Producer {
transactionManager.maybeAddPartitionToTransaction(tp);
RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
- serializedValue, interceptCallback, remainingWaitMs);
+ serializedValue, headers, interceptCallback, remainingWaitMs);
if (result.batchIsFull || result.newBatchCreated) {
log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
this.sender.wakeup();
@@ -723,7 +735,8 @@ public class KafkaProducer implements Producer {
if (transactionManager.isInTransaction()) {
if (transactionManager.isInErrorState()) {
- String errorMessage = "Cannot perform a transactional send because at least one previous transactional request has failed with errors.";
+ String errorMessage =
+ "Cannot perform a transactional send because at least one previous transactional request has failed with errors.";
Exception lastError = transactionManager.lastError();
if (lastError != null)
throw new KafkaException(errorMessage, lastError);
@@ -735,6 +748,12 @@ public class KafkaProducer implements Producer {
}
}
+ private void setReadOnly(Headers headers) {
+ if (headers instanceof RecordHeaders) {
+ ((RecordHeaders) headers).setReadOnly();
+ }
+ }
+
/**
* Wait for cluster metadata including partitions for the given topic to be available.
* @param topic The topic we want metadata for
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
index a4f59acb8ec..1b4151c5b8a 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
@@ -27,6 +27,7 @@ import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.serialization.ExtendedSerializer;
import org.apache.kafka.common.serialization.Serializer;
import java.util.ArrayDeque;
@@ -53,9 +54,9 @@ public class MockProducer implements Producer {
private final Deque completions;
private boolean autoComplete;
private Map offsets;
- private final Serializer keySerializer;
- private final Serializer valueSerializer;
private boolean closed;
+ private final ExtendedSerializer keySerializer;
+ private final ExtendedSerializer valueSerializer;
/**
* Create a mock producer
@@ -77,8 +78,8 @@ public class MockProducer implements Producer {
this.cluster = cluster;
this.autoComplete = autoComplete;
this.partitioner = partitioner;
- this.keySerializer = keySerializer;
- this.valueSerializer = valueSerializer;
+ this.keySerializer = ensureExtended(keySerializer);
+ this.valueSerializer = ensureExtended(valueSerializer);
this.offsets = new HashMap();
this.sent = new ArrayList>();
this.completions = new ArrayDeque();
@@ -134,7 +135,11 @@ public class MockProducer implements Producer {
}
public void abortTransaction() throws ProducerFencedException {
-
+
+ }
+
+ private ExtendedSerializer ensureExtended(Serializer serializer) {
+ return serializer instanceof ExtendedSerializer ? (ExtendedSerializer) serializer : new ExtendedSerializer.Wrapper<>(serializer);
}
/**
@@ -273,8 +278,8 @@ public class MockProducer implements Producer {
+ "].");
return partition;
}
- byte[] keyBytes = keySerializer.serialize(topic, record.key());
- byte[] valueBytes = valueSerializer.serialize(topic, record.value());
+ byte[] keyBytes = keySerializer.serialize(topic, record.headers(), record.key());
+ byte[] valueBytes = valueSerializer.serialize(topic, record.headers(), record.value());
return this.partitioner.partition(topic, record.key(), keyBytes, record.value(), valueBytes, cluster);
}
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java
index df896160f35..85428e58c42 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java
@@ -16,6 +16,10 @@
*/
package org.apache.kafka.clients.producer;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+
/**
* A key/value pair to be sent to Kafka. This consists of a topic name to which the record is being sent, an optional
* partition number, and an optional key and value.
@@ -44,6 +48,7 @@ public class ProducerRecord {
private final String topic;
private final Integer partition;
+ private final Headers headers;
private final K key;
private final V value;
private final Long timestamp;
@@ -56,8 +61,9 @@ public class ProducerRecord {
* @param timestamp The timestamp of the record
* @param key The key that will be included in the record
* @param value The record contents
+ * @param headers the headers that will be included in the record
*/
- public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value) {
+ public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value, Iterable headers) {
if (topic == null)
throw new IllegalArgumentException("Topic cannot be null.");
if (timestamp != null && timestamp < 0)
@@ -71,8 +77,35 @@ public class ProducerRecord {
this.key = key;
this.value = value;
this.timestamp = timestamp;
+ this.headers = new RecordHeaders(headers);
+ }
+
+ /**
+ * Creates a record with a specified timestamp to be sent to a specified topic and partition
+ *
+ * @param topic The topic the record will be appended to
+ * @param partition The partition to which the record should be sent
+ * @param timestamp The timestamp of the record
+ * @param key The key that will be included in the record
+ * @param value The record contents
+ */
+ public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value) {
+ this(topic, partition, timestamp, key, value, null);
}
+ /**
+ * Creates a record to be sent to a specified topic and partition
+ *
+ * @param topic The topic the record will be appended to
+ * @param partition The partition to which the record should be sent
+ * @param key The key that will be included in the record
+ * @param value The record contents
+ * @param headers The headers that will be included in the record
+ */
+ public ProducerRecord(String topic, Integer partition, K key, V value, Iterable headers) {
+ this(topic, partition, null, key, value, headers);
+ }
+
/**
* Creates a record to be sent to a specified topic and partition
*
@@ -82,9 +115,9 @@ public class ProducerRecord {
* @param value The record contents
*/
public ProducerRecord(String topic, Integer partition, K key, V value) {
- this(topic, partition, null, key, value);
+ this(topic, partition, null, key, value, null);
}
-
+
/**
* Create a record to be sent to Kafka
*
@@ -93,9 +126,9 @@ public class ProducerRecord {
* @param value The record contents
*/
public ProducerRecord(String topic, K key, V value) {
- this(topic, null, null, key, value);
+ this(topic, null, null, key, value, null);
}
-
+
/**
* Create a record with no key
*
@@ -103,7 +136,7 @@ public class ProducerRecord {
* @param value The record contents
*/
public ProducerRecord(String topic, V value) {
- this(topic, null, null, null, value);
+ this(topic, null, null, null, value, null);
}
/**
@@ -113,6 +146,13 @@ public class ProducerRecord {
return topic;
}
+ /**
+ * @return The headers
+ */
+ public Headers headers() {
+ return headers;
+ }
+
/**
* @return The key (or null if no key is specified)
*/
@@ -143,10 +183,11 @@ public class ProducerRecord {
@Override
public String toString() {
+ String headers = this.headers == null ? "null" : this.headers.toString();
String key = this.key == null ? "null" : this.key.toString();
String value = this.value == null ? "null" : this.value.toString();
String timestamp = this.timestamp == null ? "null" : this.timestamp.toString();
- return "ProducerRecord(topic=" + topic + ", partition=" + partition + ", key=" + key + ", value=" + value +
+ return "ProducerRecord(topic=" + topic + ", partition=" + partition + ", headers=" + headers + ", key=" + key + ", value=" + value +
", timestamp=" + timestamp + ")";
}
@@ -165,6 +206,8 @@ public class ProducerRecord {
return false;
else if (topic != null ? !topic.equals(that.topic) : that.topic != null)
return false;
+ else if (headers != null ? !headers.equals(that.headers) : that.headers != null)
+ return false;
else if (value != null ? !value.equals(that.value) : that.value != null)
return false;
else if (timestamp != null ? !timestamp.equals(that.timestamp) : that.timestamp != null)
@@ -177,6 +220,7 @@ public class ProducerRecord {
public int hashCode() {
int result = topic != null ? topic.hashCode() : 0;
result = 31 * result + (partition != null ? partition.hashCode() : 0);
+ result = 31 * result + (headers != null ? headers.hashCode() : 0);
result = 31 * result + (key != null ? key.hashCode() : 0);
result = 31 * result + (value != null ? value.hashCode() : 0);
result = 31 * result + (timestamp != null ? timestamp.hashCode() : 0);
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
index eba307879f5..6d5ca150bf2 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
@@ -20,6 +20,7 @@ import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.record.AbstractRecords;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.record.MemoryRecords;
@@ -75,12 +76,12 @@ public final class ProducerBatch {
*
* @return The RecordSend corresponding to this record or null if there isn't sufficient room.
*/
- public FutureRecordMetadata tryAppend(long timestamp, byte[] key, byte[] value, Callback callback, long now) {
+ public FutureRecordMetadata tryAppend(long timestamp, byte[] key, byte[] value, Header[] headers, Callback callback, long now) {
if (!recordsBuilder.hasRoomFor(timestamp, key, value)) {
return null;
} else {
- long checksum = this.recordsBuilder.append(timestamp, key, value);
- this.maxRecordSize = Math.max(this.maxRecordSize, AbstractRecords.sizeInBytesUpperBound(magic(), key, value));
+ long checksum = this.recordsBuilder.append(timestamp, key, value, headers);
+ this.maxRecordSize = Math.max(this.maxRecordSize, AbstractRecords.sizeInBytesUpperBound(magic(), key, value, headers));
this.lastAppendTime = now;
FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount,
timestamp, checksum,
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
index aa4d9d36b8e..dcbf691b618 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
@@ -24,6 +24,7 @@ import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.UnsupportedVersionException;
+import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.metrics.Measurable;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
@@ -31,9 +32,10 @@ import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Rate;
import org.apache.kafka.common.record.AbstractRecords;
import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.MemoryRecordsBuilder;
-import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.utils.CopyOnWriteMap;
import org.apache.kafka.common.utils.Time;
@@ -168,6 +170,7 @@ public final class RecordAccumulator {
* @param timestamp The timestamp of the record
* @param key The key for the record
* @param value The value for the record
+ * @param headers the Headers for the record
* @param callback The user-supplied callback to execute when the request is complete
* @param maxTimeToBlock The maximum time in milliseconds to block for buffer memory to be available
*/
@@ -175,26 +178,28 @@ public final class RecordAccumulator {
long timestamp,
byte[] key,
byte[] value,
+ Header[] headers,
Callback callback,
long maxTimeToBlock) throws InterruptedException {
// We keep track of the number of appending thread to make sure we do not miss batches in
// abortIncompleteBatches().
appendsInProgress.incrementAndGet();
ByteBuffer buffer = null;
+ if (headers == null) headers = Record.EMPTY_HEADERS;
try {
// check if we have an in-progress batch
Deque dq = getOrCreateDeque(tp);
synchronized (dq) {
if (closed)
throw new IllegalStateException("Cannot send after the producer is closed.");
- RecordAppendResult appendResult = tryAppend(timestamp, key, value, callback, dq);
+ RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq);
if (appendResult != null)
return appendResult;
}
// we don't have an in-progress record batch try to allocate a new batch
byte maxUsableMagic = apiVersions.maxUsableProduceMagic();
- int size = Math.max(this.batchSize, AbstractRecords.sizeInBytesUpperBound(maxUsableMagic, key, value));
+ int size = Math.max(this.batchSize, AbstractRecords.sizeInBytesUpperBound(maxUsableMagic, key, value, headers));
log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition());
buffer = free.allocate(size, maxTimeToBlock);
synchronized (dq) {
@@ -202,7 +207,7 @@ public final class RecordAccumulator {
if (closed)
throw new IllegalStateException("Cannot send after the producer is closed.");
- RecordAppendResult appendResult = tryAppend(timestamp, key, value, callback, dq);
+ RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq);
if (appendResult != null) {
// Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often...
return appendResult;
@@ -210,7 +215,7 @@ public final class RecordAccumulator {
MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer, maxUsableMagic);
ProducerBatch batch = new ProducerBatch(tp, recordsBuilder, time.milliseconds());
- FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, callback, time.milliseconds()));
+ FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, headers, callback, time.milliseconds()));
dq.addLast(batch);
incomplete.add(batch);
@@ -246,10 +251,10 @@ public final class RecordAccumulator {
* and memory records built) in one of the following cases (whichever comes first): right before send,
* if it is expired, or when the producer is closed.
*/
- private RecordAppendResult tryAppend(long timestamp, byte[] key, byte[] value, Callback callback, Deque deque) {
+ private RecordAppendResult tryAppend(long timestamp, byte[] key, byte[] value, Header[] headers, Callback callback, Deque deque) {
ProducerBatch last = deque.peekLast();
if (last != null) {
- FutureRecordMetadata future = last.tryAppend(timestamp, key, value, callback, time.milliseconds());
+ FutureRecordMetadata future = last.tryAppend(timestamp, key, value, headers, callback, time.milliseconds());
if (future == null)
last.closeForRecordAppends();
else
diff --git a/clients/src/main/java/org/apache/kafka/common/header/Header.java b/clients/src/main/java/org/apache/kafka/common/header/Header.java
new file mode 100644
index 00000000000..58869b41fb7
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/header/Header.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.header;
+
+public interface Header {
+
+ String key();
+
+ byte[] value();
+
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/header/Headers.java b/clients/src/main/java/org/apache/kafka/common/header/Headers.java
new file mode 100644
index 00000000000..1796d62944a
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/header/Headers.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.header;
+
+public interface Headers extends Iterable {
+
+ /**
+ * Adds a header (key inside), to the end, returning if the operation succeeded.
+ *
+ * @param header the Header to be added
+ * @return this instance of the Headers, once the header is added.
+ * @throws IllegalStateException is thrown if headers are in a read-only state.
+ */
+ Headers add(Header header) throws IllegalStateException;
+
+ /**
+ * Creates and adds a header, to the end, returning if the operation succeeded.
+ *
+ * @param key of the header to be added.
+ * @param value of the header to be added.
+ * @return this instance of the Headers, once the header is added.
+ * @throws IllegalStateException is thrown if headers are in a read-only state.
+ */
+ Headers add(String key, byte[] value) throws IllegalStateException;
+
+ /**
+ * Removes all headers for the given key returning if the operation succeeded.
+ *
+ * @param key to remove all headers for.
+ * @return this instance of the Headers, once the header is added.
+ * @throws IllegalStateException is thrown if headers are in a read-only state.
+ */
+ Headers remove(String key) throws IllegalStateException;
+
+ /**
+ * Returns just one (the very last) header for the given key, if present.
+ *
+ * @param key to get the last header for.
+ * @return this last header matching the given key, returns none if not present.
+ */
+ Header lastHeader(String key);
+
+ /**
+ * Returns all headers for the given key, in the order they were added in, if present.
+ *
+ * @param key to return the headers for.
+ * @return all headers for the given key, in the order they were added in, if NO headers are present an empty iterable is returned.
+ */
+ Iterable headers(String key);
+
+ /**
+ * Returns all headers as an array, in the order they were added in.
+ *
+ * @return the headers as a Header[], mutating this array will not affect the Headers, if NO headers are present an empty array is returned.
+ */
+ Header[] toArray();
+
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/record/Header.java b/clients/src/main/java/org/apache/kafka/common/header/internals/RecordHeader.java
similarity index 60%
rename from clients/src/main/java/org/apache/kafka/common/record/Header.java
rename to clients/src/main/java/org/apache/kafka/common/header/internals/RecordHeader.java
index 2ca077cc120..a6c5375dbb2 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/Header.java
+++ b/clients/src/main/java/org/apache/kafka/common/header/internals/RecordHeader.java
@@ -14,33 +14,42 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.kafka.common.record;
-
-import org.apache.kafka.common.utils.Utils;
+package org.apache.kafka.common.header.internals;
import java.nio.ByteBuffer;
+import java.util.Arrays;
import java.util.Objects;
-public class Header {
- private final String key;
- private final ByteBuffer value;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.utils.Utils;
- public Header(String key, ByteBuffer value) {
+public class RecordHeader implements Header {
+ private final String key;
+ private ByteBuffer valueBuffer;
+ private byte[] value;
+
+ public RecordHeader(String key, byte[] value) {
Objects.requireNonNull(key, "Null header keys are not permitted");
this.key = key;
this.value = value;
}
- public Header(String key, byte[] value) {
- this(key, Utils.wrapNullable(value));
+ public RecordHeader(String key, ByteBuffer valueBuffer) {
+ Objects.requireNonNull(key, "Null header keys are not permitted");
+ this.key = key;
+ this.valueBuffer = valueBuffer;
}
-
+
public String key() {
return key;
}
- public ByteBuffer value() {
- return value == null ? null : value.duplicate();
+ public byte[] value() {
+ if (value == null && valueBuffer != null) {
+ value = Utils.toArray(valueBuffer);
+ valueBuffer = null;
+ }
+ return value;
}
@Override
@@ -50,15 +59,21 @@ public class Header {
if (o == null || getClass() != o.getClass())
return false;
- Header header = (Header) o;
- return (key == null ? header.key == null : key.equals(header.key)) &&
- (value == null ? header.value == null : value.equals(header.value));
+ RecordHeader header = (RecordHeader) o;
+ return (key == null ? header.key == null : key.equals(header.key)) &&
+ Arrays.equals(value(), header.value());
}
@Override
public int hashCode() {
int result = key != null ? key.hashCode() : 0;
- result = 31 * result + (value != null ? value.hashCode() : 0);
+ result = 31 * result + Arrays.hashCode(value());
return result;
}
+
+ @Override
+ public String toString() {
+ return "RecordHeader(key = " + key + ", value = " + Arrays.toString(value()) + ")";
+ }
+
}
diff --git a/clients/src/main/java/org/apache/kafka/common/header/internals/RecordHeaders.java b/clients/src/main/java/org/apache/kafka/common/header/internals/RecordHeaders.java
new file mode 100644
index 00000000000..f23d7997ad3
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/header/internals/RecordHeaders.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.header.internals;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.utils.AbstractIterator;
+
+public class RecordHeaders implements Headers {
+
+ private final List headers;
+ private volatile boolean isReadOnly = false;
+
+ public RecordHeaders() {
+ this((Iterable) null);
+ }
+
+ public RecordHeaders(Header[] headers) {
+ if (headers == null) {
+ this.headers = new ArrayList<>();
+ } else {
+ this.headers = new ArrayList<>(Arrays.asList(headers));
+ }
+ }
+
+ public RecordHeaders(Iterable headers) {
+ //Use efficient copy constructor if possible, fallback to iteration otherwise
+ if (headers == null) {
+ this.headers = new ArrayList<>();
+ } else if (headers instanceof RecordHeaders) {
+ this.headers = new ArrayList<>(((RecordHeaders) headers).headers);
+ } else if (headers instanceof Collection) {
+ this.headers = new ArrayList<>((Collection) headers);
+ } else {
+ this.headers = new ArrayList<>();
+ Iterator iterator = headers.iterator();
+ while (iterator.hasNext()) {
+ this.headers.add(iterator.next());
+ }
+ }
+ }
+
+ @Override
+ public Headers add(Header header) throws IllegalStateException {
+ canWrite();
+ headers.add(header);
+ return this;
+ }
+
+ @Override
+ public Headers add(String key, byte[] value) throws IllegalStateException {
+ return add(new RecordHeader(key, value));
+ }
+
+ @Override
+ public Headers remove(String key) throws IllegalStateException {
+ canWrite();
+ checkKey(key);
+ Iterator iterator = iterator();
+ while (iterator.hasNext()) {
+ if (iterator.next().key().equals(key)) {
+ iterator.remove();
+ }
+ }
+ return this;
+ }
+
+ @Override
+ public Header lastHeader(String key) {
+ checkKey(key);
+ for (int i = headers.size() - 1; i >= 0; i--) {
+ Header header = headers.get(i);
+ if (header.key().equals(key)) {
+ return header;
+ }
+ }
+ return null;
+ }
+
+ @Override
+ public Iterable headers(final String key) {
+ checkKey(key);
+ return new Iterable() {
+ @Override
+ public Iterator iterator() {
+ return new FilterByKeyIterator(headers.iterator(), key);
+ }
+ };
+ }
+
+ @Override
+ public Iterator iterator() {
+ return closeAware(headers.iterator());
+ }
+
+ public void setReadOnly() {
+ this.isReadOnly = true;
+ }
+
+ public Header[] toArray() {
+ return headers.isEmpty() ? Record.EMPTY_HEADERS : headers.toArray(new Header[headers.size()]);
+ }
+
+ private void checkKey(String key) {
+ if (key == null) {
+ throw new IllegalArgumentException("key cannot be null.");
+ }
+ }
+
+ private void canWrite() {
+ if (isReadOnly) {
+ throw new IllegalStateException("RecordHeaders has been closed.");
+ }
+ }
+
+ private Iterator closeAware(final Iterator original) {
+ return new Iterator() {
+ @Override
+ public boolean hasNext() {
+ return original.hasNext();
+ }
+
+ public Header next() {
+ return original.next();
+ }
+
+ @Override
+ public void remove() {
+ canWrite();
+ original.remove();
+ }
+ };
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ RecordHeaders headers1 = (RecordHeaders) o;
+
+ return headers != null ? headers.equals(headers1.headers) : headers1.headers == null;
+ }
+
+ @Override
+ public int hashCode() {
+ return headers != null ? headers.hashCode() : 0;
+ }
+
+ @Override
+ public String toString() {
+ return "RecordHeaders(" +
+ "headers = " + headers +
+ ", isReadOnly = " + isReadOnly +
+ ')';
+ }
+
+ private static final class FilterByKeyIterator extends AbstractIterator {
+
+ private final Iterator original;
+ private final String key;
+
+ private FilterByKeyIterator(Iterator original, String key) {
+ this.original = original;
+ this.key = key;
+ }
+
+ protected Header makeNext() {
+ while (true) {
+ if (original.hasNext()) {
+ Header header = original.next();
+ if (!header.key().equals(key)) {
+ continue;
+ }
+
+ return header;
+ }
+ return this.allDone();
+ }
+ }
+ }
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java
index ddb2bc7e7b2..85fcb2ad0fe 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java
@@ -18,6 +18,7 @@ package org.apache.kafka.common.record;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.errors.CorruptRecordException;
+import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.utils.AbstractIterator;
import org.apache.kafka.common.utils.ByteBufferInputStream;
import org.apache.kafka.common.utils.ByteUtils;
diff --git a/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java b/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java
index decc06c7cfd..87df7e4a9c5 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.common.record;
+import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.utils.AbstractIterator;
import org.apache.kafka.common.utils.Utils;
@@ -164,9 +165,9 @@ public abstract class AbstractRecords implements Records {
return compressionType == CompressionType.NONE ? size : Math.min(Math.max(size / 2, 1024), 1 << 16);
}
- public static int sizeInBytesUpperBound(byte magic, byte[] key, byte[] value) {
+ public static int sizeInBytesUpperBound(byte magic, byte[] key, byte[] value, Header[] headers) {
if (magic >= RecordBatch.MAGIC_VALUE_V2)
- return DefaultRecordBatch.batchSizeUpperBound(key, value, Record.EMPTY_HEADERS);
+ return DefaultRecordBatch.batchSizeUpperBound(key, value, headers);
else
return Records.LOG_OVERHEAD + LegacyRecord.recordSize(magic, key, value);
}
diff --git a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java
index a4b1d11661f..e0794d85317 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java
@@ -16,6 +16,8 @@
*/
package org.apache.kafka.common.record;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.utils.ByteBufferOutputStream;
import org.apache.kafka.common.utils.ByteUtils;
import org.apache.kafka.common.utils.Checksums;
@@ -223,13 +225,12 @@ public class DefaultRecord implements Record {
ByteUtils.writeVarint(utf8Bytes.length, out);
out.write(utf8Bytes);
- ByteBuffer headerValue = header.value();
+ byte[] headerValue = header.value();
if (headerValue == null) {
ByteUtils.writeVarint(-1, out);
} else {
- int headerValueSize = headerValue.remaining();
- ByteUtils.writeVarint(headerValueSize, out);
- Utils.writeTo(out, headerValue, headerValueSize);
+ ByteUtils.writeVarint(headerValue.length, out);
+ out.write(headerValue);
}
}
@@ -414,7 +415,7 @@ public class DefaultRecord implements Record {
buffer.position(buffer.position() + headerValueSize);
}
- headers[i] = new Header(headerKey, headerValue);
+ headers[i] = new RecordHeader(headerKey, headerValue);
}
return new DefaultRecord(sizeInBytes, attributes, offset, timestamp, sequence, key, value, headers);
@@ -480,12 +481,11 @@ public class DefaultRecord implements Record {
int headerKeySize = Utils.utf8Length(headerKey);
size += ByteUtils.sizeOfVarint(headerKeySize) + headerKeySize;
- ByteBuffer headerValue = header.value();
+ byte[] headerValue = header.value();
if (headerValue == null) {
size += NULL_VARINT_SIZE_BYTES;
} else {
- int headerValueSize = headerValue.remaining();
- size += ByteUtils.sizeOfVarint(headerValueSize) + headerValueSize;
+ size += ByteUtils.sizeOfVarint(headerValue.length) + headerValue.length;
}
}
return size;
diff --git a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
index 2680f300b35..93cd2eb665d 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
@@ -17,6 +17,7 @@
package org.apache.kafka.common.record;
import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.utils.ByteBufferInputStream;
import org.apache.kafka.common.utils.ByteUtils;
import org.apache.kafka.common.utils.CloseableIterator;
diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
index eb521346ff3..b9d65a562b8 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
@@ -17,6 +17,7 @@
package org.apache.kafka.common.record;
import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.protocol.types.Struct;
import org.apache.kafka.common.utils.ByteBufferOutputStream;
@@ -412,6 +413,7 @@ public class MemoryRecordsBuilder {
return appendWithOffset(offset, false, record.timestamp(), record.key(), record.value(), record.headers());
}
+
/**
* Append a new record at the next sequential offset.
* @param timestamp The record timestamp
@@ -420,7 +422,19 @@ public class MemoryRecordsBuilder {
* @return crc of the record
*/
public long append(long timestamp, ByteBuffer key, ByteBuffer value) {
- return appendWithOffset(nextSequentialOffset(), false, timestamp, key, value, Record.EMPTY_HEADERS);
+ return append(timestamp, key, value, Record.EMPTY_HEADERS);
+ }
+
+ /**
+ * Append a new record at the next sequential offset.
+ * @param timestamp The record timestamp
+ * @param key The record key
+ * @param value The record value
+ * @param headers The record headers if there are any
+ * @return crc of the record
+ */
+ public long append(long timestamp, ByteBuffer key, ByteBuffer value, Header[] headers) {
+ return appendWithOffset(nextSequentialOffset(), false, timestamp, key, value, headers);
}
/**
@@ -431,7 +445,19 @@ public class MemoryRecordsBuilder {
* @return crc of the record
*/
public long append(long timestamp, byte[] key, byte[] value) {
- return append(timestamp, wrapNullable(key), wrapNullable(value));
+ return append(timestamp, wrapNullable(key), wrapNullable(value), Record.EMPTY_HEADERS);
+ }
+
+ /**
+ * Append a new record at the next sequential offset.
+ * @param timestamp The record timestamp
+ * @param key The record key
+ * @param value The record value
+ * @param headers The record headers if there are any
+ * @return crc of the record
+ */
+ public long append(long timestamp, byte[] key, byte[] value, Header[] headers) {
+ return append(timestamp, wrapNullable(key), wrapNullable(value), headers);
}
/**
diff --git a/clients/src/main/java/org/apache/kafka/common/record/Record.java b/clients/src/main/java/org/apache/kafka/common/record/Record.java
index 437ee3b22c7..fdf41b31612 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/Record.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/Record.java
@@ -18,6 +18,8 @@ package org.apache.kafka.common.record;
import java.nio.ByteBuffer;
+import org.apache.kafka.common.header.Header;
+
/**
* A log record is a tuple consisting of a unique offset in the log, a sequence number assigned by
* the producer, a timestamp, a key and a value.
diff --git a/clients/src/main/java/org/apache/kafka/common/record/SimpleRecord.java b/clients/src/main/java/org/apache/kafka/common/record/SimpleRecord.java
index 0a5cbcf3008..fd361c417f3 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/SimpleRecord.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/SimpleRecord.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.common.record;
+import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.utils.Utils;
import java.nio.ByteBuffer;
diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java b/clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java
index 383f6e345b0..53c3ba2ce6f 100644
--- a/clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java
+++ b/clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java
@@ -35,7 +35,7 @@ public interface Deserializer extends Closeable {
* @param isKey whether is for key or value
*/
public void configure(Map configs, boolean isKey);
-
+
/**
* Deserialize a record value from a bytearray into a value or object.
* @param topic topic associated with the data
diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/ExtendedDeserializer.java b/clients/src/main/java/org/apache/kafka/common/serialization/ExtendedDeserializer.java
new file mode 100644
index 00000000000..5de154ae5dc
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/serialization/ExtendedDeserializer.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.serialization;
+
+import java.util.Map;
+
+import org.apache.kafka.common.header.Headers;
+
+public interface ExtendedDeserializer extends Deserializer {
+
+ T deserialize(String topic, Headers headers, byte[] data);
+
+ class Wrapper implements ExtendedDeserializer {
+
+ private final Deserializer deserializer;
+
+ public Wrapper(Deserializer deserializer) {
+ this.deserializer = deserializer;
+ }
+
+
+ @Override
+ public T deserialize(String topic, Headers headers, byte[] data) {
+ return deserialize(topic, data);
+ }
+
+ @Override
+ public void configure(Map configs, boolean isKey) {
+ deserializer.configure(configs, isKey);
+ }
+
+ @Override
+ public T deserialize(String topic, byte[] data) {
+ return deserializer.deserialize(topic, data);
+ }
+
+ @Override
+ public void close() {
+ deserializer.close();
+ }
+ }
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/ExtendedSerializer.java b/clients/src/main/java/org/apache/kafka/common/serialization/ExtendedSerializer.java
new file mode 100644
index 00000000000..87406318cb1
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/serialization/ExtendedSerializer.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.serialization;
+
+import java.util.Map;
+
+import org.apache.kafka.common.header.Headers;
+
+public interface ExtendedSerializer extends Serializer {
+
+ byte[] serialize(String topic, Headers headers, T data);
+
+ class Wrapper implements ExtendedSerializer {
+
+ private final Serializer serializer;
+
+ public Wrapper(Serializer serializer) {
+ this.serializer = serializer;
+ }
+
+ @Override
+ public byte[] serialize(String topic, Headers headers, T data) {
+ return serialize(topic, data);
+ }
+
+ @Override
+ public void configure(Map configs, boolean isKey) {
+ serializer.configure(configs, isKey);
+ }
+
+ @Override
+ public byte[] serialize(String topic, T data) {
+ return serializer.serialize(topic, data);
+ }
+
+ @Override
+ public void close() {
+ serializer.close();
+ }
+ }
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java b/clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java
index 233a6583bfd..43d234c46d3 100644
--- a/clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java
+++ b/clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java
@@ -43,7 +43,6 @@ public interface Serializer extends Closeable {
*/
public byte[] serialize(String topic, T data);
-
/**
* Close this serializer.
* This method has to be idempotent if the serializer is used in KafkaProducer because it might be called
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
index a7d2a1b25e0..1a4de98bf8c 100755
--- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
@@ -185,6 +185,15 @@ public class Utils {
return toArray(buffer, 0, buffer.remaining());
}
+ /**
+ * Read a byte array from its current position given the size in the buffer
+ * @param buffer The buffer to read from
+ * @param size The number of bytes to read into the array
+ */
+ public static byte[] toArray(ByteBuffer buffer, int size) {
+ return toArray(buffer, 0, size);
+ }
+
/**
* Convert a ByteBuffer to a nullable array.
* @param buffer The buffer to convert
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordTest.java
index 9e622331ee3..a8a52834883 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.clients.consumer;
+import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.record.TimestampType;
import org.junit.Test;
@@ -42,6 +43,7 @@ public class ConsumerRecordTest {
assertEquals(ConsumerRecord.NULL_CHECKSUM, record.checksum());
assertEquals(ConsumerRecord.NULL_SIZE, record.serializedKeySize());
assertEquals(ConsumerRecord.NULL_SIZE, record.serializedValueSize());
+ assertEquals(new RecordHeaders(), record.headers());
}
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index b41e6acc135..f0dd09c0d4c 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -36,12 +36,14 @@ import org.apache.kafka.common.errors.RecordTooLargeException;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
+import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.ControlRecordType;
+import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.record.LegacyRecord;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.MemoryRecordsBuilder;
@@ -76,6 +78,7 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@@ -335,6 +338,50 @@ public class FetcherTest {
}
}
+ @Test
+ public void testHeaders() {
+ Fetcher fetcher = createFetcher(subscriptions, new Metrics(time));
+
+ MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE, TimestampType.CREATE_TIME, 1L);
+ builder.append(0L, "key".getBytes(), "value-1".getBytes());
+
+ Header[] headersArray = new Header[1];
+ headersArray[0] = new RecordHeader("headerKey", "headerValue".getBytes(StandardCharsets.UTF_8));
+ builder.append(0L, "key".getBytes(), "value-2".getBytes(), headersArray);
+
+ Header[] headersArray2 = new Header[2];
+ headersArray2[0] = new RecordHeader("headerKey", "headerValue".getBytes(StandardCharsets.UTF_8));
+ headersArray2[1] = new RecordHeader("headerKey", "headerValue2".getBytes(StandardCharsets.UTF_8));
+ builder.append(0L, "key".getBytes(), "value-3".getBytes(), headersArray2);
+
+ MemoryRecords memoryRecords = builder.build();
+
+ List> records;
+ subscriptions.assignFromUser(singleton(tp1));
+ subscriptions.seek(tp1, 1);
+
+ client.prepareResponse(matchesOffset(tp1, 1), fetchResponse(memoryRecords, Errors.NONE, 100L, 0));
+
+ assertEquals(1, fetcher.sendFetches());
+ consumerClient.poll(0);
+ records = fetcher.fetchedRecords().get(tp1);
+
+ assertEquals(3, records.size());
+
+ Iterator> recordIterator = records.iterator();
+
+ ConsumerRecord record = recordIterator.next();
+ assertNull(record.headers().lastHeader("headerKey"));
+
+ record = recordIterator.next();
+ assertEquals("headerValue", new String(record.headers().lastHeader("headerKey").value(), StandardCharsets.UTF_8));
+ assertEquals("headerKey", record.headers().lastHeader("headerKey").key());
+
+ record = recordIterator.next();
+ assertEquals("headerValue2", new String(record.headers().lastHeader("headerKey").value(), StandardCharsets.UTF_8));
+ assertEquals("headerKey", record.headers().lastHeader("headerKey").key());
+ }
+
@Test
public void testFetchMaxPollRecords() {
Fetcher fetcher = createFetcher(subscriptions, new Metrics(time), 2);
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
index 819f15eff39..514426d40c0 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
@@ -25,8 +25,10 @@ import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.internals.ClusterResourceListeners;
+import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.network.Selectable;
import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.common.serialization.ExtendedSerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
@@ -51,6 +53,7 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
+import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@RunWith(PowerMockRunner.class)
@@ -347,5 +350,63 @@ public class KafkaProducerTest {
}
Assert.assertTrue("Topic should still exist in metadata", metadata.containsTopic(topic));
}
+
+ @PrepareOnlyThisForTest(Metadata.class)
+ @Test
+ public void testHeaders() throws Exception {
+ Properties props = new Properties();
+ props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
+ ExtendedSerializer keySerializer = PowerMock.createNiceMock(ExtendedSerializer.class);
+ ExtendedSerializer valueSerializer = PowerMock.createNiceMock(ExtendedSerializer.class);
+
+ KafkaProducer producer = new KafkaProducer<>(props, keySerializer, valueSerializer);
+ Metadata metadata = PowerMock.createNiceMock(Metadata.class);
+ MemberModifier.field(KafkaProducer.class, "metadata").set(producer, metadata);
+
+ String topic = "topic";
+ Collection nodes = Collections.singletonList(new Node(0, "host1", 1000));
+
+ final Cluster cluster = new Cluster(
+ "dummy",
+ Collections.singletonList(new Node(0, "host1", 1000)),
+ Arrays.asList(new PartitionInfo(topic, 0, null, null, null)),
+ Collections.emptySet(),
+ Collections.emptySet());
+
+
+ EasyMock.expect(metadata.fetch()).andReturn(cluster).anyTimes();
+
+ PowerMock.replay(metadata);
+
+ String value = "value";
+
+ ProducerRecord record = new ProducerRecord<>(topic, value);
+ EasyMock.expect(keySerializer.serialize(topic, record.headers(), null)).andReturn(null).once();
+ EasyMock.expect(valueSerializer.serialize(topic, record.headers(), value)).andReturn(value.getBytes()).once();
+
+ PowerMock.replay(keySerializer);
+ PowerMock.replay(valueSerializer);
+
+
+ //ensure headers can be mutated pre send.
+ record.headers().add(new RecordHeader("test", "header2".getBytes()));
+
+ producer.send(record, null);
+
+ //ensure headers are closed and cannot be mutated post send
+ try {
+ record.headers().add(new RecordHeader("test", "test".getBytes()));
+ fail("Expected IllegalStateException to be raised");
+ } catch (IllegalStateException ise) {
+ //expected
+ }
+
+ //ensure existing headers are not changed, and last header for key is still original value
+ assertTrue(Arrays.equals(record.headers().lastHeader("test").value(), "header2".getBytes()));
+
+ PowerMock.verify(valueSerializer);
+ PowerMock.verify(keySerializer);
+
+ }
}
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/ProducerRecordTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/ProducerRecordTest.java
index b5a7a60cfa3..dc3c89882df 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/ProducerRecordTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/ProducerRecordTest.java
@@ -46,7 +46,7 @@ public class ProducerRecordTest {
ProducerRecord valueMisMatch = new ProducerRecord<>("test", 1, "key", 2);
assertFalse(producerRecord.equals(valueMisMatch));
- ProducerRecord nullFieldsRecord = new ProducerRecord<>("topic", null, null, null, null);
+ ProducerRecord nullFieldsRecord = new ProducerRecord<>("topic", null, null, null, null, null);
assertEquals(nullFieldsRecord, nullFieldsRecord);
assertEquals(nullFieldsRecord.hashCode(), nullFieldsRecord.hashCode());
}
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
index 0f1cb549546..af599caf389 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
@@ -99,7 +99,7 @@ public class RecordAccumulatorTest {
int appends = expectedNumAppends(batchSize);
for (int i = 0; i < appends; i++) {
// append to the first batch
- accum.append(tp1, 0L, key, value, null, maxBlockTimeMs);
+ accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs);
Deque partitionBatches = accum.batches().get(tp1);
assertEquals(1, partitionBatches.size());
@@ -110,7 +110,7 @@ public class RecordAccumulatorTest {
// this append doesn't fit in the first batch, so a new batch is created and the first batch is closed
- accum.append(tp1, 0L, key, value, null, maxBlockTimeMs);
+ accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs);
Deque partitionBatches = accum.batches().get(tp1);
assertEquals(2, partitionBatches.size());
Iterator partitionBatchesIterator = partitionBatches.iterator();
@@ -136,7 +136,7 @@ public class RecordAccumulatorTest {
byte[] value = new byte[2 * batchSize];
RecordAccumulator accum = new RecordAccumulator(batchSize + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * 1024,
CompressionType.NONE, 0L, 100L, metrics, time, new ApiVersions(), null);
- accum.append(tp1, 0L, key, value, null, maxBlockTimeMs);
+ accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs);
assertEquals("Our partition's leader should be ready", Collections.singleton(node1), accum.ready(cluster, time.milliseconds()).readyNodes);
Deque batches = accum.batches().get(tp1);
@@ -160,7 +160,7 @@ public class RecordAccumulatorTest {
long lingerMs = 10L;
RecordAccumulator accum = new RecordAccumulator(1024 + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * 1024,
CompressionType.NONE, lingerMs, 100L, metrics, time, new ApiVersions(), null);
- accum.append(tp1, 0L, key, value, null, maxBlockTimeMs);
+ accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs);
assertEquals("No partitions should be ready", 0, accum.ready(cluster, time.milliseconds()).readyNodes.size());
time.sleep(10);
assertEquals("Our partition's leader should be ready", Collections.singleton(node1), accum.ready(cluster, time.milliseconds()).readyNodes);
@@ -183,7 +183,7 @@ public class RecordAccumulatorTest {
List partitions = asList(tp1, tp2);
for (TopicPartition tp : partitions) {
for (int i = 0; i < appends; i++)
- accum.append(tp, 0L, key, value, null, maxBlockTimeMs);
+ accum.append(tp, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs);
}
assertEquals("Partition's leader should be ready", Collections.singleton(node1), accum.ready(cluster, time.milliseconds()).readyNodes);
@@ -205,7 +205,7 @@ public class RecordAccumulatorTest {
public void run() {
for (int i = 0; i < msgs; i++) {
try {
- accum.append(new TopicPartition(topic, i % numParts), 0L, key, value, null, maxBlockTimeMs);
+ accum.append(new TopicPartition(topic, i % numParts), 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs);
} catch (Exception e) {
e.printStackTrace();
}
@@ -249,7 +249,7 @@ public class RecordAccumulatorTest {
// Partition on node1 only
for (int i = 0; i < appends; i++)
- accum.append(tp1, 0L, key, value, null, maxBlockTimeMs);
+ accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs);
RecordAccumulator.ReadyCheckResult result = accum.ready(cluster, time.milliseconds());
assertEquals("No nodes should be ready.", 0, result.readyNodes.size());
assertEquals("Next check time should be the linger time", lingerMs, result.nextReadyCheckDelayMs);
@@ -258,14 +258,14 @@ public class RecordAccumulatorTest {
// Add partition on node2 only
for (int i = 0; i < appends; i++)
- accum.append(tp3, 0L, key, value, null, maxBlockTimeMs);
+ accum.append(tp3, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs);
result = accum.ready(cluster, time.milliseconds());
assertEquals("No nodes should be ready.", 0, result.readyNodes.size());
assertEquals("Next check time should be defined by node1, half remaining linger time", lingerMs / 2, result.nextReadyCheckDelayMs);
// Add data for another partition on node1, enough to make data sendable immediately
for (int i = 0; i < appends + 1; i++)
- accum.append(tp2, 0L, key, value, null, maxBlockTimeMs);
+ accum.append(tp2, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs);
result = accum.ready(cluster, time.milliseconds());
assertEquals("Node1 should be ready", Collections.singleton(node1), result.readyNodes);
// Note this can actually be < linger time because it may use delays from partitions that aren't sendable
@@ -281,7 +281,7 @@ public class RecordAccumulatorTest {
CompressionType.NONE, lingerMs, retryBackoffMs, metrics, time, new ApiVersions(), null);
long now = time.milliseconds();
- accum.append(tp1, 0L, key, value, null, maxBlockTimeMs);
+ accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs);
RecordAccumulator.ReadyCheckResult result = accum.ready(cluster, now + lingerMs + 1);
assertEquals("Node1 should be ready", Collections.singleton(node1), result.readyNodes);
Map> batches = accum.drain(cluster, result.readyNodes, Integer.MAX_VALUE, now + lingerMs + 1);
@@ -293,7 +293,7 @@ public class RecordAccumulatorTest {
accum.reenqueue(batches.get(0).get(0), now);
// Put message for partition 1 into accumulator
- accum.append(tp2, 0L, key, value, null, maxBlockTimeMs);
+ accum.append(tp2, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs);
result = accum.ready(cluster, now + lingerMs + 1);
assertEquals("Node1 should be ready", Collections.singleton(node1), result.readyNodes);
@@ -318,7 +318,7 @@ public class RecordAccumulatorTest {
final RecordAccumulator accum = new RecordAccumulator(4 * 1024 + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 64 * 1024,
CompressionType.NONE, lingerMs, 100L, metrics, time, new ApiVersions(), null);
for (int i = 0; i < 100; i++)
- accum.append(new TopicPartition(topic, i % 3), 0L, key, value, null, maxBlockTimeMs);
+ accum.append(new TopicPartition(topic, i % 3), 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs);
RecordAccumulator.ReadyCheckResult result = accum.ready(cluster, time.milliseconds());
assertEquals("No nodes should be ready.", 0, result.readyNodes.size());
@@ -351,7 +351,7 @@ public class RecordAccumulatorTest {
public void testAwaitFlushComplete() throws Exception {
RecordAccumulator accum = new RecordAccumulator(4 * 1024 + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 64 * 1024,
CompressionType.NONE, Long.MAX_VALUE, 100L, metrics, time, new ApiVersions(), null);
- accum.append(new TopicPartition(topic, 0), 0L, key, value, null, maxBlockTimeMs);
+ accum.append(new TopicPartition(topic, 0), 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs);
accum.beginFlush();
assertTrue(accum.flushInProgress());
@@ -379,7 +379,7 @@ public class RecordAccumulatorTest {
}
}
for (int i = 0; i < 100; i++)
- accum.append(new TopicPartition(topic, i % 3), 0L, key, value, new TestCallback(), maxBlockTimeMs);
+ accum.append(new TopicPartition(topic, i % 3), 0L, key, value, null, new TestCallback(), maxBlockTimeMs);
RecordAccumulator.ReadyCheckResult result = accum.ready(cluster, time.milliseconds());
assertEquals("No nodes should be ready.", 0, result.readyNodes.size());
@@ -404,11 +404,11 @@ public class RecordAccumulatorTest {
// Test batches not in retry
for (int i = 0; i < appends; i++) {
- accum.append(tp1, 0L, key, value, null, maxBlockTimeMs);
+ accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs);
assertEquals("No partitions should be ready.", 0, accum.ready(cluster, time.milliseconds()).readyNodes.size());
}
// Make the batches ready due to batch full
- accum.append(tp1, 0L, key, value, null, 0);
+ accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, 0);
Set readyNodes = accum.ready(cluster, time.milliseconds()).readyNodes;
assertEquals("Our partition's leader should be ready", Collections.singleton(node1), readyNodes);
// Advance the clock to expire the batch.
@@ -438,7 +438,7 @@ public class RecordAccumulatorTest {
// Test batches in retry.
// Create a retried batch
- accum.append(tp1, 0L, key, value, null, 0);
+ accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, 0);
time.sleep(lingerMs);
readyNodes = accum.ready(cluster, time.milliseconds()).readyNodes;
assertEquals("Our partition's leader should be ready", Collections.singleton(node1), readyNodes);
@@ -479,7 +479,7 @@ public class RecordAccumulatorTest {
if (exception instanceof TimeoutException) {
expiryCallbackCount.incrementAndGet();
try {
- accum.append(tp1, 0L, key, value, null, maxBlockTimeMs);
+ accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs);
} catch (InterruptedException e) {
throw new RuntimeException("Unexpected interruption", e);
}
@@ -489,7 +489,7 @@ public class RecordAccumulatorTest {
};
for (int i = 0; i < messagesPerBatch + 1; i++)
- accum.append(tp1, 0L, key, value, callback, maxBlockTimeMs);
+ accum.append(tp1, 0L, key, value, null, callback, maxBlockTimeMs);
assertEquals(2, accum.batches().get(tp1).size());
assertTrue("First batch not full", accum.batches().get(tp1).peekFirst().isFull());
@@ -514,7 +514,7 @@ public class RecordAccumulatorTest {
CompressionType.NONE, 10, 100L, metrics, time, new ApiVersions(), null);
int appends = expectedNumAppends(batchSize);
for (int i = 0; i < appends; i++) {
- accum.append(tp1, 0L, key, value, null, maxBlockTimeMs);
+ accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs);
assertEquals("No partitions should be ready.", 0, accum.ready(cluster, now).readyNodes.size());
}
time.sleep(2000);
@@ -549,7 +549,7 @@ public class RecordAccumulatorTest {
(short) 0, (short) 2))));
RecordAccumulator accum = new RecordAccumulator(batchSize + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * batchSize,
CompressionType.NONE, 10, 100L, metrics, time, apiVersions, new TransactionManager());
- accum.append(tp1, 0L, key, value, null, 0);
+ accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, 0);
}
/**
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
index 7de378da2d1..934c89560ca 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
@@ -96,7 +96,7 @@ public class SenderTest {
@Test
public void testSimple() throws Exception {
long offset = 0;
- Future future = accumulator.append(tp0, 0L, "key".getBytes(), "value".getBytes(), null, MAX_BLOCK_TIMEOUT).future;
+ Future future = accumulator.append(tp0, 0L, "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
sender.run(time.milliseconds()); // connect
sender.run(time.milliseconds()); // send produce request
assertEquals("We should have a single produce request in flight.", 1, client.inFlightRequestCount());
@@ -121,7 +121,7 @@ public class SenderTest {
apiVersions.update("0", NodeApiVersions.create());
Future future = accumulator.append(tp0, 0L, "key".getBytes(), "value".getBytes(),
- null, MAX_BLOCK_TIMEOUT).future;
+ null, null, MAX_BLOCK_TIMEOUT).future;
// now the partition leader supports only v2
apiVersions.update("0", NodeApiVersions.create(Collections.singleton(
@@ -161,14 +161,14 @@ public class SenderTest {
apiVersions.update("0", NodeApiVersions.create());
Future future1 = accumulator.append(tp0, 0L, "key".getBytes(), "value".getBytes(),
- null, MAX_BLOCK_TIMEOUT).future;
+ null, null, MAX_BLOCK_TIMEOUT).future;
// now the partition leader supports only v2
apiVersions.update("0", NodeApiVersions.create(Collections.singleton(
new ApiVersionsResponse.ApiVersion(ApiKeys.PRODUCE.id, (short) 0, (short) 2))));
Future future2 = accumulator.append(tp1, 0L, "key".getBytes(), "value".getBytes(),
- null, MAX_BLOCK_TIMEOUT).future;
+ null, null, MAX_BLOCK_TIMEOUT).future;
// start off support produce request v3
apiVersions.update("0", NodeApiVersions.create());
@@ -212,7 +212,7 @@ public class SenderTest {
public void testQuotaMetrics() throws Exception {
final long offset = 0;
for (int i = 1; i <= 3; i++) {
- accumulator.append(tp0, 0L, "key".getBytes(), "value".getBytes(), null, MAX_BLOCK_TIMEOUT);
+ accumulator.append(tp0, 0L, "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT);
sender.run(time.milliseconds()); // send produce request
client.respond(produceResponse(tp0, offset, Errors.NONE, 100 * i));
sender.run(time.milliseconds());
@@ -245,7 +245,7 @@ public class SenderTest {
apiVersions
);
// do a successful retry
- Future future = accumulator.append(tp0, 0L, "key".getBytes(), "value".getBytes(), null, MAX_BLOCK_TIMEOUT).future;
+ Future future = accumulator.append(tp0, 0L, "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
sender.run(time.milliseconds()); // connect
sender.run(time.milliseconds()); // send produce request
String id = client.requests().peek().destination();
@@ -269,7 +269,7 @@ public class SenderTest {
assertEquals(offset, future.get().offset());
// do an unsuccessful retry
- future = accumulator.append(tp0, 0L, "key".getBytes(), "value".getBytes(), null, MAX_BLOCK_TIMEOUT).future;
+ future = accumulator.append(tp0, 0L, "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
sender.run(time.milliseconds()); // send produce request
for (int i = 0; i < maxRetries + 1; i++) {
client.disconnect(client.requests().peek().destination());
@@ -309,7 +309,7 @@ public class SenderTest {
// Send the first message.
TopicPartition tp2 = new TopicPartition("test", 1);
- accumulator.append(tp2, 0L, "key1".getBytes(), "value1".getBytes(), null, MAX_BLOCK_TIMEOUT);
+ accumulator.append(tp2, 0L, "key1".getBytes(), "value1".getBytes(), null, null, MAX_BLOCK_TIMEOUT);
sender.run(time.milliseconds()); // connect
sender.run(time.milliseconds()); // send produce request
String id = client.requests().peek().destination();
@@ -321,7 +321,7 @@ public class SenderTest {
time.sleep(900);
// Now send another message to tp2
- accumulator.append(tp2, 0L, "key2".getBytes(), "value2".getBytes(), null, MAX_BLOCK_TIMEOUT);
+ accumulator.append(tp2, 0L, "key2".getBytes(), "value2".getBytes(), null, null, MAX_BLOCK_TIMEOUT);
// Update metadata before sender receives response from broker 0. Now partition 2 moves to broker 0
Cluster cluster2 = TestUtils.singletonCluster("test", 2);
@@ -344,7 +344,7 @@ public class SenderTest {
long offset = 0;
metadata.update(Cluster.empty(), Collections.emptySet(), time.milliseconds());
- Future future = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, MAX_BLOCK_TIMEOUT).future;
+ Future future = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
sender.run(time.milliseconds());
assertTrue("Topic not added to metadata", metadata.containsTopic(tp0.topic()));
metadata.update(cluster, Collections.emptySet(), time.milliseconds());
@@ -360,7 +360,7 @@ public class SenderTest {
time.sleep(Metadata.TOPIC_EXPIRY_MS);
metadata.update(Cluster.empty(), Collections.emptySet(), time.milliseconds());
assertFalse("Unused topic has not been expired", metadata.containsTopic(tp0.topic()));
- future = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, MAX_BLOCK_TIMEOUT).future;
+ future = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
sender.run(time.milliseconds());
assertTrue("Topic not added to metadata", metadata.containsTopic(tp0.topic()));
metadata.update(cluster, Collections.emptySet(), time.milliseconds());
@@ -416,7 +416,7 @@ public class SenderTest {
apiVersions
);
- Future responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, MAX_BLOCK_TIMEOUT).future;
+ Future responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
client.prepareResponse(new MockClient.RequestMatcher() {
@Override
public boolean matches(AbstractRequest body) {
@@ -469,7 +469,7 @@ public class SenderTest {
apiVersions
);
- Future responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, MAX_BLOCK_TIMEOUT).future;
+ Future responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
sender.run(time.milliseconds()); // connect.
sender.run(time.milliseconds()); // send.
String id = client.requests().peek().destination();
@@ -518,7 +518,7 @@ public class SenderTest {
apiVersions
);
- Future responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, MAX_BLOCK_TIMEOUT).future;
+ Future responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
sender.run(time.milliseconds()); // connect.
sender.run(time.milliseconds()); // send.
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
index 7c5b2b57fcf..a1efa58bfcf 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
@@ -33,6 +33,7 @@ import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.MutableRecordBatch;
+import org.apache.kafka.common.record.Record;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.AddOffsetsToTxnRequest;
@@ -166,7 +167,7 @@ public class TransactionManagerTest {
transactionManager.maybeAddPartitionToTransaction(tp0);
Future responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
- "value".getBytes(), null, MAX_BLOCK_TIMEOUT).future;
+ "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT).future;
assertFalse(responseFuture.isDone());
prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, epoch, pid);
@@ -309,7 +310,7 @@ public class TransactionManagerTest {
transactionManager.maybeAddPartitionToTransaction(tp0);
Future responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
- "value".getBytes(), null, MAX_BLOCK_TIMEOUT).future;
+ "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT).future;
assertFalse(responseFuture.isDone());
@@ -365,7 +366,7 @@ public class TransactionManagerTest {
transactionManager.maybeAddPartitionToTransaction(tp0);
Future responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
- "value".getBytes(), null, MAX_BLOCK_TIMEOUT).future;
+ "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT).future;
assertFalse(responseFuture.isDone());
prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, epoch, pid);
@@ -380,7 +381,7 @@ public class TransactionManagerTest {
// In the mean time, the user does a second produce to a different partition
transactionManager.maybeAddPartitionToTransaction(tp1);
Future secondResponseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
- "value".getBytes(), null, MAX_BLOCK_TIMEOUT).future;
+ "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT).future;
prepareAddPartitionsToTxnResponse(Errors.NONE, tp1, epoch, pid);
prepareProduceResponse(Errors.NONE, pid, epoch);
@@ -426,7 +427,7 @@ public class TransactionManagerTest {
transactionManager.maybeAddPartitionToTransaction(tp0);
Future responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
- "value".getBytes(), null, MAX_BLOCK_TIMEOUT).future;
+ "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT).future;
assertFalse(responseFuture.isDone());
prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, epoch, pid);
@@ -460,7 +461,7 @@ public class TransactionManagerTest {
transactionManager.maybeAddPartitionToTransaction(tp0);
Future responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
- "value".getBytes(), new MockCallback(transactionManager), MAX_BLOCK_TIMEOUT).future;
+ "value".getBytes(), Record.EMPTY_HEADERS, new MockCallback(transactionManager), MAX_BLOCK_TIMEOUT).future;
FutureTransactionalResult commitResult = transactionManager.beginCommittingTransaction();
assertFalse(responseFuture.isDone());
diff --git a/clients/src/test/java/org/apache/kafka/common/header/internals/RecordHeadersTest.java b/clients/src/test/java/org/apache/kafka/common/header/internals/RecordHeadersTest.java
new file mode 100644
index 00000000000..39c1c9c9a7b
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/header/internals/RecordHeadersTest.java
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.header.internals;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Iterator;
+
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.Headers;
+import org.junit.Test;
+
+public class RecordHeadersTest {
+
+ @Test
+ public void testAdd() {
+ Headers headers = new RecordHeaders();
+ headers.add(new RecordHeader("key", "value".getBytes()));
+
+ Header header = headers.iterator().next();
+ assertHeader("key", "value", header);
+
+ headers.add(new RecordHeader("key2", "value2".getBytes()));
+
+ assertHeader("key2", "value2", headers.lastHeader("key2"));
+ assertEquals(2, getCount(headers));
+ }
+
+ @Test
+ public void testRemove() {
+ Headers headers = new RecordHeaders();
+ headers.add(new RecordHeader("key", "value".getBytes()));
+
+ assertTrue(headers.iterator().hasNext());
+
+ headers.remove("key");
+
+ assertFalse(headers.iterator().hasNext());
+ }
+
+ @Test
+ public void testAddRemoveInterleaved() {
+ Headers headers = new RecordHeaders();
+ headers.add(new RecordHeader("key", "value".getBytes()));
+ headers.add(new RecordHeader("key2", "value2".getBytes()));
+
+ assertTrue(headers.iterator().hasNext());
+
+ headers.remove("key");
+
+ assertEquals(1, getCount(headers));
+
+ headers.add(new RecordHeader("key3", "value3".getBytes()));
+
+ assertNull(headers.lastHeader("key"));
+
+ assertHeader("key2", "value2", headers.lastHeader("key2"));
+
+ assertHeader("key3", "value3", headers.lastHeader("key3"));
+
+ assertEquals(2, getCount(headers));
+
+ headers.remove("key2");
+
+ assertNull(headers.lastHeader("key"));
+
+ assertNull(headers.lastHeader("key2"));
+
+ assertHeader("key3", "value3", headers.lastHeader("key3"));
+
+ assertEquals(1, getCount(headers));
+
+ headers.add(new RecordHeader("key3", "value4".getBytes()));
+
+ assertHeader("key3", "value4", headers.lastHeader("key3"));
+
+ assertEquals(2, getCount(headers));
+
+ headers.add(new RecordHeader("key", "valueNew".getBytes()));
+
+ assertEquals(3, getCount(headers));
+
+
+ assertHeader("key", "valueNew", headers.lastHeader("key"));
+
+ headers.remove("key3");
+
+ assertEquals(1, getCount(headers));
+
+ assertNull(headers.lastHeader("key2"));
+
+ headers.remove("key");
+
+ assertFalse(headers.iterator().hasNext());
+ }
+
+ @Test
+ public void testLastHeader() {
+ Headers headers = new RecordHeaders();
+ headers.add(new RecordHeader("key", "value".getBytes()));
+ headers.add(new RecordHeader("key", "value2".getBytes()));
+ headers.add(new RecordHeader("key", "value3".getBytes()));
+
+ assertHeader("key", "value3", headers.lastHeader("key"));
+ assertEquals(3, getCount(headers));
+
+ }
+
+ @Test
+ public void testReadOnly() throws IOException {
+ RecordHeaders headers = new RecordHeaders();
+ headers.add(new RecordHeader("key", "value".getBytes()));
+ Iterator headerIteratorBeforeClose = headers.iterator();
+ headers.setReadOnly();
+ try {
+ headers.add(new RecordHeader("key", "value".getBytes()));
+ fail("IllegalStateException expected as headers are closed");
+ } catch (IllegalStateException ise) {
+ //expected
+ }
+
+ try {
+ headers.remove("key");
+ fail("IllegalStateException expected as headers are closed");
+ } catch (IllegalStateException ise) {
+ //expected
+ }
+
+ try {
+ Iterator headerIterator = headers.iterator();
+ headerIterator.next();
+ headerIterator.remove();
+ fail("IllegalStateException expected as headers are closed");
+ } catch (IllegalStateException ise) {
+ //expected
+ }
+
+ try {
+ headerIteratorBeforeClose.next();
+ headerIteratorBeforeClose.remove();
+ fail("IllegalStateException expected as headers are closed");
+ } catch (IllegalStateException ise) {
+ //expected
+ }
+ }
+
+ @Test
+ public void testHeaders() throws IOException {
+ RecordHeaders headers = new RecordHeaders();
+ headers.add(new RecordHeader("key", "value".getBytes()));
+ headers.add(new RecordHeader("key1", "key1value".getBytes()));
+ headers.add(new RecordHeader("key", "value2".getBytes()));
+ headers.add(new RecordHeader("key2", "key2value".getBytes()));
+
+
+ Iterator keyHeaders = headers.headers("key").iterator();
+ assertHeader("key", "value", keyHeaders.next());
+ assertHeader("key", "value2", keyHeaders.next());
+ assertFalse(keyHeaders.hasNext());
+
+ keyHeaders = headers.headers("key1").iterator();
+ assertHeader("key1", "key1value", keyHeaders.next());
+ assertFalse(keyHeaders.hasNext());
+
+ keyHeaders = headers.headers("key2").iterator();
+ assertHeader("key2", "key2value", keyHeaders.next());
+ assertFalse(keyHeaders.hasNext());
+
+ }
+
+ @Test
+ public void testNew() throws IOException {
+ RecordHeaders headers = new RecordHeaders();
+ headers.add(new RecordHeader("key", "value".getBytes()));
+ headers.setReadOnly();
+
+ RecordHeaders newHeaders = new RecordHeaders(headers);
+ newHeaders.add(new RecordHeader("key", "value2".getBytes()));
+
+ //Ensure existing headers are not modified
+ assertHeader("key", "value", headers.lastHeader("key"));
+ assertEquals(1, getCount(headers));
+
+ //Ensure new headers are modified
+ assertHeader("key", "value2", newHeaders.lastHeader("key"));
+ assertEquals(2, getCount(newHeaders));
+ }
+
+ private int getCount(Headers headers) {
+ int count = 0;
+ Iterator headerIterator = headers.iterator();
+ while (headerIterator.hasNext()) {
+ headerIterator.next();
+ count++;
+ }
+ return count;
+ }
+
+ static void assertHeader(String key, String value, Header actual) {
+ assertEquals(key, actual.key());
+ assertTrue(Arrays.equals(value.getBytes(), actual.value()));
+ }
+
+}
diff --git a/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordBatchTest.java b/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordBatchTest.java
index a50c5b261ca..57f4663f644 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordBatchTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordBatchTest.java
@@ -16,6 +16,8 @@
*/
package org.apache.kafka.common.record;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.utils.CloseableIterator;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.test.TestUtils;
@@ -90,8 +92,8 @@ public class DefaultRecordBatchTest {
@Test
public void testSizeInBytes() {
Header[] headers = new Header[] {
- new Header("foo", "value".getBytes()),
- new Header("bar", Utils.wrapNullable(null))
+ new RecordHeader("foo", "value".getBytes()),
+ new RecordHeader("bar", (byte[]) null)
};
long timestamp = System.currentTimeMillis();
diff --git a/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordTest.java b/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordTest.java
index 85024758e0c..251db153485 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordTest.java
@@ -16,7 +16,8 @@
*/
package org.apache.kafka.common.record;
-import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.internals.RecordHeader;
import org.junit.Test;
import java.nio.ByteBuffer;
@@ -31,9 +32,9 @@ public class DefaultRecordTest {
@Test
public void testBasicSerde() {
Header[] headers = new Header[] {
- new Header("foo", "value".getBytes()),
- new Header("bar", Utils.wrapNullable(null)),
- new Header("\"A\\u00ea\\u00f1\\u00fcC\"", "value".getBytes())
+ new RecordHeader("foo", "value".getBytes()),
+ new RecordHeader("bar", (byte[]) null),
+ new RecordHeader("\"A\\u00ea\\u00f1\\u00fcC\"", "value".getBytes())
};
SimpleRecord[] records = new SimpleRecord[] {
diff --git a/core/src/main/scala/kafka/consumer/BaseConsumer.scala b/core/src/main/scala/kafka/consumer/BaseConsumer.scala
index c1ee7cdd33f..b1a203f7736 100644
--- a/core/src/main/scala/kafka/consumer/BaseConsumer.scala
+++ b/core/src/main/scala/kafka/consumer/BaseConsumer.scala
@@ -26,6 +26,8 @@ import kafka.message.Message
import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener
import org.apache.kafka.common.record.TimestampType
import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.header.Headers
+import org.apache.kafka.common.header.internals.RecordHeaders
/**
* A base consumer used to abstract both old and new consumer
@@ -45,7 +47,8 @@ case class BaseConsumerRecord(topic: String,
timestamp: Long = Message.NoTimestamp,
timestampType: TimestampType = TimestampType.NO_TIMESTAMP_TYPE,
key: Array[Byte],
- value: Array[Byte])
+ value: Array[Byte],
+ headers: Headers = new RecordHeaders())
class NewShinyConsumer(topic: Option[String], partitionId: Option[Int], offset: Option[Long], whitelist: Option[String], consumerProps: Properties, val timeoutMs: Long = Long.MaxValue) extends BaseConsumer {
import org.apache.kafka.clients.consumer.KafkaConsumer
@@ -97,7 +100,8 @@ class NewShinyConsumer(topic: Option[String], partitionId: Option[Int], offset:
record.timestamp,
record.timestampType,
record.key,
- record.value)
+ record.value,
+ record.headers)
}
override def stop() {
@@ -132,7 +136,8 @@ class OldConsumer(topicFilter: TopicFilter, consumerProps: Properties) extends B
messageAndMetadata.timestamp,
messageAndMetadata.timestampType,
messageAndMetadata.key,
- messageAndMetadata.message)
+ messageAndMetadata.message,
+ new RecordHeaders())
}
override def stop() {
diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
index 393fee6b59e..6d27e858fdf 100755
--- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
+++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
@@ -138,7 +138,7 @@ object ConsoleConsumer extends Logging {
messageCount += 1
try {
formatter.writeTo(new ConsumerRecord(msg.topic, msg.partition, msg.offset, msg.timestamp,
- msg.timestampType, 0, 0, 0, msg.key, msg.value), output)
+ msg.timestampType, 0, 0, 0, msg.key, msg.value, msg.headers), output)
} catch {
case e: Throwable =>
if (skipMessageOnError) {
diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala
index d55ed6c1990..b3a7978dc7f 100755
--- a/core/src/main/scala/kafka/tools/MirrorMaker.scala
+++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala
@@ -43,6 +43,7 @@ import scala.collection.JavaConverters._
import scala.collection.mutable.HashMap
import scala.util.control.ControlThrowable
import org.apache.kafka.clients.consumer.{ConsumerConfig => NewConsumerConfig}
+import org.apache.kafka.common.header.internals.RecordHeaders
import org.apache.kafka.common.record.RecordBatch
/**
@@ -559,7 +560,8 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
messageAndMetadata.timestamp,
messageAndMetadata.timestampType,
messageAndMetadata.key,
- messageAndMetadata.message)
+ messageAndMetadata.message,
+ new RecordHeaders())
}
override def stop() {
diff --git a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
index 9ec544c35b2..4a49833cb76 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
@@ -23,8 +23,9 @@ import org.apache.kafka.clients.consumer._
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.kafka.common.{MetricName, TopicPartition}
import org.apache.kafka.common.errors.InvalidTopicException
+import org.apache.kafka.common.header.Headers
import org.apache.kafka.common.record.{CompressionType, TimestampType}
-import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer, StringDeserializer, StringSerializer}
+import org.apache.kafka.common.serialization._
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.test.{MockConsumerInterceptor, MockProducerInterceptor}
import org.junit.Assert._
@@ -36,6 +37,95 @@ import scala.collection.mutable.Buffer
/* We have some tests in this class instead of `BaseConsumerTest` in order to keep the build time under control. */
class PlaintextConsumerTest extends BaseConsumerTest {
+ @Test
+ def testHeaders() {
+ val numRecords = 1
+ val record = new ProducerRecord(tp.topic(), tp.partition(), null, s"key".getBytes, s"value".getBytes)
+
+ record.headers().add(s"headerKey", s"headerValue".getBytes)
+
+ this.producers.head.send(record)
+
+ assertEquals(0, this.consumers.head.assignment.size)
+ this.consumers.head.assign(List(tp).asJava)
+ assertEquals(1, this.consumers.head.assignment.size)
+
+ this.consumers.head.seek(tp, 0)
+ val records = consumeRecords(consumer = this.consumers.head, numRecords = numRecords)
+
+ assertEquals(numRecords, records.size)
+
+ for (i <- 0 until numRecords) {
+ val record = records(i)
+ val header = record.headers().lastHeader(s"headerKey")
+ assertEquals(s"headerValue", if (header == null) null else new String(header.value()))
+ }
+ }
+
+ @Test
+ def testHeadersExtendedSerializerDeserializer() {
+ val numRecords = 1
+ val record = new ProducerRecord(tp.topic(), tp.partition(), null, s"key".getBytes, s"value".getBytes)
+
+ val extendedSerializer = new ExtendedSerializer[Array[Byte]] {
+
+ var serializer = new ByteArraySerializer()
+
+ override def serialize(topic: String, headers: Headers, data: Array[Byte]): Array[Byte] = {
+ headers.add(s"content-type", s"application/octet-stream".getBytes)
+ serializer.serialize(topic, data)
+ }
+
+ override def configure(configs: util.Map[String, _], isKey: Boolean): Unit = serializer.configure(configs, isKey)
+
+ override def close(): Unit = serializer.close()
+
+ override def serialize(topic: String, data: Array[Byte]): Array[Byte] = {
+ fail("method should not be invoked")
+ null
+ }
+ }
+
+
+ val extendedDeserializer = new ExtendedDeserializer[Array[Byte]] {
+
+ var deserializer = new ByteArrayDeserializer()
+
+ override def deserialize(topic: String, headers: Headers, data: Array[Byte]): Array[Byte] = {
+ var header = headers.lastHeader(s"content-type")
+ assertEquals(s"application/octet-stream", if (header == null) null else new String(header.value()))
+ deserializer.deserialize(topic, data)
+ }
+
+ override def configure(configs: util.Map[String, _], isKey: Boolean): Unit = deserializer.configure(configs, isKey)
+
+
+ override def close(): Unit = deserializer.close()
+
+ override def deserialize(topic: String, data: Array[Byte]): Array[Byte] = {
+ fail("method should not be invoked")
+ null
+ }
+
+ }
+
+ val producer0 = new KafkaProducer(this.producerConfig, new ByteArraySerializer(), extendedSerializer)
+ producers += producer0
+ producer0.send(record)
+
+ val consumer0 = new KafkaConsumer(this.consumerConfig, new ByteArrayDeserializer(), extendedDeserializer)
+ consumers += consumer0
+
+ assertEquals(0, consumer0.assignment.size)
+ consumer0.assign(List(tp).asJava)
+ assertEquals(1, consumer0.assignment.size)
+
+ consumer0.seek(tp, 0)
+ val records = consumeRecords(consumer = consumer0, numRecords = numRecords)
+
+ assertEquals(numRecords, records.size)
+ }
+
@Test
def testMaxPollRecords() {
val maxPollRecords = 2