diff --git a/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java b/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java index 5be3057b62d..cad32445cd9 100755 --- a/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java +++ b/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java @@ -197,12 +197,35 @@ public class TopicConfig { public static final String MESSAGE_TIMESTAMP_TYPE_DOC = "Define whether the timestamp in the message is " + "message create time or log append time. The value should be either `CreateTime` or `LogAppendTime`"; + /** + * @deprecated since 3.6, removal planned in 4.0. + * Use message.timestamp.before.max.ms and message.timestamp.after.max.ms instead + */ + @Deprecated public static final String MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_CONFIG = "message.timestamp.difference.max.ms"; - public static final String MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_DOC = "The maximum difference allowed between " + + + /** + * @deprecated since 3.6, removal planned in 4.0. + * Use message.timestamp.before.max.ms and message.timestamp.after.max.ms instead + */ + @Deprecated + public static final String MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_DOC = "[DEPRECATED] The maximum difference allowed between " + "the timestamp when a broker receives a message and the timestamp specified in the message. If " + "message.timestamp.type=CreateTime, a message will be rejected if the difference in timestamp " + "exceeds this threshold. This configuration is ignored if message.timestamp.type=LogAppendTime."; + public static final String MESSAGE_TIMESTAMP_BEFORE_MAX_MS_CONFIG = "message.timestamp.before.max.ms"; + public static final String MESSAGE_TIMESTAMP_BEFORE_MAX_MS_DOC = "This configuration sets the allowable timestamp " + + "difference between the broker's timestamp and the message timestamp. The message timestamp can be earlier than " + + "or equal to the broker's timestamp, with the maximum allowable difference determined by the value set in this " + + "configuration. If message.timestamp.type=CreateTime, the message will be rejected if the difference in " + + "timestamps exceeds this specified threshold. This configuration is ignored if message.timestamp.type=LogAppendTime."; + public static final String MESSAGE_TIMESTAMP_AFTER_MAX_MS_CONFIG = "message.timestamp.after.max.ms"; + public static final String MESSAGE_TIMESTAMP_AFTER_MAX_MS_DOC = "This configuration sets the allowable timestamp " + + "difference between the message timestamp and the broker's timestamp. The message timestamp can be later than " + + "or equal to the broker's timestamp, with the maximum allowable difference determined by the value set in this " + + "configuration. If message.timestamp.type=CreateTime, the message will be rejected if the difference in " + + "timestamps exceeds this specified threshold. This configuration is ignored if message.timestamp.type=LogAppendTime."; public static final String MESSAGE_DOWNCONVERSION_ENABLE_CONFIG = "message.downconversion.enable"; public static final String MESSAGE_DOWNCONVERSION_ENABLE_DOC = "This configuration controls whether " + "down-conversion of message formats is enabled to satisfy consume requests. When set to false, " + diff --git a/core/src/main/scala/kafka/log/UnifiedLog.scala b/core/src/main/scala/kafka/log/UnifiedLog.scala index 1b191844e77..72e0784e37d 100644 --- a/core/src/main/scala/kafka/log/UnifiedLog.scala +++ b/core/src/main/scala/kafka/log/UnifiedLog.scala @@ -774,7 +774,8 @@ class UnifiedLog(@volatile var logStartOffset: Long, config.compact, config.recordVersion.value, config.messageTimestampType, - config.messageTimestampDifferenceMaxMs, + config.messageTimestampBeforeMaxMs, + config.messageTimestampAfterMaxMs, leaderEpoch, origin, interBrokerProtocolVersion diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index ddc206f7ed8..1245ad0a3eb 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -465,7 +465,14 @@ object KafkaConfig { val LogMessageFormatVersionProp = ServerTopicConfigSynonyms.serverSynonym(TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG) val LogMessageTimestampTypeProp = ServerTopicConfigSynonyms.serverSynonym(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG) + + /* See `TopicConfig.MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_CONFIG` for details */ + @deprecated("3.6") val LogMessageTimestampDifferenceMaxMsProp = ServerTopicConfigSynonyms.serverSynonym(TopicConfig.MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_CONFIG) + + val LogMessageTimestampBeforeMaxMsProp = ServerTopicConfigSynonyms.serverSynonym(TopicConfig.MESSAGE_TIMESTAMP_BEFORE_MAX_MS_CONFIG) + val LogMessageTimestampAfterMaxMsProp = ServerTopicConfigSynonyms.serverSynonym(TopicConfig.MESSAGE_TIMESTAMP_AFTER_MAX_MS_CONFIG) + val NumRecoveryThreadsPerDataDirProp = "num.recovery.threads.per.data.dir" val AutoCreateTopicsEnableProp = "auto.create.topics.enable" val MinInSyncReplicasProp = ServerTopicConfigSynonyms.serverSynonym(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG) @@ -899,10 +906,23 @@ object KafkaConfig { val LogMessageTimestampTypeDoc = "Define whether the timestamp in the message is message create time or log append time. The value should be either " + "`CreateTime` or `LogAppendTime`." - val LogMessageTimestampDifferenceMaxMsDoc = "The maximum difference allowed between the timestamp when a broker receives " + + val LogMessageTimestampDifferenceMaxMsDoc = "[DEPRECATED] The maximum difference allowed between the timestamp when a broker receives " + "a message and the timestamp specified in the message. If log.message.timestamp.type=CreateTime, a message will be rejected " + "if the difference in timestamp exceeds this threshold. This configuration is ignored if log.message.timestamp.type=LogAppendTime." + "The maximum timestamp difference allowed should be no greater than log.retention.ms to avoid unnecessarily frequent log rolling." + + val LogMessageTimestampBeforeMaxMsDoc = "This configuration sets the allowable timestamp difference between the " + + "broker's timestamp and the message timestamp. The message timestamp can be earlier than or equal to the broker's " + + "timestamp, with the maximum allowable difference determined by the value set in this configuration. " + + "If log.message.timestamp.type=CreateTime, the message will be rejected if the difference in timestamps exceeds " + + "this specified threshold. This configuration is ignored if log.message.timestamp.type=LogAppendTime."; + + val LogMessageTimestampAfterMaxMsDoc = "This configuration sets the allowable timestamp difference between the " + + "message timestamp and the broker's timestamp. The message timestamp can be later than or equal to the broker's " + + "timestamp, with the maximum allowable difference determined by the value set in this configuration. " + + "If log.message.timestamp.type=CreateTime, the message will be rejected if the difference in timestamps exceeds " + + "this specified threshold. This configuration is ignored if log.message.timestamp.type=LogAppendTime."; + val NumRecoveryThreadsPerDataDirDoc = "The number of threads per data directory to be used for log recovery at startup and flushing at shutdown" val AutoCreateTopicsEnableDoc = "Enable auto creation of topic on the server." val MinInSyncReplicasDoc = "When a producer sets acks to \"all\" (or \"-1\"), " + @@ -1284,6 +1304,8 @@ object KafkaConfig { .define(LogMessageFormatVersionProp, STRING, LogConfig.DEFAULT_MESSAGE_FORMAT_VERSION, new MetadataVersionValidator(), MEDIUM, LogMessageFormatVersionDoc) .define(LogMessageTimestampTypeProp, STRING, LogConfig.DEFAULT_MESSAGE_TIMESTAMP_TYPE, in("CreateTime", "LogAppendTime"), MEDIUM, LogMessageTimestampTypeDoc) .define(LogMessageTimestampDifferenceMaxMsProp, LONG, LogConfig.DEFAULT_MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS, atLeast(0), MEDIUM, LogMessageTimestampDifferenceMaxMsDoc) + .define(LogMessageTimestampBeforeMaxMsProp, LONG, LogConfig.DEFAULT_MESSAGE_TIMESTAMP_BEFORE_MAX_MS, atLeast(0), MEDIUM, LogMessageTimestampBeforeMaxMsDoc) + .define(LogMessageTimestampAfterMaxMsProp, LONG, LogConfig.DEFAULT_MESSAGE_TIMESTAMP_AFTER_MAX_MS, atLeast(0), MEDIUM, LogMessageTimestampAfterMaxMsDoc) .define(CreateTopicPolicyClassNameProp, CLASS, null, LOW, CreateTopicPolicyClassNameDoc) .define(AlterConfigPolicyClassNameProp, CLASS, null, LOW, AlterConfigPolicyClassNameDoc) .define(LogMessageDownConversionEnableProp, BOOLEAN, LogConfig.DEFAULT_MESSAGE_DOWNCONVERSION_ENABLE, LOW, LogMessageDownConversionEnableDoc) @@ -1862,7 +1884,37 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami else MetadataVersion.fromVersionString(logMessageFormatVersionString) def logMessageTimestampType = TimestampType.forName(getString(KafkaConfig.LogMessageTimestampTypeProp)) + + /* See `TopicConfig.MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_CONFIG` for details */ + @deprecated("3.6") def logMessageTimestampDifferenceMaxMs: Long = getLong(KafkaConfig.LogMessageTimestampDifferenceMaxMsProp) + + // In the transition period before logMessageTimestampDifferenceMaxMs is removed, to maintain backward compatibility, + // we are using its value if logMessageTimestampBeforeMaxMs default value hasn't changed. + // See `TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG` for deprecation details + @nowarn("cat=deprecation") + def logMessageTimestampBeforeMaxMs: Long = { + val messageTimestampBeforeMaxMs: Long = getLong(KafkaConfig.LogMessageTimestampBeforeMaxMsProp) + if (messageTimestampBeforeMaxMs != LogConfig.DEFAULT_MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS) { + messageTimestampBeforeMaxMs + } else { + logMessageTimestampDifferenceMaxMs + } + } + + // In the transition period before logMessageTimestampDifferenceMaxMs is removed, to maintain backward compatibility, + // we are using its value if logMessageTimestampAfterMaxMs default value hasn't changed. + // See `TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG` for deprecation details + @nowarn("cat=deprecation") + def logMessageTimestampAfterMaxMs: Long = { + val messageTimestampAfterMaxMs: Long = getLong(KafkaConfig.LogMessageTimestampAfterMaxMsProp) + if (messageTimestampAfterMaxMs != Long.MaxValue) { + messageTimestampAfterMaxMs + } else { + logMessageTimestampDifferenceMaxMs + } + } + def logMessageDownConversionEnable: Boolean = getBoolean(KafkaConfig.LogMessageDownConversionEnableProp) /** ********* Replication configuration ***********/ @@ -2450,6 +2502,8 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami logProps.put(TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG, logMessageFormatVersion.version) logProps.put(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG, logMessageTimestampType.name) logProps.put(TopicConfig.MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_CONFIG, logMessageTimestampDifferenceMaxMs: java.lang.Long) + logProps.put(TopicConfig.MESSAGE_TIMESTAMP_BEFORE_MAX_MS_CONFIG, logMessageTimestampBeforeMaxMs: java.lang.Long) + logProps.put(TopicConfig.MESSAGE_TIMESTAMP_AFTER_MAX_MS_CONFIG, logMessageTimestampAfterMaxMs: java.lang.Long) logProps.put(TopicConfig.MESSAGE_DOWNCONVERSION_ENABLE_CONFIG, logMessageDownConversionEnable: java.lang.Boolean) logProps.put(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG, logLocalRetentionMs) logProps.put(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, logLocalRetentionBytes) diff --git a/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala index 6ebb51faeeb..ac7b775c228 100644 --- a/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala +++ b/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala @@ -29,9 +29,10 @@ import org.apache.kafka.storage.internals.log.LogConfig import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.Timeout import org.junit.jupiter.params.ParameterizedTest -import org.junit.jupiter.params.provider.ValueSource +import org.junit.jupiter.params.provider.{Arguments, MethodSource, ValueSource} import java.nio.charset.StandardCharsets +import scala.annotation.nowarn class PlaintextProducerSendTest extends BaseProducerSendTest { @@ -121,16 +122,18 @@ class PlaintextProducerSendTest extends BaseProducerSendTest { } @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) - @ValueSource(strings = Array("zk", "kraft")) - def testSendWithInvalidCreateTime(quorum: String): Unit = { + @MethodSource(Array("quorumAndTimestampConfigProvider")) + def testSendWithInvalidBeforeAndAfterTimestamp(quorum: String, messageTimeStampConfig: String, recordTimestamp: Long): Unit = { val topicProps = new Properties() - topicProps.setProperty(TopicConfig.MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_CONFIG, "1000") + // set the TopicConfig for timestamp validation to have 1 minute threshold. Note that recordTimestamp has 5 minutes diff + val oneMinuteInMs: Long = 1 * 60 * 60 * 1000L + topicProps.setProperty(messageTimeStampConfig, oneMinuteInMs.toString) TestUtils.createTopicWithAdmin(admin, topic, brokers, 1, 2, topicConfig = topicProps) val producer = createProducer() try { val e = assertThrows(classOf[ExecutionException], - () => producer.send(new ProducerRecord(topic, 0, System.currentTimeMillis() - 1001, "key".getBytes, "value".getBytes)).get()).getCause + () => producer.send(new ProducerRecord(topic, 0, recordTimestamp, "key".getBytes, "value".getBytes)).get()).getCause assertTrue(e.isInstanceOf[InvalidTimestampException]) } finally { producer.close() @@ -140,13 +143,54 @@ class PlaintextProducerSendTest extends BaseProducerSendTest { val compressedProducer = createProducer(compressionType = "gzip") try { val e = assertThrows(classOf[ExecutionException], - () => compressedProducer.send(new ProducerRecord(topic, 0, System.currentTimeMillis() - 1001, "key".getBytes, "value".getBytes)).get()).getCause + () => compressedProducer.send(new ProducerRecord(topic, 0, recordTimestamp, "key".getBytes, "value".getBytes)).get()).getCause assertTrue(e.isInstanceOf[InvalidTimestampException]) } finally { compressedProducer.close() } } + @ParameterizedTest + @MethodSource(Array("quorumAndTimestampConfigProvider")) + def testValidBeforeAndAfterTimestampsAtThreshold(quorum: String, messageTimeStampConfig: String, recordTimestamp: Long): Unit = { + val topicProps = new Properties() + + // set the TopicConfig for timestamp validation to be the same as the record timestamp + topicProps.setProperty(messageTimeStampConfig, recordTimestamp.toString) + TestUtils.createTopicWithAdmin(admin, topic, brokers, 1, 2, topicConfig = topicProps) + + val producer = createProducer() + + assertDoesNotThrow(() => producer.send(new ProducerRecord(topic, 0, recordTimestamp, "key".getBytes, "value".getBytes))) + producer.close() + + // Test compressed messages. + val compressedProducer = createProducer(compressionType = "gzip") + assertDoesNotThrow(() => compressedProducer.send(new ProducerRecord(topic, 0, recordTimestamp, "key".getBytes, "value".getBytes))) + compressedProducer.close() + } + + @ParameterizedTest + @MethodSource(Array("quorumAndTimestampConfigProvider")) + def testValidBeforeAndAfterTimestampsWithinThreshold(quorum: String, messageTimeStampConfig: String, recordTimestamp: Long): Unit = { + val topicProps = new Properties() + + // set the TopicConfig for timestamp validation to have 10 minute threshold. Note that recordTimestamp has 5 minutes diff + val tenMinutesInMs: Long = 10 * 60 * 60 * 1000L + topicProps.setProperty(messageTimeStampConfig, tenMinutesInMs.toString) + TestUtils.createTopicWithAdmin(admin, topic, brokers, 1, 2, topicConfig = topicProps) + + val producer = createProducer() + + assertDoesNotThrow(() => producer.send(new ProducerRecord(topic, 0, recordTimestamp, "key".getBytes, "value".getBytes))) + producer.close() + + // Test compressed messages. + val compressedProducer = createProducer(compressionType = "gzip") + assertDoesNotThrow(() => compressedProducer.send(new ProducerRecord(topic, 0, recordTimestamp, "key".getBytes, "value".getBytes))) + compressedProducer.close() + } + // Test that producer with max.block.ms=0 can be used to send in non-blocking mode // where requests are failed immediately without blocking if metadata is not available // or buffer is full. @@ -228,3 +272,21 @@ class PlaintextProducerSendTest extends BaseProducerSendTest { } } + +object PlaintextProducerSendTest { + + // See `TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG` for deprecation details + @nowarn("cat=deprecation") + def quorumAndTimestampConfigProvider: java.util.stream.Stream[Arguments] = { + val now: Long = System.currentTimeMillis() + val fiveMinutesInMs: Long = 5 * 60 * 60 * 1000L + java.util.stream.Stream.of[Arguments]( + Arguments.of("zk", TopicConfig.MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_CONFIG, Long.box(now - fiveMinutesInMs)), + Arguments.of("zk", TopicConfig.MESSAGE_TIMESTAMP_BEFORE_MAX_MS_CONFIG, Long.box(now - fiveMinutesInMs)), + Arguments.of("zk", TopicConfig.MESSAGE_TIMESTAMP_AFTER_MAX_MS_CONFIG, Long.box(now + fiveMinutesInMs)), + Arguments.of("kraft", TopicConfig.MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_CONFIG, Long.box(now - fiveMinutesInMs)), + Arguments.of("kraft", TopicConfig.MESSAGE_TIMESTAMP_BEFORE_MAX_MS_CONFIG, Long.box(now - fiveMinutesInMs)), + Arguments.of("kraft", TopicConfig.MESSAGE_TIMESTAMP_AFTER_MAX_MS_CONFIG, Long.box(now + fiveMinutesInMs)) + ) + } +} \ No newline at end of file diff --git a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala index cc4e6eeead6..c92287184a8 100644 --- a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala +++ b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala @@ -618,6 +618,7 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup @Test @Disabled // TODO: To be re-enabled once we can make it less flaky: KAFKA-6527 + @nowarn("cat=deprecation") // See `TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG` for deprecation details def testDefaultTopicConfig(): Unit = { val (producerThread, consumerThread) = startProduceConsume(retries = 0) @@ -643,6 +644,8 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup props.put(KafkaConfig.LogPreAllocateProp, true.toString) props.put(KafkaConfig.LogMessageTimestampTypeProp, TimestampType.LOG_APPEND_TIME.toString) props.put(KafkaConfig.LogMessageTimestampDifferenceMaxMsProp, "1000") + props.put(KafkaConfig.LogMessageTimestampBeforeMaxMsProp, "1000") + props.put(KafkaConfig.LogMessageTimestampAfterMaxMsProp, "1000") props.put(KafkaConfig.LogMessageDownConversionEnableProp, "false") reconfigureServers(props, perBrokerConfig = false, (KafkaConfig.LogSegmentBytesProp, "4000")) @@ -681,11 +684,15 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup props.clear() props.put(KafkaConfig.LogMessageTimestampTypeProp, TimestampType.CREATE_TIME.toString) props.put(KafkaConfig.LogMessageTimestampDifferenceMaxMsProp, "1000") + props.put(KafkaConfig.LogMessageTimestampBeforeMaxMsProp, "1000") + props.put(KafkaConfig.LogMessageTimestampAfterMaxMsProp, "1000") reconfigureServers(props, perBrokerConfig = false, (KafkaConfig.LogMessageTimestampTypeProp, TimestampType.CREATE_TIME.toString)) consumerThread.waitForMatchingRecords(record => record.timestampType == TimestampType.CREATE_TIME) // Verify that invalid configs are not applied val invalidProps = Map( KafkaConfig.LogMessageTimestampDifferenceMaxMsProp -> "abc", // Invalid type + KafkaConfig.LogMessageTimestampBeforeMaxMsProp -> "abc", // Invalid type + KafkaConfig.LogMessageTimestampAfterMaxMsProp -> "abc", // Invalid type KafkaConfig.LogMessageTimestampTypeProp -> "invalid", // Invalid value KafkaConfig.LogRollTimeMillisProp -> "0" // Fails KafkaConfig validation ) diff --git a/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala index 1575542abfc..b52f4ec1e69 100644 --- a/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala +++ b/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala @@ -73,7 +73,6 @@ abstract class AbstractLogCleanerIntegrationTest { props.put(TopicConfig.FILE_DELETE_DELAY_MS_CONFIG, deleteDelay: java.lang.Integer) props.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT) props.put(TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG, minCleanableDirtyRatio: java.lang.Float) - props.put(TopicConfig.MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_CONFIG, Long.MaxValue.toString) props.put(TopicConfig.MIN_COMPACTION_LAG_MS_CONFIG, minCompactionLagMs: java.lang.Long) props.put(TopicConfig.MAX_COMPACTION_LAG_MS_CONFIG, maxCompactionLagMs: java.lang.Long) props ++= propertyOverrides diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala index 8d5056c5ede..05aaa8ab062 100644 --- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala @@ -54,7 +54,6 @@ class LogCleanerTest { logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, 1024: java.lang.Integer) logProps.put(TopicConfig.SEGMENT_INDEX_BYTES_CONFIG, 1024: java.lang.Integer) logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT) - logProps.put(TopicConfig.MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_CONFIG, Long.MaxValue.toString) val logConfig = new LogConfig(logProps) val time = new MockTime() val throttler = new Throttler(desiredRatePerSec = Double.MaxValue, checkIntervalMs = Long.MaxValue, time = time) diff --git a/core/src/test/scala/unit/kafka/log/LogConfigTest.scala b/core/src/test/scala/unit/kafka/log/LogConfigTest.scala index dd9b20289c4..f60635a3522 100644 --- a/core/src/test/scala/unit/kafka/log/LogConfigTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogConfigTest.scala @@ -397,4 +397,22 @@ class LogConfigTest { LogConfig.validateBrokerLogConfigValues(kafkaConfig.extractLogConfigMap, kafkaConfig.isRemoteLogStorageSystemEnabled) } } + + /* Verify that when the deprecated config LogMessageTimestampDifferenceMaxMsProp has non default value the new configs + * LogMessageTimestampBeforeMaxMsProp and LogMessageTimestampAfterMaxMsProp are not changed from the default we are using + * the deprecated config for backward compatibility. + * See `TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG` for deprecation details */ + @nowarn("cat=deprecation") + @Test + def testTimestampBeforeMaxMsUsesDeprecatedConfig(): Unit = { + val oneDayInMillis = 24 * 60 * 60 * 1000L + val kafkaProps = TestUtils.createBrokerConfig(nodeId = 0, zkConnect = "") + kafkaProps.put(KafkaConfig.LogMessageTimestampBeforeMaxMsProp, Long.MaxValue.toString) + kafkaProps.put(KafkaConfig.LogMessageTimestampAfterMaxMsProp, Long.MaxValue.toString) + kafkaProps.put(KafkaConfig.LogMessageTimestampDifferenceMaxMsProp, oneDayInMillis.toString) + + val logProps = KafkaConfig.fromProps(kafkaProps).extractLogConfigMap + assertEquals(oneDayInMillis, logProps.get(TopicConfig.MESSAGE_TIMESTAMP_BEFORE_MAX_MS_CONFIG)) + assertEquals(oneDayInMillis, logProps.get(TopicConfig.MESSAGE_TIMESTAMP_AFTER_MAX_MS_CONFIG)) + } } diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala index e1c593fbca5..5cab80b1864 100755 --- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala @@ -55,7 +55,6 @@ class LogManagerTest { logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, 1024: java.lang.Integer) logProps.put(TopicConfig.SEGMENT_INDEX_BYTES_CONFIG, 4096: java.lang.Integer) logProps.put(TopicConfig.RETENTION_MS_CONFIG, maxLogAgeMs: java.lang.Integer) - logProps.put(TopicConfig.MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_CONFIG, Long.MaxValue.toString) val logConfig = new LogConfig(logProps) var logDir: File = _ var logManager: LogManager = _ diff --git a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala index 4f54e219317..6b781f6fa69 100644 --- a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala @@ -123,6 +123,7 @@ class LogValidatorTest { magic, TimestampType.CREATE_TIME, 1000L, + 1000L, RecordBatch.NO_PRODUCER_EPOCH, AppendOrigin.CLIENT, MetadataVersion.IBP_2_3_IV1 @@ -154,6 +155,7 @@ class LogValidatorTest { magic, TimestampType.LOG_APPEND_TIME, 1000L, + 1000L, RecordBatch.NO_PARTITION_LEADER_EPOCH, AppendOrigin.CLIENT, MetadataVersion.latest @@ -198,6 +200,7 @@ class LogValidatorTest { targetMagic, TimestampType.LOG_APPEND_TIME, 1000L, + 1000L, RecordBatch.NO_PARTITION_LEADER_EPOCH, AppendOrigin.CLIENT, MetadataVersion.latest @@ -249,6 +252,7 @@ class LogValidatorTest { magic, TimestampType.LOG_APPEND_TIME, 1000L, + 1000L, RecordBatch.NO_PARTITION_LEADER_EPOCH, AppendOrigin.CLIENT, MetadataVersion.latest @@ -314,6 +318,7 @@ class LogValidatorTest { RecordBatch.MAGIC_VALUE_V2, TimestampType.LOG_APPEND_TIME, 1000L, + 1000L, RecordBatch.NO_PARTITION_LEADER_EPOCH, AppendOrigin.CLIENT, MetadataVersion.latest @@ -361,6 +366,7 @@ class LogValidatorTest { magic, TimestampType.CREATE_TIME, 1000L, + 1000L, partitionLeaderEpoch, AppendOrigin.CLIENT, MetadataVersion.latest @@ -438,6 +444,7 @@ class LogValidatorTest { magic, TimestampType.CREATE_TIME, 1000L, + 1000L, partitionLeaderEpoch, AppendOrigin.CLIENT, MetadataVersion.latest @@ -496,6 +503,7 @@ class LogValidatorTest { toMagic, TimestampType.CREATE_TIME, 1000L, + 1000L, RecordBatch.NO_PARTITION_LEADER_EPOCH, AppendOrigin.CLIENT, MetadataVersion.latest @@ -543,6 +551,7 @@ class LogValidatorTest { RecordBatch.MAGIC_VALUE_V2, TimestampType.CREATE_TIME, 1000L, + 1000L, RecordBatch.NO_PARTITION_LEADER_EPOCH, AppendOrigin.CLIENT, MetadataVersion.latest, @@ -602,6 +611,7 @@ class LogValidatorTest { magic, TimestampType.CREATE_TIME, 1000L, + 1000L, partitionLeaderEpoch, AppendOrigin.CLIENT, MetadataVersion.latest @@ -657,6 +667,7 @@ class LogValidatorTest { RecordBatch.MAGIC_VALUE_V1, TimestampType.CREATE_TIME, 1000L, + 1000L, RecordBatch.NO_PARTITION_LEADER_EPOCH, AppendOrigin.CLIENT, MetadataVersion.latest @@ -682,6 +693,7 @@ class LogValidatorTest { RecordBatch.MAGIC_VALUE_V2, TimestampType.CREATE_TIME, 1000L, + 1000L, RecordBatch.NO_PARTITION_LEADER_EPOCH, AppendOrigin.CLIENT, MetadataVersion.latest @@ -707,6 +719,7 @@ class LogValidatorTest { RecordBatch.MAGIC_VALUE_V1, TimestampType.CREATE_TIME, 1000L, + 1000L, RecordBatch.NO_PARTITION_LEADER_EPOCH, AppendOrigin.CLIENT, MetadataVersion.latest @@ -732,6 +745,7 @@ class LogValidatorTest { RecordBatch.MAGIC_VALUE_V1, TimestampType.CREATE_TIME, 1000L, + 1000L, RecordBatch.NO_PARTITION_LEADER_EPOCH, AppendOrigin.CLIENT, MetadataVersion.latest @@ -756,6 +770,7 @@ class LogValidatorTest { RecordBatch.MAGIC_VALUE_V0, TimestampType.CREATE_TIME, 1000L, + 1000L, RecordBatch.NO_PARTITION_LEADER_EPOCH, AppendOrigin.CLIENT, MetadataVersion.latest, @@ -778,6 +793,7 @@ class LogValidatorTest { RecordBatch.MAGIC_VALUE_V0, TimestampType.CREATE_TIME, 1000L, + 1000L, RecordBatch.NO_PARTITION_LEADER_EPOCH, AppendOrigin.CLIENT, MetadataVersion.latest @@ -801,6 +817,7 @@ class LogValidatorTest { RecordBatch.MAGIC_VALUE_V1, TimestampType.CREATE_TIME, 5000L, + 5000L, RecordBatch.NO_PARTITION_LEADER_EPOCH, AppendOrigin.CLIENT, MetadataVersion.latest @@ -825,6 +842,7 @@ class LogValidatorTest { RecordBatch.MAGIC_VALUE_V2, TimestampType.CREATE_TIME, 5000L, + 5000L, RecordBatch.NO_PARTITION_LEADER_EPOCH, AppendOrigin.CLIENT, MetadataVersion.latest @@ -850,6 +868,7 @@ class LogValidatorTest { RecordBatch.MAGIC_VALUE_V1, TimestampType.CREATE_TIME, 5000L, + 5000L, RecordBatch.NO_PARTITION_LEADER_EPOCH, AppendOrigin.CLIENT, MetadataVersion.latest @@ -875,6 +894,7 @@ class LogValidatorTest { RecordBatch.MAGIC_VALUE_V2, TimestampType.CREATE_TIME, 5000L, + 5000L, RecordBatch.NO_PARTITION_LEADER_EPOCH, AppendOrigin.CLIENT, MetadataVersion.latest @@ -898,6 +918,7 @@ class LogValidatorTest { RecordBatch.MAGIC_VALUE_V1, TimestampType.LOG_APPEND_TIME, 1000L, + 1000L, RecordBatch.NO_PARTITION_LEADER_EPOCH, AppendOrigin.CLIENT, MetadataVersion.latest @@ -923,6 +944,7 @@ class LogValidatorTest { RecordBatch.MAGIC_VALUE_V2, TimestampType.LOG_APPEND_TIME, 1000L, + 1000L, RecordBatch.NO_PARTITION_LEADER_EPOCH, AppendOrigin.CLIENT, MetadataVersion.latest @@ -948,6 +970,7 @@ class LogValidatorTest { RecordBatch.MAGIC_VALUE_V1, TimestampType.LOG_APPEND_TIME, 1000L, + 1000L, RecordBatch.NO_PARTITION_LEADER_EPOCH, AppendOrigin.CLIENT, MetadataVersion.latest @@ -973,6 +996,7 @@ class LogValidatorTest { RecordBatch.MAGIC_VALUE_V2, TimestampType.LOG_APPEND_TIME, 1000L, + 1000L, RecordBatch.NO_PARTITION_LEADER_EPOCH, AppendOrigin.CLIENT, MetadataVersion.latest @@ -998,6 +1022,7 @@ class LogValidatorTest { RecordBatch.CURRENT_MAGIC_VALUE, TimestampType.CREATE_TIME, 5000L, + 5000L, RecordBatch.NO_PARTITION_LEADER_EPOCH, AppendOrigin.CLIENT, MetadataVersion.latest @@ -1020,6 +1045,7 @@ class LogValidatorTest { RecordBatch.CURRENT_MAGIC_VALUE, TimestampType.CREATE_TIME, 5000L, + 5000L, RecordBatch.NO_PARTITION_LEADER_EPOCH, AppendOrigin.COORDINATOR, MetadataVersion.latest @@ -1047,6 +1073,7 @@ class LogValidatorTest { RecordBatch.MAGIC_VALUE_V0, TimestampType.CREATE_TIME, 5000L, + 5000L, RecordBatch.NO_PARTITION_LEADER_EPOCH, AppendOrigin.CLIENT, MetadataVersion.latest @@ -1070,6 +1097,7 @@ class LogValidatorTest { RecordBatch.MAGIC_VALUE_V0, TimestampType.CREATE_TIME, 5000L, + 5000L, RecordBatch.NO_PARTITION_LEADER_EPOCH, AppendOrigin.CLIENT, MetadataVersion.latest @@ -1092,6 +1120,7 @@ class LogValidatorTest { RecordBatch.MAGIC_VALUE_V2, TimestampType.LOG_APPEND_TIME, 1000L, + 1000L, RecordBatch.NO_PARTITION_LEADER_EPOCH, AppendOrigin.CLIENT, MetadataVersion.latest @@ -1114,6 +1143,7 @@ class LogValidatorTest { RecordBatch.MAGIC_VALUE_V2, TimestampType.LOG_APPEND_TIME, 1000L, + 1000L, RecordBatch.NO_PARTITION_LEADER_EPOCH, AppendOrigin.CLIENT, MetadataVersion.latest @@ -1137,6 +1167,7 @@ class LogValidatorTest { RecordBatch.MAGIC_VALUE_V1, TimestampType.CREATE_TIME, 5000L, + 5000L, RecordBatch.NO_PARTITION_LEADER_EPOCH, AppendOrigin.CLIENT, MetadataVersion.latest @@ -1160,6 +1191,7 @@ class LogValidatorTest { RecordBatch.MAGIC_VALUE_V1, TimestampType.CREATE_TIME, 5000L, + 5000L, RecordBatch.NO_PARTITION_LEADER_EPOCH, AppendOrigin.CLIENT, MetadataVersion.latest @@ -1185,6 +1217,7 @@ class LogValidatorTest { RecordBatch.MAGIC_VALUE_V1, TimestampType.CREATE_TIME, 5000L, + 5000L, RecordBatch.NO_PARTITION_LEADER_EPOCH, AppendOrigin.CLIENT, MetadataVersion.latest @@ -1210,6 +1243,7 @@ class LogValidatorTest { RecordBatch.MAGIC_VALUE_V1, TimestampType.CREATE_TIME, 5000L, + 5000L, RecordBatch.NO_PARTITION_LEADER_EPOCH, AppendOrigin.CLIENT, MetadataVersion.latest @@ -1233,6 +1267,7 @@ class LogValidatorTest { RecordBatch.MAGIC_VALUE_V0, TimestampType.CREATE_TIME, 5000L, + 5000L, RecordBatch.NO_PARTITION_LEADER_EPOCH, AppendOrigin.CLIENT, MetadataVersion.latest @@ -1256,6 +1291,7 @@ class LogValidatorTest { RecordBatch.MAGIC_VALUE_V0, TimestampType.CREATE_TIME, 5000L, + 5000L, RecordBatch.NO_PARTITION_LEADER_EPOCH, AppendOrigin.CLIENT, MetadataVersion.latest @@ -1277,6 +1313,7 @@ class LogValidatorTest { RecordBatch.MAGIC_VALUE_V0, TimestampType.CREATE_TIME, 5000L, + 5000L, RecordBatch.NO_PARTITION_LEADER_EPOCH, AppendOrigin.CLIENT, MetadataVersion.latest @@ -1305,6 +1342,7 @@ class LogValidatorTest { RecordBatch.MAGIC_VALUE_V2, TimestampType.LOG_APPEND_TIME, 1000L, + 1000L, RecordBatch.NO_PARTITION_LEADER_EPOCH, AppendOrigin.CLIENT, MetadataVersion.IBP_2_0_IV1 @@ -1339,6 +1377,7 @@ class LogValidatorTest { RecordBatch.MAGIC_VALUE_V1, TimestampType.CREATE_TIME, 1000L, + 1000L, RecordBatch.NO_PARTITION_LEADER_EPOCH, AppendOrigin.CLIENT, MetadataVersion.latest @@ -1398,6 +1437,73 @@ class LogValidatorTest { assertEquals(6, e.recordErrors.size) } + @Test + def testRecordWithPastTimestampIsRejected(): Unit = { + val timestampBeforeMaxConfig = 24 * 60 * 60 * 1000L //24 hrs + val timestampAfterMaxConfig = 1 * 60 * 60 * 1000L //1 hr + val now = System.currentTimeMillis() + val fiveMinutesBeforeThreshold = now - timestampBeforeMaxConfig - (5 * 60 * 1000L) + val records = createRecords(magicValue = RecordBatch.MAGIC_VALUE_V2, timestamp = fiveMinutesBeforeThreshold, + codec = CompressionType.GZIP) + val e = assertThrows(classOf[RecordValidationException], + () => new LogValidator( + records, + topicPartition, + time, + CompressionType.GZIP, + CompressionType.GZIP, + false, + RecordBatch.MAGIC_VALUE_V2, + TimestampType.CREATE_TIME, + timestampBeforeMaxConfig, + timestampAfterMaxConfig, + RecordBatch.NO_PARTITION_LEADER_EPOCH, + AppendOrigin.CLIENT, + MetadataVersion.latest + ).validateMessagesAndAssignOffsets( + PrimitiveRef.ofLong(0L), metricsRecorder, RequestLocal.withThreadConfinedCaching.bufferSupplier + ) + ) + + assertTrue(e.invalidException.isInstanceOf[InvalidTimestampException]) + assertFalse(e.recordErrors.isEmpty) + assertEquals(e.recordErrors.size, 3) + } + + + @Test + def testRecordWithFutureTimestampIsRejected(): Unit = { + val timestampBeforeMaxConfig = 24 * 60 * 60 * 1000L //24 hrs + val timestampAfterMaxConfig = 1 * 60 * 60 * 1000L //1 hr + val now = System.currentTimeMillis() + val fiveMinutesAfterThreshold = now + timestampAfterMaxConfig + (5 * 60 * 1000L) + val records = createRecords(magicValue = RecordBatch.MAGIC_VALUE_V2, timestamp = fiveMinutesAfterThreshold, + codec = CompressionType.GZIP) + val e = assertThrows(classOf[RecordValidationException], + () => new LogValidator( + records, + topicPartition, + time, + CompressionType.GZIP, + CompressionType.GZIP, + false, + RecordBatch.MAGIC_VALUE_V2, + TimestampType.CREATE_TIME, + timestampBeforeMaxConfig, + timestampAfterMaxConfig, + RecordBatch.NO_PARTITION_LEADER_EPOCH, + AppendOrigin.CLIENT, + MetadataVersion.latest + ).validateMessagesAndAssignOffsets( + PrimitiveRef.ofLong(0L), metricsRecorder, RequestLocal.withThreadConfinedCaching.bufferSupplier + ) + ) + + assertTrue(e.invalidException.isInstanceOf[InvalidTimestampException]) + assertFalse(e.recordErrors.isEmpty) + assertEquals(e.recordErrors.size, 3) + } + private def testBatchWithoutRecordsNotAllowed(sourceCompression: CompressionType, targetCompression: CompressionType): Unit = { val offset = 1234567 val (producerId, producerEpoch, baseSequence, isTransactional, partitionLeaderEpoch) = @@ -1417,6 +1523,7 @@ class LogValidatorTest { RecordBatch.CURRENT_MAGIC_VALUE, TimestampType.CREATE_TIME, 5000L, + 5000L, RecordBatch.NO_PARTITION_LEADER_EPOCH, AppendOrigin.CLIENT, MetadataVersion.latest diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala index 5136862192c..ea7ab2e293b 100755 --- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala @@ -761,6 +761,7 @@ class KafkaConfigTest { } @Test + @nowarn("cat=deprecation") // See `TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG` for deprecation details def testFromPropsInvalid(): Unit = { def baseProperties: Properties = { val validRequiredProperties = new Properties() @@ -856,6 +857,8 @@ class KafkaConfigTest { case KafkaConfig.LogFlushSchedulerIntervalMsProp => assertPropertyInvalid(baseProperties, name, "not_a_number") case KafkaConfig.LogFlushIntervalMsProp => assertPropertyInvalid(baseProperties, name, "not_a_number") case KafkaConfig.LogMessageTimestampDifferenceMaxMsProp => assertPropertyInvalid(baseProperties, name, "not_a_number") + case KafkaConfig.LogMessageTimestampBeforeMaxMsProp => assertPropertyInvalid(baseProperties, name, "not_a_number") + case KafkaConfig.LogMessageTimestampAfterMaxMsProp => assertPropertyInvalid(baseProperties, name, "not_a_number") case KafkaConfig.LogFlushStartOffsetCheckpointIntervalMsProp => assertPropertyInvalid(baseProperties, name, "not_a_number") case KafkaConfig.NumRecoveryThreadsPerDataDirProp => assertPropertyInvalid(baseProperties, name, "not_a_number", "0") case KafkaConfig.AutoCreateTopicsEnableProp => assertPropertyInvalid(baseProperties, name, "not_a_boolean", "0") @@ -1094,6 +1097,10 @@ class KafkaConfigTest { assertDynamic(kafkaConfigProp, false, () => config.logMessageDownConversionEnable) case TopicConfig.MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_CONFIG => assertDynamic(kafkaConfigProp, 10009, () => config.logMessageTimestampDifferenceMaxMs) + case TopicConfig.MESSAGE_TIMESTAMP_BEFORE_MAX_MS_CONFIG => + assertDynamic(kafkaConfigProp, 10015L, () => config.logMessageTimestampBeforeMaxMs) + case TopicConfig.MESSAGE_TIMESTAMP_AFTER_MAX_MS_CONFIG => + assertDynamic(kafkaConfigProp, 10016L, () => config.logMessageTimestampAfterMaxMs) case TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG => assertDynamic(kafkaConfigProp, "LogAppendTime", () => config.logMessageTimestampType.name) case TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG => diff --git a/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala b/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala index d02f59c0025..6ad22df3221 100644 --- a/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala @@ -30,7 +30,10 @@ import org.apache.kafka.server.metrics.KafkaYammerMetrics import org.apache.kafka.server.record.BrokerCompressionType import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.Test +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.{Arguments, MethodSource} +import scala.annotation.nowarn import scala.jdk.CollectionConverters._ /** @@ -78,12 +81,13 @@ class ProduceRequestTest extends BaseRequestTest { new SimpleRecord(System.currentTimeMillis(), "key2".getBytes, "value2".getBytes)), 1) } - @Test - def testProduceWithInvalidTimestamp(): Unit = { + @ParameterizedTest + @MethodSource(Array("timestampConfigProvider")) + def testProduceWithInvalidTimestamp(messageTimeStampConfig: String, recordTimestamp: Long): Unit = { val topic = "topic" val partition = 0 val topicConfig = new Properties - topicConfig.setProperty(TopicConfig.MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_CONFIG, "1000") + topicConfig.setProperty(messageTimeStampConfig, "1000") val partitionToLeader = TestUtils.createTopic(zkClient, topic, 1, 1, servers, topicConfig) val leader = partitionToLeader(partition) @@ -96,7 +100,7 @@ class ProduceRequestTest extends BaseRequestTest { builder.build() } - val records = createRecords(RecordBatch.MAGIC_VALUE_V2, System.currentTimeMillis() - 1001L, CompressionType.GZIP) + val records = createRecords(RecordBatch.MAGIC_VALUE_V2, recordTimestamp, CompressionType.GZIP) val topicPartition = new TopicPartition("topic", partition) val produceResponse = sendProduceRequest(leader, ProduceRequest.forCurrentMagic(new ProduceRequestData() .setTopicData(new ProduceRequestData.TopicProduceDataCollection(Collections.singletonList( @@ -250,3 +254,16 @@ class ProduceRequestTest extends BaseRequestTest { } } + +object ProduceRequestTest { + + @nowarn("cat=deprecation") // See `TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG` for deprecation details + def timestampConfigProvider: java.util.stream.Stream[Arguments] = { + val fiveMinutesInMs: Long = 5 * 60 * 60 * 1000L + java.util.stream.Stream.of[Arguments]( + Arguments.of(TopicConfig.MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_CONFIG, Long.box(System.currentTimeMillis() - fiveMinutesInMs)), + Arguments.of(TopicConfig.MESSAGE_TIMESTAMP_BEFORE_MAX_MS_CONFIG, Long.box(System.currentTimeMillis() - fiveMinutesInMs)), + Arguments.of(TopicConfig.MESSAGE_TIMESTAMP_AFTER_MAX_MS_CONFIG, Long.box(System.currentTimeMillis() + fiveMinutesInMs)) + ) + } +} diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index 08cfffdb784..e4c831deebe 100755 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -369,7 +369,6 @@ object TestUtils extends Logging { props.put(KafkaConfig.LogDeleteDelayMsProp, "1000") props.put(KafkaConfig.ControlledShutdownRetryBackoffMsProp, "100") props.put(KafkaConfig.LogCleanerDedupeBufferSizeProp, "2097152") - props.put(KafkaConfig.LogMessageTimestampDifferenceMaxMsProp, Long.MaxValue.toString) props.put(KafkaConfig.OffsetsTopicReplicationFactorProp, "1") if (!props.containsKey(KafkaConfig.OffsetsTopicPartitionsProp)) props.put(KafkaConfig.OffsetsTopicPartitionsProp, "5") diff --git a/docs/upgrade.html b/docs/upgrade.html index a6901c42b77..13e76b79cc3 100644 --- a/docs/upgrade.html +++ b/docs/upgrade.html @@ -37,6 +37,9 @@ For more detailed information please refer to the Compatibility, Deprecation, and Migration Plan section in KIP-902. +
  • The configuration log.message.timestamp.difference.max.ms is deprecated. + Two new configurations, log.message.timestamp.before.max.ms and log.message.timestamp.after.max.ms, have been added. + For more detailed information, please refer to KIP-937.
  • Kafka Streams has introduced a new task assignor, RackAwareTaskAssignor, for computing task assignments which can minimize cross rack traffic under certain conditions. It works with existing StickyTaskAssignor and HighAvailabilityTaskAssignor. diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/record/CompressedRecordBatchValidationBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/record/CompressedRecordBatchValidationBenchmark.java index 878ab946877..e96ec08e1ab 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/record/CompressedRecordBatchValidationBenchmark.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/record/CompressedRecordBatchValidationBenchmark.java @@ -53,7 +53,7 @@ public class CompressedRecordBatchValidationBenchmark extends BaseRecordBatchBen MemoryRecords records = MemoryRecords.readableRecords(singleBatchBuffer.duplicate()); new LogValidator(records, new TopicPartition("a", 0), Time.SYSTEM, compressionType, compressionType, false, messageVersion, - TimestampType.CREATE_TIME, Long.MAX_VALUE, 0, AppendOrigin.CLIENT, + TimestampType.CREATE_TIME, Long.MAX_VALUE, Long.MAX_VALUE, 0, AppendOrigin.CLIENT, MetadataVersion.latest() ).validateMessagesAndAssignOffsetsCompressed(PrimitiveRef.ofLong(startingOffset), validatorMetricsRecorder, requestLocal.bufferSupplier()); diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/record/UncompressedRecordBatchValidationBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/record/UncompressedRecordBatchValidationBenchmark.java index d08b408e315..fda12efa6c4 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/record/UncompressedRecordBatchValidationBenchmark.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/record/UncompressedRecordBatchValidationBenchmark.java @@ -49,7 +49,7 @@ public class UncompressedRecordBatchValidationBenchmark extends BaseRecordBatchB MemoryRecords records = MemoryRecords.readableRecords(singleBatchBuffer.duplicate()); new LogValidator(records, new TopicPartition("a", 0), Time.SYSTEM, CompressionType.NONE, CompressionType.NONE, false, - messageVersion, TimestampType.CREATE_TIME, Long.MAX_VALUE, 0, AppendOrigin.CLIENT, + messageVersion, TimestampType.CREATE_TIME, Long.MAX_VALUE, Long.MAX_VALUE, 0, AppendOrigin.CLIENT, MetadataVersion.latest() ).assignOffsetsNonCompressed(PrimitiveRef.ofLong(startingOffset), validatorMetricsRecorder); } diff --git a/server-common/src/main/java/org/apache/kafka/server/config/ServerTopicConfigSynonyms.java b/server-common/src/main/java/org/apache/kafka/server/config/ServerTopicConfigSynonyms.java index ece3891a419..c53f8da84f9 100644 --- a/server-common/src/main/java/org/apache/kafka/server/config/ServerTopicConfigSynonyms.java +++ b/server-common/src/main/java/org/apache/kafka/server/config/ServerTopicConfigSynonyms.java @@ -83,6 +83,8 @@ public final class ServerTopicConfigSynonyms { sameNameWithLogPrefix(TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG), sameNameWithLogPrefix(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG), sameNameWithLogPrefix(TopicConfig.MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_CONFIG), + sameNameWithLogPrefix(TopicConfig.MESSAGE_TIMESTAMP_BEFORE_MAX_MS_CONFIG), + sameNameWithLogPrefix(TopicConfig.MESSAGE_TIMESTAMP_AFTER_MAX_MS_CONFIG), sameNameWithLogPrefix(TopicConfig.MESSAGE_DOWNCONVERSION_ENABLE_CONFIG), sameNameWithLogPrefix(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG), sameNameWithLogPrefix(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG) diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java index ce2880865dd..f4b402b8e3b 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java @@ -175,7 +175,12 @@ public class LogConfig extends AbstractConfig { public static final String DEFAULT_COMPRESSION_TYPE = BrokerCompressionType.PRODUCER.name; public static final boolean DEFAULT_PREALLOCATE = false; public static final String DEFAULT_MESSAGE_TIMESTAMP_TYPE = "CreateTime"; + /* See `TopicConfig.MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_CONFIG` for details */ + @Deprecated public static final long DEFAULT_MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS = Long.MAX_VALUE; + + public static final long DEFAULT_MESSAGE_TIMESTAMP_BEFORE_MAX_MS = Long.MAX_VALUE; + public static final long DEFAULT_MESSAGE_TIMESTAMP_AFTER_MAX_MS = Long.MAX_VALUE; public static final boolean DEFAULT_MESSAGE_DOWNCONVERSION_ENABLE = true; public static final boolean DEFAULT_REMOTE_STORAGE_ENABLE = false; @@ -192,9 +197,16 @@ public class LogConfig extends AbstractConfig { public static final String LEADER_REPLICATION_THROTTLED_REPLICAS_CONFIG = "leader.replication.throttled.replicas"; public static final String FOLLOWER_REPLICATION_THROTTLED_REPLICAS_CONFIG = "follower.replication.throttled.replicas"; + /* See `TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG` for details */ @SuppressWarnings("deprecation") private static final String MESSAGE_FORMAT_VERSION_CONFIG = TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG; + @SuppressWarnings("deprecation") + private static final String MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_CONFIG = TopicConfig.MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_CONFIG; + + @SuppressWarnings("deprecation") + private static final String MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_DOC = TopicConfig.MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_DOC; + // Visible for testing public static final Set CONFIGS_WITH_NO_SERVER_DEFAULTS = Collections.unmodifiableSet(Utils.mkSet( TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, @@ -261,8 +273,12 @@ public class LogConfig extends AbstractConfig { MESSAGE_FORMAT_VERSION_DOC) .define(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG, STRING, DEFAULT_MESSAGE_TIMESTAMP_TYPE, in("CreateTime", "LogAppendTime"), MEDIUM, TopicConfig.MESSAGE_TIMESTAMP_TYPE_DOC) - .define(TopicConfig.MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_CONFIG, LONG, DEFAULT_MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS, - atLeast(0), MEDIUM, TopicConfig.MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_DOC) + .define(MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_CONFIG, LONG, DEFAULT_MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS, + atLeast(0), MEDIUM, MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_DOC) + .define(TopicConfig.MESSAGE_TIMESTAMP_BEFORE_MAX_MS_CONFIG, LONG, DEFAULT_MESSAGE_TIMESTAMP_BEFORE_MAX_MS, + atLeast(0), MEDIUM, TopicConfig.MESSAGE_TIMESTAMP_BEFORE_MAX_MS_DOC) + .define(TopicConfig.MESSAGE_TIMESTAMP_AFTER_MAX_MS_CONFIG, LONG, DEFAULT_MESSAGE_TIMESTAMP_AFTER_MAX_MS, + atLeast(0), MEDIUM, TopicConfig.MESSAGE_TIMESTAMP_AFTER_MAX_MS_DOC) .define(LEADER_REPLICATION_THROTTLED_REPLICAS_CONFIG, LIST, DEFAULT_LEADER_REPLICATION_THROTTLED_REPLICAS, ThrottledReplicaListValidator.INSTANCE, MEDIUM, LEADER_REPLICATION_THROTTLED_REPLICAS_DOC) .define(FOLLOWER_REPLICATION_THROTTLED_REPLICAS_CONFIG, LIST, DEFAULT_FOLLOWER_REPLICATION_THROTTLED_REPLICAS, @@ -309,7 +325,12 @@ public class LogConfig extends AbstractConfig { public final MetadataVersion messageFormatVersion; public final TimestampType messageTimestampType; + + /* See `TopicConfig.MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_CONFIG` for details regarding the deprecation */ + @Deprecated public final long messageTimestampDifferenceMaxMs; + public final long messageTimestampBeforeMaxMs; + public final long messageTimestampAfterMaxMs; public final List leaderReplicationThrottledReplicas; public final List followerReplicationThrottledReplicas; public final boolean messageDownConversionEnable; @@ -358,6 +379,8 @@ public class LogConfig extends AbstractConfig { this.messageFormatVersion = MetadataVersion.fromVersionString(getString(TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG)); this.messageTimestampType = TimestampType.forName(getString(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG)); this.messageTimestampDifferenceMaxMs = getLong(TopicConfig.MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_CONFIG); + this.messageTimestampBeforeMaxMs = getMessageTimestampBeforeMaxMs(); + this.messageTimestampAfterMaxMs = getMessageTimestampAfterMaxMs(); this.leaderReplicationThrottledReplicas = Collections.unmodifiableList(getList(LogConfig.LEADER_REPLICATION_THROTTLED_REPLICAS_CONFIG)); this.followerReplicationThrottledReplicas = Collections.unmodifiableList(getList(LogConfig.FOLLOWER_REPLICATION_THROTTLED_REPLICAS_CONFIG)); this.messageDownConversionEnable = getBoolean(TopicConfig.MESSAGE_DOWNCONVERSION_ENABLE_CONFIG); @@ -365,6 +388,30 @@ public class LogConfig extends AbstractConfig { remoteLogConfig = new RemoteLogConfig(this); } + //In the transition period before messageTimestampDifferenceMaxMs is removed, to maintain backward compatibility, + // we are using its value if messageTimestampBeforeMaxMs default value hasn't changed. + @SuppressWarnings("deprecation") + private long getMessageTimestampBeforeMaxMs() { + final Long messageTimestampBeforeMaxMs = getLong(TopicConfig.MESSAGE_TIMESTAMP_BEFORE_MAX_MS_CONFIG); + if (!messageTimestampBeforeMaxMs.equals(Long.MAX_VALUE)) { + return messageTimestampBeforeMaxMs; + } else { + return messageTimestampDifferenceMaxMs; + } + } + + //In the transition period before messageTimestampDifferenceMaxMs is removed, to maintain backward compatibility, + // we are using its value if messageTimestampAfterMaxMs default value hasn't changed. + @SuppressWarnings("deprecation") + private long getMessageTimestampAfterMaxMs() { + final Long messageTimestampAfterMaxMs = getLong(TopicConfig.MESSAGE_TIMESTAMP_AFTER_MAX_MS_CONFIG); + if (!messageTimestampAfterMaxMs.equals(Long.MAX_VALUE)) { + return messageTimestampAfterMaxMs; + } else { + return messageTimestampDifferenceMaxMs; + } + } + @SuppressWarnings("deprecation") public RecordVersion recordVersion() { return messageFormatVersion.highestSupportedRecordVersion(); diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java index 640fe472a14..a05512a0196 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java @@ -102,7 +102,8 @@ public class LogValidator { private final boolean compactedTopic; private final byte toMagic; private final TimestampType timestampType; - private final long timestampDiffMaxMs; + private final long timestampBeforeMaxMs; + private final long timestampAfterMaxMs; private final int partitionLeaderEpoch; private final AppendOrigin origin; private final MetadataVersion interBrokerProtocolVersion; @@ -115,7 +116,8 @@ public class LogValidator { boolean compactedTopic, byte toMagic, TimestampType timestampType, - long timestampDiffMaxMs, + long timestampBeforeMaxMs, + long timestampAfterMaxMs, int partitionLeaderEpoch, AppendOrigin origin, MetadataVersion interBrokerProtocolVersion) { @@ -127,7 +129,8 @@ public class LogValidator { this.compactedTopic = compactedTopic; this.toMagic = toMagic; this.timestampType = timestampType; - this.timestampDiffMaxMs = timestampDiffMaxMs; + this.timestampBeforeMaxMs = timestampBeforeMaxMs; + this.timestampAfterMaxMs = timestampAfterMaxMs; this.partitionLeaderEpoch = partitionLeaderEpoch; this.origin = origin; this.interBrokerProtocolVersion = interBrokerProtocolVersion; @@ -208,7 +211,7 @@ public class LogValidator { int batchIndex = 0; for (Record record : batch) { Optional recordError = validateRecord(batch, topicPartition, - record, batchIndex, now, timestampType, timestampDiffMaxMs, compactedTopic, + record, batchIndex, now, timestampType, timestampBeforeMaxMs, timestampAfterMaxMs, compactedTopic, metricsRecorder); recordError.ifPresent(e -> recordErrors.add(e)); // we fail the batch if any record fails, so we stop appending if any record fails @@ -257,7 +260,7 @@ public class LogValidator { int batchIndex = 0; for (Record record : batch) { Optional recordError = validateRecord(batch, topicPartition, record, - batchIndex, now, timestampType, timestampDiffMaxMs, compactedTopic, metricsRecorder); + batchIndex, now, timestampType, timestampBeforeMaxMs, timestampAfterMaxMs, compactedTopic, metricsRecorder); recordError.ifPresent(e -> recordErrors.add(e)); long offset = offsetCounter.value++; @@ -368,7 +371,7 @@ public class LogValidator { batchIndex, record); if (!recordError.isPresent()) { recordError = validateRecord(batch, topicPartition, record, batchIndex, now, - timestampType, timestampDiffMaxMs, compactedTopic, metricsRecorder); + timestampType, timestampBeforeMaxMs, timestampAfterMaxMs, compactedTopic, metricsRecorder); } if (!recordError.isPresent() @@ -537,7 +540,8 @@ public class LogValidator { int batchIndex, long now, TimestampType timestampType, - long timestampDiffMaxMs, + long timestampBeforeMaxMs, + long timestampAfterMaxMs, boolean compactedTopic, MetricsRecorder metricsRecorder) { if (!record.hasMagic(batch.magic())) { @@ -566,7 +570,7 @@ public class LogValidator { if (keyError.isPresent()) return keyError; else - return validateTimestamp(batch, record, batchIndex, now, timestampType, timestampDiffMaxMs); + return validateTimestamp(batch, record, batchIndex, now, timestampType, timestampBeforeMaxMs, timestampAfterMaxMs); } private static Optional validateKey(Record record, @@ -588,20 +592,29 @@ public class LogValidator { int batchIndex, long now, TimestampType timestampType, - long timestampDiffMaxMs) { - if (timestampType == TimestampType.CREATE_TIME - && record.timestamp() != RecordBatch.NO_TIMESTAMP - && Math.abs(record.timestamp() - now) > timestampDiffMaxMs) - return Optional.of(new ApiRecordError(Errors.INVALID_TIMESTAMP, new RecordError(batchIndex, - "Timestamp " + record.timestamp() + " of message with offset " + record.offset() - + " is out of range. The timestamp should be within [" + (now - timestampDiffMaxMs) - + ", " + (now + timestampDiffMaxMs) + "]"))); - else if (batch.timestampType() == TimestampType.LOG_APPEND_TIME) + long timestampBeforeMaxMs, + long timestampAfterMaxMs) { + if (timestampType == TimestampType.CREATE_TIME && record.timestamp() != RecordBatch.NO_TIMESTAMP) { + if (recordHasInvalidTimestamp(record, now, timestampBeforeMaxMs, timestampAfterMaxMs)) { + return Optional.of(new ApiRecordError(Errors.INVALID_TIMESTAMP, new RecordError(batchIndex, + "Timestamp " + record.timestamp() + " of message with offset " + record.offset() + + " is out of range. The timestamp should be within [" + (now - timestampBeforeMaxMs) + + ", " + (now + timestampAfterMaxMs) + "]"))); + } + } else if (batch.timestampType() == TimestampType.LOG_APPEND_TIME) return Optional.of(new ApiRecordError(Errors.INVALID_TIMESTAMP, new RecordError(batchIndex, "Invalid timestamp type in message " + record + ". Producer should not set timestamp " + "type to LogAppendTime."))); - else - return Optional.empty(); + return Optional.empty(); + } + + private static boolean recordHasInvalidTimestamp(Record record, + long now, + long timestampBeforeMaxMs, + long timestampAfterMaxMs) { + final long timestampDiff = now - record.timestamp(); + return timestampDiff > timestampBeforeMaxMs || + -1 * timestampDiff > timestampAfterMaxMs; } private static Optional validateRecordCompression(CompressionType sourceCompression,