produceData = new HashMap<>();
+ produceData.put(new TopicPartition("test", 0), builder.build());
+
+ // Can't create ProduceRequest instance with version within [3, 7)
+ for (short version = 3; version < 7; version++) {
+ ProduceRequest.Builder requestBuilder = new ProduceRequest.Builder(version, version, (short) 1, 5000, produceData, null);
+ assertThrowsInvalidRecordExceptionForAllVersions(requestBuilder);
+ }
+
+ // Works fine with current version (>= 7)
+ ProduceRequest.Builder.forCurrentMagic((short) 1, 5000, produceData);
+ }
+
private void assertThrowsInvalidRecordExceptionForAllVersions(ProduceRequest.Builder builder) {
for (short version = builder.oldestAllowedVersion(); version < builder.latestAllowedVersion(); version++) {
assertThrowsInvalidRecordException(builder, version);
diff --git a/config/producer.properties b/config/producer.properties
index 750b95ee0ae..4786b988a29 100644
--- a/config/producer.properties
+++ b/config/producer.properties
@@ -20,7 +20,7 @@
# format: host1:port1,host2:port2 ...
bootstrap.servers=localhost:9092
-# specify the compression codec for all data generated: none, gzip, snappy, lz4
+# specify the compression codec for all data generated: none, gzip, snappy, lz4, zstd
compression.type=none
# name of the partitioner class for partitioning events; default partition spreads data randomly
diff --git a/core/src/main/scala/kafka/api/ApiVersion.scala b/core/src/main/scala/kafka/api/ApiVersion.scala
index bc3602bd69a..e9b16fafeee 100644
--- a/core/src/main/scala/kafka/api/ApiVersion.scala
+++ b/core/src/main/scala/kafka/api/ApiVersion.scala
@@ -79,7 +79,9 @@ object ApiVersion {
// Introduced new schemas for group offset (v2) and group metadata (v2) (KIP-211)
KAFKA_2_1_IV0,
// New Fetch, OffsetsForLeaderEpoch, and ListOffsets schemas (KIP-320)
- KAFKA_2_1_IV1
+ KAFKA_2_1_IV1,
+ // Support ZStandard Compression Codec (KIP-110)
+ KAFKA_2_1_IV2
)
// Map keys are the union of the short and full versions
@@ -270,6 +272,13 @@ case object KAFKA_2_1_IV1 extends DefaultApiVersion {
val id: Int = 18
}
+case object KAFKA_2_1_IV2 extends DefaultApiVersion {
+ val shortVersion: String = "2.1"
+ val subVersion = "IV2"
+ val recordVersion = RecordVersion.V2
+ val id: Int = 19
+}
+
object ApiVersionValidator extends Validator {
override def ensureValid(name: String, value: Any): Unit = {
diff --git a/core/src/main/scala/kafka/message/CompressionCodec.scala b/core/src/main/scala/kafka/message/CompressionCodec.scala
index 64e0aaa72a1..abe3694b035 100644
--- a/core/src/main/scala/kafka/message/CompressionCodec.scala
+++ b/core/src/main/scala/kafka/message/CompressionCodec.scala
@@ -28,6 +28,7 @@ object CompressionCodec {
case GZIPCompressionCodec.codec => GZIPCompressionCodec
case SnappyCompressionCodec.codec => SnappyCompressionCodec
case LZ4CompressionCodec.codec => LZ4CompressionCodec
+ case ZStdCompressionCodec.codec => ZStdCompressionCodec
case _ => throw new UnknownCodecException("%d is an unknown compression codec".format(codec))
}
}
@@ -37,6 +38,7 @@ object CompressionCodec {
case GZIPCompressionCodec.name => GZIPCompressionCodec
case SnappyCompressionCodec.name => SnappyCompressionCodec
case LZ4CompressionCodec.name => LZ4CompressionCodec
+ case ZStdCompressionCodec.name => ZStdCompressionCodec
case _ => throw new kafka.common.UnknownCodecException("%s is an unknown compression codec".format(name))
}
}
@@ -44,7 +46,7 @@ object CompressionCodec {
object BrokerCompressionCodec {
- val brokerCompressionCodecs = List(UncompressedCodec, SnappyCompressionCodec, LZ4CompressionCodec, GZIPCompressionCodec, ProducerCompressionCodec)
+ val brokerCompressionCodecs = List(UncompressedCodec, ZStdCompressionCodec, LZ4CompressionCodec, SnappyCompressionCodec, GZIPCompressionCodec, ProducerCompressionCodec)
val brokerCompressionOptions = brokerCompressionCodecs.map(codec => codec.name)
def isValid(compressionType: String): Boolean = brokerCompressionOptions.contains(compressionType.toLowerCase(Locale.ROOT))
@@ -87,6 +89,11 @@ case object LZ4CompressionCodec extends CompressionCodec with BrokerCompressionC
val name = "lz4"
}
+case object ZStdCompressionCodec extends CompressionCodec with BrokerCompressionCodec {
+ val codec = 4
+ val name = "zstd"
+}
+
case object NoCompressionCodec extends CompressionCodec with BrokerCompressionCodec {
val codec = 0
val name = "none"
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 18ab9db22ed..ecbbdb6f03f 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -31,6 +31,8 @@ import kafka.common.OffsetAndMetadata
import kafka.controller.KafkaController
import kafka.coordinator.group.{GroupCoordinator, JoinGroupResult}
import kafka.coordinator.transaction.{InitProducerIdResult, TransactionCoordinator}
+import kafka.log.{Log, LogManager, TimestampOffset}
+import kafka.message.{CompressionCodec, NoCompressionCodec, ZStdCompressionCodec}
import kafka.network.RequestChannel
import kafka.security.SecurityUtils
import kafka.security.auth.{Resource, _}
@@ -534,42 +536,56 @@ class KafkaApis(val requestChannel: RequestChannel,
def maybeConvertFetchedData(tp: TopicPartition,
partitionData: FetchResponse.PartitionData[Records]): FetchResponse.PartitionData[BaseRecords] = {
- // Down-conversion of the fetched records is needed when the stored magic version is
- // greater than that supported by the client (as indicated by the fetch request version). If the
- // configured magic version for the topic is less than or equal to that supported by the version of the
- // fetch request, we skip the iteration through the records in order to check the magic version since we
- // know it must be supported. However, if the magic version is changed from a higher version back to a
- // lower version, this check will no longer be valid and we will fail to down-convert the messages
- // which were written in the new format prior to the version downgrade.
- val unconvertedRecords = partitionData.records
val logConfig = replicaManager.getLogConfig(tp)
- val downConvertMagic =
- logConfig.map(_.messageFormatVersion.recordVersion.value).flatMap { magic =>
- if (magic > RecordBatch.MAGIC_VALUE_V0 && versionId <= 1 && !unconvertedRecords.hasCompatibleMagic(RecordBatch.MAGIC_VALUE_V0))
- Some(RecordBatch.MAGIC_VALUE_V0)
- else if (magic > RecordBatch.MAGIC_VALUE_V1 && versionId <= 3 && !unconvertedRecords.hasCompatibleMagic(RecordBatch.MAGIC_VALUE_V1))
- Some(RecordBatch.MAGIC_VALUE_V1)
- else
- None
- }
- // For fetch requests from clients, check if down-conversion is disabled for the particular partition
- if (downConvertMagic.isDefined && !fetchRequest.isFromFollower && !logConfig.forall(_.messageDownConversionEnable)) {
- trace(s"Conversion to message format ${downConvertMagic.get} is disabled for partition $tp. Sending unsupported version response to $clientId.")
- errorResponse(Errors.UNSUPPORTED_VERSION)
+ if (logConfig.forall(_.compressionType == ZStdCompressionCodec.name) && versionId < 10) {
+ trace(s"Fetching messages is disabled for ZStandard compressed partition $tp. Sending unsupported version response to $clientId.")
+ errorResponse(Errors.UNSUPPORTED_COMPRESSION_TYPE)
} else {
- val convertedRecords =
- downConvertMagic.map { magic =>
- trace(s"Down converting records from partition $tp to message format version $magic for fetch request from $clientId")
- // Because down-conversion is extremely memory intensive, we want to try and delay the down-conversion as much
- // as possible. With KIP-283, we have the ability to lazily down-convert in a chunked manner. The lazy, chunked
- // down-conversion always guarantees that at least one batch of messages is down-converted and sent out to the
- // client.
- new LazyDownConversionRecords(tp, unconvertedRecords, magic, fetchContext.getFetchOffset(tp).get, time)
- }.getOrElse(unconvertedRecords)
- new FetchResponse.PartitionData[BaseRecords](partitionData.error, partitionData.highWatermark,
- FetchResponse.INVALID_LAST_STABLE_OFFSET, partitionData.logStartOffset, partitionData.abortedTransactions,
- convertedRecords)
+ // Down-conversion of the fetched records is needed when the stored magic version is
+ // greater than that supported by the client (as indicated by the fetch request version). If the
+ // configured magic version for the topic is less than or equal to that supported by the version of the
+ // fetch request, we skip the iteration through the records in order to check the magic version since we
+ // know it must be supported. However, if the magic version is changed from a higher version back to a
+ // lower version, this check will no longer be valid and we will fail to down-convert the messages
+ // which were written in the new format prior to the version downgrade.
+ val unconvertedRecords = partitionData.records
+ val downConvertMagic =
+ logConfig.map(_.messageFormatVersion.recordVersion.value).flatMap { magic =>
+ if (magic > RecordBatch.MAGIC_VALUE_V0 && versionId <= 1 && !unconvertedRecords.hasCompatibleMagic(RecordBatch.MAGIC_VALUE_V0))
+ Some(RecordBatch.MAGIC_VALUE_V0)
+ else if (magic > RecordBatch.MAGIC_VALUE_V1 && versionId <= 3 && !unconvertedRecords.hasCompatibleMagic(RecordBatch.MAGIC_VALUE_V1))
+ Some(RecordBatch.MAGIC_VALUE_V1)
+ else
+ None
+ }
+
+ downConvertMagic match {
+ case Some(magic) =>
+ // For fetch requests from clients, check if down-conversion is disabled for the particular partition
+ if (!fetchRequest.isFromFollower && !logConfig.forall(_.messageDownConversionEnable)) {
+ trace(s"Conversion to message format ${downConvertMagic.get} is disabled for partition $tp. Sending unsupported version response to $clientId.")
+ errorResponse(Errors.UNSUPPORTED_VERSION)
+ } else {
+ try {
+ trace(s"Down converting records from partition $tp to message format version $magic for fetch request from $clientId")
+ // Because down-conversion is extremely memory intensive, we want to try and delay the down-conversion as much
+ // as possible. With KIP-283, we have the ability to lazily down-convert in a chunked manner. The lazy, chunked
+ // down-conversion always guarantees that at least one batch of messages is down-converted and sent out to the
+ // client.
+ new FetchResponse.PartitionData[BaseRecords](partitionData.error, partitionData.highWatermark,
+ FetchResponse.INVALID_LAST_STABLE_OFFSET, partitionData.logStartOffset, partitionData.abortedTransactions,
+ new LazyDownConversionRecords(tp, unconvertedRecords, magic, fetchContext.getFetchOffset(tp).get, time))
+ } catch {
+ case e: UnsupportedCompressionTypeException =>
+ trace("Received unsupported compression type error during down-conversion", e)
+ errorResponse(Errors.UNSUPPORTED_COMPRESSION_TYPE)
+ }
+ }
+ case None => new FetchResponse.PartitionData[BaseRecords](partitionData.error, partitionData.highWatermark,
+ FetchResponse.INVALID_LAST_STABLE_OFFSET, partitionData.logStartOffset, partitionData.abortedTransactions,
+ unconvertedRecords)
+ }
}
}
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 700f32c534f..9edda4ea7f1 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -720,7 +720,7 @@ object KafkaConfig {
val DeleteTopicEnableDoc = "Enables delete topic. Delete topic through the admin tool will have no effect if this config is turned off"
val CompressionTypeDoc = "Specify the final compression type for a given topic. This configuration accepts the standard compression codecs " +
- "('gzip', 'snappy', 'lz4'). It additionally accepts 'uncompressed' which is equivalent to no compression; and " +
+ "('gzip', 'snappy', 'lz4', 'zstd'). It additionally accepts 'uncompressed' which is equivalent to no compression; and " +
"'producer' which means retain the original compression codec set by the producer."
/** ********* Kafka Metrics Configuration ***********/
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
index 930281a342c..aeeaf29516a 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@ -62,7 +62,7 @@ class ReplicaFetcherThread(name: String,
// Visible for testing
private[server] val fetchRequestVersion: Short =
- if (brokerConfig.interBrokerProtocolVersion >= KAFKA_2_1_IV1) 9
+ if (brokerConfig.interBrokerProtocolVersion >= KAFKA_2_1_IV2) 10
else if (brokerConfig.interBrokerProtocolVersion >= KAFKA_2_0_IV1) 8
else if (brokerConfig.interBrokerProtocolVersion >= KAFKA_1_1_IV0) 7
else if (brokerConfig.interBrokerProtocolVersion >= KAFKA_0_11_0_IV1) 5
diff --git a/core/src/main/scala/kafka/tools/ConsoleProducer.scala b/core/src/main/scala/kafka/tools/ConsoleProducer.scala
index 8d8c42d36cf..a9099954564 100644
--- a/core/src/main/scala/kafka/tools/ConsoleProducer.scala
+++ b/core/src/main/scala/kafka/tools/ConsoleProducer.scala
@@ -120,7 +120,7 @@ object ConsoleProducer {
.describedAs("broker-list")
.ofType(classOf[String])
val syncOpt = parser.accepts("sync", "If set message send requests to the brokers are synchronously, one at a time as they arrive.")
- val compressionCodecOpt = parser.accepts("compression-codec", "The compression codec: either 'none', 'gzip', 'snappy', or 'lz4'." +
+ val compressionCodecOpt = parser.accepts("compression-codec", "The compression codec: either 'none', 'gzip', 'snappy', 'lz4', or 'zstd'." +
"If specified without value, then it defaults to 'gzip'")
.withOptionalArg()
.describedAs("compression-codec")
diff --git a/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala b/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala
index bb49884686b..24193521f91 100755
--- a/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala
@@ -113,7 +113,8 @@ object ProducerCompressionTest {
Array("none"),
Array("gzip"),
Array("snappy"),
- Array("lz4")
+ Array("lz4"),
+ Array("zstd")
).asJava
}
}
diff --git a/core/src/test/scala/unit/kafka/api/ApiVersionTest.scala b/core/src/test/scala/unit/kafka/api/ApiVersionTest.scala
index d2d115b2cfa..1ffa695f48c 100644
--- a/core/src/test/scala/unit/kafka/api/ApiVersionTest.scala
+++ b/core/src/test/scala/unit/kafka/api/ApiVersionTest.scala
@@ -80,9 +80,10 @@ class ApiVersionTest {
assertEquals(KAFKA_2_0_IV0, ApiVersion("2.0-IV0"))
assertEquals(KAFKA_2_0_IV1, ApiVersion("2.0-IV1"))
- assertEquals(KAFKA_2_1_IV1, ApiVersion("2.1"))
+ assertEquals(KAFKA_2_1_IV2, ApiVersion("2.1"))
assertEquals(KAFKA_2_1_IV0, ApiVersion("2.1-IV0"))
assertEquals(KAFKA_2_1_IV1, ApiVersion("2.1-IV1"))
+ assertEquals(KAFKA_2_1_IV2, ApiVersion("2.1-IV2"))
}
@Test
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerParameterizedIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerParameterizedIntegrationTest.scala
index 266bb391e2e..232cfdbf1c1 100755
--- a/core/src/test/scala/unit/kafka/log/LogCleanerParameterizedIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerParameterizedIntegrationTest.scala
@@ -131,6 +131,10 @@ class LogCleanerParameterizedIntegrationTest(compressionCodec: String) extends A
@Test
def testCleanerWithMessageFormatV0(): Unit = {
+ // zstd compression is not supported with older message formats
+ if (codec == CompressionType.ZSTD)
+ return
+
val largeMessageKey = 20
val (largeMessageValue, largeMessageSet) = createLargeSingleMessageSet(largeMessageKey, RecordBatch.MAGIC_VALUE_V0)
val maxMessageSize = codec match {
@@ -181,6 +185,10 @@ class LogCleanerParameterizedIntegrationTest(compressionCodec: String) extends A
@Test
def testCleaningNestedMessagesWithMultipleVersions(): Unit = {
+ // zstd compression is not supported with older message formats
+ if (codec == CompressionType.ZSTD)
+ return
+
val maxMessageSize = 192
cleaner = makeCleaner(partitions = topicPartitions, maxMessageSize = maxMessageSize)
diff --git a/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala b/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
index b4315d105de..72a28549cf5 100644
--- a/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
@@ -22,6 +22,7 @@ import java.util.{Optional, Properties}
import kafka.api.KAFKA_0_11_0_IV2
import kafka.log.LogConfig
+import kafka.message.{GZIPCompressionCodec, ProducerCompressionCodec, ZStdCompressionCodec}
import kafka.utils.TestUtils
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.kafka.common.TopicPartition
@@ -424,6 +425,119 @@ class FetchRequestTest extends BaseRequestTest {
assertFalse(resp4.responseData().containsKey(bar0))
}
+ @Test
+ def testZStdCompressedTopic(): Unit = {
+ // ZSTD compressed topic
+ val topicConfig = Map(LogConfig.CompressionTypeProp -> ZStdCompressionCodec.name)
+ val (topicPartition, leaderId) = createTopics(numTopics = 1, numPartitions = 1, configs = topicConfig).head
+
+ // Produce messages (v2)
+ producer = TestUtils.createProducer(TestUtils.getBrokerListStrFromServers(servers),
+ keySerializer = new StringSerializer,
+ valueSerializer = new StringSerializer)
+ producer.send(new ProducerRecord(topicPartition.topic, topicPartition.partition,
+ "key1", "value1")).get
+ producer.send(new ProducerRecord(topicPartition.topic, topicPartition.partition,
+ "key2", "value2")).get
+ producer.send(new ProducerRecord(topicPartition.topic, topicPartition.partition,
+ "key3", "value3")).get
+ producer.close()
+
+ // fetch request with version below v10: UNSUPPORTED_COMPRESSION_TYPE error occurs
+ val req0 = new FetchRequest.Builder(0, 9, -1, Int.MaxValue, 0,
+ createPartitionMap(300, Seq(topicPartition), Map.empty))
+ .setMaxBytes(800).build()
+
+ val res0 = sendFetchRequest(leaderId, req0)
+ val data0 = res0.responseData.get(topicPartition)
+ assertEquals(Errors.UNSUPPORTED_COMPRESSION_TYPE, data0.error)
+
+ // fetch request with version 10: works fine!
+ val req1= new FetchRequest.Builder(0, 10, -1, Int.MaxValue, 0,
+ createPartitionMap(300, Seq(topicPartition), Map.empty))
+ .setMaxBytes(800).build()
+ val res1 = sendFetchRequest(leaderId, req1)
+ val data1 = res1.responseData.get(topicPartition)
+ assertEquals(Errors.NONE, data1.error)
+ assertEquals(3, records(data1).size)
+ }
+
+ @Test
+ def testZStdCompressedRecords(): Unit = {
+ // Producer compressed topic
+ val topicConfig = Map(LogConfig.CompressionTypeProp -> ProducerCompressionCodec.name,
+ LogConfig.MessageFormatVersionProp -> "2.0.0")
+ val (topicPartition, leaderId) = createTopics(numTopics = 1, numPartitions = 1, configs = topicConfig).head
+
+ // Produce GZIP compressed messages (v2)
+ val producer1 = TestUtils.createProducer(TestUtils.getBrokerListStrFromServers(servers),
+ compressionType = GZIPCompressionCodec.name,
+ keySerializer = new StringSerializer,
+ valueSerializer = new StringSerializer)
+ producer1.send(new ProducerRecord(topicPartition.topic, topicPartition.partition,
+ "key1", "value1")).get
+ producer1.close()
+ // Produce ZSTD compressed messages (v2)
+ val producer2 = TestUtils.createProducer(TestUtils.getBrokerListStrFromServers(servers),
+ compressionType = ZStdCompressionCodec.name,
+ keySerializer = new StringSerializer,
+ valueSerializer = new StringSerializer)
+ producer2.send(new ProducerRecord(topicPartition.topic, topicPartition.partition,
+ "key2", "value2")).get
+ producer2.send(new ProducerRecord(topicPartition.topic, topicPartition.partition,
+ "key3", "value3")).get
+ producer2.close()
+
+ // fetch request with fetch version v1 (magic 0):
+ // gzip compressed record is returned with down-conversion.
+ // zstd compressed record raises UNSUPPORTED_COMPRESSION_TYPE error.
+ val req0 = new FetchRequest.Builder(0, 1, -1, Int.MaxValue, 0,
+ createPartitionMap(300, Seq(topicPartition), Map.empty))
+ .setMaxBytes(800).build()
+
+ val res0 = sendFetchRequest(leaderId, req0)
+ val data0 = res0.responseData.get(topicPartition)
+ assertEquals(Errors.NONE, data0.error)
+ assertEquals(1, records(data0).size)
+
+ val req1 = new FetchRequest.Builder(0, 1, -1, Int.MaxValue, 0,
+ createPartitionMap(300, Seq(topicPartition), Map(topicPartition -> 1L)))
+ .setMaxBytes(800).build()
+
+ val res1 = sendFetchRequest(leaderId, req1)
+ val data1 = res1.responseData.get(topicPartition)
+ assertEquals(Errors.UNSUPPORTED_COMPRESSION_TYPE, data1.error)
+
+ // fetch request with fetch version v3 (magic 1):
+ // gzip compressed record is returned with down-conversion.
+ // zstd compressed record raises UNSUPPORTED_COMPRESSION_TYPE error.
+ val req2 = new FetchRequest.Builder(2, 3, -1, Int.MaxValue, 0,
+ createPartitionMap(300, Seq(topicPartition), Map.empty))
+ .setMaxBytes(800).build()
+
+ val res2 = sendFetchRequest(leaderId, req2)
+ val data2 = res2.responseData.get(topicPartition)
+ assertEquals(Errors.NONE, data2.error)
+ assertEquals(1, records(data2).size)
+
+ val req3 = new FetchRequest.Builder(0, 1, -1, Int.MaxValue, 0,
+ createPartitionMap(300, Seq(topicPartition), Map(topicPartition -> 1L)))
+ .setMaxBytes(800).build()
+
+ val res3 = sendFetchRequest(leaderId, req3)
+ val data3 = res3.responseData.get(topicPartition)
+ assertEquals(Errors.UNSUPPORTED_COMPRESSION_TYPE, data3.error)
+
+ // fetch request with version 10: works fine!
+ val req4= new FetchRequest.Builder(0, 10, -1, Int.MaxValue, 0,
+ createPartitionMap(300, Seq(topicPartition), Map.empty))
+ .setMaxBytes(800).build()
+ val res4 = sendFetchRequest(leaderId, req4)
+ val data4 = res4.responseData.get(topicPartition)
+ assertEquals(Errors.NONE, data4.error)
+ assertEquals(3, records(data4).size)
+ }
+
private def records(partitionData: FetchResponse.PartitionData[MemoryRecords]): Seq[Record] = {
partitionData.records.records.asScala.toIndexedSeq
}
diff --git a/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala b/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala
index 4e66494374f..b1f3af145b9 100644
--- a/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala
@@ -17,10 +17,14 @@
package kafka.server
+import java.util.Properties
+
+import kafka.log.LogConfig
+import kafka.message.ZStdCompressionCodec
import kafka.utils.TestUtils
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
-import org.apache.kafka.common.record.{CompressionType, DefaultRecordBatch, MemoryRecords, SimpleRecord}
+import org.apache.kafka.common.record._
import org.apache.kafka.common.requests.{ProduceRequest, ProduceResponse}
import org.junit.Assert._
import org.junit.Test
@@ -111,6 +115,31 @@ class ProduceRequestTest extends BaseRequestTest {
assertEquals(-1, partitionResponse.logAppendTime)
}
+ @Test
+ def testZSTDProduceRequest(): Unit = {
+ val topic = "topic"
+ val partition = 0
+
+ // Create a single-partition topic compressed with ZSTD
+ val topicConfig = new Properties
+ topicConfig.setProperty(LogConfig.CompressionTypeProp, ZStdCompressionCodec.name)
+ val partitionToLeader = TestUtils.createTopic(zkClient, topic, 1, 1, servers, topicConfig)
+ val leader = partitionToLeader(partition)
+ val memoryRecords = MemoryRecords.withRecords(CompressionType.ZSTD,
+ new SimpleRecord(System.currentTimeMillis(), "key".getBytes, "value".getBytes))
+ val topicPartition = new TopicPartition("topic", partition)
+ val partitionRecords = Map(topicPartition -> memoryRecords)
+
+ // produce request with v7: works fine!
+ val res1 = sendProduceRequest(leader,
+ new ProduceRequest.Builder(7, 7, -1, 3000, partitionRecords.asJava, null).build())
+ val (tp, partitionResponse) = res1.responses.asScala.head
+ assertEquals(topicPartition, tp)
+ assertEquals(Errors.NONE, partitionResponse.error)
+ assertEquals(0, partitionResponse.baseOffset)
+ assertEquals(-1, partitionResponse.logAppendTime)
+ }
+
private def sendProduceRequest(leaderId: Int, request: ProduceRequest): ProduceResponse = {
val response = connectAndSend(request, ApiKeys.PRODUCE, destination = brokerSocketServer(leaderId))
ProduceResponse.parse(response, request.version)
diff --git a/docs/design.html b/docs/design.html
index bdc7e637ea9..0061a53c49d 100644
--- a/docs/design.html
+++ b/docs/design.html
@@ -136,7 +136,7 @@
Kafka supports this with an efficient batching format. A batch of messages can be clumped together compressed and sent to the server in this form. This batch of messages will be written in compressed form and will
remain compressed in the log and will only be decompressed by the consumer.
- Kafka supports GZIP, Snappy and LZ4 compression protocols. More details on compression can be found here.
+ Kafka supports GZIP, Snappy, LZ4 and ZStandard compression protocols. More details on compression can be found here.
diff --git a/docs/implementation.html b/docs/implementation.html
index 4ecce7b4485..cc2b0f47ffe 100644
--- a/docs/implementation.html
+++ b/docs/implementation.html
@@ -44,6 +44,7 @@
1: gzip
2: snappy
3: lz4
+ 4: zstd
bit 3: timestampType
bit 4: isTransactional (0 means not transactional)
bit 5: isControlBatch (0 means not a control batch)
diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle
index 9ebf9e2261d..e22885e06a1 100644
--- a/gradle/dependencies.gradle
+++ b/gradle/dependencies.gradle
@@ -84,7 +84,8 @@ versions += [
slf4j: "1.7.25",
snappy: "1.1.7.2",
zkclient: "0.10",
- zookeeper: "3.4.13"
+ zookeeper: "3.4.13",
+ zstd: "1.3.5-4"
]
libs += [
@@ -143,5 +144,6 @@ libs += [
zkclient: "com.101tec:zkclient:$versions.zkclient",
zookeeper: "org.apache.zookeeper:zookeeper:$versions.zookeeper",
jfreechart: "jfreechart:jfreechart:$versions.jfreechart",
- mavenArtifact: "org.apache.maven:maven-artifact:$versions.mavenArtifact"
+ mavenArtifact: "org.apache.maven:maven-artifact:$versions.mavenArtifact",
+ zstd: "com.github.luben:zstd-jni:$versions.zstd",
]
diff --git a/tests/kafkatest/tests/client/compression_test.py b/tests/kafkatest/tests/client/compression_test.py
index 165e11add19..2085d9b6259 100644
--- a/tests/kafkatest/tests/client/compression_test.py
+++ b/tests/kafkatest/tests/client/compression_test.py
@@ -54,7 +54,7 @@ class CompressionTest(ProduceConsumeValidateTest):
return super(CompressionTest, self).min_cluster_size() + self.num_producers + self.num_consumers
@cluster(num_nodes=7)
- @parametrize(compression_types=["snappy","gzip","lz4","none"])
+ @parametrize(compression_types=["snappy","gzip","lz4","zstd","none"])
def test_compressed_topic(self, compression_types):
"""Test produce => consume => validate for compressed topics
Setup: 1 zk, 1 kafka node, 1 topic with partitions=10, replication-factor=1