KAFKA-14991: KIP-937-Improve message timestamp validation (#14135)

This implementation introduces two new configurations `log.message.timestamp.before.max.ms` and `log.message.timestamp.after.max.ms` and deprecates `log.message.timestamp.difference.max.ms`.

The default value for all these three configs is maintained to be Long.MAX_VALUE for backward compatibility but with the newly added configurations we can have a finer control when validating message timestamps that are in the past and the future compared to the broker's timestamp.

To maintain backward compatibility if the default value of `log.message.timestamp.before.max.ms` is not changed, we are assuming users are still using the deprecated config `log.message.timestamp.difference.max.ms` and validation is done using its value. This ensures that existing customers who have customized the value of `log.message.timestamp.difference.max.ms` will continue to see no change in behavior.

Reviewers: Divij Vaidya <diviv@amazon.com>, Christo Lolov <lolovc@amazon.com>
This commit is contained in:
Mehari Beyene 2023-08-24 03:04:55 -07:00 committed by Divij Vaidya
parent 09c074ad8a
commit f91cb6b87b
19 changed files with 397 additions and 40 deletions

View File

@ -197,12 +197,35 @@ public class TopicConfig {
public static final String MESSAGE_TIMESTAMP_TYPE_DOC = "Define whether the timestamp in the message is " +
"message create time or log append time. The value should be either `CreateTime` or `LogAppendTime`";
/**
* @deprecated since 3.6, removal planned in 4.0.
* Use message.timestamp.before.max.ms and message.timestamp.after.max.ms instead
*/
@Deprecated
public static final String MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_CONFIG = "message.timestamp.difference.max.ms";
public static final String MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_DOC = "The maximum difference allowed between " +
/**
* @deprecated since 3.6, removal planned in 4.0.
* Use message.timestamp.before.max.ms and message.timestamp.after.max.ms instead
*/
@Deprecated
public static final String MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_DOC = "[DEPRECATED] The maximum difference allowed between " +
"the timestamp when a broker receives a message and the timestamp specified in the message. If " +
"message.timestamp.type=CreateTime, a message will be rejected if the difference in timestamp " +
"exceeds this threshold. This configuration is ignored if message.timestamp.type=LogAppendTime.";
public static final String MESSAGE_TIMESTAMP_BEFORE_MAX_MS_CONFIG = "message.timestamp.before.max.ms";
public static final String MESSAGE_TIMESTAMP_BEFORE_MAX_MS_DOC = "This configuration sets the allowable timestamp " +
"difference between the broker's timestamp and the message timestamp. The message timestamp can be earlier than " +
"or equal to the broker's timestamp, with the maximum allowable difference determined by the value set in this " +
"configuration. If message.timestamp.type=CreateTime, the message will be rejected if the difference in " +
"timestamps exceeds this specified threshold. This configuration is ignored if message.timestamp.type=LogAppendTime.";
public static final String MESSAGE_TIMESTAMP_AFTER_MAX_MS_CONFIG = "message.timestamp.after.max.ms";
public static final String MESSAGE_TIMESTAMP_AFTER_MAX_MS_DOC = "This configuration sets the allowable timestamp " +
"difference between the message timestamp and the broker's timestamp. The message timestamp can be later than " +
"or equal to the broker's timestamp, with the maximum allowable difference determined by the value set in this " +
"configuration. If message.timestamp.type=CreateTime, the message will be rejected if the difference in " +
"timestamps exceeds this specified threshold. This configuration is ignored if message.timestamp.type=LogAppendTime.";
public static final String MESSAGE_DOWNCONVERSION_ENABLE_CONFIG = "message.downconversion.enable";
public static final String MESSAGE_DOWNCONVERSION_ENABLE_DOC = "This configuration controls whether " +
"down-conversion of message formats is enabled to satisfy consume requests. When set to <code>false</code>, " +

View File

@ -774,7 +774,8 @@ class UnifiedLog(@volatile var logStartOffset: Long,
config.compact,
config.recordVersion.value,
config.messageTimestampType,
config.messageTimestampDifferenceMaxMs,
config.messageTimestampBeforeMaxMs,
config.messageTimestampAfterMaxMs,
leaderEpoch,
origin,
interBrokerProtocolVersion

View File

@ -465,7 +465,14 @@ object KafkaConfig {
val LogMessageFormatVersionProp = ServerTopicConfigSynonyms.serverSynonym(TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG)
val LogMessageTimestampTypeProp = ServerTopicConfigSynonyms.serverSynonym(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG)
/* See `TopicConfig.MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_CONFIG` for details */
@deprecated("3.6")
val LogMessageTimestampDifferenceMaxMsProp = ServerTopicConfigSynonyms.serverSynonym(TopicConfig.MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_CONFIG)
val LogMessageTimestampBeforeMaxMsProp = ServerTopicConfigSynonyms.serverSynonym(TopicConfig.MESSAGE_TIMESTAMP_BEFORE_MAX_MS_CONFIG)
val LogMessageTimestampAfterMaxMsProp = ServerTopicConfigSynonyms.serverSynonym(TopicConfig.MESSAGE_TIMESTAMP_AFTER_MAX_MS_CONFIG)
val NumRecoveryThreadsPerDataDirProp = "num.recovery.threads.per.data.dir"
val AutoCreateTopicsEnableProp = "auto.create.topics.enable"
val MinInSyncReplicasProp = ServerTopicConfigSynonyms.serverSynonym(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG)
@ -899,10 +906,23 @@ object KafkaConfig {
val LogMessageTimestampTypeDoc = "Define whether the timestamp in the message is message create time or log append time. The value should be either " +
"`CreateTime` or `LogAppendTime`."
val LogMessageTimestampDifferenceMaxMsDoc = "The maximum difference allowed between the timestamp when a broker receives " +
val LogMessageTimestampDifferenceMaxMsDoc = "[DEPRECATED] The maximum difference allowed between the timestamp when a broker receives " +
"a message and the timestamp specified in the message. If log.message.timestamp.type=CreateTime, a message will be rejected " +
"if the difference in timestamp exceeds this threshold. This configuration is ignored if log.message.timestamp.type=LogAppendTime." +
"The maximum timestamp difference allowed should be no greater than log.retention.ms to avoid unnecessarily frequent log rolling."
val LogMessageTimestampBeforeMaxMsDoc = "This configuration sets the allowable timestamp difference between the " +
"broker's timestamp and the message timestamp. The message timestamp can be earlier than or equal to the broker's " +
"timestamp, with the maximum allowable difference determined by the value set in this configuration. " +
"If log.message.timestamp.type=CreateTime, the message will be rejected if the difference in timestamps exceeds " +
"this specified threshold. This configuration is ignored if log.message.timestamp.type=LogAppendTime.";
val LogMessageTimestampAfterMaxMsDoc = "This configuration sets the allowable timestamp difference between the " +
"message timestamp and the broker's timestamp. The message timestamp can be later than or equal to the broker's " +
"timestamp, with the maximum allowable difference determined by the value set in this configuration. " +
"If log.message.timestamp.type=CreateTime, the message will be rejected if the difference in timestamps exceeds " +
"this specified threshold. This configuration is ignored if log.message.timestamp.type=LogAppendTime.";
val NumRecoveryThreadsPerDataDirDoc = "The number of threads per data directory to be used for log recovery at startup and flushing at shutdown"
val AutoCreateTopicsEnableDoc = "Enable auto creation of topic on the server."
val MinInSyncReplicasDoc = "When a producer sets acks to \"all\" (or \"-1\"), " +
@ -1284,6 +1304,8 @@ object KafkaConfig {
.define(LogMessageFormatVersionProp, STRING, LogConfig.DEFAULT_MESSAGE_FORMAT_VERSION, new MetadataVersionValidator(), MEDIUM, LogMessageFormatVersionDoc)
.define(LogMessageTimestampTypeProp, STRING, LogConfig.DEFAULT_MESSAGE_TIMESTAMP_TYPE, in("CreateTime", "LogAppendTime"), MEDIUM, LogMessageTimestampTypeDoc)
.define(LogMessageTimestampDifferenceMaxMsProp, LONG, LogConfig.DEFAULT_MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS, atLeast(0), MEDIUM, LogMessageTimestampDifferenceMaxMsDoc)
.define(LogMessageTimestampBeforeMaxMsProp, LONG, LogConfig.DEFAULT_MESSAGE_TIMESTAMP_BEFORE_MAX_MS, atLeast(0), MEDIUM, LogMessageTimestampBeforeMaxMsDoc)
.define(LogMessageTimestampAfterMaxMsProp, LONG, LogConfig.DEFAULT_MESSAGE_TIMESTAMP_AFTER_MAX_MS, atLeast(0), MEDIUM, LogMessageTimestampAfterMaxMsDoc)
.define(CreateTopicPolicyClassNameProp, CLASS, null, LOW, CreateTopicPolicyClassNameDoc)
.define(AlterConfigPolicyClassNameProp, CLASS, null, LOW, AlterConfigPolicyClassNameDoc)
.define(LogMessageDownConversionEnableProp, BOOLEAN, LogConfig.DEFAULT_MESSAGE_DOWNCONVERSION_ENABLE, LOW, LogMessageDownConversionEnableDoc)
@ -1862,7 +1884,37 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
else MetadataVersion.fromVersionString(logMessageFormatVersionString)
def logMessageTimestampType = TimestampType.forName(getString(KafkaConfig.LogMessageTimestampTypeProp))
/* See `TopicConfig.MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_CONFIG` for details */
@deprecated("3.6")
def logMessageTimestampDifferenceMaxMs: Long = getLong(KafkaConfig.LogMessageTimestampDifferenceMaxMsProp)
// In the transition period before logMessageTimestampDifferenceMaxMs is removed, to maintain backward compatibility,
// we are using its value if logMessageTimestampBeforeMaxMs default value hasn't changed.
// See `TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG` for deprecation details
@nowarn("cat=deprecation")
def logMessageTimestampBeforeMaxMs: Long = {
val messageTimestampBeforeMaxMs: Long = getLong(KafkaConfig.LogMessageTimestampBeforeMaxMsProp)
if (messageTimestampBeforeMaxMs != LogConfig.DEFAULT_MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS) {
messageTimestampBeforeMaxMs
} else {
logMessageTimestampDifferenceMaxMs
}
}
// In the transition period before logMessageTimestampDifferenceMaxMs is removed, to maintain backward compatibility,
// we are using its value if logMessageTimestampAfterMaxMs default value hasn't changed.
// See `TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG` for deprecation details
@nowarn("cat=deprecation")
def logMessageTimestampAfterMaxMs: Long = {
val messageTimestampAfterMaxMs: Long = getLong(KafkaConfig.LogMessageTimestampAfterMaxMsProp)
if (messageTimestampAfterMaxMs != Long.MaxValue) {
messageTimestampAfterMaxMs
} else {
logMessageTimestampDifferenceMaxMs
}
}
def logMessageDownConversionEnable: Boolean = getBoolean(KafkaConfig.LogMessageDownConversionEnableProp)
/** ********* Replication configuration ***********/
@ -2450,6 +2502,8 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
logProps.put(TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG, logMessageFormatVersion.version)
logProps.put(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG, logMessageTimestampType.name)
logProps.put(TopicConfig.MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_CONFIG, logMessageTimestampDifferenceMaxMs: java.lang.Long)
logProps.put(TopicConfig.MESSAGE_TIMESTAMP_BEFORE_MAX_MS_CONFIG, logMessageTimestampBeforeMaxMs: java.lang.Long)
logProps.put(TopicConfig.MESSAGE_TIMESTAMP_AFTER_MAX_MS_CONFIG, logMessageTimestampAfterMaxMs: java.lang.Long)
logProps.put(TopicConfig.MESSAGE_DOWNCONVERSION_ENABLE_CONFIG, logMessageDownConversionEnable: java.lang.Boolean)
logProps.put(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG, logLocalRetentionMs)
logProps.put(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, logLocalRetentionBytes)

View File

@ -29,9 +29,10 @@ import org.apache.kafka.storage.internals.log.LogConfig
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Timeout
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
import org.junit.jupiter.params.provider.{Arguments, MethodSource, ValueSource}
import java.nio.charset.StandardCharsets
import scala.annotation.nowarn
class PlaintextProducerSendTest extends BaseProducerSendTest {
@ -121,16 +122,18 @@ class PlaintextProducerSendTest extends BaseProducerSendTest {
}
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk", "kraft"))
def testSendWithInvalidCreateTime(quorum: String): Unit = {
@MethodSource(Array("quorumAndTimestampConfigProvider"))
def testSendWithInvalidBeforeAndAfterTimestamp(quorum: String, messageTimeStampConfig: String, recordTimestamp: Long): Unit = {
val topicProps = new Properties()
topicProps.setProperty(TopicConfig.MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_CONFIG, "1000")
// set the TopicConfig for timestamp validation to have 1 minute threshold. Note that recordTimestamp has 5 minutes diff
val oneMinuteInMs: Long = 1 * 60 * 60 * 1000L
topicProps.setProperty(messageTimeStampConfig, oneMinuteInMs.toString)
TestUtils.createTopicWithAdmin(admin, topic, brokers, 1, 2, topicConfig = topicProps)
val producer = createProducer()
try {
val e = assertThrows(classOf[ExecutionException],
() => producer.send(new ProducerRecord(topic, 0, System.currentTimeMillis() - 1001, "key".getBytes, "value".getBytes)).get()).getCause
() => producer.send(new ProducerRecord(topic, 0, recordTimestamp, "key".getBytes, "value".getBytes)).get()).getCause
assertTrue(e.isInstanceOf[InvalidTimestampException])
} finally {
producer.close()
@ -140,13 +143,54 @@ class PlaintextProducerSendTest extends BaseProducerSendTest {
val compressedProducer = createProducer(compressionType = "gzip")
try {
val e = assertThrows(classOf[ExecutionException],
() => compressedProducer.send(new ProducerRecord(topic, 0, System.currentTimeMillis() - 1001, "key".getBytes, "value".getBytes)).get()).getCause
() => compressedProducer.send(new ProducerRecord(topic, 0, recordTimestamp, "key".getBytes, "value".getBytes)).get()).getCause
assertTrue(e.isInstanceOf[InvalidTimestampException])
} finally {
compressedProducer.close()
}
}
@ParameterizedTest
@MethodSource(Array("quorumAndTimestampConfigProvider"))
def testValidBeforeAndAfterTimestampsAtThreshold(quorum: String, messageTimeStampConfig: String, recordTimestamp: Long): Unit = {
val topicProps = new Properties()
// set the TopicConfig for timestamp validation to be the same as the record timestamp
topicProps.setProperty(messageTimeStampConfig, recordTimestamp.toString)
TestUtils.createTopicWithAdmin(admin, topic, brokers, 1, 2, topicConfig = topicProps)
val producer = createProducer()
assertDoesNotThrow(() => producer.send(new ProducerRecord(topic, 0, recordTimestamp, "key".getBytes, "value".getBytes)))
producer.close()
// Test compressed messages.
val compressedProducer = createProducer(compressionType = "gzip")
assertDoesNotThrow(() => compressedProducer.send(new ProducerRecord(topic, 0, recordTimestamp, "key".getBytes, "value".getBytes)))
compressedProducer.close()
}
@ParameterizedTest
@MethodSource(Array("quorumAndTimestampConfigProvider"))
def testValidBeforeAndAfterTimestampsWithinThreshold(quorum: String, messageTimeStampConfig: String, recordTimestamp: Long): Unit = {
val topicProps = new Properties()
// set the TopicConfig for timestamp validation to have 10 minute threshold. Note that recordTimestamp has 5 minutes diff
val tenMinutesInMs: Long = 10 * 60 * 60 * 1000L
topicProps.setProperty(messageTimeStampConfig, tenMinutesInMs.toString)
TestUtils.createTopicWithAdmin(admin, topic, brokers, 1, 2, topicConfig = topicProps)
val producer = createProducer()
assertDoesNotThrow(() => producer.send(new ProducerRecord(topic, 0, recordTimestamp, "key".getBytes, "value".getBytes)))
producer.close()
// Test compressed messages.
val compressedProducer = createProducer(compressionType = "gzip")
assertDoesNotThrow(() => compressedProducer.send(new ProducerRecord(topic, 0, recordTimestamp, "key".getBytes, "value".getBytes)))
compressedProducer.close()
}
// Test that producer with max.block.ms=0 can be used to send in non-blocking mode
// where requests are failed immediately without blocking if metadata is not available
// or buffer is full.
@ -228,3 +272,21 @@ class PlaintextProducerSendTest extends BaseProducerSendTest {
}
}
object PlaintextProducerSendTest {
// See `TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG` for deprecation details
@nowarn("cat=deprecation")
def quorumAndTimestampConfigProvider: java.util.stream.Stream[Arguments] = {
val now: Long = System.currentTimeMillis()
val fiveMinutesInMs: Long = 5 * 60 * 60 * 1000L
java.util.stream.Stream.of[Arguments](
Arguments.of("zk", TopicConfig.MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_CONFIG, Long.box(now - fiveMinutesInMs)),
Arguments.of("zk", TopicConfig.MESSAGE_TIMESTAMP_BEFORE_MAX_MS_CONFIG, Long.box(now - fiveMinutesInMs)),
Arguments.of("zk", TopicConfig.MESSAGE_TIMESTAMP_AFTER_MAX_MS_CONFIG, Long.box(now + fiveMinutesInMs)),
Arguments.of("kraft", TopicConfig.MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_CONFIG, Long.box(now - fiveMinutesInMs)),
Arguments.of("kraft", TopicConfig.MESSAGE_TIMESTAMP_BEFORE_MAX_MS_CONFIG, Long.box(now - fiveMinutesInMs)),
Arguments.of("kraft", TopicConfig.MESSAGE_TIMESTAMP_AFTER_MAX_MS_CONFIG, Long.box(now + fiveMinutesInMs))
)
}
}

View File

@ -618,6 +618,7 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
@Test
@Disabled // TODO: To be re-enabled once we can make it less flaky: KAFKA-6527
@nowarn("cat=deprecation") // See `TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG` for deprecation details
def testDefaultTopicConfig(): Unit = {
val (producerThread, consumerThread) = startProduceConsume(retries = 0)
@ -643,6 +644,8 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
props.put(KafkaConfig.LogPreAllocateProp, true.toString)
props.put(KafkaConfig.LogMessageTimestampTypeProp, TimestampType.LOG_APPEND_TIME.toString)
props.put(KafkaConfig.LogMessageTimestampDifferenceMaxMsProp, "1000")
props.put(KafkaConfig.LogMessageTimestampBeforeMaxMsProp, "1000")
props.put(KafkaConfig.LogMessageTimestampAfterMaxMsProp, "1000")
props.put(KafkaConfig.LogMessageDownConversionEnableProp, "false")
reconfigureServers(props, perBrokerConfig = false, (KafkaConfig.LogSegmentBytesProp, "4000"))
@ -681,11 +684,15 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
props.clear()
props.put(KafkaConfig.LogMessageTimestampTypeProp, TimestampType.CREATE_TIME.toString)
props.put(KafkaConfig.LogMessageTimestampDifferenceMaxMsProp, "1000")
props.put(KafkaConfig.LogMessageTimestampBeforeMaxMsProp, "1000")
props.put(KafkaConfig.LogMessageTimestampAfterMaxMsProp, "1000")
reconfigureServers(props, perBrokerConfig = false, (KafkaConfig.LogMessageTimestampTypeProp, TimestampType.CREATE_TIME.toString))
consumerThread.waitForMatchingRecords(record => record.timestampType == TimestampType.CREATE_TIME)
// Verify that invalid configs are not applied
val invalidProps = Map(
KafkaConfig.LogMessageTimestampDifferenceMaxMsProp -> "abc", // Invalid type
KafkaConfig.LogMessageTimestampBeforeMaxMsProp -> "abc", // Invalid type
KafkaConfig.LogMessageTimestampAfterMaxMsProp -> "abc", // Invalid type
KafkaConfig.LogMessageTimestampTypeProp -> "invalid", // Invalid value
KafkaConfig.LogRollTimeMillisProp -> "0" // Fails KafkaConfig validation
)

View File

@ -73,7 +73,6 @@ abstract class AbstractLogCleanerIntegrationTest {
props.put(TopicConfig.FILE_DELETE_DELAY_MS_CONFIG, deleteDelay: java.lang.Integer)
props.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT)
props.put(TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG, minCleanableDirtyRatio: java.lang.Float)
props.put(TopicConfig.MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_CONFIG, Long.MaxValue.toString)
props.put(TopicConfig.MIN_COMPACTION_LAG_MS_CONFIG, minCompactionLagMs: java.lang.Long)
props.put(TopicConfig.MAX_COMPACTION_LAG_MS_CONFIG, maxCompactionLagMs: java.lang.Long)
props ++= propertyOverrides

View File

@ -54,7 +54,6 @@ class LogCleanerTest {
logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, 1024: java.lang.Integer)
logProps.put(TopicConfig.SEGMENT_INDEX_BYTES_CONFIG, 1024: java.lang.Integer)
logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT)
logProps.put(TopicConfig.MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_CONFIG, Long.MaxValue.toString)
val logConfig = new LogConfig(logProps)
val time = new MockTime()
val throttler = new Throttler(desiredRatePerSec = Double.MaxValue, checkIntervalMs = Long.MaxValue, time = time)

View File

@ -397,4 +397,22 @@ class LogConfigTest {
LogConfig.validateBrokerLogConfigValues(kafkaConfig.extractLogConfigMap, kafkaConfig.isRemoteLogStorageSystemEnabled)
}
}
/* Verify that when the deprecated config LogMessageTimestampDifferenceMaxMsProp has non default value the new configs
* LogMessageTimestampBeforeMaxMsProp and LogMessageTimestampAfterMaxMsProp are not changed from the default we are using
* the deprecated config for backward compatibility.
* See `TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG` for deprecation details */
@nowarn("cat=deprecation")
@Test
def testTimestampBeforeMaxMsUsesDeprecatedConfig(): Unit = {
val oneDayInMillis = 24 * 60 * 60 * 1000L
val kafkaProps = TestUtils.createBrokerConfig(nodeId = 0, zkConnect = "")
kafkaProps.put(KafkaConfig.LogMessageTimestampBeforeMaxMsProp, Long.MaxValue.toString)
kafkaProps.put(KafkaConfig.LogMessageTimestampAfterMaxMsProp, Long.MaxValue.toString)
kafkaProps.put(KafkaConfig.LogMessageTimestampDifferenceMaxMsProp, oneDayInMillis.toString)
val logProps = KafkaConfig.fromProps(kafkaProps).extractLogConfigMap
assertEquals(oneDayInMillis, logProps.get(TopicConfig.MESSAGE_TIMESTAMP_BEFORE_MAX_MS_CONFIG))
assertEquals(oneDayInMillis, logProps.get(TopicConfig.MESSAGE_TIMESTAMP_AFTER_MAX_MS_CONFIG))
}
}

View File

@ -55,7 +55,6 @@ class LogManagerTest {
logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, 1024: java.lang.Integer)
logProps.put(TopicConfig.SEGMENT_INDEX_BYTES_CONFIG, 4096: java.lang.Integer)
logProps.put(TopicConfig.RETENTION_MS_CONFIG, maxLogAgeMs: java.lang.Integer)
logProps.put(TopicConfig.MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_CONFIG, Long.MaxValue.toString)
val logConfig = new LogConfig(logProps)
var logDir: File = _
var logManager: LogManager = _

View File

@ -123,6 +123,7 @@ class LogValidatorTest {
magic,
TimestampType.CREATE_TIME,
1000L,
1000L,
RecordBatch.NO_PRODUCER_EPOCH,
AppendOrigin.CLIENT,
MetadataVersion.IBP_2_3_IV1
@ -154,6 +155,7 @@ class LogValidatorTest {
magic,
TimestampType.LOG_APPEND_TIME,
1000L,
1000L,
RecordBatch.NO_PARTITION_LEADER_EPOCH,
AppendOrigin.CLIENT,
MetadataVersion.latest
@ -198,6 +200,7 @@ class LogValidatorTest {
targetMagic,
TimestampType.LOG_APPEND_TIME,
1000L,
1000L,
RecordBatch.NO_PARTITION_LEADER_EPOCH,
AppendOrigin.CLIENT,
MetadataVersion.latest
@ -249,6 +252,7 @@ class LogValidatorTest {
magic,
TimestampType.LOG_APPEND_TIME,
1000L,
1000L,
RecordBatch.NO_PARTITION_LEADER_EPOCH,
AppendOrigin.CLIENT,
MetadataVersion.latest
@ -314,6 +318,7 @@ class LogValidatorTest {
RecordBatch.MAGIC_VALUE_V2,
TimestampType.LOG_APPEND_TIME,
1000L,
1000L,
RecordBatch.NO_PARTITION_LEADER_EPOCH,
AppendOrigin.CLIENT,
MetadataVersion.latest
@ -361,6 +366,7 @@ class LogValidatorTest {
magic,
TimestampType.CREATE_TIME,
1000L,
1000L,
partitionLeaderEpoch,
AppendOrigin.CLIENT,
MetadataVersion.latest
@ -438,6 +444,7 @@ class LogValidatorTest {
magic,
TimestampType.CREATE_TIME,
1000L,
1000L,
partitionLeaderEpoch,
AppendOrigin.CLIENT,
MetadataVersion.latest
@ -496,6 +503,7 @@ class LogValidatorTest {
toMagic,
TimestampType.CREATE_TIME,
1000L,
1000L,
RecordBatch.NO_PARTITION_LEADER_EPOCH,
AppendOrigin.CLIENT,
MetadataVersion.latest
@ -543,6 +551,7 @@ class LogValidatorTest {
RecordBatch.MAGIC_VALUE_V2,
TimestampType.CREATE_TIME,
1000L,
1000L,
RecordBatch.NO_PARTITION_LEADER_EPOCH,
AppendOrigin.CLIENT,
MetadataVersion.latest,
@ -602,6 +611,7 @@ class LogValidatorTest {
magic,
TimestampType.CREATE_TIME,
1000L,
1000L,
partitionLeaderEpoch,
AppendOrigin.CLIENT,
MetadataVersion.latest
@ -657,6 +667,7 @@ class LogValidatorTest {
RecordBatch.MAGIC_VALUE_V1,
TimestampType.CREATE_TIME,
1000L,
1000L,
RecordBatch.NO_PARTITION_LEADER_EPOCH,
AppendOrigin.CLIENT,
MetadataVersion.latest
@ -682,6 +693,7 @@ class LogValidatorTest {
RecordBatch.MAGIC_VALUE_V2,
TimestampType.CREATE_TIME,
1000L,
1000L,
RecordBatch.NO_PARTITION_LEADER_EPOCH,
AppendOrigin.CLIENT,
MetadataVersion.latest
@ -707,6 +719,7 @@ class LogValidatorTest {
RecordBatch.MAGIC_VALUE_V1,
TimestampType.CREATE_TIME,
1000L,
1000L,
RecordBatch.NO_PARTITION_LEADER_EPOCH,
AppendOrigin.CLIENT,
MetadataVersion.latest
@ -732,6 +745,7 @@ class LogValidatorTest {
RecordBatch.MAGIC_VALUE_V1,
TimestampType.CREATE_TIME,
1000L,
1000L,
RecordBatch.NO_PARTITION_LEADER_EPOCH,
AppendOrigin.CLIENT,
MetadataVersion.latest
@ -756,6 +770,7 @@ class LogValidatorTest {
RecordBatch.MAGIC_VALUE_V0,
TimestampType.CREATE_TIME,
1000L,
1000L,
RecordBatch.NO_PARTITION_LEADER_EPOCH,
AppendOrigin.CLIENT,
MetadataVersion.latest,
@ -778,6 +793,7 @@ class LogValidatorTest {
RecordBatch.MAGIC_VALUE_V0,
TimestampType.CREATE_TIME,
1000L,
1000L,
RecordBatch.NO_PARTITION_LEADER_EPOCH,
AppendOrigin.CLIENT,
MetadataVersion.latest
@ -801,6 +817,7 @@ class LogValidatorTest {
RecordBatch.MAGIC_VALUE_V1,
TimestampType.CREATE_TIME,
5000L,
5000L,
RecordBatch.NO_PARTITION_LEADER_EPOCH,
AppendOrigin.CLIENT,
MetadataVersion.latest
@ -825,6 +842,7 @@ class LogValidatorTest {
RecordBatch.MAGIC_VALUE_V2,
TimestampType.CREATE_TIME,
5000L,
5000L,
RecordBatch.NO_PARTITION_LEADER_EPOCH,
AppendOrigin.CLIENT,
MetadataVersion.latest
@ -850,6 +868,7 @@ class LogValidatorTest {
RecordBatch.MAGIC_VALUE_V1,
TimestampType.CREATE_TIME,
5000L,
5000L,
RecordBatch.NO_PARTITION_LEADER_EPOCH,
AppendOrigin.CLIENT,
MetadataVersion.latest
@ -875,6 +894,7 @@ class LogValidatorTest {
RecordBatch.MAGIC_VALUE_V2,
TimestampType.CREATE_TIME,
5000L,
5000L,
RecordBatch.NO_PARTITION_LEADER_EPOCH,
AppendOrigin.CLIENT,
MetadataVersion.latest
@ -898,6 +918,7 @@ class LogValidatorTest {
RecordBatch.MAGIC_VALUE_V1,
TimestampType.LOG_APPEND_TIME,
1000L,
1000L,
RecordBatch.NO_PARTITION_LEADER_EPOCH,
AppendOrigin.CLIENT,
MetadataVersion.latest
@ -923,6 +944,7 @@ class LogValidatorTest {
RecordBatch.MAGIC_VALUE_V2,
TimestampType.LOG_APPEND_TIME,
1000L,
1000L,
RecordBatch.NO_PARTITION_LEADER_EPOCH,
AppendOrigin.CLIENT,
MetadataVersion.latest
@ -948,6 +970,7 @@ class LogValidatorTest {
RecordBatch.MAGIC_VALUE_V1,
TimestampType.LOG_APPEND_TIME,
1000L,
1000L,
RecordBatch.NO_PARTITION_LEADER_EPOCH,
AppendOrigin.CLIENT,
MetadataVersion.latest
@ -973,6 +996,7 @@ class LogValidatorTest {
RecordBatch.MAGIC_VALUE_V2,
TimestampType.LOG_APPEND_TIME,
1000L,
1000L,
RecordBatch.NO_PARTITION_LEADER_EPOCH,
AppendOrigin.CLIENT,
MetadataVersion.latest
@ -998,6 +1022,7 @@ class LogValidatorTest {
RecordBatch.CURRENT_MAGIC_VALUE,
TimestampType.CREATE_TIME,
5000L,
5000L,
RecordBatch.NO_PARTITION_LEADER_EPOCH,
AppendOrigin.CLIENT,
MetadataVersion.latest
@ -1020,6 +1045,7 @@ class LogValidatorTest {
RecordBatch.CURRENT_MAGIC_VALUE,
TimestampType.CREATE_TIME,
5000L,
5000L,
RecordBatch.NO_PARTITION_LEADER_EPOCH,
AppendOrigin.COORDINATOR,
MetadataVersion.latest
@ -1047,6 +1073,7 @@ class LogValidatorTest {
RecordBatch.MAGIC_VALUE_V0,
TimestampType.CREATE_TIME,
5000L,
5000L,
RecordBatch.NO_PARTITION_LEADER_EPOCH,
AppendOrigin.CLIENT,
MetadataVersion.latest
@ -1070,6 +1097,7 @@ class LogValidatorTest {
RecordBatch.MAGIC_VALUE_V0,
TimestampType.CREATE_TIME,
5000L,
5000L,
RecordBatch.NO_PARTITION_LEADER_EPOCH,
AppendOrigin.CLIENT,
MetadataVersion.latest
@ -1092,6 +1120,7 @@ class LogValidatorTest {
RecordBatch.MAGIC_VALUE_V2,
TimestampType.LOG_APPEND_TIME,
1000L,
1000L,
RecordBatch.NO_PARTITION_LEADER_EPOCH,
AppendOrigin.CLIENT,
MetadataVersion.latest
@ -1114,6 +1143,7 @@ class LogValidatorTest {
RecordBatch.MAGIC_VALUE_V2,
TimestampType.LOG_APPEND_TIME,
1000L,
1000L,
RecordBatch.NO_PARTITION_LEADER_EPOCH,
AppendOrigin.CLIENT,
MetadataVersion.latest
@ -1137,6 +1167,7 @@ class LogValidatorTest {
RecordBatch.MAGIC_VALUE_V1,
TimestampType.CREATE_TIME,
5000L,
5000L,
RecordBatch.NO_PARTITION_LEADER_EPOCH,
AppendOrigin.CLIENT,
MetadataVersion.latest
@ -1160,6 +1191,7 @@ class LogValidatorTest {
RecordBatch.MAGIC_VALUE_V1,
TimestampType.CREATE_TIME,
5000L,
5000L,
RecordBatch.NO_PARTITION_LEADER_EPOCH,
AppendOrigin.CLIENT,
MetadataVersion.latest
@ -1185,6 +1217,7 @@ class LogValidatorTest {
RecordBatch.MAGIC_VALUE_V1,
TimestampType.CREATE_TIME,
5000L,
5000L,
RecordBatch.NO_PARTITION_LEADER_EPOCH,
AppendOrigin.CLIENT,
MetadataVersion.latest
@ -1210,6 +1243,7 @@ class LogValidatorTest {
RecordBatch.MAGIC_VALUE_V1,
TimestampType.CREATE_TIME,
5000L,
5000L,
RecordBatch.NO_PARTITION_LEADER_EPOCH,
AppendOrigin.CLIENT,
MetadataVersion.latest
@ -1233,6 +1267,7 @@ class LogValidatorTest {
RecordBatch.MAGIC_VALUE_V0,
TimestampType.CREATE_TIME,
5000L,
5000L,
RecordBatch.NO_PARTITION_LEADER_EPOCH,
AppendOrigin.CLIENT,
MetadataVersion.latest
@ -1256,6 +1291,7 @@ class LogValidatorTest {
RecordBatch.MAGIC_VALUE_V0,
TimestampType.CREATE_TIME,
5000L,
5000L,
RecordBatch.NO_PARTITION_LEADER_EPOCH,
AppendOrigin.CLIENT,
MetadataVersion.latest
@ -1277,6 +1313,7 @@ class LogValidatorTest {
RecordBatch.MAGIC_VALUE_V0,
TimestampType.CREATE_TIME,
5000L,
5000L,
RecordBatch.NO_PARTITION_LEADER_EPOCH,
AppendOrigin.CLIENT,
MetadataVersion.latest
@ -1305,6 +1342,7 @@ class LogValidatorTest {
RecordBatch.MAGIC_VALUE_V2,
TimestampType.LOG_APPEND_TIME,
1000L,
1000L,
RecordBatch.NO_PARTITION_LEADER_EPOCH,
AppendOrigin.CLIENT,
MetadataVersion.IBP_2_0_IV1
@ -1339,6 +1377,7 @@ class LogValidatorTest {
RecordBatch.MAGIC_VALUE_V1,
TimestampType.CREATE_TIME,
1000L,
1000L,
RecordBatch.NO_PARTITION_LEADER_EPOCH,
AppendOrigin.CLIENT,
MetadataVersion.latest
@ -1398,6 +1437,73 @@ class LogValidatorTest {
assertEquals(6, e.recordErrors.size)
}
@Test
def testRecordWithPastTimestampIsRejected(): Unit = {
val timestampBeforeMaxConfig = 24 * 60 * 60 * 1000L //24 hrs
val timestampAfterMaxConfig = 1 * 60 * 60 * 1000L //1 hr
val now = System.currentTimeMillis()
val fiveMinutesBeforeThreshold = now - timestampBeforeMaxConfig - (5 * 60 * 1000L)
val records = createRecords(magicValue = RecordBatch.MAGIC_VALUE_V2, timestamp = fiveMinutesBeforeThreshold,
codec = CompressionType.GZIP)
val e = assertThrows(classOf[RecordValidationException],
() => new LogValidator(
records,
topicPartition,
time,
CompressionType.GZIP,
CompressionType.GZIP,
false,
RecordBatch.MAGIC_VALUE_V2,
TimestampType.CREATE_TIME,
timestampBeforeMaxConfig,
timestampAfterMaxConfig,
RecordBatch.NO_PARTITION_LEADER_EPOCH,
AppendOrigin.CLIENT,
MetadataVersion.latest
).validateMessagesAndAssignOffsets(
PrimitiveRef.ofLong(0L), metricsRecorder, RequestLocal.withThreadConfinedCaching.bufferSupplier
)
)
assertTrue(e.invalidException.isInstanceOf[InvalidTimestampException])
assertFalse(e.recordErrors.isEmpty)
assertEquals(e.recordErrors.size, 3)
}
@Test
def testRecordWithFutureTimestampIsRejected(): Unit = {
val timestampBeforeMaxConfig = 24 * 60 * 60 * 1000L //24 hrs
val timestampAfterMaxConfig = 1 * 60 * 60 * 1000L //1 hr
val now = System.currentTimeMillis()
val fiveMinutesAfterThreshold = now + timestampAfterMaxConfig + (5 * 60 * 1000L)
val records = createRecords(magicValue = RecordBatch.MAGIC_VALUE_V2, timestamp = fiveMinutesAfterThreshold,
codec = CompressionType.GZIP)
val e = assertThrows(classOf[RecordValidationException],
() => new LogValidator(
records,
topicPartition,
time,
CompressionType.GZIP,
CompressionType.GZIP,
false,
RecordBatch.MAGIC_VALUE_V2,
TimestampType.CREATE_TIME,
timestampBeforeMaxConfig,
timestampAfterMaxConfig,
RecordBatch.NO_PARTITION_LEADER_EPOCH,
AppendOrigin.CLIENT,
MetadataVersion.latest
).validateMessagesAndAssignOffsets(
PrimitiveRef.ofLong(0L), metricsRecorder, RequestLocal.withThreadConfinedCaching.bufferSupplier
)
)
assertTrue(e.invalidException.isInstanceOf[InvalidTimestampException])
assertFalse(e.recordErrors.isEmpty)
assertEquals(e.recordErrors.size, 3)
}
private def testBatchWithoutRecordsNotAllowed(sourceCompression: CompressionType, targetCompression: CompressionType): Unit = {
val offset = 1234567
val (producerId, producerEpoch, baseSequence, isTransactional, partitionLeaderEpoch) =
@ -1417,6 +1523,7 @@ class LogValidatorTest {
RecordBatch.CURRENT_MAGIC_VALUE,
TimestampType.CREATE_TIME,
5000L,
5000L,
RecordBatch.NO_PARTITION_LEADER_EPOCH,
AppendOrigin.CLIENT,
MetadataVersion.latest

View File

@ -761,6 +761,7 @@ class KafkaConfigTest {
}
@Test
@nowarn("cat=deprecation") // See `TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG` for deprecation details
def testFromPropsInvalid(): Unit = {
def baseProperties: Properties = {
val validRequiredProperties = new Properties()
@ -856,6 +857,8 @@ class KafkaConfigTest {
case KafkaConfig.LogFlushSchedulerIntervalMsProp => assertPropertyInvalid(baseProperties, name, "not_a_number")
case KafkaConfig.LogFlushIntervalMsProp => assertPropertyInvalid(baseProperties, name, "not_a_number")
case KafkaConfig.LogMessageTimestampDifferenceMaxMsProp => assertPropertyInvalid(baseProperties, name, "not_a_number")
case KafkaConfig.LogMessageTimestampBeforeMaxMsProp => assertPropertyInvalid(baseProperties, name, "not_a_number")
case KafkaConfig.LogMessageTimestampAfterMaxMsProp => assertPropertyInvalid(baseProperties, name, "not_a_number")
case KafkaConfig.LogFlushStartOffsetCheckpointIntervalMsProp => assertPropertyInvalid(baseProperties, name, "not_a_number")
case KafkaConfig.NumRecoveryThreadsPerDataDirProp => assertPropertyInvalid(baseProperties, name, "not_a_number", "0")
case KafkaConfig.AutoCreateTopicsEnableProp => assertPropertyInvalid(baseProperties, name, "not_a_boolean", "0")
@ -1094,6 +1097,10 @@ class KafkaConfigTest {
assertDynamic(kafkaConfigProp, false, () => config.logMessageDownConversionEnable)
case TopicConfig.MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_CONFIG =>
assertDynamic(kafkaConfigProp, 10009, () => config.logMessageTimestampDifferenceMaxMs)
case TopicConfig.MESSAGE_TIMESTAMP_BEFORE_MAX_MS_CONFIG =>
assertDynamic(kafkaConfigProp, 10015L, () => config.logMessageTimestampBeforeMaxMs)
case TopicConfig.MESSAGE_TIMESTAMP_AFTER_MAX_MS_CONFIG =>
assertDynamic(kafkaConfigProp, 10016L, () => config.logMessageTimestampAfterMaxMs)
case TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG =>
assertDynamic(kafkaConfigProp, "LogAppendTime", () => config.logMessageTimestampType.name)
case TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG =>

View File

@ -30,7 +30,10 @@ import org.apache.kafka.server.metrics.KafkaYammerMetrics
import org.apache.kafka.server.record.BrokerCompressionType
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Test
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.{Arguments, MethodSource}
import scala.annotation.nowarn
import scala.jdk.CollectionConverters._
/**
@ -78,12 +81,13 @@ class ProduceRequestTest extends BaseRequestTest {
new SimpleRecord(System.currentTimeMillis(), "key2".getBytes, "value2".getBytes)), 1)
}
@Test
def testProduceWithInvalidTimestamp(): Unit = {
@ParameterizedTest
@MethodSource(Array("timestampConfigProvider"))
def testProduceWithInvalidTimestamp(messageTimeStampConfig: String, recordTimestamp: Long): Unit = {
val topic = "topic"
val partition = 0
val topicConfig = new Properties
topicConfig.setProperty(TopicConfig.MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_CONFIG, "1000")
topicConfig.setProperty(messageTimeStampConfig, "1000")
val partitionToLeader = TestUtils.createTopic(zkClient, topic, 1, 1, servers, topicConfig)
val leader = partitionToLeader(partition)
@ -96,7 +100,7 @@ class ProduceRequestTest extends BaseRequestTest {
builder.build()
}
val records = createRecords(RecordBatch.MAGIC_VALUE_V2, System.currentTimeMillis() - 1001L, CompressionType.GZIP)
val records = createRecords(RecordBatch.MAGIC_VALUE_V2, recordTimestamp, CompressionType.GZIP)
val topicPartition = new TopicPartition("topic", partition)
val produceResponse = sendProduceRequest(leader, ProduceRequest.forCurrentMagic(new ProduceRequestData()
.setTopicData(new ProduceRequestData.TopicProduceDataCollection(Collections.singletonList(
@ -250,3 +254,16 @@ class ProduceRequestTest extends BaseRequestTest {
}
}
object ProduceRequestTest {
@nowarn("cat=deprecation") // See `TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG` for deprecation details
def timestampConfigProvider: java.util.stream.Stream[Arguments] = {
val fiveMinutesInMs: Long = 5 * 60 * 60 * 1000L
java.util.stream.Stream.of[Arguments](
Arguments.of(TopicConfig.MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_CONFIG, Long.box(System.currentTimeMillis() - fiveMinutesInMs)),
Arguments.of(TopicConfig.MESSAGE_TIMESTAMP_BEFORE_MAX_MS_CONFIG, Long.box(System.currentTimeMillis() - fiveMinutesInMs)),
Arguments.of(TopicConfig.MESSAGE_TIMESTAMP_AFTER_MAX_MS_CONFIG, Long.box(System.currentTimeMillis() + fiveMinutesInMs))
)
}
}

View File

@ -369,7 +369,6 @@ object TestUtils extends Logging {
props.put(KafkaConfig.LogDeleteDelayMsProp, "1000")
props.put(KafkaConfig.ControlledShutdownRetryBackoffMsProp, "100")
props.put(KafkaConfig.LogCleanerDedupeBufferSizeProp, "2097152")
props.put(KafkaConfig.LogMessageTimestampDifferenceMaxMsProp, Long.MaxValue.toString)
props.put(KafkaConfig.OffsetsTopicReplicationFactorProp, "1")
if (!props.containsKey(KafkaConfig.OffsetsTopicPartitionsProp))
props.put(KafkaConfig.OffsetsTopicPartitionsProp, "5")

View File

@ -37,6 +37,9 @@
For more detailed information please refer to the Compatibility, Deprecation, and Migration Plan section in
<a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-902%3A+Upgrade+Zookeeper+to+3.8.1">KIP-902</a>.
</li>
<li>The configuration <code>log.message.timestamp.difference.max.ms</code> is deprecated.
Two new configurations, <code>log.message.timestamp.before.max.ms</code> and <code>log.message.timestamp.after.max.ms</code>, have been added.
For more detailed information, please refer to <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-937%3A+Improve+Message+Timestamp+Validation">KIP-937</a>.
<li>
Kafka Streams has introduced a new task assignor, <code>RackAwareTaskAssignor</code>, for computing task assignments which can minimize
cross rack traffic under certain conditions. It works with existing <code>StickyTaskAssignor</code> and <code>HighAvailabilityTaskAssignor</code>.

View File

@ -53,7 +53,7 @@ public class CompressedRecordBatchValidationBenchmark extends BaseRecordBatchBen
MemoryRecords records = MemoryRecords.readableRecords(singleBatchBuffer.duplicate());
new LogValidator(records, new TopicPartition("a", 0),
Time.SYSTEM, compressionType, compressionType, false, messageVersion,
TimestampType.CREATE_TIME, Long.MAX_VALUE, 0, AppendOrigin.CLIENT,
TimestampType.CREATE_TIME, Long.MAX_VALUE, Long.MAX_VALUE, 0, AppendOrigin.CLIENT,
MetadataVersion.latest()
).validateMessagesAndAssignOffsetsCompressed(PrimitiveRef.ofLong(startingOffset),
validatorMetricsRecorder, requestLocal.bufferSupplier());

View File

@ -49,7 +49,7 @@ public class UncompressedRecordBatchValidationBenchmark extends BaseRecordBatchB
MemoryRecords records = MemoryRecords.readableRecords(singleBatchBuffer.duplicate());
new LogValidator(records, new TopicPartition("a", 0),
Time.SYSTEM, CompressionType.NONE, CompressionType.NONE, false,
messageVersion, TimestampType.CREATE_TIME, Long.MAX_VALUE, 0, AppendOrigin.CLIENT,
messageVersion, TimestampType.CREATE_TIME, Long.MAX_VALUE, Long.MAX_VALUE, 0, AppendOrigin.CLIENT,
MetadataVersion.latest()
).assignOffsetsNonCompressed(PrimitiveRef.ofLong(startingOffset), validatorMetricsRecorder);
}

View File

@ -83,6 +83,8 @@ public final class ServerTopicConfigSynonyms {
sameNameWithLogPrefix(TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG),
sameNameWithLogPrefix(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG),
sameNameWithLogPrefix(TopicConfig.MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_CONFIG),
sameNameWithLogPrefix(TopicConfig.MESSAGE_TIMESTAMP_BEFORE_MAX_MS_CONFIG),
sameNameWithLogPrefix(TopicConfig.MESSAGE_TIMESTAMP_AFTER_MAX_MS_CONFIG),
sameNameWithLogPrefix(TopicConfig.MESSAGE_DOWNCONVERSION_ENABLE_CONFIG),
sameNameWithLogPrefix(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG),
sameNameWithLogPrefix(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG)

View File

@ -175,7 +175,12 @@ public class LogConfig extends AbstractConfig {
public static final String DEFAULT_COMPRESSION_TYPE = BrokerCompressionType.PRODUCER.name;
public static final boolean DEFAULT_PREALLOCATE = false;
public static final String DEFAULT_MESSAGE_TIMESTAMP_TYPE = "CreateTime";
/* See `TopicConfig.MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_CONFIG` for details */
@Deprecated
public static final long DEFAULT_MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS = Long.MAX_VALUE;
public static final long DEFAULT_MESSAGE_TIMESTAMP_BEFORE_MAX_MS = Long.MAX_VALUE;
public static final long DEFAULT_MESSAGE_TIMESTAMP_AFTER_MAX_MS = Long.MAX_VALUE;
public static final boolean DEFAULT_MESSAGE_DOWNCONVERSION_ENABLE = true;
public static final boolean DEFAULT_REMOTE_STORAGE_ENABLE = false;
@ -192,9 +197,16 @@ public class LogConfig extends AbstractConfig {
public static final String LEADER_REPLICATION_THROTTLED_REPLICAS_CONFIG = "leader.replication.throttled.replicas";
public static final String FOLLOWER_REPLICATION_THROTTLED_REPLICAS_CONFIG = "follower.replication.throttled.replicas";
/* See `TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG` for details */
@SuppressWarnings("deprecation")
private static final String MESSAGE_FORMAT_VERSION_CONFIG = TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG;
@SuppressWarnings("deprecation")
private static final String MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_CONFIG = TopicConfig.MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_CONFIG;
@SuppressWarnings("deprecation")
private static final String MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_DOC = TopicConfig.MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_DOC;
// Visible for testing
public static final Set<String> CONFIGS_WITH_NO_SERVER_DEFAULTS = Collections.unmodifiableSet(Utils.mkSet(
TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG,
@ -261,8 +273,12 @@ public class LogConfig extends AbstractConfig {
MESSAGE_FORMAT_VERSION_DOC)
.define(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG, STRING, DEFAULT_MESSAGE_TIMESTAMP_TYPE,
in("CreateTime", "LogAppendTime"), MEDIUM, TopicConfig.MESSAGE_TIMESTAMP_TYPE_DOC)
.define(TopicConfig.MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_CONFIG, LONG, DEFAULT_MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS,
atLeast(0), MEDIUM, TopicConfig.MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_DOC)
.define(MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_CONFIG, LONG, DEFAULT_MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS,
atLeast(0), MEDIUM, MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_DOC)
.define(TopicConfig.MESSAGE_TIMESTAMP_BEFORE_MAX_MS_CONFIG, LONG, DEFAULT_MESSAGE_TIMESTAMP_BEFORE_MAX_MS,
atLeast(0), MEDIUM, TopicConfig.MESSAGE_TIMESTAMP_BEFORE_MAX_MS_DOC)
.define(TopicConfig.MESSAGE_TIMESTAMP_AFTER_MAX_MS_CONFIG, LONG, DEFAULT_MESSAGE_TIMESTAMP_AFTER_MAX_MS,
atLeast(0), MEDIUM, TopicConfig.MESSAGE_TIMESTAMP_AFTER_MAX_MS_DOC)
.define(LEADER_REPLICATION_THROTTLED_REPLICAS_CONFIG, LIST, DEFAULT_LEADER_REPLICATION_THROTTLED_REPLICAS,
ThrottledReplicaListValidator.INSTANCE, MEDIUM, LEADER_REPLICATION_THROTTLED_REPLICAS_DOC)
.define(FOLLOWER_REPLICATION_THROTTLED_REPLICAS_CONFIG, LIST, DEFAULT_FOLLOWER_REPLICATION_THROTTLED_REPLICAS,
@ -309,7 +325,12 @@ public class LogConfig extends AbstractConfig {
public final MetadataVersion messageFormatVersion;
public final TimestampType messageTimestampType;
/* See `TopicConfig.MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_CONFIG` for details regarding the deprecation */
@Deprecated
public final long messageTimestampDifferenceMaxMs;
public final long messageTimestampBeforeMaxMs;
public final long messageTimestampAfterMaxMs;
public final List<String> leaderReplicationThrottledReplicas;
public final List<String> followerReplicationThrottledReplicas;
public final boolean messageDownConversionEnable;
@ -358,6 +379,8 @@ public class LogConfig extends AbstractConfig {
this.messageFormatVersion = MetadataVersion.fromVersionString(getString(TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG));
this.messageTimestampType = TimestampType.forName(getString(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG));
this.messageTimestampDifferenceMaxMs = getLong(TopicConfig.MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_CONFIG);
this.messageTimestampBeforeMaxMs = getMessageTimestampBeforeMaxMs();
this.messageTimestampAfterMaxMs = getMessageTimestampAfterMaxMs();
this.leaderReplicationThrottledReplicas = Collections.unmodifiableList(getList(LogConfig.LEADER_REPLICATION_THROTTLED_REPLICAS_CONFIG));
this.followerReplicationThrottledReplicas = Collections.unmodifiableList(getList(LogConfig.FOLLOWER_REPLICATION_THROTTLED_REPLICAS_CONFIG));
this.messageDownConversionEnable = getBoolean(TopicConfig.MESSAGE_DOWNCONVERSION_ENABLE_CONFIG);
@ -365,6 +388,30 @@ public class LogConfig extends AbstractConfig {
remoteLogConfig = new RemoteLogConfig(this);
}
//In the transition period before messageTimestampDifferenceMaxMs is removed, to maintain backward compatibility,
// we are using its value if messageTimestampBeforeMaxMs default value hasn't changed.
@SuppressWarnings("deprecation")
private long getMessageTimestampBeforeMaxMs() {
final Long messageTimestampBeforeMaxMs = getLong(TopicConfig.MESSAGE_TIMESTAMP_BEFORE_MAX_MS_CONFIG);
if (!messageTimestampBeforeMaxMs.equals(Long.MAX_VALUE)) {
return messageTimestampBeforeMaxMs;
} else {
return messageTimestampDifferenceMaxMs;
}
}
//In the transition period before messageTimestampDifferenceMaxMs is removed, to maintain backward compatibility,
// we are using its value if messageTimestampAfterMaxMs default value hasn't changed.
@SuppressWarnings("deprecation")
private long getMessageTimestampAfterMaxMs() {
final Long messageTimestampAfterMaxMs = getLong(TopicConfig.MESSAGE_TIMESTAMP_AFTER_MAX_MS_CONFIG);
if (!messageTimestampAfterMaxMs.equals(Long.MAX_VALUE)) {
return messageTimestampAfterMaxMs;
} else {
return messageTimestampDifferenceMaxMs;
}
}
@SuppressWarnings("deprecation")
public RecordVersion recordVersion() {
return messageFormatVersion.highestSupportedRecordVersion();

View File

@ -102,7 +102,8 @@ public class LogValidator {
private final boolean compactedTopic;
private final byte toMagic;
private final TimestampType timestampType;
private final long timestampDiffMaxMs;
private final long timestampBeforeMaxMs;
private final long timestampAfterMaxMs;
private final int partitionLeaderEpoch;
private final AppendOrigin origin;
private final MetadataVersion interBrokerProtocolVersion;
@ -115,7 +116,8 @@ public class LogValidator {
boolean compactedTopic,
byte toMagic,
TimestampType timestampType,
long timestampDiffMaxMs,
long timestampBeforeMaxMs,
long timestampAfterMaxMs,
int partitionLeaderEpoch,
AppendOrigin origin,
MetadataVersion interBrokerProtocolVersion) {
@ -127,7 +129,8 @@ public class LogValidator {
this.compactedTopic = compactedTopic;
this.toMagic = toMagic;
this.timestampType = timestampType;
this.timestampDiffMaxMs = timestampDiffMaxMs;
this.timestampBeforeMaxMs = timestampBeforeMaxMs;
this.timestampAfterMaxMs = timestampAfterMaxMs;
this.partitionLeaderEpoch = partitionLeaderEpoch;
this.origin = origin;
this.interBrokerProtocolVersion = interBrokerProtocolVersion;
@ -208,7 +211,7 @@ public class LogValidator {
int batchIndex = 0;
for (Record record : batch) {
Optional<ApiRecordError> recordError = validateRecord(batch, topicPartition,
record, batchIndex, now, timestampType, timestampDiffMaxMs, compactedTopic,
record, batchIndex, now, timestampType, timestampBeforeMaxMs, timestampAfterMaxMs, compactedTopic,
metricsRecorder);
recordError.ifPresent(e -> recordErrors.add(e));
// we fail the batch if any record fails, so we stop appending if any record fails
@ -257,7 +260,7 @@ public class LogValidator {
int batchIndex = 0;
for (Record record : batch) {
Optional<ApiRecordError> recordError = validateRecord(batch, topicPartition, record,
batchIndex, now, timestampType, timestampDiffMaxMs, compactedTopic, metricsRecorder);
batchIndex, now, timestampType, timestampBeforeMaxMs, timestampAfterMaxMs, compactedTopic, metricsRecorder);
recordError.ifPresent(e -> recordErrors.add(e));
long offset = offsetCounter.value++;
@ -368,7 +371,7 @@ public class LogValidator {
batchIndex, record);
if (!recordError.isPresent()) {
recordError = validateRecord(batch, topicPartition, record, batchIndex, now,
timestampType, timestampDiffMaxMs, compactedTopic, metricsRecorder);
timestampType, timestampBeforeMaxMs, timestampAfterMaxMs, compactedTopic, metricsRecorder);
}
if (!recordError.isPresent()
@ -537,7 +540,8 @@ public class LogValidator {
int batchIndex,
long now,
TimestampType timestampType,
long timestampDiffMaxMs,
long timestampBeforeMaxMs,
long timestampAfterMaxMs,
boolean compactedTopic,
MetricsRecorder metricsRecorder) {
if (!record.hasMagic(batch.magic())) {
@ -566,7 +570,7 @@ public class LogValidator {
if (keyError.isPresent())
return keyError;
else
return validateTimestamp(batch, record, batchIndex, now, timestampType, timestampDiffMaxMs);
return validateTimestamp(batch, record, batchIndex, now, timestampType, timestampBeforeMaxMs, timestampAfterMaxMs);
}
private static Optional<ApiRecordError> validateKey(Record record,
@ -588,20 +592,29 @@ public class LogValidator {
int batchIndex,
long now,
TimestampType timestampType,
long timestampDiffMaxMs) {
if (timestampType == TimestampType.CREATE_TIME
&& record.timestamp() != RecordBatch.NO_TIMESTAMP
&& Math.abs(record.timestamp() - now) > timestampDiffMaxMs)
return Optional.of(new ApiRecordError(Errors.INVALID_TIMESTAMP, new RecordError(batchIndex,
"Timestamp " + record.timestamp() + " of message with offset " + record.offset()
+ " is out of range. The timestamp should be within [" + (now - timestampDiffMaxMs)
+ ", " + (now + timestampDiffMaxMs) + "]")));
else if (batch.timestampType() == TimestampType.LOG_APPEND_TIME)
long timestampBeforeMaxMs,
long timestampAfterMaxMs) {
if (timestampType == TimestampType.CREATE_TIME && record.timestamp() != RecordBatch.NO_TIMESTAMP) {
if (recordHasInvalidTimestamp(record, now, timestampBeforeMaxMs, timestampAfterMaxMs)) {
return Optional.of(new ApiRecordError(Errors.INVALID_TIMESTAMP, new RecordError(batchIndex,
"Timestamp " + record.timestamp() + " of message with offset " + record.offset()
+ " is out of range. The timestamp should be within [" + (now - timestampBeforeMaxMs)
+ ", " + (now + timestampAfterMaxMs) + "]")));
}
} else if (batch.timestampType() == TimestampType.LOG_APPEND_TIME)
return Optional.of(new ApiRecordError(Errors.INVALID_TIMESTAMP, new RecordError(batchIndex,
"Invalid timestamp type in message " + record + ". Producer should not set timestamp "
+ "type to LogAppendTime.")));
else
return Optional.empty();
return Optional.empty();
}
private static boolean recordHasInvalidTimestamp(Record record,
long now,
long timestampBeforeMaxMs,
long timestampAfterMaxMs) {
final long timestampDiff = now - record.timestamp();
return timestampDiff > timestampBeforeMaxMs ||
-1 * timestampDiff > timestampAfterMaxMs;
}
private static Optional<ApiRecordError> validateRecordCompression(CompressionType sourceCompression,