From b4be1785998d382843002ffd8685d67fe8d9c97d Mon Sep 17 00:00:00 2001 From: PoAn Yang Date: Sun, 22 Dec 2024 07:35:15 +0800 Subject: [PATCH] KAFKA-17393: Remove log.message.format.version/message.format.version (KIP-724) (#18267) Based on [KIP-724](https://cwiki.apache.org/confluence/display/KAFKA/KIP-724%3A+Drop+support+for+message+formats+v0+and+v1), the `log.message.format.version` and `message.format.version` can be removed in 4.0. These configs effectively a no-op with inter-broker protocol version 3.0 or higher since Apache Kafka 3.0, so the impact should be minimal. Reviewers: Ismael Juma --- .../kafka/common/config/TopicConfig.java | 20 -- .../common/runtime/CoordinatorRuntime.java | 3 +- .../scala/kafka/admin/ConfigCommand.scala | 10 +- .../group/GroupMetadataManager.scala | 214 +++++++++--------- .../src/main/scala/kafka/log/LogManager.scala | 19 +- .../src/main/scala/kafka/log/UnifiedLog.scala | 43 +--- .../scala/kafka/server/ConfigHandler.scala | 29 +-- .../kafka/server/DynamicBrokerConfig.scala | 22 +- .../main/scala/kafka/server/KafkaApis.scala | 9 +- .../main/scala/kafka/server/KafkaConfig.scala | 33 --- .../scala/kafka/server/ReplicaManager.scala | 2 - ...ribeTopicPartitionsRequestHandlerTest.java | 2 +- ...thLegacyMessageFormatIntegrationTest.scala | 25 +- .../kafka/cluster/PartitionLockTest.scala | 2 +- .../unit/kafka/cluster/PartitionTest.scala | 2 +- ...PartitionWithLegacyMessageFormatTest.scala | 66 ------ .../AbstractCoordinatorConcurrencyTest.scala | 5 +- .../group/GroupCoordinatorTest.scala | 32 +-- .../group/GroupMetadataManagerTest.scala | 67 +++--- .../TransactionStateManagerTest.scala | 2 - .../kafka/log/LogCleanerManagerTest.scala | 2 +- ...gCleanerParameterizedIntegrationTest.scala | 120 +--------- .../scala/unit/kafka/log/LogCleanerTest.scala | 2 +- .../scala/unit/kafka/log/LogConfigTest.scala | 8 - .../scala/unit/kafka/log/LogLoaderTest.scala | 196 +--------------- .../scala/unit/kafka/log/UnifiedLogTest.scala | 44 ---- .../server/DynamicBrokerConfigTest.scala | 5 - .../unit/kafka/server/KafkaApisTest.scala | 91 ++------ .../unit/kafka/server/KafkaConfigTest.scala | 36 +-- .../kafka/server/ReplicaManagerTest.scala | 2 +- .../unit/kafka/utils/SchedulerTest.scala | 2 +- .../scala/unit/kafka/utils/TestUtils.scala | 7 +- docs/upgrade.html | 2 + .../kafka/server/config/ServerLogConfigs.java | 14 -- .../config/ServerTopicConfigSynonyms.java | 2 - .../kafka/storage/internals/log/LocalLog.java | 6 - .../storage/internals/log/LogConfig.java | 69 +----- .../storage/internals/log/LogLoader.java | 2 - .../storage/internals/log/UnifiedLog.java | 16 +- .../org/apache/kafka/tools/TopicCommand.java | 5 - 40 files changed, 214 insertions(+), 1024 deletions(-) delete mode 100644 core/src/test/scala/unit/kafka/cluster/PartitionWithLegacyMessageFormatTest.scala diff --git a/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java b/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java index 56279421ff2..fb51d254cdd 100755 --- a/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java +++ b/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java @@ -198,26 +198,6 @@ public class TopicConfig { public static final String PREALLOCATE_DOC = "True if we should preallocate the file on disk when " + "creating a new log segment."; - /** - * @deprecated since 3.0, removal planned in 4.0. The default value for this config is appropriate - * for most situations. - */ - @Deprecated - public static final String MESSAGE_FORMAT_VERSION_CONFIG = "message.format.version"; - - /** - * @deprecated since 3.0, removal planned in 4.0. The default value for this config is appropriate - * for most situations. - */ - @Deprecated - public static final String MESSAGE_FORMAT_VERSION_DOC = "[DEPRECATED] Specify the message format version the broker " + - "will use to append messages to the logs. The value of this config is always assumed to be `3.0` if " + - "`inter.broker.protocol.version` is 3.0 or higher (the actual config value is ignored). Otherwise, the value should " + - "be a valid ApiVersion. Some examples are: 0.10.0, 1.1, 2.8, 3.0. By setting a particular message format version, the " + - "user is certifying that all the existing messages on disk are smaller or equal than the specified version. Setting " + - "this value incorrectly will cause consumers with older versions to break as they will receive messages with a format " + - "that they don't understand."; - public static final String MESSAGE_TIMESTAMP_TYPE_CONFIG = "message.timestamp.type"; public static final String MESSAGE_TIMESTAMP_TYPE_DOC = "Define whether the timestamp in the message is " + "message create time or log append time."; diff --git a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java index c08e20b98cd..5d9e9ef13a8 100644 --- a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java +++ b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java @@ -857,14 +857,13 @@ public class CoordinatorRuntime, U> implements Aut ) { if (currentBatch == null) { LogConfig logConfig = partitionWriter.config(tp); - byte magic = logConfig.recordVersion().value; int maxBatchSize = logConfig.maxMessageSize(); long prevLastWrittenOffset = coordinator.lastWrittenOffset(); ByteBuffer buffer = bufferSupplier.get(maxBatchSize); MemoryRecordsBuilder builder = new MemoryRecordsBuilder( buffer, - magic, + RecordBatch.CURRENT_MAGIC_VALUE, compression, TimestampType.CREATE_TIME, 0L, diff --git a/core/src/main/scala/kafka/admin/ConfigCommand.scala b/core/src/main/scala/kafka/admin/ConfigCommand.scala index fe1fe9faed3..aa38a6c85c4 100644 --- a/core/src/main/scala/kafka/admin/ConfigCommand.scala +++ b/core/src/main/scala/kafka/admin/ConfigCommand.scala @@ -25,7 +25,7 @@ import kafka.server.DynamicConfig import kafka.utils.Implicits._ import kafka.utils.Logging import org.apache.kafka.clients.admin.{Admin, AlterClientQuotasOptions, AlterConfigOp, AlterConfigsOptions, ConfigEntry, DescribeClusterOptions, DescribeConfigsOptions, ListTopicsOptions, ScramCredentialInfo, UserScramCredentialDeletion, UserScramCredentialUpsertion, ScramMechanism => PublicScramMechanism} -import org.apache.kafka.common.config.{ConfigResource, TopicConfig} +import org.apache.kafka.common.config.ConfigResource import org.apache.kafka.common.errors.{InvalidConfigurationException, UnsupportedVersionException} import org.apache.kafka.common.internals.Topic import org.apache.kafka.common.protocol.ApiKeys @@ -36,7 +36,6 @@ import org.apache.kafka.server.config.{ConfigType, QuotaConfig} import org.apache.kafka.server.util.{CommandDefaultOptions, CommandLineUtils} import org.apache.kafka.storage.internals.log.LogConfig -import scala.annotation.nowarn import scala.jdk.CollectionConverters._ import scala.collection._ @@ -95,8 +94,6 @@ object ConfigCommand extends Logging { } } - - @nowarn("cat=deprecation") def parseConfigsToBeAdded(opts: ConfigCommandOptions): Properties = { val props = new Properties if (opts.options.has(opts.addConfigFile)) { @@ -115,11 +112,6 @@ object ConfigCommand extends Logging { //Create properties, parsing square brackets from values if necessary configsToBeAdded.foreach(pair => props.setProperty(pair(0).trim, pair(1).replaceAll("\\[?\\]?", "").trim)) } - if (props.containsKey(TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG)) { - System.out.println(s"WARNING: The configuration ${TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG}=${props.getProperty(TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG)} is specified. " + - "This configuration will be ignored if the version is newer than the inter.broker.protocol.version specified in the broker or " + - "if the inter.broker.protocol.version is 3.0 or newer. This configuration is deprecated and it will be removed in Apache Kafka 4.0.") - } validatePropsKey(props) props } diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala index 653430f7304..383f9dbf66d 100644 --- a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala +++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala @@ -24,6 +24,7 @@ import java.util.concurrent.locks.ReentrantLock import java.util.concurrent.{CompletableFuture, ConcurrentHashMap} import java.util.function.Supplier import com.yammer.metrics.core.Gauge +import kafka.cluster.Partition import kafka.coordinator.group.GroupMetadataManager.maybeConvertOffsetCommitError import kafka.server.ReplicaManager import kafka.utils.CoreUtils.inLock @@ -240,79 +241,78 @@ class GroupMetadataManager(brokerId: Int, groupAssignment: Map[String, Array[Byte]], responseCallback: Errors => Unit, requestLocal: RequestLocal = RequestLocal.noCaching): Unit = { - getMagic(partitionFor(group.groupId)) match { - case Some(magicValue) => - // We always use CREATE_TIME, like the producer. The conversion to LOG_APPEND_TIME (if necessary) happens automatically. - val timestampType = TimestampType.CREATE_TIME - val timestamp = time.milliseconds() - val key = GroupMetadataManager.groupMetadataKey(group.groupId) - val value = GroupMetadataManager.groupMetadataValue(group, groupAssignment, interBrokerProtocolVersion) + if (onlinePartition(partitionFor(group.groupId)).isDefined) { + // We always use CREATE_TIME, like the producer. The conversion to LOG_APPEND_TIME (if necessary) happens automatically. + val timestampType = TimestampType.CREATE_TIME + val timestamp = time.milliseconds() + val key = GroupMetadataManager.groupMetadataKey(group.groupId) + val value = GroupMetadataManager.groupMetadataValue(group, groupAssignment, interBrokerProtocolVersion) - val records = { - val buffer = ByteBuffer.allocate(AbstractRecords.estimateSizeInBytes(magicValue, compression.`type`(), - Seq(new SimpleRecord(timestamp, key, value)).asJava)) - val builder = MemoryRecords.builder(buffer, magicValue, compression, timestampType, 0L) - builder.append(timestamp, key, value) - builder.build() - } + val records = { + val buffer = ByteBuffer.allocate(AbstractRecords.estimateSizeInBytes(RecordBatch.CURRENT_MAGIC_VALUE, compression.`type`(), + Seq(new SimpleRecord(timestamp, key, value)).asJava)) + val builder = MemoryRecords.builder(buffer, RecordBatch.CURRENT_MAGIC_VALUE, compression, timestampType, 0L) + builder.append(timestamp, key, value) + builder.build() + } - val groupMetadataPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, partitionFor(group.groupId)) - val groupMetadataRecords = Map(groupMetadataPartition -> records) - val generationId = group.generationId + val groupMetadataPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, partitionFor(group.groupId)) + val groupMetadataRecords = Map(groupMetadataPartition -> records) + val generationId = group.generationId - // set the callback function to insert the created group into cache after log append completed - def putCacheCallback(responseStatus: Map[TopicPartition, PartitionResponse]): Unit = { - // the append response should only contain the topics partition - if (responseStatus.size != 1 || !responseStatus.contains(groupMetadataPartition)) - throw new IllegalStateException("Append status %s should only have one partition %s" - .format(responseStatus, groupMetadataPartition)) + // set the callback function to insert the created group into cache after log append completed + def putCacheCallback(responseStatus: Map[TopicPartition, PartitionResponse]): Unit = { + // the append response should only contain the topics partition + if (responseStatus.size != 1 || !responseStatus.contains(groupMetadataPartition)) + throw new IllegalStateException("Append status %s should only have one partition %s" + .format(responseStatus, groupMetadataPartition)) - // construct the error status in the propagated assignment response in the cache - val status = responseStatus(groupMetadataPartition) + // construct the error status in the propagated assignment response in the cache + val status = responseStatus(groupMetadataPartition) - val responseError = if (status.error == Errors.NONE) { - Errors.NONE - } else { - debug(s"Metadata from group ${group.groupId} with generation $generationId failed when appending to log " + - s"due to ${status.error.exceptionName}") + val responseError = if (status.error == Errors.NONE) { + Errors.NONE + } else { + debug(s"Metadata from group ${group.groupId} with generation $generationId failed when appending to log " + + s"due to ${status.error.exceptionName}") - // transform the log append error code to the corresponding the commit status error code - status.error match { - case Errors.UNKNOWN_TOPIC_OR_PARTITION - | Errors.NOT_ENOUGH_REPLICAS - | Errors.NOT_ENOUGH_REPLICAS_AFTER_APPEND => - Errors.COORDINATOR_NOT_AVAILABLE + // transform the log append error code to the corresponding the commit status error code + status.error match { + case Errors.UNKNOWN_TOPIC_OR_PARTITION + | Errors.NOT_ENOUGH_REPLICAS + | Errors.NOT_ENOUGH_REPLICAS_AFTER_APPEND => + Errors.COORDINATOR_NOT_AVAILABLE - case Errors.NOT_LEADER_OR_FOLLOWER - | Errors.KAFKA_STORAGE_ERROR => - Errors.NOT_COORDINATOR + case Errors.NOT_LEADER_OR_FOLLOWER + | Errors.KAFKA_STORAGE_ERROR => + Errors.NOT_COORDINATOR - case Errors.REQUEST_TIMED_OUT => - Errors.REBALANCE_IN_PROGRESS + case Errors.REQUEST_TIMED_OUT => + Errors.REBALANCE_IN_PROGRESS - case Errors.MESSAGE_TOO_LARGE - | Errors.RECORD_LIST_TOO_LARGE - | Errors.INVALID_FETCH_SIZE => + case Errors.MESSAGE_TOO_LARGE + | Errors.RECORD_LIST_TOO_LARGE + | Errors.INVALID_FETCH_SIZE => - error(s"Appending metadata message for group ${group.groupId} generation $generationId failed due to " + - s"${status.error.exceptionName}, returning UNKNOWN error code to the client") + error(s"Appending metadata message for group ${group.groupId} generation $generationId failed due to " + + s"${status.error.exceptionName}, returning UNKNOWN error code to the client") - Errors.UNKNOWN_SERVER_ERROR + Errors.UNKNOWN_SERVER_ERROR - case other => - error(s"Appending metadata message for group ${group.groupId} generation $generationId failed " + - s"due to unexpected error: ${status.error.exceptionName}") + case other => + error(s"Appending metadata message for group ${group.groupId} generation $generationId failed " + + s"due to unexpected error: ${status.error.exceptionName}") - other - } + other } - - responseCallback(responseError) } - appendForGroup(group, groupMetadataRecords, requestLocal, putCacheCallback) - case None => - responseCallback(Errors.NOT_COORDINATOR) + responseCallback(responseError) + } + + appendForGroup(group, groupMetadataRecords, requestLocal, putCacheCallback) + } else { + responseCallback(Errors.NOT_COORDINATOR) } } @@ -464,8 +464,7 @@ class GroupMetadataManager(brokerId: Int, return } - val magicOpt = getMagic(partitionFor(group.groupId)) - if (magicOpt.isEmpty) { + if (onlinePartition(partitionFor(group.groupId)).isEmpty) { val commitStatus = offsetMetadata.map { case (topicIdPartition, _) => (topicIdPartition, Errors.NOT_COORDINATOR) } @@ -474,7 +473,7 @@ class GroupMetadataManager(brokerId: Int, } val isTxnOffsetCommit = producerId != RecordBatch.NO_PRODUCER_ID - val records = generateOffsetRecords(magicOpt.get, isTxnOffsetCommit, group.groupId, offsetTopicPartition, filteredOffsetMetadata, producerId, producerEpoch) + val records = generateOffsetRecords(RecordBatch.CURRENT_MAGIC_VALUE, isTxnOffsetCommit, group.groupId, offsetTopicPartition, filteredOffsetMetadata, producerId, producerEpoch) val putCacheCallback = createPutCacheCallback(isTxnOffsetCommit, group, consumerId, offsetMetadata, filteredOffsetMetadata, responseCallback, producerId, records) val verificationGuards = verificationGuard.map(guard => offsetTopicPartition -> guard).toMap @@ -868,56 +867,55 @@ class GroupMetadataManager(brokerId: Int, (removedOffsets, group.is(Dead), group.generationId) } - val offsetsPartition = partitionFor(groupId) - val appendPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, offsetsPartition) - getMagic(offsetsPartition) match { - case Some(magicValue) => - // We always use CREATE_TIME, like the producer. The conversion to LOG_APPEND_TIME (if necessary) happens automatically. - val timestampType = TimestampType.CREATE_TIME - val timestamp = time.milliseconds() + val offsetsPartition = partitionFor(groupId) + val appendPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, offsetsPartition) + onlinePartition(offsetsPartition) match { + case Some(partition) => + // We always use CREATE_TIME, like the producer. The conversion to LOG_APPEND_TIME (if necessary) happens automatically. + val timestampType = TimestampType.CREATE_TIME + val timestamp = time.milliseconds() - replicaManager.onlinePartition(appendPartition).foreach { partition => - val tombstones = ArrayBuffer.empty[SimpleRecord] - removedOffsets.foreachEntry { (topicPartition, offsetAndMetadata) => - trace(s"Removing expired/deleted offset and metadata for $groupId, $topicPartition: $offsetAndMetadata") - val commitKey = GroupMetadataManager.offsetCommitKey(groupId, topicPartition) - tombstones += new SimpleRecord(timestamp, commitKey, null) - } - trace(s"Marked ${removedOffsets.size} offsets in $appendPartition for deletion.") + val tombstones = ArrayBuffer.empty[SimpleRecord] + removedOffsets.foreachEntry { (topicPartition, offsetAndMetadata) => + trace(s"Removing expired/deleted offset and metadata for $groupId, $topicPartition: $offsetAndMetadata") + val commitKey = GroupMetadataManager.offsetCommitKey(groupId, topicPartition) + tombstones += new SimpleRecord(timestamp, commitKey, null) + } + trace(s"Marked ${removedOffsets.size} offsets in $appendPartition for deletion.") - // We avoid writing the tombstone when the generationId is 0, since this group is only using - // Kafka for offset storage. - if (groupIsDead && groupMetadataCache.remove(groupId, group) && generation > 0) { - // Append the tombstone messages to the partition. It is okay if the replicas don't receive these (say, - // if we crash or leaders move) since the new leaders will still expire the consumers with heartbeat and - // retry removing this group. - val groupMetadataKey = GroupMetadataManager.groupMetadataKey(group.groupId) - tombstones += new SimpleRecord(timestamp, groupMetadataKey, null) - trace(s"Group $groupId removed from the metadata cache and marked for deletion in $appendPartition.") - } + // We avoid writing the tombstone when the generationId is 0, since this group is only using + // Kafka for offset storage. + if (groupIsDead && groupMetadataCache.remove(groupId, group) && generation > 0) { + // Append the tombstone messages to the partition. It is okay if the replicas don't receive these (say, + // if we crash or leaders move) since the new leaders will still expire the consumers with heartbeat and + // retry removing this group. + val groupMetadataKey = GroupMetadataManager.groupMetadataKey(group.groupId) + tombstones += new SimpleRecord(timestamp, groupMetadataKey, null) + trace(s"Group $groupId removed from the metadata cache and marked for deletion in $appendPartition.") + } - if (tombstones.nonEmpty) { - try { - // do not need to require acks since even if the tombstone is lost, - // it will be appended again in the next purge cycle - val records = MemoryRecords.withRecords(magicValue, 0L, compression, timestampType, tombstones.toArray: _*) - partition.appendRecordsToLeader(records, origin = AppendOrigin.COORDINATOR, requiredAcks = 0, - requestLocal = requestLocal) + if (tombstones.nonEmpty) { + try { + // do not need to require acks since even if the tombstone is lost, + // it will be appended again in the next purge cycle + val records = MemoryRecords.withRecords(RecordBatch.CURRENT_MAGIC_VALUE, 0L, compression, timestampType, tombstones.toArray: _*) + partition.appendRecordsToLeader(records, origin = AppendOrigin.COORDINATOR, requiredAcks = 0, + requestLocal = requestLocal) - offsetsRemoved += removedOffsets.size - trace(s"Successfully appended ${tombstones.size} tombstones to $appendPartition for expired/deleted " + - s"offsets and/or metadata for group $groupId") - } catch { - case t: Throwable => - error(s"Failed to append ${tombstones.size} tombstones to $appendPartition for expired/deleted " + - s"offsets and/or metadata for group $groupId.", t) - // ignore and continue - } + offsetsRemoved += removedOffsets.size + trace(s"Successfully appended ${tombstones.size} tombstones to $appendPartition for expired/deleted " + + s"offsets and/or metadata for group $groupId") + } catch { + case t: Throwable => + error(s"Failed to append ${tombstones.size} tombstones to $appendPartition for expired/deleted " + + s"offsets and/or metadata for group $groupId.", t) + // ignore and continue } } - case None => - info(s"BrokerId $brokerId is no longer a coordinator for the group $groupId. Proceeding cleanup for other alive groups") + + case None => + info(s"BrokerId $brokerId is no longer a coordinator for the group $groupId. Proceeding cleanup for other alive groups") } } @@ -1002,14 +1000,8 @@ class GroupMetadataManager(brokerId: Int, // TODO: clear the caches } - /** - * Check if the replica is local and return the message format version - * - * @param partition Partition of GroupMetadataTopic - * @return Some(MessageFormatVersion) if replica is local, None otherwise - */ - private def getMagic(partition: Int): Option[Byte] = - replicaManager.getMagic(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, partition)) + private def onlinePartition(partition: Int): Option[Partition] = + replicaManager.onlinePartition(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, partition)) /** * Add a partition to the owned partition set. diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala index a0e68364138..c9e771a5a55 100755 --- a/core/src/main/scala/kafka/log/LogManager.scala +++ b/core/src/main/scala/kafka/log/LogManager.scala @@ -35,14 +35,12 @@ import scala.jdk.CollectionConverters._ import scala.collection._ import scala.collection.mutable.ArrayBuffer import scala.util.{Failure, Success, Try} -import org.apache.kafka.common.config.TopicConfig import org.apache.kafka.common.requests.{AbstractControlRequest, LeaderAndIsrRequest} import org.apache.kafka.image.TopicsImage import org.apache.kafka.metadata.properties.{MetaProperties, MetaPropertiesEnsemble, PropertiesUtils} import java.util.{Collections, OptionalLong, Properties} import org.apache.kafka.server.common.MetadataVersion -import org.apache.kafka.storage.internals.log.LogConfig.MessageFormatVersion import org.apache.kafka.server.metrics.KafkaMetricsGroup import org.apache.kafka.server.util.{FileLock, Scheduler} import org.apache.kafka.storage.internals.log.{CleanerConfig, LogConfig, LogDirFailureChannel, ProducerStateManagerConfig, RemoteIndexCache} @@ -50,7 +48,6 @@ import org.apache.kafka.storage.internals.checkpoint.{CleanShutdownFileHandler, import org.apache.kafka.storage.log.metrics.BrokerTopicStats import java.util -import scala.annotation.nowarn /** * The entry point to the kafka log management subsystem. The log manager is responsible for log creation, retrieval, and cleaning. @@ -583,27 +580,13 @@ class LogManager(logDirs: Seq[File], } // visible for testing - @nowarn("cat=deprecation") private[log] def fetchTopicConfigOverrides(defaultConfig: LogConfig, topicNames: Set[String]): Map[String, LogConfig] = { val topicConfigOverrides = mutable.Map[String, LogConfig]() val defaultProps = defaultConfig.originals() topicNames.foreach { topicName => - var overrides = configRepository.topicConfig(topicName) + val overrides = configRepository.topicConfig(topicName) // save memory by only including configs for topics with overrides if (!overrides.isEmpty) { - Option(overrides.getProperty(TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG)).foreach { versionString => - val messageFormatVersion = new MessageFormatVersion(versionString, interBrokerProtocolVersion.version) - if (messageFormatVersion.shouldIgnore) { - val copy = new Properties() - copy.putAll(overrides) - copy.remove(TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG) - overrides = copy - - if (messageFormatVersion.shouldWarn) - warn(messageFormatVersion.topicWarningMessage(topicName)) - } - } - val logConfig = LogConfig.fromProps(defaultProps, overrides) topicConfigOverrides(topicName) = logConfig } diff --git a/core/src/main/scala/kafka/log/UnifiedLog.scala b/core/src/main/scala/kafka/log/UnifiedLog.scala index 7c697ac4a52..3f129deec7f 100644 --- a/core/src/main/scala/kafka/log/UnifiedLog.scala +++ b/core/src/main/scala/kafka/log/UnifiedLog.scala @@ -30,7 +30,6 @@ import org.apache.kafka.common.requests.ProduceResponse.RecordError import org.apache.kafka.common.utils.{PrimitiveRef, Time, Utils} import org.apache.kafka.common.{InvalidRecordException, KafkaException, TopicPartition, Uuid} import org.apache.kafka.server.common.{MetadataVersion, OffsetAndEpoch, RequestLocal} -import org.apache.kafka.server.common.MetadataVersion.IBP_0_10_0_IV0 import org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig import org.apache.kafka.server.metrics.KafkaMetricsGroup import org.apache.kafka.server.record.BrokerCompressionType @@ -49,7 +48,6 @@ import java.util import java.util.concurrent.{ConcurrentHashMap, ConcurrentMap, ScheduledFuture} import java.util.stream.Collectors import java.util.{Collections, Optional, OptionalInt, OptionalLong} -import scala.annotation.nowarn import scala.collection.mutable.{ArrayBuffer, ListBuffer} import scala.collection.{Seq, mutable} import scala.jdk.CollectionConverters._ @@ -254,10 +252,6 @@ class UnifiedLog(@volatile var logStartOffset: Long, def updateConfig(newConfig: LogConfig): LogConfig = { val oldConfig = localLog.config localLog.updateConfig(newConfig) - val oldRecordVersion = oldConfig.recordVersion - val newRecordVersion = newConfig.recordVersion - if (newRecordVersion != oldRecordVersion) - initializeLeaderEpochCache() oldConfig } @@ -480,8 +474,6 @@ class UnifiedLog(@volatile var logStartOffset: Long, updateHighWatermark(localLog.logEndOffsetMetadata) } - private def recordVersion: RecordVersion = config.recordVersion - private def initializePartitionMetadata(): Unit = lock synchronized { val partitionMetadata = PartitionMetadataFile.newFile(dir) partitionMetadataFile = Some(new PartitionMetadataFile(partitionMetadata, logDirFailureChannel)) @@ -518,7 +510,7 @@ class UnifiedLog(@volatile var logStartOffset: Long, private def initializeLeaderEpochCache(): Unit = lock synchronized { leaderEpochCache = UnifiedLog.maybeCreateLeaderEpochCache( - dir, topicPartition, logDirFailureChannel, recordVersion, logIdent, leaderEpochCache, scheduler) + dir, topicPartition, logDirFailureChannel, logIdent, leaderEpochCache, scheduler) } private def updateHighWatermarkWithLogEndOffset(): Unit = { @@ -552,7 +544,7 @@ class UnifiedLog(@volatile var logStartOffset: Long, private def rebuildProducerState(lastOffset: Long, producerStateManager: ProducerStateManager): Unit = lock synchronized { localLog.checkIfMemoryMappedBufferClosed() - JUnifiedLog.rebuildProducerState(producerStateManager, localLog.segments, logStartOffset, lastOffset, recordVersion, time, false, logIdent) + JUnifiedLog.rebuildProducerState(producerStateManager, localLog.segments, logStartOffset, lastOffset, time, false, logIdent) } @threadsafe @@ -795,7 +787,7 @@ class UnifiedLog(@volatile var logStartOffset: Long, appendInfo.sourceCompression, targetCompression, config.compact, - config.recordVersion.value, + RecordBatch.CURRENT_MAGIC_VALUE, config.messageTimestampType, config.messageTimestampBeforeMaxMs, config.messageTimestampAfterMaxMs, @@ -1269,18 +1261,10 @@ class UnifiedLog(@volatile var logStartOffset: Long, *
  • All special timestamp offset results are returned immediately irrespective of the remote storage. * */ - @nowarn("cat=deprecation") def fetchOffsetByTimestamp(targetTimestamp: Long, remoteLogManager: Option[RemoteLogManager] = None): OffsetResultHolder = { maybeHandleIOException(s"Error while fetching offset by timestamp for $topicPartition in dir ${dir.getParent}") { debug(s"Searching offset for timestamp $targetTimestamp") - if (config.messageFormatVersion.isLessThan(IBP_0_10_0_IV0) && - targetTimestamp != ListOffsetsRequest.EARLIEST_TIMESTAMP && - targetTimestamp != ListOffsetsRequest.LATEST_TIMESTAMP) - throw new UnsupportedForMessageFormatException(s"Cannot search offsets based on timestamp because message format version " + - s"for partition $topicPartition is ${config.messageFormatVersion} which is earlier than the minimum " + - s"required version $IBP_0_10_0_IV0") - // For the earliest and latest, we do not need to return the timestamp. if (targetTimestamp == ListOffsetsRequest.EARLIEST_TIMESTAMP || (!remoteLogEnabled() && targetTimestamp == ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP)) { @@ -1354,9 +1338,6 @@ class UnifiedLog(@volatile var logStartOffset: Long, if (remoteLogManager.isEmpty) { throw new KafkaException("RemoteLogManager is empty even though the remote log storage is enabled.") } - if (recordVersion.value < RecordVersion.V2.value) { - throw new KafkaException("Tiered storage is supported only with versions supporting leader epochs, that means RecordVersion must be >= 2.") - } val asyncOffsetReadFutureHolder = remoteLogManager.get.asyncOffsetRead(topicPartition, targetTimestamp, logStartOffset, leaderEpochCache.get, () => searchOffsetInLocalLog(targetTimestamp, localLogStartOffset())) @@ -2038,7 +2019,6 @@ object UnifiedLog extends Logging { dir, topicPartition, logDirFailureChannel, - config.recordVersion, s"[UnifiedLog partition=$topicPartition, dir=${dir.getParent}] ", None, scheduler) @@ -2100,7 +2080,6 @@ object UnifiedLog extends Logging { * @param dir The directory in which the log will reside * @param topicPartition The topic partition * @param logDirFailureChannel The LogDirFailureChannel to asynchronously handle log dir failure - * @param recordVersion The record version * @param logPrefix The logging prefix * @param currentCache The current LeaderEpochFileCache instance (if any) * @param scheduler The scheduler for executing asynchronous tasks @@ -2109,23 +2088,13 @@ object UnifiedLog extends Logging { def maybeCreateLeaderEpochCache(dir: File, topicPartition: TopicPartition, logDirFailureChannel: LogDirFailureChannel, - recordVersion: RecordVersion, logPrefix: String, currentCache: Option[LeaderEpochFileCache], scheduler: Scheduler): Option[LeaderEpochFileCache] = { val leaderEpochFile = LeaderEpochCheckpointFile.newFile(dir) - - if (recordVersion.precedes(RecordVersion.V2)) { - if (leaderEpochFile.exists()) { - warn(s"${logPrefix}Deleting non-empty leader epoch cache due to incompatible message format $recordVersion") - } - Files.deleteIfExists(leaderEpochFile.toPath) - None - } else { - val checkpointFile = new LeaderEpochCheckpointFile(leaderEpochFile, logDirFailureChannel) - currentCache.map(_.withCheckpoint(checkpointFile)) - .orElse(Some(new LeaderEpochFileCache(topicPartition, checkpointFile, scheduler))) - } + val checkpointFile = new LeaderEpochCheckpointFile(leaderEpochFile, logDirFailureChannel) + currentCache.map(_.withCheckpoint(checkpointFile)) + .orElse(Some(new LeaderEpochFileCache(topicPartition, checkpointFile, scheduler))) } private[log] def replaceSegments(existingSegments: LogSegments, diff --git a/core/src/main/scala/kafka/server/ConfigHandler.scala b/core/src/main/scala/kafka/server/ConfigHandler.scala index d8cf895677d..e1bc4f9b393 100644 --- a/core/src/main/scala/kafka/server/ConfigHandler.scala +++ b/core/src/main/scala/kafka/server/ConfigHandler.scala @@ -25,7 +25,6 @@ import kafka.network.ConnectionQuotas import kafka.server.QuotaFactory.QuotaManagers import kafka.utils.Logging import org.apache.kafka.server.config.{QuotaConfig, ReplicationConfigs, ZooKeeperInternals} -import org.apache.kafka.common.config.TopicConfig import org.apache.kafka.common.metrics.Quota import org.apache.kafka.common.metrics.Quota._ import org.apache.kafka.common.utils.Sanitizer @@ -34,9 +33,7 @@ import org.apache.kafka.security.CredentialProvider import org.apache.kafka.server.ClientMetricsManager import org.apache.kafka.server.common.StopPartition import org.apache.kafka.storage.internals.log.{LogStartOffsetIncrementReason, ThrottledReplicaListValidator} -import org.apache.kafka.storage.internals.log.LogConfig.MessageFormatVersion -import scala.annotation.nowarn import scala.jdk.CollectionConverters._ import scala.collection.Seq import scala.util.Try @@ -60,19 +57,13 @@ class TopicConfigHandler(private val replicaManager: ReplicaManager, private def updateLogConfig(topic: String, topicConfig: Properties): Unit = { val logManager = replicaManager.logManager - // Validate the configurations. - val configNamesToExclude = excludedConfigs(topic, topicConfig) - val props = new Properties() - topicConfig.asScala.foreachEntry { (key, value) => - if (!configNamesToExclude.contains(key)) props.put(key, value) - } val logs = logManager.logsByTopic(topic) val wasRemoteLogEnabled = logs.exists(_.remoteLogEnabled()) val wasCopyDisabled = logs.exists(_.config.remoteLogCopyDisable()) // kafkaController is only defined in Zookeeper's mode - logManager.updateTopicConfig(topic, props, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(), + logManager.updateTopicConfig(topic, topicConfig, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(), wasRemoteLogEnabled, kafkaController.isDefined) maybeUpdateRemoteLogComponents(topic, logs, wasRemoteLogEnabled, wasCopyDisabled) } @@ -158,24 +149,6 @@ class TopicConfigHandler(private val replicaManager: ReplicaManager, .map(_ (0).toInt).toSeq //convert to list of partition ids } } - - @nowarn("cat=deprecation") - private def excludedConfigs(topic: String, topicConfig: Properties): Set[String] = { - // Verify message format version - Option(topicConfig.getProperty(TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG)).flatMap { versionString => - val messageFormatVersion = new MessageFormatVersion(versionString, kafkaConfig.interBrokerProtocolVersion.version) - if (messageFormatVersion.shouldIgnore) { - if (messageFormatVersion.shouldWarn) - warn(messageFormatVersion.topicWarningMessage(topic)) - Some(TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG) - } else if (kafkaConfig.interBrokerProtocolVersion.isLessThan(messageFormatVersion.messageFormatVersion)) { - warn(s"Topic configuration ${TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG} is ignored for `$topic` because `$versionString` " + - s"is higher than what is allowed by the inter-broker protocol version `${kafkaConfig.interBrokerProtocolVersionString}`") - Some(TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG) - } else - None - }.toSet - } } diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala index 02fbf7eb2f0..0fc7b8a49e8 100755 --- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala +++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala @@ -29,7 +29,7 @@ import kafka.utils.{CoreUtils, Logging} import kafka.zk.{AdminZkClient, KafkaZkClient} import org.apache.kafka.common.Reconfigurable import org.apache.kafka.common.config.internals.BrokerSecurityConfigs -import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, ConfigException, SaslConfigs, SslConfigs, TopicConfig} +import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, ConfigException, SaslConfigs, SslConfigs} import org.apache.kafka.common.metrics.{Metrics, MetricsReporter} import org.apache.kafka.common.config.types.Password import org.apache.kafka.common.network.{ListenerName, ListenerReconfigurable} @@ -662,24 +662,13 @@ trait BrokerReconfigurable { } object DynamicLogConfig { - /** - * The log configurations that are non-reconfigurable. This set contains the names you - * would use when setting a dynamic configuration on a topic, which are different than the - * corresponding broker configuration names. - * - * For now, message.format.version is not reconfigurable, since we need to check that - * the version is supported on all brokers in the cluster. - */ - val NonReconfigrableLogConfigs: Set[String] = Set(TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG) - /** * The broker configurations pertaining to logs that are reconfigurable. This set contains * the names you would use when setting a static or dynamic broker configuration (not topic * configuration). */ val ReconfigurableConfigs: Set[String] = - ServerTopicConfigSynonyms.TOPIC_CONFIG_SYNONYMS.asScala. - filterNot(s => NonReconfigrableLogConfigs.contains(s._1)).values.toSet + ServerTopicConfigSynonyms.TOPIC_CONFIG_SYNONYMS.asScala.values.toSet } class DynamicLogConfig(logManager: LogManager, server: KafkaBroker) extends BrokerReconfigurable with Logging { @@ -745,13 +734,6 @@ class DynamicLogConfig(logManager: LogManager, server: KafkaBroker) extends Brok val originalLogConfig = logManager.currentDefaultConfig val originalUncleanLeaderElectionEnable = originalLogConfig.uncleanLeaderElectionEnable val newBrokerDefaults = new util.HashMap[String, Object](newConfig.extractLogConfigMap) - val originalLogConfigMap = originalLogConfig.originals() - DynamicLogConfig.NonReconfigrableLogConfigs.foreach(k => { - Option(originalLogConfigMap.get(k)) match { - case None => newBrokerDefaults.remove(k) - case Some(v) => newBrokerDefaults.put(k, v) - } - }) logManager.reconfigureDefaultLogConfig(new LogConfig(newBrokerDefaults)) diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 04a4f18087d..c1e8807de9d 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -2251,12 +2251,9 @@ class KafkaApis(val requestChannel: RequestChannel, val currentErrors = new ConcurrentHashMap[TopicPartition, Errors]() marker.partitions.forEach { partition => - replicaManager.getMagic(partition) match { - case Some(magic) => - if (magic < RecordBatch.MAGIC_VALUE_V2) - currentErrors.put(partition, Errors.UNSUPPORTED_FOR_MESSAGE_FORMAT) - else - partitionsWithCompatibleMessageFormat += partition + replicaManager.onlinePartition(partition) match { + case Some(_) => + partitionsWithCompatibleMessageFormat += partition case None => currentErrors.put(partition, Errors.UNKNOWN_TOPIC_OR_PARTITION) } diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 4eea07f4dbd..513c1273d16 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -50,10 +50,8 @@ import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig import org.apache.kafka.server.metrics.MetricConfigs import org.apache.kafka.server.util.Csv import org.apache.kafka.storage.internals.log.{CleanerConfig, LogConfig} -import org.apache.kafka.storage.internals.log.LogConfig.MessageFormatVersion import org.apache.zookeeper.client.ZKClientConfig -import scala.annotation.nowarn import scala.jdk.CollectionConverters._ import scala.collection.{Map, Seq} import scala.jdk.OptionConverters.RichOptional @@ -475,18 +473,6 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _]) def logPreAllocateEnable: java.lang.Boolean = getBoolean(ServerLogConfigs.LOG_PRE_ALLOCATE_CONFIG) def logInitialTaskDelayMs: java.lang.Long = Option(getLong(ServerLogConfigs.LOG_INITIAL_TASK_DELAY_MS_CONFIG)).getOrElse(ServerLogConfigs.LOG_INITIAL_TASK_DELAY_MS_DEFAULT) - // We keep the user-provided String as `MetadataVersion.fromVersionString` can choose a slightly different version (eg if `0.10.0` - // is passed, `0.10.0-IV0` may be picked) - @nowarn("cat=deprecation") - private val logMessageFormatVersionString = getString(ServerLogConfigs.LOG_MESSAGE_FORMAT_VERSION_CONFIG) - - /* See `TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG` for details */ - @deprecated("3.0") - lazy val logMessageFormatVersion = - if (LogConfig.shouldIgnoreMessageFormatVersion(interBrokerProtocolVersion)) - MetadataVersion.fromVersionString(ServerLogConfigs.LOG_MESSAGE_FORMAT_VERSION_DEFAULT) - else MetadataVersion.fromVersionString(logMessageFormatVersionString) - def logMessageTimestampType = TimestampType.forName(getString(ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_TYPE_CONFIG)) def logMessageTimestampBeforeMaxMs: Long = getLong(ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_BEFORE_MAX_MS_CONFIG) @@ -789,7 +775,6 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _]) validateValues() - @nowarn("cat=deprecation") private def validateValues(): Unit = { if (nodeId != brokerId) { throw new ConfigException(s"You must set `${KRaftConfigs.NODE_ID_CONFIG}` to the same value as `${ServerConfigs.BROKER_ID_CONFIG}`.") @@ -946,15 +931,6 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _]) s"Currently they both have the value ${controlPlaneListenerName.get}") } - val messageFormatVersion = new MessageFormatVersion(logMessageFormatVersionString, interBrokerProtocolVersionString) - if (messageFormatVersion.shouldWarn) - warn(createBrokerWarningMessage) - - val recordVersion = logMessageFormatVersion.highestSupportedRecordVersion - require(interBrokerProtocolVersion.highestSupportedRecordVersion().value >= recordVersion.value, - s"log.message.format.version $logMessageFormatVersionString can only be used when inter.broker.protocol.version " + - s"is set to version ${MetadataVersion.minSupportedFor(recordVersion).shortVersion} or higher") - if (groupCoordinatorConfig.offsetTopicCompressionType == CompressionType.ZSTD) require(interBrokerProtocolVersion.highestSupportedRecordVersion().value >= IBP_2_1_IV0.highestSupportedRecordVersion().value, "offsets.topic.compression.codec zstd can only be used when inter.broker.protocol.version " + @@ -1004,7 +980,6 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _]) * Copy the subset of properties that are relevant to Logs. The individual properties * are listed here since the names are slightly different in each Config class... */ - @nowarn("cat=deprecation") def extractLogConfigMap: java.util.Map[String, Object] = { val logProps = new java.util.HashMap[String, Object]() logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, logSegmentBytes) @@ -1030,7 +1005,6 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _]) logProps.put(TopicConfig.COMPRESSION_ZSTD_LEVEL_CONFIG, zstdCompressionLevel) logProps.put(TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG, uncleanLeaderElectionEnable) logProps.put(TopicConfig.PREALLOCATE_CONFIG, logPreAllocateEnable) - logProps.put(TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG, logMessageFormatVersion.version) logProps.put(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG, logMessageTimestampType.name) logProps.put(TopicConfig.MESSAGE_TIMESTAMP_BEFORE_MAX_MS_CONFIG, logMessageTimestampBeforeMaxMs: java.lang.Long) logProps.put(TopicConfig.MESSAGE_TIMESTAMP_AFTER_MAX_MS_CONFIG, logMessageTimestampAfterMaxMs: java.lang.Long) @@ -1038,11 +1012,4 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _]) logProps.put(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, remoteLogManagerConfig.logLocalRetentionBytes: java.lang.Long) logProps } - - @nowarn("cat=deprecation") - private def createBrokerWarningMessage: String = { - s"Broker configuration ${ServerLogConfigs.LOG_MESSAGE_FORMAT_VERSION_CONFIG} with value $logMessageFormatVersionString is ignored " + - s"because the inter-broker protocol version `$interBrokerProtocolVersionString` is greater or equal than 3.0. " + - "This configuration is deprecated and it will be removed in Apache Kafka 4.0." - } } diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 6b3b8c95175..cfce9aaf8aa 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -2046,8 +2046,6 @@ class ReplicaManager(val config: KafkaConfig, def getLogConfig(topicPartition: TopicPartition): Option[LogConfig] = localLog(topicPartition).map(_.config) - def getMagic(topicPartition: TopicPartition): Option[Byte] = getLogConfig(topicPartition).map(_.recordVersion.value) - def maybeUpdateMetadataCache(correlationId: Int, updateMetadataRequest: UpdateMetadataRequest) : Seq[TopicPartition] = { replicaStateChangeLock synchronized { if (updateMetadataRequest.controllerEpoch < controllerEpoch) { diff --git a/core/src/test/java/kafka/server/handlers/DescribeTopicPartitionsRequestHandlerTest.java b/core/src/test/java/kafka/server/handlers/DescribeTopicPartitionsRequestHandlerTest.java index 643d9d333f5..ae98ca4b55f 100644 --- a/core/src/test/java/kafka/server/handlers/DescribeTopicPartitionsRequestHandlerTest.java +++ b/core/src/test/java/kafka/server/handlers/DescribeTopicPartitionsRequestHandlerTest.java @@ -559,7 +559,7 @@ class DescribeTopicPartitionsRequestHandlerTest { int voterId = brokerId + 1; properties.put(QuorumConfig.QUORUM_VOTERS_CONFIG, voterId + "@localhost:9093"); properties.put(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "SSL"); - TestUtils.setIbpAndMessageFormatVersions(properties, MetadataVersion.latestProduction()); + TestUtils.setIbpVersion(properties, MetadataVersion.latestProduction()); return new KafkaConfig(properties); } } \ No newline at end of file diff --git a/core/src/test/scala/integration/kafka/api/ConsumerWithLegacyMessageFormatIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/ConsumerWithLegacyMessageFormatIntegrationTest.scala index ab4cd82d6f6..e3c8b2b85a1 100644 --- a/core/src/test/scala/integration/kafka/api/ConsumerWithLegacyMessageFormatIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/ConsumerWithLegacyMessageFormatIntegrationTest.scala @@ -17,21 +17,17 @@ package kafka.api import kafka.utils.TestInfoUtils -import org.apache.kafka.clients.producer.ProducerConfig import org.apache.kafka.common.TopicPartition -import org.apache.kafka.common.config.TopicConfig import org.junit.jupiter.api.Assertions.{assertEquals, assertNull, assertThrows} import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.MethodSource import java.util -import java.util.{Collections, Optional, Properties} -import scala.annotation.nowarn +import java.util.{Collections, Optional} import scala.jdk.CollectionConverters._ class ConsumerWithLegacyMessageFormatIntegrationTest extends AbstractConsumerTest { - @nowarn("cat=deprecation") @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) def testOffsetsForTimes(quorum: String, groupProtocol: String): Unit = { @@ -39,11 +35,8 @@ class ConsumerWithLegacyMessageFormatIntegrationTest extends AbstractConsumerTes val topic1 = "part-test-topic-1" val topic2 = "part-test-topic-2" val topic3 = "part-test-topic-3" - val props = new Properties() - props.setProperty(TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG, "0.9.0") createTopic(topic1, numParts) - // Topic2 is in old message format. - createTopic(topic2, numParts, 1, props) + createTopic(topic2, numParts) createTopic(topic3, numParts) val consumer = createConsumer() @@ -102,20 +95,14 @@ class ConsumerWithLegacyMessageFormatIntegrationTest extends AbstractConsumerTes assertNull(timestampOffsets.get(new TopicPartition(topic3, 1))) } - @nowarn("cat=deprecation") @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) def testEarliestOrLatestOffsets(quorum: String, groupProtocol: String): Unit = { - val topic0 = "topicWithNewMessageFormat" - val topic1 = "topicWithOldMessageFormat" - val prop = new Properties() - // idempotence producer doesn't support old version of messages - prop.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "false") - val producer = createProducer(configOverrides = prop) + val topic0 = "topic0" + val topic1 = "topic1" + val producer = createProducer() createTopicAndSendRecords(producer, topicName = topic0, numPartitions = 2, recordsPerPartition = 100) - val props = new Properties() - props.setProperty(TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG, "0.9.0") - createTopic(topic1, numPartitions = 1, replicationFactor = 1, props) + createTopic(topic1) sendRecords(producer, numRecords = 100, new TopicPartition(topic1, 0)) val t0p0 = new TopicPartition(topic0, 0) diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala b/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala index 179e098c347..8bdc80e9a4f 100644 --- a/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala +++ b/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala @@ -303,7 +303,7 @@ class PartitionLockTest extends Logging { val logDirFailureChannel = new LogDirFailureChannel(1) val segments = new LogSegments(log.topicPartition) val leaderEpochCache = UnifiedLog.maybeCreateLeaderEpochCache( - log.dir, log.topicPartition, logDirFailureChannel, log.config.recordVersion, "", None, mockTime.scheduler) + log.dir, log.topicPartition, logDirFailureChannel, "", None, mockTime.scheduler) val maxTransactionTimeout = 5 * 60 * 1000 val producerStateManagerConfig = new ProducerStateManagerConfig(TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_DEFAULT, false) val producerStateManager = new ProducerStateManager( diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala index bc67a93b1be..f21522067ca 100644 --- a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala +++ b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala @@ -448,7 +448,7 @@ class PartitionTest extends AbstractPartitionTest { val logDirFailureChannel = new LogDirFailureChannel(1) val segments = new LogSegments(log.topicPartition) val leaderEpochCache = UnifiedLog.maybeCreateLeaderEpochCache( - log.dir, log.topicPartition, logDirFailureChannel, log.config.recordVersion, "", None, time.scheduler) + log.dir, log.topicPartition, logDirFailureChannel, "", None, time.scheduler) val maxTransactionTimeoutMs = 5 * 60 * 1000 val producerStateManagerConfig = new ProducerStateManagerConfig(TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_DEFAULT, true) val producerStateManager = new ProducerStateManager( diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionWithLegacyMessageFormatTest.scala b/core/src/test/scala/unit/kafka/cluster/PartitionWithLegacyMessageFormatTest.scala deleted file mode 100644 index 8185781c135..00000000000 --- a/core/src/test/scala/unit/kafka/cluster/PartitionWithLegacyMessageFormatTest.scala +++ /dev/null @@ -1,66 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package kafka.cluster - -import kafka.utils.TestUtils -import org.apache.kafka.common.config.TopicConfig -import org.apache.kafka.common.record.{RecordVersion, SimpleRecord} -import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.{UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET} -import org.junit.jupiter.api.Assertions.assertEquals -import org.junit.jupiter.api.Test - -import java.util.Optional -import org.apache.kafka.server.common.MetadataVersion -import org.apache.kafka.server.common.MetadataVersion.IBP_2_8_IV1 - -import scala.annotation.nowarn - -class PartitionWithLegacyMessageFormatTest extends AbstractPartitionTest { - - // legacy message formats are only supported with IBP < 3.0 - override protected def interBrokerProtocolVersion: MetadataVersion = IBP_2_8_IV1 - - @nowarn("cat=deprecation") - @Test - def testMakeLeaderDoesNotUpdateEpochCacheForOldFormats(): Unit = { - val leaderEpoch = 8 - configRepository.setTopicConfig(topicPartition.topic(), - TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG, MetadataVersion.IBP_0_10_2_IV0.shortVersion) - val log = logManager.getOrCreateLog(topicPartition, topicId = None) - log.appendAsLeader(TestUtils.records(List( - new SimpleRecord("k1".getBytes, "v1".getBytes), - new SimpleRecord("k2".getBytes, "v2".getBytes)), - magicValue = RecordVersion.V1.value - ), leaderEpoch = 0) - log.appendAsLeader(TestUtils.records(List( - new SimpleRecord("k3".getBytes, "v3".getBytes), - new SimpleRecord("k4".getBytes, "v4".getBytes)), - magicValue = RecordVersion.V1.value - ), leaderEpoch = 5) - assertEquals(4, log.logEndOffset) - - val partition = setupPartitionWithMocks(leaderEpoch = leaderEpoch, isLeader = true) - assertEquals(Some(4), partition.leaderLogIfLocal.map(_.logEndOffset)) - assertEquals(None, log.latestEpoch) - - val epochEndOffset = partition.lastOffsetForLeaderEpoch(currentLeaderEpoch = Optional.of(leaderEpoch), - leaderEpoch = leaderEpoch, fetchOnlyFromLeader = true) - assertEquals(UNDEFINED_EPOCH_OFFSET, epochEndOffset.endOffset) - assertEquals(UNDEFINED_EPOCH, epochEndOffset.leaderEpoch) - } - -} diff --git a/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala b/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala index e72f1986f46..b567295809f 100644 --- a/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala @@ -22,6 +22,7 @@ import java.util.{Collections, Random} import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.locks.Lock import kafka.coordinator.AbstractCoordinatorConcurrencyTest._ +import kafka.cluster.Partition import kafka.log.{LogManager, UnifiedLog} import kafka.server.QuotaFactory.QuotaManagers import kafka.server.{KafkaConfig, _} @@ -253,8 +254,8 @@ object AbstractCoordinatorConcurrencyTest { producePurgatory.tryCompleteElseWatch(delayedProduce, producerRequestKeys.toList.asJava) } - override def getMagic(topicPartition: TopicPartition): Option[Byte] = { - Some(RecordBatch.MAGIC_VALUE_V2) + override def onlinePartition(topicPartition: TopicPartition): Option[Partition] = { + Some(mock(classOf[Partition])) } def getOrCreateLogs(): mutable.Map[TopicPartition, (UnifiedLog, Long)] = { diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala index 433047275e2..086f191de0b 100644 --- a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala @@ -416,6 +416,8 @@ class GroupCoordinatorTest { } // advance clock by GroupInitialRebalanceDelay to complete first InitialDelayedJoin + when(replicaManager.onlinePartition(any[TopicPartition])) + .thenReturn(Some(mock(classOf[Partition]))) timer.advanceClock(DefaultRebalanceTimeout + 1) // Awaiting results @@ -636,8 +638,8 @@ class GroupCoordinatorTest { } private def verifySessionExpiration(groupId: String): Unit = { - when(replicaManager.getMagic(any[TopicPartition])) - .thenReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE)) + when(replicaManager.onlinePartition(any[TopicPartition])) + .thenReturn(Some(mock(classOf[Partition]))) timer.advanceClock(DefaultSessionTimeout + 1) @@ -1591,6 +1593,7 @@ class GroupCoordinatorTest { assertEquals(newGeneration, followerJoinGroupResult.generationId) val leaderId = leaderJoinGroupResult.memberId + when(replicaManager.onlinePartition(any[TopicPartition])).thenReturn(Some(mock(classOf[Partition]))) val leaderSyncGroupResult = syncGroupLeader(groupId, leaderJoinGroupResult.generationId, leaderId, Map(leaderId -> Array[Byte]())) assertEquals(Errors.NONE, leaderSyncGroupResult.error) assertTrue(getGroup(groupId).is(Stable)) @@ -1749,7 +1752,6 @@ class GroupCoordinatorTest { when(replicaManager.getPartition(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId))) .thenReturn(HostedPartition.None) - when(replicaManager.getMagic(any[TopicPartition])).thenReturn(Some(RecordBatch.MAGIC_VALUE_V1)) timer.advanceClock(DefaultSessionTimeout + 100) @@ -2061,8 +2063,6 @@ class GroupCoordinatorTest { assertEquals(1, group.numPending) assertEquals(Stable, group.currentState) - when(replicaManager.getMagic(any[TopicPartition])).thenReturn(Some(RecordBatch.MAGIC_VALUE_V1)) - // advance clock to timeout the pending member assertEquals(Set(firstMemberId), group.allMembers) assertEquals(1, group.numPending) @@ -2155,7 +2155,6 @@ class GroupCoordinatorTest { // Advancing Clock by > 100 (session timeout for third and fourth member) // and < 500 (for first and second members). This will force the coordinator to attempt join // completion on heartbeat expiration (since we are in PendingRebalance stage). - when(replicaManager.getMagic(any[TopicPartition])).thenReturn(Some(RecordBatch.MAGIC_VALUE_V1)) timer.advanceClock(120) assertGroupState(groupState = CompletingRebalance) @@ -2230,8 +2229,8 @@ class GroupCoordinatorTest { } // Advance part the rebalance timeout to trigger the delayed operation. - when(replicaManager.getMagic(any[TopicPartition])) - .thenReturn(Some(RecordBatch.MAGIC_VALUE_V1)) + when(replicaManager.onlinePartition(any[TopicPartition])) + .thenReturn(Some(mock(classOf[Partition]))) timer.advanceClock(DefaultRebalanceTimeout / 2 + 1) @@ -2618,7 +2617,6 @@ class GroupCoordinatorTest { val groupTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId) val partition: Partition = mock(classOf[Partition]) - when(replicaManager.getMagic(any[TopicPartition])).thenReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE)) when(replicaManager.getPartition(groupTopicPartition)).thenReturn(HostedPartition.Online(partition)) when(replicaManager.onlinePartition(groupTopicPartition)).thenReturn(Some(partition)) @@ -3514,7 +3512,6 @@ class GroupCoordinatorTest { val groupTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId) val partition: Partition = mock(classOf[Partition]) - when(replicaManager.getMagic(any[TopicPartition])).thenReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE)) when(replicaManager.getPartition(groupTopicPartition)).thenReturn(HostedPartition.Online(partition)) when(replicaManager.onlinePartition(groupTopicPartition)).thenReturn(Some(partition)) @@ -3549,7 +3546,6 @@ class GroupCoordinatorTest { val groupTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId) val partition: Partition = mock(classOf[Partition]) - when(replicaManager.getMagic(any[TopicPartition])).thenReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE)) when(replicaManager.getPartition(groupTopicPartition)).thenReturn(HostedPartition.Online(partition)) when(replicaManager.onlinePartition(groupTopicPartition)).thenReturn(Some(partition)) @@ -3609,7 +3605,6 @@ class GroupCoordinatorTest { val groupTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId) val partition: Partition = mock(classOf[Partition]) - when(replicaManager.getMagic(any[TopicPartition])).thenReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE)) when(replicaManager.getPartition(groupTopicPartition)).thenReturn(HostedPartition.Online(partition)) when(replicaManager.onlinePartition(groupTopicPartition)).thenReturn(Some(partition)) @@ -3690,7 +3685,6 @@ class GroupCoordinatorTest { val groupTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId) val partition: Partition = mock(classOf[Partition]) - when(replicaManager.getMagic(any[TopicPartition])).thenReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE)) when(replicaManager.getPartition(groupTopicPartition)).thenReturn(HostedPartition.Online(partition)) when(replicaManager.onlinePartition(groupTopicPartition)).thenReturn(Some(partition)) @@ -3733,7 +3727,6 @@ class GroupCoordinatorTest { val groupTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId) val partition: Partition = mock(classOf[Partition]) - when(replicaManager.getMagic(any[TopicPartition])).thenReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE)) when(replicaManager.getPartition(groupTopicPartition)).thenReturn(HostedPartition.Online(partition)) when(replicaManager.onlinePartition(groupTopicPartition)).thenReturn(Some(partition)) @@ -3928,8 +3921,6 @@ class GroupCoordinatorTest { supportSkippingAssignment: Boolean = true): Future[JoinGroupResult] = { val (responseFuture, responseCallback) = setupJoinGroupCallback - when(replicaManager.getMagic(any[TopicPartition])).thenReturn(Some(RecordBatch.MAGIC_VALUE_V1)) - groupCoordinator.handleJoinGroup(groupId, memberId, groupInstanceId, requireKnownMemberId, supportSkippingAssignment, "clientId", "clientHost", rebalanceTimeout, sessionTimeout, protocolType, protocols, responseCallback) responseFuture @@ -3967,7 +3958,6 @@ class GroupCoordinatorTest { ) ) }) - when(replicaManager.getMagic(any[TopicPartition])).thenReturn(Some(RecordBatch.MAGIC_VALUE_V1)) groupCoordinator.handleJoinGroup(groupId, memberId, Some(groupInstanceId), requireKnownMemberId, supportSkippingAssignment, "clientId", "clientHost", rebalanceTimeout, sessionTimeout, protocolType, protocols, responseCallback) @@ -4004,7 +3994,7 @@ class GroupCoordinatorTest { ) } ) - when(replicaManager.getMagic(any[TopicPartition])).thenReturn(Some(RecordBatch.MAGIC_VALUE_V1)) + when(replicaManager.onlinePartition(any[TopicPartition])).thenReturn(Some(mock(classOf[Partition]))) groupCoordinator.handleSyncGroup(groupId, generation, leaderId, protocolType, protocolName, groupInstanceId, assignment, responseCallback) @@ -4150,7 +4140,7 @@ class GroupCoordinatorTest { ) ) }) - when(replicaManager.getMagic(any[TopicPartition])).thenReturn(Some(RecordBatch.MAGIC_VALUE_V1)) + when(replicaManager.onlinePartition(any[TopicPartition])).thenReturn(Some(mock(classOf[Partition]))) groupCoordinator.handleCommitOffsets(groupId, memberId, groupInstanceId, generationId, offsets, responseCallback) Await.result(responseFuture, Duration(40, TimeUnit.MILLISECONDS)) @@ -4208,7 +4198,7 @@ class GroupCoordinatorTest { ) ) }) - when(replicaManager.getMagic(any[TopicPartition])).thenReturn(Some(RecordBatch.MAGIC_VALUE_V2)) + when(replicaManager.onlinePartition(any[TopicPartition])).thenReturn(Some(mock(classOf[Partition]))) groupCoordinator.handleTxnCommitOffsets(groupId, transactionalId, producerId, producerEpoch, memberId, groupInstanceId, generationId, offsets, responseCallback, RequestLocal.noCaching, ApiKeys.TXN_OFFSET_COMMIT.latestVersion()) @@ -4232,7 +4222,7 @@ class GroupCoordinatorTest { when(replicaManager.getPartition(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId))) .thenReturn(HostedPartition.None) - when(replicaManager.getMagic(any[TopicPartition])).thenReturn(Some(RecordBatch.MAGIC_VALUE_V1)) + when(replicaManager.onlinePartition(any[TopicPartition])).thenReturn(Some(mock(classOf[Partition]))) groupCoordinator.handleLeaveGroup(groupId, memberIdentities, responseCallback) Await.result(responseFuture, Duration(40, TimeUnit.MILLISECONDS)) diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala index 214295b332c..6033b312b60 100644 --- a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala @@ -1189,7 +1189,7 @@ class GroupMetadataManagerTest { any(), any(), any()) - verify(replicaManager).getMagic(any()) + verify(replicaManager).onlinePartition(any()) } @Test @@ -1227,12 +1227,12 @@ class GroupMetadataManagerTest { any(), any(), any()) - verify(replicaManager).getMagic(any()) + verify(replicaManager).onlinePartition(any()) } @Test def testStoreNonEmptyGroupWhenCoordinatorHasMoved(): Unit = { - when(replicaManager.getMagic(any())).thenReturn(None) + when(replicaManager.onlinePartition(any())).thenReturn(None) val memberId = "memberId" val clientId = "clientId" val clientHost = "localhost" @@ -1253,7 +1253,7 @@ class GroupMetadataManagerTest { groupMetadataManager.storeGroup(group, Map(memberId -> Array[Byte]()), callback) assertEquals(Some(Errors.NOT_COORDINATOR), maybeError) - verify(replicaManager).getMagic(any()) + verify(replicaManager).onlinePartition(any()) } @Test @@ -1326,7 +1326,7 @@ class GroupMetadataManagerTest { val offsets = immutable.Map(topicIdPartition -> offsetAndMetadata) val capturedResponseCallback: ArgumentCaptor[Map[TopicPartition, PartitionResponse] => Unit] = ArgumentCaptor.forClass(classOf[Map[TopicPartition, PartitionResponse] => Unit]) - when(replicaManager.getMagic(any())).thenReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE)) + when(replicaManager.onlinePartition(any())).thenReturn(Some(mock(classOf[Partition]))) var commitErrors: Option[immutable.Map[TopicIdPartition, Errors]] = None def callback(errors: immutable.Map[TopicIdPartition, Errors]): Unit = { commitErrors = Some(errors) @@ -1348,7 +1348,7 @@ class GroupMetadataManagerTest { any(), any(), ArgumentMatchers.eq(Map(offsetTopicPartition -> verificationGuard))) - verify(replicaManager).getMagic(any()) + verify(replicaManager).onlinePartition(any()) capturedResponseCallback.getValue.apply(Map(groupTopicPartition -> new PartitionResponse(Errors.NONE, 0L, RecordBatch.NO_TIMESTAMP, 0L))) @@ -1377,7 +1377,7 @@ class GroupMetadataManagerTest { val offsetTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupMetadataManager.partitionFor(group.groupId)) val offsets = immutable.Map(topicIdPartition -> new OffsetAndMetadata(offset, noLeader, "", time.milliseconds(), noExpiration)) - when(replicaManager.getMagic(any())).thenReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE)) + when(replicaManager.onlinePartition(any())).thenReturn(Some(mock(classOf[Partition]))) var commitErrors: Option[immutable.Map[TopicIdPartition, Errors]] = None def callback(errors: immutable.Map[TopicIdPartition, Errors]): Unit = { @@ -1410,7 +1410,7 @@ class GroupMetadataManagerTest { any(), any(), ArgumentMatchers.eq(Map(offsetTopicPartition -> verificationGuard))) - verify(replicaManager).getMagic(any()) + verify(replicaManager).onlinePartition(any()) } @Test @@ -1429,7 +1429,7 @@ class GroupMetadataManagerTest { val offsetTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupMetadataManager.partitionFor(group.groupId)) val offsets = immutable.Map(topicIdPartition -> new OffsetAndMetadata(offset, noLeader, "", time.milliseconds(), noExpiration)) - when(replicaManager.getMagic(any())).thenReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE)) + when(replicaManager.onlinePartition(any())).thenReturn(Some(mock(classOf[Partition]))) var commitErrors: Option[immutable.Map[TopicIdPartition, Errors]] = None def callback(errors: immutable.Map[TopicIdPartition, Errors]): Unit = { @@ -1462,12 +1462,12 @@ class GroupMetadataManagerTest { any(), any(), ArgumentMatchers.eq(Map(offsetTopicPartition -> verificationGuard))) - verify(replicaManager).getMagic(any()) + verify(replicaManager).onlinePartition(any()) } @Test def testCommitOffsetWhenCoordinatorHasMoved(): Unit = { - when(replicaManager.getMagic(any())).thenReturn(None) + when(replicaManager.onlinePartition(any())).thenReturn(None) val memberId = "" val topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), 0, "foo") val offset = 37 @@ -1491,7 +1491,7 @@ class GroupMetadataManagerTest { val maybeError = commitErrors.get.get(topicIdPartition) assertEquals(Some(Errors.NOT_COORDINATOR), maybeError) - verify(replicaManager).getMagic(any()) + verify(replicaManager).onlinePartition(any()) } @Test @@ -1520,7 +1520,7 @@ class GroupMetadataManagerTest { val offsetTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupMetadataManager.partitionFor(group.groupId)) val offsets = immutable.Map(topicIdPartition -> new OffsetAndMetadata(offset, noLeader, "", time.milliseconds(), noExpiration)) - when(replicaManager.getMagic(any())).thenReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE)) + when(replicaManager.onlinePartition(any())).thenReturn(Some(mock(classOf[Partition]))) var commitErrors: Option[immutable.Map[TopicIdPartition, Errors]] = None def callback(errors: immutable.Map[TopicIdPartition, Errors]): Unit = { @@ -1549,7 +1549,7 @@ class GroupMetadataManagerTest { cachedOffsets.get(topicIdPartition.topicPartition).map(_.offset) ) - verify(replicaManager).getMagic(any()) + verify(replicaManager).onlinePartition(any()) // Will not update sensor if failed assertEquals(0, TestUtils.totalMetricValue(metrics, "offset-commit-count")) } @@ -1573,7 +1573,7 @@ class GroupMetadataManagerTest { topicIdPartitionFailed -> new OffsetAndMetadata(offset, noLeader, "s" * (offsetConfig.maxMetadataSize + 1) , time.milliseconds(), noExpiration) ) - when(replicaManager.getMagic(any())).thenReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE)) + when(replicaManager.onlinePartition(any())).thenReturn(Some(mock(classOf[Partition]))) var commitErrors: Option[immutable.Map[TopicIdPartition, Errors]] = None def callback(errors: immutable.Map[TopicIdPartition, Errors]): Unit = { @@ -1617,7 +1617,7 @@ class GroupMetadataManagerTest { any(), any(), any()) - verify(replicaManager).getMagic(any()) + verify(replicaManager).onlinePartition(any()) assertEquals(1, TestUtils.totalMetricValue(metrics, "offset-commit-count")) } @@ -1740,7 +1740,7 @@ class GroupMetadataManagerTest { val capturedResponseCallback: ArgumentCaptor[Map[TopicPartition, PartitionResponse] => Unit] = ArgumentCaptor.forClass(classOf[Map[TopicPartition, PartitionResponse] => Unit]) - when(replicaManager.getMagic(any())).thenReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE)) + when(replicaManager.onlinePartition(any())).thenReturn(Some(mock(classOf[Partition]))) var commitErrors: Option[immutable.Map[TopicIdPartition, Errors]] = None def callback(errors: immutable.Map[TopicIdPartition, Errors]): Unit = { @@ -1773,7 +1773,7 @@ class GroupMetadataManagerTest { any(), any(), ArgumentMatchers.eq(Map(offsetTopicPartition -> verificationGuard))) - verify(replicaManager).getMagic(any()) + verify(replicaManager).onlinePartition(any()) capturedResponseCallback.getValue.apply(Map(groupTopicPartition -> new PartitionResponse(Errors.NONE, 0L, RecordBatch.NO_TIMESTAMP, 0L))) @@ -1854,7 +1854,7 @@ class GroupMetadataManagerTest { any(), any(), any()) - verify(replicaManager, times(2)).getMagic(any()) + verify(replicaManager, times(2)).onlinePartition(any()) } @Test @@ -1871,7 +1871,7 @@ class GroupMetadataManagerTest { // expect the group metadata tombstone val recordsCapture: ArgumentCaptor[MemoryRecords] = ArgumentCaptor.forClass(classOf[MemoryRecords]) - when(replicaManager.getMagic(any())).thenReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE)) + when(replicaManager.onlinePartition(any())).thenReturn(Some(mock(classOf[Partition]))) mockGetPartition() when(partition.appendRecordsToLeader(recordsCapture.capture(), origin = ArgumentMatchers.eq(AppendOrigin.COORDINATOR), requiredAcks = anyInt(), @@ -1914,7 +1914,7 @@ class GroupMetadataManagerTest { // expect the group metadata tombstone val recordsCapture: ArgumentCaptor[MemoryRecords] = ArgumentCaptor.forClass(classOf[MemoryRecords]) - when(replicaManager.getMagic(any())).thenReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE)) + when(replicaManager.onlinePartition(any())).thenReturn(Some(mock(classOf[Partition]))) mockGetPartition() when(partition.appendRecordsToLeader(recordsCapture.capture(), origin = ArgumentMatchers.eq(AppendOrigin.COORDINATOR), requiredAcks = anyInt(), @@ -1984,7 +1984,7 @@ class GroupMetadataManagerTest { // expect the offset tombstone val recordsCapture: ArgumentCaptor[MemoryRecords] = ArgumentCaptor.forClass(classOf[MemoryRecords]) - + when(replicaManager.onlinePartition(any())).thenReturn(Some(partition)) when(partition.appendRecordsToLeader(recordsCapture.capture(), origin = ArgumentMatchers.eq(AppendOrigin.COORDINATOR), requiredAcks = anyInt(), any(), any())).thenReturn(LogAppendInfo.UNKNOWN_LOG_APPEND_INFO) @@ -2017,7 +2017,7 @@ class GroupMetadataManagerTest { cachedOffsets.get(topicIdPartition2.topicPartition).map(_.offset) ) - verify(replicaManager).onlinePartition(groupTopicPartition) + verify(replicaManager, times(2)).onlinePartition(groupTopicPartition) } @Test @@ -2088,7 +2088,7 @@ class GroupMetadataManagerTest { assertEquals(Some(offset), cachedOffsets.get(topicIdPartition2.topicPartition).map(_.offset)) assertEquals(Some(offset), cachedOffsets.get(topicIdPartition3.topicPartition).map(_.offset)) - verify(replicaManager).onlinePartition(groupTopicPartition) + verify(replicaManager, times(2)).onlinePartition(groupTopicPartition) group.transitionTo(PreparingRebalance) group.transitionTo(Empty) @@ -2114,7 +2114,7 @@ class GroupMetadataManagerTest { assertEquals(Some(offset), cachedOffsets.get(topicIdPartition2.topicPartition).map(_.offset)) assertEquals(Some(offset), cachedOffsets.get(topicIdPartition3.topicPartition).map(_.offset)) - verify(replicaManager, times(2)).onlinePartition(groupTopicPartition) + verify(replicaManager, times(3)).onlinePartition(groupTopicPartition) time.sleep(2) @@ -2139,7 +2139,7 @@ class GroupMetadataManagerTest { assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), cachedOffsets.get(topicIdPartition2.topicPartition).map(_.offset)) assertEquals(Some(offset), cachedOffsets.get(topicIdPartition3.topicPartition).map(_.offset)) - verify(replicaManager, times(3)).onlinePartition(groupTopicPartition) + verify(replicaManager, times(4)).onlinePartition(groupTopicPartition) // advance time to just before the offset of last partition is to be expired, no offset should expire time.sleep(group.currentStateTimestamp.get + defaultOffsetRetentionMs - time.milliseconds() - 1) @@ -2170,7 +2170,7 @@ class GroupMetadataManagerTest { cachedOffsets.get(topicIdPartition3.topicPartition).map(_.offset) ) - verify(replicaManager, times(4)).onlinePartition(groupTopicPartition) + verify(replicaManager, times(5)).onlinePartition(groupTopicPartition) // advance time enough for that last offset to expire time.sleep(2) @@ -2205,7 +2205,7 @@ class GroupMetadataManagerTest { cachedOffsets.get(topicIdPartition3.topicPartition).map(_.offset) ) - verify(replicaManager, times(5)).onlinePartition(groupTopicPartition) + verify(replicaManager, times(6)).onlinePartition(groupTopicPartition) assert(group.is(Dead)) } @@ -2261,7 +2261,7 @@ class GroupMetadataManagerTest { ) assertEquals(Some(offset), cachedOffsets.get(topicIdPartition1.topicPartition).map(_.offset)) - verify(replicaManager).onlinePartition(groupTopicPartition) + verify(replicaManager, times(2)).onlinePartition(groupTopicPartition) // advance time to enough for offsets to expire time.sleep(2) @@ -2286,7 +2286,7 @@ class GroupMetadataManagerTest { cachedOffsets.get(topicIdPartition1.topicPartition).map(_.offset) ) - verify(replicaManager, times(2)).onlinePartition(groupTopicPartition) + verify(replicaManager, times(3)).onlinePartition(groupTopicPartition) assert(group.is(Dead)) } @@ -2401,7 +2401,7 @@ class GroupMetadataManagerTest { cachedOffsets.get(topic2IdPartition1.topicPartition).map(_.offset) ) - verify(replicaManager).onlinePartition(groupTopicPartition) + verify(replicaManager, times(2)).onlinePartition(groupTopicPartition) group.transitionTo(PreparingRebalance) @@ -2419,6 +2419,7 @@ class GroupMetadataManagerTest { group.initNextGeneration() group.transitionTo(Stable) + when(replicaManager.onlinePartition(any)).thenReturn(Some(partition)) // expect the offset tombstone when(partition.appendRecordsToLeader(any[MemoryRecords], origin = ArgumentMatchers.eq(AppendOrigin.COORDINATOR), requiredAcks = anyInt(), @@ -2429,7 +2430,7 @@ class GroupMetadataManagerTest { verify(partition).appendRecordsToLeader(any[MemoryRecords], origin = ArgumentMatchers.eq(AppendOrigin.COORDINATOR), requiredAcks = anyInt(), any(), any()) - verify(replicaManager, times(2)).onlinePartition(groupTopicPartition) + verify(replicaManager, times(3)).onlinePartition(groupTopicPartition) assertEquals(Some(group), groupMetadataManager.getGroup(groupId)) assert(group.is(Stable)) @@ -2932,7 +2933,7 @@ class GroupMetadataManagerTest { new PartitionResponse(error, 0L, RecordBatch.NO_TIMESTAMP, 0L) ) )}) - when(replicaManager.getMagic(any())).thenReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE)) + when(replicaManager.onlinePartition(any())).thenReturn(Some(mock(classOf[Partition]))) capturedRecords } diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala index ed4cf9aff46..28aef1573a9 100644 --- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala @@ -1256,8 +1256,6 @@ class TransactionStateManagerTest { Map(new TopicPartition(TRANSACTION_STATE_TOPIC_NAME, partitionId) -> new PartitionResponse(error, 0L, RecordBatch.NO_TIMESTAMP, 0L))) ) - when(replicaManager.getMagic(any())) - .thenReturn(Some(RecordBatch.MAGIC_VALUE_V1)) } @Test diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala index 5b3cc00732d..974da551e77 100644 --- a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala @@ -111,7 +111,7 @@ class LogCleanerManagerTest extends Logging { val producerIdExpirationCheckIntervalMs = TransactionLogConfig.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT val segments = new LogSegments(tp) val leaderEpochCache = UnifiedLog.maybeCreateLeaderEpochCache( - tpDir, topicPartition, logDirFailureChannel, config.recordVersion, "", None, time.scheduler) + tpDir, topicPartition, logDirFailureChannel, "", None, time.scheduler) val producerStateManager = new ProducerStateManager(topicPartition, tpDir, maxTransactionTimeoutMs, producerStateManagerConfig, time) val offsets = new LogLoader( tpDir, diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerParameterizedIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerParameterizedIntegrationTest.scala index 2601846b1a0..0b53960dd04 100755 --- a/core/src/test/scala/unit/kafka/log/LogCleanerParameterizedIntegrationTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogCleanerParameterizedIntegrationTest.scala @@ -25,17 +25,15 @@ import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.compress.Compression import org.apache.kafka.common.config.TopicConfig import org.apache.kafka.common.record._ -import org.apache.kafka.server.common.MetadataVersion.{IBP_0_10_0_IV1, IBP_0_11_0_IV0, IBP_0_9_0} import org.apache.kafka.server.config.ServerConfigs import org.apache.kafka.server.util.MockTime import org.apache.kafka.storage.internals.checkpoint.OffsetCheckpointFile -import org.apache.kafka.storage.internals.log.{CleanerConfig, LogConfig} +import org.apache.kafka.storage.internals.log.CleanerConfig import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.extension.ExtensionContext import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.{Arguments, ArgumentsProvider, ArgumentsSource} -import scala.annotation.nowarn import scala.collection._ import scala.jdk.CollectionConverters._ @@ -136,101 +134,6 @@ class LogCleanerParameterizedIntegrationTest extends AbstractLogCleanerIntegrati assertEquals(toMap(messages), toMap(read), "Contents of the map shouldn't change") } - @nowarn("cat=deprecation") - @ParameterizedTest - @ArgumentsSource(classOf[LogCleanerParameterizedIntegrationTest.ExcludeZstd]) - def testCleanerWithMessageFormatV0(compressionType: CompressionType): Unit = { - val codec: Compression = Compression.of(compressionType).build() - val largeMessageKey = 20 - val (largeMessageValue, largeMessageSet) = createLargeSingleMessageSet(largeMessageKey, RecordBatch.MAGIC_VALUE_V0, codec) - val maxMessageSize = codec match { - case Compression.NONE => largeMessageSet.sizeInBytes - case _ => - // the broker assigns absolute offsets for message format 0 which potentially causes the compressed size to - // increase because the broker offsets are larger than the ones assigned by the client - // adding `6` to the message set size is good enough for this test: it covers the increased message size while - // still being less than the overhead introduced by the conversion from message format version 0 to 1 - largeMessageSet.sizeInBytes + 6 - } - - cleaner = makeCleaner(partitions = topicPartitions, maxMessageSize = maxMessageSize) - - val log = cleaner.logs.get(topicPartitions(0)) - val props = logConfigProperties(maxMessageSize = maxMessageSize) - props.put(TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG, IBP_0_9_0.version) - log.updateConfig(new LogConfig(props)) - - val appends = writeDups(numKeys = 100, numDups = 3, log = log, codec = codec, magicValue = RecordBatch.MAGIC_VALUE_V0) - val startSize = log.size - cleaner.startup() - - val firstDirty = log.activeSegment.baseOffset - checkLastCleaned("log", 0, firstDirty) - val compactedSize = log.logSegments.asScala.map(_.size).sum - assertTrue(startSize > compactedSize, s"log should have been compacted: startSize=$startSize compactedSize=$compactedSize") - - checkLogAfterAppendingDups(log, startSize, appends) - - val appends2: Seq[(Int, String, Long)] = { - val dupsV0 = writeDups(numKeys = 40, numDups = 3, log = log, codec = codec, magicValue = RecordBatch.MAGIC_VALUE_V0) - val appendInfo = log.appendAsLeader(largeMessageSet, leaderEpoch = 0) - // move LSO forward to increase compaction bound - log.updateHighWatermark(log.logEndOffset) - val largeMessageOffset = appendInfo.firstOffset - - // also add some messages with version 1 and version 2 to check that we handle mixed format versions correctly - props.put(TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG, IBP_0_11_0_IV0.version) - log.updateConfig(new LogConfig(props)) - val dupsV1 = writeDups(startKey = 30, numKeys = 40, numDups = 3, log = log, codec = codec, magicValue = RecordBatch.MAGIC_VALUE_V1) - val dupsV2 = writeDups(startKey = 15, numKeys = 5, numDups = 3, log = log, codec = codec, magicValue = RecordBatch.MAGIC_VALUE_V2) - appends ++ dupsV0 ++ Seq((largeMessageKey, largeMessageValue, largeMessageOffset)) ++ dupsV1 ++ dupsV2 - } - val firstDirty2 = log.activeSegment.baseOffset - checkLastCleaned("log", 0, firstDirty2) - - checkLogAfterAppendingDups(log, startSize, appends2) - } - - @nowarn("cat=deprecation") - @ParameterizedTest - @ArgumentsSource(classOf[LogCleanerParameterizedIntegrationTest.ExcludeZstd]) - def testCleaningNestedMessagesWithV0AndV1(codec: CompressionType): Unit = { - val compression = Compression.of(codec).build() - val maxMessageSize = 192 - cleaner = makeCleaner(partitions = topicPartitions, maxMessageSize = maxMessageSize, segmentSize = 256) - - val log = cleaner.logs.get(topicPartitions(0)) - val props = logConfigProperties(maxMessageSize = maxMessageSize, segmentSize = 256) - props.put(TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG, IBP_0_9_0.version) - log.updateConfig(new LogConfig(props)) - - // with compression enabled, these messages will be written as a single message containing - // all of the individual messages - var appendsV0 = writeDupsSingleMessageSet(numKeys = 2, numDups = 3, log = log, codec = compression, magicValue = RecordBatch.MAGIC_VALUE_V0) - appendsV0 ++= writeDupsSingleMessageSet(numKeys = 2, startKey = 3, numDups = 2, log = log, codec = compression, magicValue = RecordBatch.MAGIC_VALUE_V0) - - props.put(TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG, IBP_0_10_0_IV1.version) - log.updateConfig(new LogConfig(props)) - - var appendsV1 = writeDupsSingleMessageSet(startKey = 4, numKeys = 2, numDups = 2, log = log, codec = compression, magicValue = RecordBatch.MAGIC_VALUE_V1) - appendsV1 ++= writeDupsSingleMessageSet(startKey = 4, numKeys = 2, numDups = 2, log = log, codec = compression, magicValue = RecordBatch.MAGIC_VALUE_V1) - appendsV1 ++= writeDupsSingleMessageSet(startKey = 6, numKeys = 2, numDups = 2, log = log, codec = compression, magicValue = RecordBatch.MAGIC_VALUE_V1) - - val appends = appendsV0 ++ appendsV1 - - val startSize = log.size - cleaner.startup() - - val firstDirty = log.activeSegment.baseOffset - assertTrue(firstDirty > appendsV0.size) // ensure we clean data from V0 and V1 - - checkLastCleaned("log", 0, firstDirty) - val compactedSize = log.logSegments.asScala.map(_.size).sum - assertTrue(startSize > compactedSize, s"log should have been compacted: startSize=$startSize compactedSize=$compactedSize") - - checkLogAfterAppendingDups(log, startSize, appends) - } - @ParameterizedTest @ArgumentsSource(classOf[LogCleanerParameterizedIntegrationTest.AllCompressions]) def cleanerConfigUpdateTest(compressionType: CompressionType): Unit = { @@ -310,27 +213,6 @@ class LogCleanerParameterizedIntegrationTest extends AbstractLogCleanerIntegrati (key, value, deepLogEntry.offset) } } - - private def writeDupsSingleMessageSet(numKeys: Int, numDups: Int, log: UnifiedLog, codec: Compression, - startKey: Int = 0, magicValue: Byte): Seq[(Int, String, Long)] = { - val kvs = for (_ <- 0 until numDups; key <- startKey until (startKey + numKeys)) yield { - val payload = counter.toString - incCounter() - (key, payload) - } - - val records = kvs.map { case (key, payload) => - new SimpleRecord(key.toString.getBytes, payload.getBytes) - } - - val appendInfo = log.appendAsLeader(MemoryRecords.withRecords(magicValue, codec, records: _*), leaderEpoch = 0) - // move LSO forward to increase compaction bound - log.updateHighWatermark(log.logEndOffset) - val offsets = appendInfo.firstOffset to appendInfo.lastOffset - - kvs.zip(offsets).map { case (kv, offset) => (kv._1, kv._2, offset) } - } - } object LogCleanerParameterizedIntegrationTest { diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala index 81600b0f201..be9e1181301 100644 --- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala @@ -190,7 +190,7 @@ class LogCleanerTest extends Logging { val producerIdExpirationCheckIntervalMs = TransactionLogConfig.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT val logSegments = new LogSegments(topicPartition) val leaderEpochCache = UnifiedLog.maybeCreateLeaderEpochCache( - dir, topicPartition, logDirFailureChannel, config.recordVersion, "", None, time.scheduler) + dir, topicPartition, logDirFailureChannel, "", None, time.scheduler) val producerStateManager = new ProducerStateManager(topicPartition, dir, maxTransactionTimeoutMs, producerStateManagerConfig, time) val offsets = new LogLoader( diff --git a/core/src/test/scala/unit/kafka/log/LogConfigTest.scala b/core/src/test/scala/unit/kafka/log/LogConfigTest.scala index d9fa241075d..c1e97d74ef0 100644 --- a/core/src/test/scala/unit/kafka/log/LogConfigTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogConfigTest.scala @@ -28,14 +28,12 @@ import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.Test import java.util.{Collections, Properties} -import org.apache.kafka.server.common.MetadataVersion.IBP_3_0_IV1 import org.apache.kafka.server.config.ServerLogConfigs import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig import org.apache.kafka.storage.internals.log.{LogConfig, ThrottledReplicaListValidator} import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.ValueSource -import scala.annotation.nowarn import scala.jdk.CollectionConverters._ class LogConfigTest { @@ -59,7 +57,6 @@ class LogConfigTest { }) } - @nowarn("cat=deprecation") @Test def testKafkaConfigToProps(): Unit = { val millisInHour = 60L * 60L * 1000L @@ -69,7 +66,6 @@ class LogConfigTest { kafkaProps.put(ServerLogConfigs.LOG_ROLL_TIME_HOURS_CONFIG, "2") kafkaProps.put(ServerLogConfigs.LOG_ROLL_TIME_JITTER_HOURS_CONFIG, "2") kafkaProps.put(ServerLogConfigs.LOG_RETENTION_TIME_HOURS_CONFIG, "960") // 40 days - kafkaProps.put(ServerLogConfigs.LOG_MESSAGE_FORMAT_VERSION_CONFIG, "0.11.0") kafkaProps.put(RemoteLogManagerConfig.LOG_LOCAL_RETENTION_MS_PROP, "2592000000") // 30 days kafkaProps.put(RemoteLogManagerConfig.LOG_LOCAL_RETENTION_BYTES_PROP, "4294967296") // 4 GB @@ -77,13 +73,10 @@ class LogConfigTest { assertEquals(2 * millisInHour, logProps.get(TopicConfig.SEGMENT_MS_CONFIG)) assertEquals(2 * millisInHour, logProps.get(TopicConfig.SEGMENT_JITTER_MS_CONFIG)) assertEquals(40 * millisInDay, logProps.get(TopicConfig.RETENTION_MS_CONFIG)) - // The message format version should always be 3.0 if the inter-broker protocol version is 3.0 or higher - assertEquals(IBP_3_0_IV1.version, logProps.get(TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG)) assertEquals(30 * millisInDay, logProps.get(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG)) assertEquals(4 * bytesInGB, logProps.get(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG)) } - @nowarn("cat=deprecation") @Test def testFromPropsInvalid(): Unit = { LogConfig.configNames.forEach(name => name match { @@ -93,7 +86,6 @@ class LogConfigTest { case TopicConfig.CLEANUP_POLICY_CONFIG => assertPropertyInvalid(name, "true", "foobar") case TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG => assertPropertyInvalid(name, "not_a_number", "-0.1", "1.2") case TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG => assertPropertyInvalid(name, "not_a_number", "0", "-1") - case TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG => assertPropertyInvalid(name, "") case TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG => assertPropertyInvalid(name, "not_a_boolean") case TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG => assertPropertyInvalid(name, "not_a_number", "-3") case TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG => assertPropertyInvalid(name, "not_a_number", "-3") diff --git a/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala b/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala index e7e99852c53..ec82099f730 100644 --- a/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala @@ -28,7 +28,6 @@ import org.apache.kafka.common.record.{ControlRecordType, DefaultRecordBatch, Me import org.apache.kafka.common.utils.{Time, Utils} import org.apache.kafka.coordinator.transaction.TransactionLogConfig import org.apache.kafka.server.common.MetadataVersion -import org.apache.kafka.server.common.MetadataVersion.IBP_0_11_0_IV0 import org.apache.kafka.server.util.{MockTime, Scheduler} import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache import org.apache.kafka.storage.internals.log.{AbortedTxn, CleanerConfig, EpochEntry, LocalLog, LogConfig, LogDirFailureChannel, LogFileUtils, LogLoader, LogOffsetMetadata, LogSegment, LogSegments, LogStartOffsetIncrementReason, OffsetIndex, ProducerStateManager, ProducerStateManagerConfig, SnapshotFile} @@ -50,7 +49,6 @@ import java.nio.file.{Files, NoSuchFileException, Paths} import java.util import java.util.concurrent.{ConcurrentHashMap, ConcurrentMap} import java.util.{Optional, OptionalLong, Properties} -import scala.annotation.nowarn import scala.collection.mutable.ListBuffer import scala.collection.{Iterable, Map, mutable} import scala.jdk.CollectionConverters._ @@ -158,7 +156,7 @@ class LogLoaderTest { val logDirFailureChannel: LogDirFailureChannel = new LogDirFailureChannel(1) val segments = new LogSegments(topicPartition) val leaderEpochCache = UnifiedLog.maybeCreateLeaderEpochCache( - logDir, topicPartition, logDirFailureChannel, config.recordVersion, "", None, time.scheduler) + logDir, topicPartition, logDirFailureChannel, "", None, time.scheduler) val producerStateManager = new ProducerStateManager(topicPartition, logDir, this.maxTransactionTimeoutMs, this.producerStateManagerConfig, time) val logLoader = new LogLoader(logDir, topicPartition, config, time.scheduler, time, @@ -311,11 +309,9 @@ class LogLoaderTest { builder.build() } - @nowarn("cat=deprecation") private def testProducerSnapshotsRecoveryAfterUncleanShutdown(messageFormatVersion: String): Unit = { val logProps = new Properties() logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, "640") - logProps.put(TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG, messageFormatVersion) val logConfig = new LogConfig(logProps) var log = createLog(logDir, logConfig) assertEquals(OptionalLong.empty(), log.oldestProducerSnapshotOffset) @@ -343,13 +339,8 @@ class LogLoaderTest { val expectedSegmentsWithReads = mutable.Set[Long]() val expectedSnapshotOffsets = mutable.Set[Long]() - if (logConfig.messageFormatVersion.isLessThan(IBP_0_11_0_IV0)) { - expectedSegmentsWithReads += activeSegmentOffset - expectedSnapshotOffsets ++= log.logSegments.asScala.map(_.baseOffset).toVector.takeRight(2) :+ log.logEndOffset - } else { - expectedSegmentsWithReads ++= segOffsetsBeforeRecovery ++ Set(activeSegmentOffset) - expectedSnapshotOffsets ++= log.logSegments.asScala.map(_.baseOffset).toVector.takeRight(4) :+ log.logEndOffset - } + expectedSegmentsWithReads ++= segOffsetsBeforeRecovery ++ Set(activeSegmentOffset) + expectedSnapshotOffsets ++= log.logSegments.asScala.map(_.baseOffset).toVector.takeRight(4) :+ log.logEndOffset def createLogWithInterceptedReads(recoveryPoint: Long): UnifiedLog = { val maxTransactionTimeoutMs = 5 * 60 * 1000 @@ -372,7 +363,7 @@ class LogLoaderTest { } } val leaderEpochCache = UnifiedLog.maybeCreateLeaderEpochCache( - logDir, topicPartition, logDirFailureChannel, logConfig.recordVersion, "", None, mockTime.scheduler) + logDir, topicPartition, logDirFailureChannel, "", None, mockTime.scheduler) val producerStateManager = new ProducerStateManager(topicPartition, logDir, maxTransactionTimeoutMs, producerStateManagerConfig, mockTime) val logLoader = new LogLoader( @@ -440,7 +431,7 @@ class LogLoaderTest { val config = new LogConfig(new Properties()) val segments = new LogSegments(topicPartition) val leaderEpochCache = UnifiedLog.maybeCreateLeaderEpochCache( - logDir, topicPartition, logDirFailureChannel, config.recordVersion, "", None, mockTime.scheduler) + logDir, topicPartition, logDirFailureChannel, "", None, mockTime.scheduler) val offsets = new LogLoader( logDir, topicPartition, @@ -533,119 +524,6 @@ class LogLoaderTest { log.close() } - @nowarn("cat=deprecation") - @Test - def testSkipTruncateAndReloadIfOldMessageFormatAndNoCleanShutdown(): Unit = { - val maxTransactionTimeoutMs = 60000 - val producerStateManagerConfig = new ProducerStateManagerConfig(300000, false) - - val stateManager: ProducerStateManager = mock(classOf[ProducerStateManager]) - when(stateManager.isEmpty).thenReturn(true) - when(stateManager.firstUnstableOffset).thenReturn(Optional.empty[LogOffsetMetadata]()) - when(stateManager.producerStateManagerConfig).thenReturn(producerStateManagerConfig) - when(stateManager.maxTransactionTimeoutMs).thenReturn(maxTransactionTimeoutMs) - - val topicPartition = UnifiedLog.parseTopicPartitionName(logDir) - val logProps = new Properties() - logProps.put(TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG, "0.10.2") - val config = new LogConfig(logProps) - val logDirFailureChannel = null - val segments = new LogSegments(topicPartition) - val leaderEpochCache = UnifiedLog.maybeCreateLeaderEpochCache( - logDir, topicPartition, logDirFailureChannel, config.recordVersion, "", None, mockTime.scheduler) - val offsets = new LogLoader( - logDir, - topicPartition, - config, - mockTime.scheduler, - mockTime, - logDirFailureChannel, - false, - segments, - 0L, - 0L, - leaderEpochCache.toJava, - stateManager, - new ConcurrentHashMap[String, Integer], - false - ).load() - val localLog = new LocalLog(logDir, config, segments, offsets.recoveryPoint, - offsets.nextOffsetMetadata, mockTime.scheduler, mockTime, topicPartition, - logDirFailureChannel) - new UnifiedLog(offsets.logStartOffset, - localLog, - brokerTopicStats = brokerTopicStats, - producerIdExpirationCheckIntervalMs = 30000, - leaderEpochCache = leaderEpochCache, - producerStateManager = stateManager, - _topicId = None, - keepPartitionMetadataFile = true) - - verify(stateManager).removeStraySnapshots(any[java.util.List[java.lang.Long]]) - verify(stateManager, times(2)).updateMapEndOffset(0L) - verify(stateManager, times(2)).takeSnapshot() - verify(stateManager).isEmpty - verify(stateManager).firstUnstableOffset - verify(stateManager, times(2)).takeSnapshot() - verify(stateManager, times(2)).updateMapEndOffset(0L) - } - - @nowarn("cat=deprecation") - @Test - def testSkipTruncateAndReloadIfOldMessageFormatAndCleanShutdown(): Unit = { - val maxTransactionTimeoutMs = 60000 - val producerStateManagerConfig = new ProducerStateManagerConfig(300000, false) - - val stateManager: ProducerStateManager = mock(classOf[ProducerStateManager]) - when(stateManager.isEmpty).thenReturn(true) - when(stateManager.firstUnstableOffset).thenReturn(Optional.empty[LogOffsetMetadata]()) - when(stateManager.producerStateManagerConfig).thenReturn(producerStateManagerConfig) - when(stateManager.maxTransactionTimeoutMs).thenReturn(maxTransactionTimeoutMs) - - val topicPartition = UnifiedLog.parseTopicPartitionName(logDir) - val logProps = new Properties() - logProps.put(TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG, "0.10.2") - val config = new LogConfig(logProps) - val logDirFailureChannel = null - val segments = new LogSegments(topicPartition) - val leaderEpochCache = UnifiedLog.maybeCreateLeaderEpochCache( - logDir, topicPartition, logDirFailureChannel, config.recordVersion, "", None, mockTime.scheduler) - val offsets = new LogLoader( - logDir, - topicPartition, - config, - mockTime.scheduler, - mockTime, - logDirFailureChannel, - true, - segments, - 0L, - 0L, - leaderEpochCache.toJava, - stateManager, - new ConcurrentHashMap[String, Integer], - false - ).load() - val localLog = new LocalLog(logDir, config, segments, offsets.recoveryPoint, - offsets.nextOffsetMetadata, mockTime.scheduler, mockTime, topicPartition, - logDirFailureChannel) - new UnifiedLog(offsets.logStartOffset, - localLog, - brokerTopicStats = brokerTopicStats, - producerIdExpirationCheckIntervalMs = 30000, - leaderEpochCache = leaderEpochCache, - producerStateManager = stateManager, - _topicId = None, - keepPartitionMetadataFile = true) - - verify(stateManager).removeStraySnapshots(any[java.util.List[java.lang.Long]]) - verify(stateManager, times(2)).updateMapEndOffset(0L) - verify(stateManager, times(2)).takeSnapshot() - verify(stateManager).isEmpty - verify(stateManager).firstUnstableOffset - } - - @nowarn("cat=deprecation") @Test def testSkipTruncateAndReloadIfNewMessageFormatAndCleanShutdown(): Unit = { val maxTransactionTimeoutMs = 60000 @@ -659,13 +537,11 @@ class LogLoaderTest { when(stateManager.maxTransactionTimeoutMs).thenReturn(maxTransactionTimeoutMs) val topicPartition = UnifiedLog.parseTopicPartitionName(logDir) - val logProps = new Properties() - logProps.put(TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG, "0.11.0") - val config = new LogConfig(logProps) + val config = new LogConfig(new Properties()) val logDirFailureChannel = null val segments = new LogSegments(topicPartition) val leaderEpochCache = UnifiedLog.maybeCreateLeaderEpochCache( - logDir, topicPartition, logDirFailureChannel, config.recordVersion, "", None, mockTime.scheduler) + logDir, topicPartition, logDirFailureChannel, "", None, mockTime.scheduler) val offsets = new LogLoader( logDir, topicPartition, @@ -888,38 +764,6 @@ class LogLoaderTest { log.close() } - /** - * Test that if messages format version of the messages in a segment is before 0.10.0, the time index should be empty. - */ - @nowarn("cat=deprecation") - @Test - def testRebuildTimeIndexForOldMessages(): Unit = { - val numMessages = 200 - val segmentSize = 200 - val logProps = new Properties() - logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, segmentSize.toString) - logProps.put(TopicConfig.INDEX_INTERVAL_BYTES_CONFIG, "1") - logProps.put(TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG, "0.9.0") - val logConfig = new LogConfig(logProps) - var log = createLog(logDir, logConfig) - for (i <- 0 until numMessages) - log.appendAsLeader(TestUtils.singletonRecords(value = TestUtils.randomBytes(10), - timestamp = mockTime.milliseconds + i * 10, magicValue = RecordBatch.MAGIC_VALUE_V1), leaderEpoch = 0) - val timeIndexFiles = log.logSegments.asScala.map(_.timeIndexFile()) - log.close() - - // Delete the time index. - timeIndexFiles.foreach(file => Files.delete(file.toPath)) - - // The rebuilt time index should be empty - log = createLog(logDir, logConfig, recoveryPoint = numMessages + 1, lastShutdownClean = false) - for (segment <- log.logSegments.asScala.init) { - assertEquals(0, segment.timeIndex.entries, "The time index should be empty") - assertEquals(0, segment.timeIndexFile().length, "The time index file size should be 0") - } - } - - /** * Test that if we have corrupted an index segment it is rebuilt when the log is re-opened */ @@ -1116,30 +960,6 @@ class LogLoaderTest { Utils.delete(logDir) } - @nowarn("cat=deprecation") - @Test - def testLeaderEpochCacheClearedAfterStaticMessageFormatDowngrade(): Unit = { - val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1000, indexIntervalBytes = 1, maxMessageBytes = 65536) - val log = createLog(logDir, logConfig) - log.appendAsLeader(TestUtils.records(List(new SimpleRecord("foo".getBytes()))), leaderEpoch = 5) - assertEquals(Some(5), log.latestEpoch) - log.close() - - // reopen the log with an older message format version and check the cache - val logProps = new Properties() - logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, "1000") - logProps.put(TopicConfig.INDEX_INTERVAL_BYTES_CONFIG, "1") - logProps.put(TopicConfig.MAX_MESSAGE_BYTES_CONFIG, "65536") - logProps.put(TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG, "0.10.2") - val downgradedLogConfig = new LogConfig(logProps) - val reopened = createLog(logDir, downgradedLogConfig, lastShutdownClean = false) - LogTestUtils.assertLeaderEpochCacheEmpty(reopened) - - reopened.appendAsLeader(TestUtils.records(List(new SimpleRecord("bar".getBytes())), - magicValue = RecordVersion.V1.value), leaderEpoch = 5) - LogTestUtils.assertLeaderEpochCacheEmpty(reopened) - } - @Test def testOverCompactedLogRecoveryMultiRecord(): Unit = { // append some messages to create some segments @@ -1814,7 +1634,7 @@ class LogLoaderTest { assertEquals(5, segments.firstSegment.get.baseOffset) val leaderEpochCache = UnifiedLog.maybeCreateLeaderEpochCache( - logDir, topicPartition, logDirFailureChannel, logConfig.recordVersion, "", None, mockTime.scheduler) + logDir, topicPartition, logDirFailureChannel, "", None, mockTime.scheduler) val offsets = new LogLoader( logDir, topicPartition, diff --git a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala index 53f99bcc512..399828ddedc 100755 --- a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala +++ b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala @@ -57,7 +57,6 @@ import java.nio.ByteBuffer import java.nio.file.Files import java.util.concurrent.{Callable, ConcurrentHashMap, Executors, TimeUnit} import java.util.{Optional, OptionalLong, Properties} -import scala.annotation.nowarn import scala.collection.immutable.SortedSet import scala.collection.mutable.ListBuffer import scala.jdk.CollectionConverters._ @@ -2581,49 +2580,6 @@ class UnifiedLogTest { assertEquals(None, log.leaderEpochCache.flatMap(_.latestEpoch.toScala)) } - @nowarn("cat=deprecation") - @Test - def testLeaderEpochCacheClearedAfterDynamicMessageFormatDowngrade(): Unit = { - val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1000, indexIntervalBytes = 1, maxMessageBytes = 64 * 1024) - val log = createLog(logDir, logConfig) - log.appendAsLeader(TestUtils.records(List(new SimpleRecord("foo".getBytes()))), leaderEpoch = 5) - assertEquals(Some(5), log.latestEpoch) - - val logProps = new Properties() - logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, "1000") - logProps.put(TopicConfig.INDEX_INTERVAL_BYTES_CONFIG, "1") - logProps.put(TopicConfig.MAX_MESSAGE_BYTES_CONFIG, "65536") - logProps.put(TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG, "0.10.2") - val downgradedLogConfig = new LogConfig(logProps) - log.updateConfig(downgradedLogConfig) - LogTestUtils.assertLeaderEpochCacheEmpty(log) - - log.appendAsLeader(TestUtils.records(List(new SimpleRecord("bar".getBytes())), - magicValue = RecordVersion.V1.value), leaderEpoch = 5) - LogTestUtils.assertLeaderEpochCacheEmpty(log) - } - - @nowarn("cat=deprecation") - @Test - def testLeaderEpochCacheCreatedAfterMessageFormatUpgrade(): Unit = { - val logProps = new Properties() - logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, "1000") - logProps.put(TopicConfig.INDEX_INTERVAL_BYTES_CONFIG, "1") - logProps.put(TopicConfig.MAX_MESSAGE_BYTES_CONFIG, "65536") - logProps.put(TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG, "0.10.2") - val logConfig = new LogConfig(logProps) - val log = createLog(logDir, logConfig) - log.appendAsLeader(TestUtils.records(List(new SimpleRecord("bar".getBytes())), - magicValue = RecordVersion.V1.value), leaderEpoch = 5) - LogTestUtils.assertLeaderEpochCacheEmpty(log) - - logProps.put(TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG, "0.11.0") - val upgradedLogConfig = new LogConfig(logProps) - log.updateConfig(upgradedLogConfig) - log.appendAsLeader(TestUtils.records(List(new SimpleRecord("foo".getBytes()))), leaderEpoch = 5) - assertEquals(Some(5), log.latestEpoch) - } - @Test def testSplitOnOffsetOverflow(): Unit = { // create a log such that one log segment has offsets that overflow, and call the split API on that segment diff --git a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala index 4ee1a108d1d..3988ea6842b 100755 --- a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala +++ b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala @@ -49,7 +49,6 @@ import org.mockito.ArgumentMatchers.anyString import org.mockito.{ArgumentCaptor, ArgumentMatchers, Mockito} import org.mockito.Mockito.{mock, verify, verifyNoMoreInteractions, when} -import scala.annotation.nowarn import scala.jdk.CollectionConverters._ import scala.collection.Set @@ -277,7 +276,6 @@ class DynamicBrokerConfigTest { verifyNoMoreInteractions(remoteLogManager) } - @nowarn("cat=deprecation") @Test def testConfigUpdateWithSomeInvalidConfigs(): Unit = { val origProps = TestUtils.createBrokerConfig(0, null, port = 8181) @@ -295,9 +293,6 @@ class DynamicBrokerConfigTest { // Test update of configs with invalid type val invalidProps = Map(CleanerConfig.LOG_CLEANER_THREADS_PROP -> "invalid") verifyConfigUpdateWithInvalidConfig(config, origProps, validProps, invalidProps) - - val excludedTopicConfig = Map(ServerLogConfigs.LOG_MESSAGE_FORMAT_VERSION_CONFIG -> "0.10.2") - verifyConfigUpdateWithInvalidConfig(config, origProps, validProps, excludedTopicConfig) } @Test diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala index a6a01845a78..92935a887cf 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala @@ -184,7 +184,7 @@ class KafkaApisTest extends Logging { TestUtils.createBrokerConfig(brokerId, "zk") } overrideProperties.foreach( p => properties.put(p._1, p._2)) - TestUtils.setIbpAndMessageFormatVersions(properties, interBrokerProtocolVersion) + TestUtils.setIbpVersion(properties, interBrokerProtocolVersion) val config = new KafkaConfig(properties) val forwardingManagerOpt = if (enableForwarding) @@ -3039,27 +3039,6 @@ class KafkaApisTest extends Logging { () => kafkaApis.handleWriteTxnMarkersRequest(request, RequestLocal.withThreadConfinedCaching)) } - @Test - def shouldRespondWithUnsupportedForMessageFormatOnHandleWriteTxnMarkersWhenMagicLowerThanRequired(): Unit = { - val topicPartition = new TopicPartition("t", 0) - val (_, request) = createWriteTxnMarkersRequest(asList(topicPartition)) - val expectedErrors = Map(topicPartition -> Errors.UNSUPPORTED_FOR_MESSAGE_FORMAT).asJava - val capturedResponse: ArgumentCaptor[WriteTxnMarkersResponse] = ArgumentCaptor.forClass(classOf[WriteTxnMarkersResponse]) - - when(replicaManager.getMagic(topicPartition)) - .thenReturn(Some(RecordBatch.MAGIC_VALUE_V1)) - kafkaApis = createKafkaApis() - kafkaApis.handleWriteTxnMarkersRequest(request, RequestLocal.withThreadConfinedCaching) - - verify(requestChannel).sendResponse( - ArgumentMatchers.eq(request), - capturedResponse.capture(), - ArgumentMatchers.eq(None) - ) - val markersResponse = capturedResponse.getValue - assertEquals(expectedErrors, markersResponse.errorsByProducerId.get(1L)) - } - @Test def shouldRespondWithUnknownTopicWhenPartitionIsNotHosted(): Unit = { val topicPartition = new TopicPartition("t", 0) @@ -3067,7 +3046,7 @@ class KafkaApisTest extends Logging { val expectedErrors = Map(topicPartition -> Errors.UNKNOWN_TOPIC_OR_PARTITION).asJava val capturedResponse: ArgumentCaptor[WriteTxnMarkersResponse] = ArgumentCaptor.forClass(classOf[WriteTxnMarkersResponse]) - when(replicaManager.getMagic(topicPartition)) + when(replicaManager.onlinePartition(topicPartition)) .thenReturn(None) kafkaApis = createKafkaApis() kafkaApis.handleWriteTxnMarkersRequest(request, RequestLocal.withThreadConfinedCaching) @@ -3094,8 +3073,8 @@ class KafkaApisTest extends Logging { val request = buildRequest(writeTxnMarkersRequest) val capturedResponse: ArgumentCaptor[WriteTxnMarkersResponse] = ArgumentCaptor.forClass(classOf[WriteTxnMarkersResponse]) - when(replicaManager.getMagic(any())) - .thenReturn(Some(RecordBatch.MAGIC_VALUE_V2)) + when(replicaManager.onlinePartition(any())) + .thenReturn(Some(mock(classOf[Partition]))) when(groupCoordinator.isNewGroupCoordinator) .thenReturn(true) when(groupCoordinator.completeTransaction( @@ -3119,46 +3098,6 @@ class KafkaApisTest extends Logging { assertEquals(2, markersResponse.errorsByProducerId.size()) } - @Test - def shouldRespondWithUnsupportedMessageFormatForBadPartitionAndNoErrorsForGoodPartition(): Unit = { - val tp1 = new TopicPartition("t", 0) - val tp2 = new TopicPartition("t1", 0) - val (_, request) = createWriteTxnMarkersRequest(asList(tp1, tp2)) - val expectedErrors = Map(tp1 -> Errors.UNSUPPORTED_FOR_MESSAGE_FORMAT, tp2 -> Errors.NONE).asJava - - val capturedResponse: ArgumentCaptor[WriteTxnMarkersResponse] = ArgumentCaptor.forClass(classOf[WriteTxnMarkersResponse]) - val responseCallback: ArgumentCaptor[Map[TopicPartition, PartitionResponse] => Unit] = ArgumentCaptor.forClass(classOf[Map[TopicPartition, PartitionResponse] => Unit]) - - when(replicaManager.getMagic(tp1)) - .thenReturn(Some(RecordBatch.MAGIC_VALUE_V1)) - when(replicaManager.getMagic(tp2)) - .thenReturn(Some(RecordBatch.MAGIC_VALUE_V2)) - - val requestLocal = RequestLocal.withThreadConfinedCaching - when(replicaManager.appendRecords(anyLong, - anyShort, - ArgumentMatchers.eq(true), - ArgumentMatchers.eq(AppendOrigin.COORDINATOR), - any(), - responseCallback.capture(), - any(), - any(), - ArgumentMatchers.eq(requestLocal), - any(), - any() - )).thenAnswer(_ => responseCallback.getValue.apply(Map(tp2 -> new PartitionResponse(Errors.NONE)))) - kafkaApis = createKafkaApis() - kafkaApis.handleWriteTxnMarkersRequest(request, requestLocal) - - verify(requestChannel).sendResponse( - ArgumentMatchers.eq(request), - capturedResponse.capture(), - ArgumentMatchers.eq(None) - ) - val markersResponse = capturedResponse.getValue - assertEquals(expectedErrors, markersResponse.errorsByProducerId.get(1L)) - } - @Test def shouldResignCoordinatorsIfStopReplicaReceivedWithDeleteFlagAndLeaderEpoch(): Unit = { shouldResignCoordinatorsIfStopReplicaReceivedWithDeleteFlag( @@ -3261,10 +3200,10 @@ class KafkaApisTest extends Logging { val capturedResponse: ArgumentCaptor[WriteTxnMarkersResponse] = ArgumentCaptor.forClass(classOf[WriteTxnMarkersResponse]) val responseCallback: ArgumentCaptor[Map[TopicPartition, PartitionResponse] => Unit] = ArgumentCaptor.forClass(classOf[Map[TopicPartition, PartitionResponse] => Unit]) - when(replicaManager.getMagic(tp1)) + when(replicaManager.onlinePartition(tp1)) .thenReturn(None) - when(replicaManager.getMagic(tp2)) - .thenReturn(Some(RecordBatch.MAGIC_VALUE_V2)) + when(replicaManager.onlinePartition(tp2)) + .thenReturn(Some(mock(classOf[Partition]))) val requestLocal = RequestLocal.withThreadConfinedCaching when(replicaManager.appendRecords(anyLong, @@ -3296,8 +3235,8 @@ class KafkaApisTest extends Logging { def shouldAppendToLogOnWriteTxnMarkersWhenCorrectMagicVersion(allowedAclOperation: String): Unit = { val topicPartition = new TopicPartition("t", 0) val request = createWriteTxnMarkersRequest(asList(topicPartition))._2 - when(replicaManager.getMagic(topicPartition)) - .thenReturn(Some(RecordBatch.MAGIC_VALUE_V2)) + when(replicaManager.onlinePartition(topicPartition)) + .thenReturn(Some(mock(classOf[Partition]))) val requestLocal = RequestLocal.withThreadConfinedCaching @@ -3374,8 +3313,8 @@ class KafkaApisTest extends Logging { val requestChannelRequest = buildRequest(writeTxnMarkersRequest) allPartitions.foreach { tp => - when(replicaManager.getMagic(tp)) - .thenReturn(Some(RecordBatch.MAGIC_VALUE_V2)) + when(replicaManager.onlinePartition(tp)) + .thenReturn(Some(mock(classOf[Partition]))) } when(groupCoordinator.onTransactionCompleted( @@ -3500,8 +3439,8 @@ class KafkaApisTest extends Logging { val requestChannelRequest = buildRequest(writeTxnMarkersRequest) allPartitions.foreach { tp => - when(replicaManager.getMagic(tp)) - .thenReturn(Some(RecordBatch.MAGIC_VALUE_V2)) + when(replicaManager.onlinePartition(tp)) + .thenReturn(Some(mock(classOf[Partition]))) } when(groupCoordinator.completeTransaction( @@ -3618,8 +3557,8 @@ class KafkaApisTest extends Logging { val requestChannelRequest = buildRequest(writeTxnMarkersRequest) - when(replicaManager.getMagic(offset0)) - .thenReturn(Some(RecordBatch.MAGIC_VALUE_V2)) + when(replicaManager.onlinePartition(offset0)) + .thenReturn(Some(mock(classOf[Partition]))) when(groupCoordinator.completeTransaction( ArgumentMatchers.eq(offset0), diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala index 350e2ae64c8..92a7d7a2003 100755 --- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala @@ -39,7 +39,7 @@ import org.apache.kafka.network.SocketServerConfigs import org.apache.kafka.raft.QuorumConfig import org.apache.kafka.security.PasswordEncoderConfigs import org.apache.kafka.server.common.MetadataVersion -import org.apache.kafka.server.common.MetadataVersion.{IBP_0_8_2, IBP_3_0_IV1} +import org.apache.kafka.server.common.MetadataVersion.IBP_0_8_2 import org.apache.kafka.server.config.{DelegationTokenManagerConfigs, KRaftConfigs, QuotaConfig, ReplicationConfigs, ServerConfigs, ServerLogConfigs, ServerTopicConfigSynonyms, ZkConfigs} import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig import org.apache.kafka.server.metrics.MetricConfigs @@ -48,7 +48,6 @@ import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.Test import org.junit.jupiter.api.function.Executable -import scala.annotation.nowarn import scala.jdk.CollectionConverters._ class KafkaConfigTest { @@ -649,7 +648,6 @@ class KafkaConfigTest { assertEquals(conf.effectiveAdvertisedBrokerListeners, listenerListToEndPoints("PLAINTEXT://:9092")) } - @nowarn("cat=deprecation") @Test def testVersionConfiguration(): Unit = { val props = new Properties() @@ -659,15 +657,11 @@ class KafkaConfigTest { assertEquals(MetadataVersion.latestProduction, conf.interBrokerProtocolVersion) props.setProperty(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG, "0.8.2.0") - // We need to set the message format version to make the configuration valid. - props.setProperty(ServerLogConfigs.LOG_MESSAGE_FORMAT_VERSION_CONFIG, "0.8.2.0") val conf2 = KafkaConfig.fromProps(props) assertEquals(IBP_0_8_2, conf2.interBrokerProtocolVersion) // check that 0.8.2.0 is the same as 0.8.2.1 props.setProperty(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG, "0.8.2.1") - // We need to set the message format version to make the configuration valid - props.setProperty(ServerLogConfigs.LOG_MESSAGE_FORMAT_VERSION_CONFIG, "0.8.2.1") val conf3 = KafkaConfig.fromProps(props) assertEquals(IBP_0_8_2, conf3.interBrokerProtocolVersion) @@ -818,32 +812,6 @@ class KafkaConfigTest { assertBadConfigContainingMessage(props, "advertised.listeners listener names must be equal to or a subset of the ones defined in listeners") } - @nowarn("cat=deprecation") - @Test - def testInterBrokerVersionMessageFormatCompatibility(): Unit = { - def buildConfig(interBrokerProtocol: MetadataVersion, messageFormat: MetadataVersion): KafkaConfig = { - val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181) - props.setProperty(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG, interBrokerProtocol.version) - props.setProperty(ServerLogConfigs.LOG_MESSAGE_FORMAT_VERSION_CONFIG, messageFormat.version) - KafkaConfig.fromProps(props) - } - - MetadataVersion.VERSIONS.foreach { interBrokerVersion => - MetadataVersion.VERSIONS.foreach { messageFormatVersion => - if (interBrokerVersion.highestSupportedRecordVersion.value >= messageFormatVersion.highestSupportedRecordVersion.value) { - val config = buildConfig(interBrokerVersion, messageFormatVersion) - assertEquals(interBrokerVersion, config.interBrokerProtocolVersion) - if (interBrokerVersion.isAtLeast(IBP_3_0_IV1)) - assertEquals(IBP_3_0_IV1, config.logMessageFormatVersion) - else - assertEquals(messageFormatVersion, config.logMessageFormatVersion) - } else { - assertThrows(classOf[IllegalArgumentException], () => buildConfig(interBrokerVersion, messageFormatVersion)) - } - } - } - } - @Test def testFromPropsInvalid(): Unit = { def baseProperties: Properties = { @@ -1151,7 +1119,6 @@ class KafkaConfigTest { } } - @nowarn("cat=deprecation") @Test def testDynamicLogConfigs(): Unit = { def baseProperties: Properties = { @@ -1233,7 +1200,6 @@ class KafkaConfigTest { assertDynamic(kafkaConfigProp, 10015L, () => config.remoteLogManagerConfig.logLocalRetentionMs) case TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG => assertDynamic(kafkaConfigProp, 10016L, () => config.remoteLogManagerConfig.logLocalRetentionBytes) - case TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG => // not dynamically updatable case QuotaConfig.FOLLOWER_REPLICATION_THROTTLED_REPLICAS_CONFIG => // topic only config diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index 68bf1ec9922..6d9fb0bb33f 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -2910,7 +2910,7 @@ class ReplicaManagerTest { val maxProducerIdExpirationMs = 30000 val segments = new LogSegments(tp) val leaderEpochCache = UnifiedLog.maybeCreateLeaderEpochCache( - logDir, tp, mockLogDirFailureChannel, logConfig.recordVersion, "", None, time.scheduler) + logDir, tp, mockLogDirFailureChannel, "", None, time.scheduler) val producerStateManager = new ProducerStateManager(tp, logDir, maxTransactionTimeoutMs, new ProducerStateManagerConfig(maxProducerIdExpirationMs, true), time) val offsets = new LogLoader( diff --git a/core/src/test/scala/unit/kafka/utils/SchedulerTest.scala b/core/src/test/scala/unit/kafka/utils/SchedulerTest.scala index 3aaa58641bc..cb889fe91a0 100644 --- a/core/src/test/scala/unit/kafka/utils/SchedulerTest.scala +++ b/core/src/test/scala/unit/kafka/utils/SchedulerTest.scala @@ -141,7 +141,7 @@ class SchedulerTest { val logDirFailureChannel = new LogDirFailureChannel(10) val segments = new LogSegments(topicPartition) val leaderEpochCache = UnifiedLog.maybeCreateLeaderEpochCache( - logDir, topicPartition, logDirFailureChannel, logConfig.recordVersion, "", None, mockTime.scheduler) + logDir, topicPartition, logDirFailureChannel, "", None, mockTime.scheduler) val producerStateManager = new ProducerStateManager(topicPartition, logDir, maxTransactionTimeoutMs, new ProducerStateManagerConfig(maxProducerIdExpirationMs, false), mockTime) val offsets = new LogLoader( diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index 379ba5a532f..389291f1800 100755 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -78,7 +78,6 @@ import java.util import java.util.concurrent._ import java.util.concurrent.atomic.AtomicBoolean import java.util.{Collections, Optional, Properties} -import scala.annotation.nowarn import scala.collection.mutable.ArrayBuffer import scala.collection.{Map, Seq, mutable} import scala.concurrent.duration.FiniteDuration @@ -356,12 +355,8 @@ object TestUtils extends Logging { props } - @nowarn("cat=deprecation") - def setIbpAndMessageFormatVersions(config: Properties, version: MetadataVersion): Unit = { + def setIbpVersion(config: Properties, version: MetadataVersion): Unit = { config.setProperty(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG, version.version) - // for clarity, only set the log message format version if it's not ignored - if (!LogConfig.shouldIgnoreMessageFormatVersion(version)) - config.setProperty(ServerLogConfigs.LOG_MESSAGE_FORMAT_VERSION_CONFIG, version.version) } def createAdminClient[B <: KafkaBroker]( diff --git a/docs/upgrade.html b/docs/upgrade.html index 1713b24348e..653856f1aea 100644 --- a/docs/upgrade.html +++ b/docs/upgrade.html @@ -61,6 +61,8 @@
  • The org.apache.kafka.clients.producer.internals.DefaultPartitioner and org.apache.kafka.clients.producer.UniformStickyPartitioner class was removed.
  • +
  • The log.message.format.version and message.format.version configs were removed. +
  • Broker diff --git a/server-common/src/main/java/org/apache/kafka/server/config/ServerLogConfigs.java b/server-common/src/main/java/org/apache/kafka/server/config/ServerLogConfigs.java index 21615a3f757..4cd83ef1acf 100644 --- a/server-common/src/main/java/org/apache/kafka/server/config/ServerLogConfigs.java +++ b/server-common/src/main/java/org/apache/kafka/server/config/ServerLogConfigs.java @@ -19,7 +19,6 @@ package org.apache.kafka.server.config; import org.apache.kafka.common.config.TopicConfig; -import static org.apache.kafka.server.common.MetadataVersion.IBP_3_0_IV1; import static org.apache.kafka.server.config.ServerTopicConfigSynonyms.LOG_PREFIX; /** @@ -106,19 +105,6 @@ public class ServerLogConfigs { public static final String LOG_PRE_ALLOCATE_CONFIG = ServerTopicConfigSynonyms.serverSynonym(TopicConfig.PREALLOCATE_CONFIG); public static final String LOG_PRE_ALLOCATE_ENABLE_DOC = "Should pre allocate file when create new segment? If you are using Kafka on Windows, you probably need to set it to true."; - /* See `TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG` for details */ - /** - * @deprecated since "3.0" - */ - @Deprecated - public static final String LOG_MESSAGE_FORMAT_VERSION_CONFIG = ServerTopicConfigSynonyms.serverSynonym(TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG); - @Deprecated - public static final String LOG_MESSAGE_FORMAT_VERSION_DEFAULT = IBP_3_0_IV1.version(); - public static final String LOG_MESSAGE_FORMAT_VERSION_DOC = "Specify the message format version the broker will use to append messages to the logs. The value should be a valid MetadataVersion. " + - "Some examples are: 0.8.2, 0.9.0.0, 0.10.0, check MetadataVersion for more details. By setting a particular message format version, the " + - "user is certifying that all the existing messages on disk are smaller or equal than the specified version. Setting this value incorrectly " + - "will cause consumers with older versions to break as they will receive messages with a format that they don't understand."; - public static final String LOG_MESSAGE_TIMESTAMP_TYPE_CONFIG = ServerTopicConfigSynonyms.serverSynonym(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG); public static final String LOG_MESSAGE_TIMESTAMP_TYPE_DEFAULT = "CreateTime"; public static final String LOG_MESSAGE_TIMESTAMP_TYPE_DOC = "Define whether the timestamp in the message is message create time or log append time. The value should be either " + diff --git a/server-common/src/main/java/org/apache/kafka/server/config/ServerTopicConfigSynonyms.java b/server-common/src/main/java/org/apache/kafka/server/config/ServerTopicConfigSynonyms.java index 509cb27be3a..3fc8c4435b9 100644 --- a/server-common/src/main/java/org/apache/kafka/server/config/ServerTopicConfigSynonyms.java +++ b/server-common/src/main/java/org/apache/kafka/server/config/ServerTopicConfigSynonyms.java @@ -50,7 +50,6 @@ public final class ServerTopicConfigSynonyms { * the first synonym and ignore the second. */ // Topic configs with no mapping to a server config can be found in `LogConfig.CONFIGS_WITH_NO_SERVER_DEFAULTS` - @SuppressWarnings("deprecation") public static final Map> ALL_TOPIC_CONFIG_SYNONYMS = Collections.unmodifiableMap(Utils.mkMap( sameNameWithLogPrefix(TopicConfig.SEGMENT_BYTES_CONFIG), listWithLogPrefix(TopicConfig.SEGMENT_MS_CONFIG, @@ -84,7 +83,6 @@ public final class ServerTopicConfigSynonyms { sameName(TopicConfig.COMPRESSION_LZ4_LEVEL_CONFIG), sameName(TopicConfig.COMPRESSION_ZSTD_LEVEL_CONFIG), sameNameWithLogPrefix(TopicConfig.PREALLOCATE_CONFIG), - sameNameWithLogPrefix(TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG), sameNameWithLogPrefix(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG), sameNameWithLogPrefix(TopicConfig.MESSAGE_TIMESTAMP_BEFORE_MAX_MS_CONFIG), sameNameWithLogPrefix(TopicConfig.MESSAGE_TIMESTAMP_AFTER_MAX_MS_CONFIG), diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/LocalLog.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/LocalLog.java index d2cd71addad..e48f63eafd8 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/LocalLog.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/LocalLog.java @@ -24,7 +24,6 @@ import org.apache.kafka.common.message.FetchResponseData; import org.apache.kafka.common.record.FileLogInputStream; import org.apache.kafka.common.record.FileRecords; import org.apache.kafka.common.record.MemoryRecords; -import org.apache.kafka.common.record.RecordVersion; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; @@ -223,11 +222,6 @@ public class LocalLog { public void updateConfig(LogConfig newConfig) { LogConfig oldConfig = config; config = newConfig; - RecordVersion oldRecordVersion = oldConfig.recordVersion(); - RecordVersion newRecordVersion = newConfig.recordVersion(); - if (newRecordVersion.precedes(oldRecordVersion)) { - logger.warn("Record format version has been downgraded from {} to {}.", oldRecordVersion, newRecordVersion); - } } public void checkIfMemoryMappedBufferClosed() { diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java index f9d910858b2..e8935249261 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java @@ -27,13 +27,11 @@ import org.apache.kafka.common.config.TopicConfig; import org.apache.kafka.common.errors.InvalidConfigurationException; import org.apache.kafka.common.record.CompressionType; import org.apache.kafka.common.record.LegacyRecord; -import org.apache.kafka.common.record.RecordVersion; import org.apache.kafka.common.record.Records; import org.apache.kafka.common.record.TimestampType; import org.apache.kafka.common.utils.ConfigUtils; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.server.common.MetadataVersion; -import org.apache.kafka.server.common.MetadataVersionValidator; import org.apache.kafka.server.config.QuotaConfig; import org.apache.kafka.server.config.ServerLogConfigs; import org.apache.kafka.server.config.ServerTopicConfigSynonyms; @@ -69,45 +67,6 @@ import static org.apache.kafka.server.common.MetadataVersion.IBP_3_0_IV1; public class LogConfig extends AbstractConfig { - public static class MessageFormatVersion { - private final String messageFormatVersionString; - private final String interBrokerProtocolVersionString; - private final MetadataVersion messageFormatVersion; - private final MetadataVersion interBrokerProtocolVersion; - - public MessageFormatVersion(String messageFormatVersionString, String interBrokerProtocolVersionString) { - this.messageFormatVersionString = messageFormatVersionString; - this.interBrokerProtocolVersionString = interBrokerProtocolVersionString; - this.messageFormatVersion = MetadataVersion.fromVersionString(messageFormatVersionString); - this.interBrokerProtocolVersion = MetadataVersion.fromVersionString(interBrokerProtocolVersionString); - } - - public MetadataVersion messageFormatVersion() { - return messageFormatVersion; - } - - public MetadataVersion interBrokerProtocolVersion() { - return interBrokerProtocolVersion; - } - - public boolean shouldIgnore() { - return shouldIgnoreMessageFormatVersion(interBrokerProtocolVersion); - } - - public boolean shouldWarn() { - return interBrokerProtocolVersion.isAtLeast(IBP_3_0_IV1) - && messageFormatVersion.highestSupportedRecordVersion().precedes(RecordVersion.V2); - } - - @SuppressWarnings("deprecation") - public String topicWarningMessage(String topicName) { - return "Topic configuration " + TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG + " with value `" - + messageFormatVersionString + "` is ignored for `" + topicName + "` because the " - + "inter-broker protocol version `" + interBrokerProtocolVersionString + "` is greater or " - + "equal than 3.0. This configuration is deprecated and it will be removed in Apache Kafka 4.0."; - } - } - private static class RemoteLogConfig { private final boolean remoteStorageEnable; @@ -187,15 +146,6 @@ public class LogConfig extends AbstractConfig { public static final long DEFAULT_LOCAL_RETENTION_BYTES = -2; // It indicates the value to be derived from RetentionBytes public static final long DEFAULT_LOCAL_RETENTION_MS = -2; // It indicates the value to be derived from RetentionMs - /* See `TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG` for details - * Keep DEFAULT_MESSAGE_FORMAT_VERSION as a way to handle the deprecated value */ - @Deprecated - public static final String DEFAULT_MESSAGE_FORMAT_VERSION = ServerLogConfigs.LOG_MESSAGE_FORMAT_VERSION_DEFAULT; - - /* See `TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG` for details */ - @SuppressWarnings("deprecation") - private static final String MESSAGE_FORMAT_VERSION_CONFIG = TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG; - // Visible for testing public static final Set CONFIGS_WITH_NO_SERVER_DEFAULTS = Set.of( TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, @@ -205,10 +155,6 @@ public class LogConfig extends AbstractConfig { QuotaConfig.FOLLOWER_REPLICATION_THROTTLED_REPLICAS_CONFIG ); - @SuppressWarnings("deprecation") - private static final String MESSAGE_FORMAT_VERSION_DOC = TopicConfig.MESSAGE_FORMAT_VERSION_DOC; - - @SuppressWarnings("deprecation") public static final ConfigDef SERVER_CONFIG_DEF = new ConfigDef() .define(ServerLogConfigs.NUM_PARTITIONS_CONFIG, INT, ServerLogConfigs.NUM_PARTITIONS_DEFAULT, atLeast(1), MEDIUM, ServerLogConfigs.NUM_PARTITIONS_DOC) .define(ServerLogConfigs.LOG_DIR_CONFIG, STRING, ServerLogConfigs.LOG_DIR_DEFAULT, HIGH, ServerLogConfigs.LOG_DIR_DOC) @@ -240,7 +186,6 @@ public class LogConfig extends AbstractConfig { .define(ServerLogConfigs.NUM_RECOVERY_THREADS_PER_DATA_DIR_CONFIG, INT, ServerLogConfigs.NUM_RECOVERY_THREADS_PER_DATA_DIR_DEFAULT, atLeast(1), HIGH, ServerLogConfigs.NUM_RECOVERY_THREADS_PER_DATA_DIR_DOC) .define(ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG, BOOLEAN, ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_DEFAULT, HIGH, ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_DOC) .define(ServerLogConfigs.MIN_IN_SYNC_REPLICAS_CONFIG, INT, ServerLogConfigs.MIN_IN_SYNC_REPLICAS_DEFAULT, atLeast(1), HIGH, ServerLogConfigs.MIN_IN_SYNC_REPLICAS_DOC) - .define(ServerLogConfigs.LOG_MESSAGE_FORMAT_VERSION_CONFIG, STRING, ServerLogConfigs.LOG_MESSAGE_FORMAT_VERSION_DEFAULT, new MetadataVersionValidator(), MEDIUM, ServerLogConfigs.LOG_MESSAGE_FORMAT_VERSION_DOC) .define(ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_TYPE_CONFIG, STRING, ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_TYPE_DEFAULT, ConfigDef.ValidString.in("CreateTime", "LogAppendTime"), MEDIUM, ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_TYPE_DOC) .define(ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_BEFORE_MAX_MS_CONFIG, LONG, ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_BEFORE_MAX_MS_DEFAULT, atLeast(0), MEDIUM, ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_BEFORE_MAX_MS_DOC) .define(ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_AFTER_MAX_MS_CONFIG, LONG, ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_AFTER_MAX_MS_DEFAULT, atLeast(0), MEDIUM, ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_AFTER_MAX_MS_DOC) @@ -297,8 +242,6 @@ public class LogConfig extends AbstractConfig { .define(TopicConfig.COMPRESSION_ZSTD_LEVEL_CONFIG, INT, CompressionType.ZSTD.defaultLevel(), CompressionType.ZSTD.levelValidator(), MEDIUM, TopicConfig.COMPRESSION_ZSTD_LEVEL_DOC) .define(TopicConfig.PREALLOCATE_CONFIG, BOOLEAN, DEFAULT_PREALLOCATE, MEDIUM, TopicConfig.PREALLOCATE_DOC) - .define(MESSAGE_FORMAT_VERSION_CONFIG, STRING, DEFAULT_MESSAGE_FORMAT_VERSION, new MetadataVersionValidator(), MEDIUM, - MESSAGE_FORMAT_VERSION_DOC) .define(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG, STRING, ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_TYPE_DEFAULT, in("CreateTime", "LogAppendTime"), MEDIUM, TopicConfig.MESSAGE_TIMESTAMP_TYPE_DOC) .define(TopicConfig.MESSAGE_TIMESTAMP_BEFORE_MAX_MS_CONFIG, LONG, ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_BEFORE_MAX_MS_DEFAULT, @@ -347,10 +290,6 @@ public class LogConfig extends AbstractConfig { public final Optional compression; public final boolean preallocate; - /* See `TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG` for details regarding the deprecation */ - @Deprecated - public final MetadataVersion messageFormatVersion; - public final TimestampType messageTimestampType; public final long messageTimestampBeforeMaxMs; @@ -366,7 +305,7 @@ public class LogConfig extends AbstractConfig { this(props, Collections.emptySet()); } - @SuppressWarnings({"deprecation", "this-escape"}) + @SuppressWarnings({"this-escape"}) public LogConfig(Map props, Set overriddenConfigs) { super(CONFIG, props, false); this.props = Collections.unmodifiableMap(props); @@ -400,7 +339,6 @@ public class LogConfig extends AbstractConfig { this.compressionType = BrokerCompressionType.forName(getString(TopicConfig.COMPRESSION_TYPE_CONFIG)); this.compression = getCompression(); this.preallocate = getBoolean(TopicConfig.PREALLOCATE_CONFIG); - this.messageFormatVersion = MetadataVersion.fromVersionString(getString(TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG)); this.messageTimestampType = TimestampType.forName(getString(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG)); this.messageTimestampBeforeMaxMs = getLong(TopicConfig.MESSAGE_TIMESTAMP_BEFORE_MAX_MS_CONFIG); this.messageTimestampAfterMaxMs = getLong(TopicConfig.MESSAGE_TIMESTAMP_AFTER_MAX_MS_CONFIG); @@ -435,10 +373,6 @@ public class LogConfig extends AbstractConfig { } } - public RecordVersion recordVersion() { - return messageFormatVersion.highestSupportedRecordVersion(); - } - // Exposed as a method so it can be mocked public int maxMessageSize() { return maxMessageSize; @@ -739,7 +673,6 @@ public class LogConfig extends AbstractConfig { ", minInSyncReplicas=" + minInSyncReplicas + ", compressionType='" + compressionType + '\'' + ", preallocate=" + preallocate + - ", messageFormatVersion=" + messageFormatVersion + ", messageTimestampType=" + messageTimestampType + ", leaderReplicationThrottledReplicas=" + leaderReplicationThrottledReplicas + ", followerReplicationThrottledReplicas=" + followerReplicationThrottledReplicas + diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogLoader.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogLoader.java index db631b25cf9..1ba58d1b2a9 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogLoader.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogLoader.java @@ -238,7 +238,6 @@ public class LogLoader { segments, newLogStartOffset, recoveryOffsets.nextOffset, - config.recordVersion(), time, hadCleanShutdown, logPrefix); @@ -408,7 +407,6 @@ public class LogLoader { segments, logStartOffsetCheckpoint, segment.baseOffset(), - config.recordVersion(), time, false, logPrefix); diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java index 6ad75a91d40..eaf700aee38 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java @@ -19,7 +19,6 @@ package org.apache.kafka.storage.internals.log; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.record.FileRecords; import org.apache.kafka.common.record.RecordBatch; -import org.apache.kafka.common.record.RecordVersion; import org.apache.kafka.common.record.Records; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; @@ -58,7 +57,6 @@ public class UnifiedLog { LogSegments segments, long logStartOffset, long lastOffset, - RecordVersion recordVersion, Time time, boolean reloadFromCleanShutdown, String logPrefix) throws IOException { @@ -72,22 +70,20 @@ public class UnifiedLog { } offsetsToSnapshot.add(Optional.of(lastOffset)); - LOG.info("{}Loading producer state till offset {} with message format version {}", logPrefix, lastOffset, recordVersion.value); + LOG.info("{}Loading producer state till offset {}", logPrefix, lastOffset); // We want to avoid unnecessary scanning of the log to build the producer state when the broker is being // upgraded. The basic idea is to use the absence of producer snapshot files to detect the upgrade case, - // but we have to be careful not to assume too much in the presence of broker failures. The two most common - // upgrade cases in which we expect to find no snapshots are the following: + // but we have to be careful not to assume too much in the presence of broker failures. The most common + // upgrade case in which we expect to find no snapshots is the following: // - // 1. The broker has been upgraded, but the topic is still on the old message format. - // 2. The broker has been upgraded, the topic is on the new message format, and we had a clean shutdown. + // * The broker has been upgraded, and we had a clean shutdown. // - // If we hit either of these cases, we skip producer state loading and write a new snapshot at the log end + // If we hit this case, we skip producer state loading and write a new snapshot at the log end // offset (see below). The next time the log is reloaded, we will load producer state using this snapshot // (or later snapshots). Otherwise, if there is no snapshot file, then we have to rebuild producer state // from the first segment. - if (recordVersion.value < RecordBatch.MAGIC_VALUE_V2 || - (!producerStateManager.latestSnapshotOffset().isPresent() && reloadFromCleanShutdown)) { + if (!producerStateManager.latestSnapshotOffset().isPresent() && reloadFromCleanShutdown) { // To avoid an expensive scan through all the segments, we take empty snapshots from the start of the // last two segments and the last offset. This should avoid the full scan in the case that the log needs // truncation. diff --git a/tools/src/main/java/org/apache/kafka/tools/TopicCommand.java b/tools/src/main/java/org/apache/kafka/tools/TopicCommand.java index f9dc9b8dacf..d225749fa79 100644 --- a/tools/src/main/java/org/apache/kafka/tools/TopicCommand.java +++ b/tools/src/main/java/org/apache/kafka/tools/TopicCommand.java @@ -174,11 +174,6 @@ public abstract class TopicCommand { configsToBeAdded.stream() .forEach(pair -> props.setProperty(pair.get(0).trim(), pair.get(1).trim())); LogConfig.validate(props); - if (props.containsKey(TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG)) { - System.out.println("WARNING: The configuration ${TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG}=${props.getProperty(TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG)} is specified. " + - "This configuration will be ignored if the version is newer than the inter.broker.protocol.version specified in the broker or " + - "if the inter.broker.protocol.version is 3.0 or newer. This configuration is deprecated and it will be removed in Apache Kafka 4.0."); - } return props; }