KAFKA-17393: Remove log.message.format.version/message.format.version (KIP-724) (#18267)

Based on [KIP-724](https://cwiki.apache.org/confluence/display/KAFKA/KIP-724%3A+Drop+support+for+message+formats+v0+and+v1), the `log.message.format.version` and `message.format.version` can be removed in 4.0.

These configs effectively a no-op with inter-broker protocol version 3.0 or higher
since Apache Kafka 3.0, so the impact should be minimal.

Reviewers: Ismael Juma <ismael@juma.me.uk>
This commit is contained in:
PoAn Yang 2024-12-22 07:35:15 +08:00 committed by GitHub
parent 8bd3746e0c
commit b4be178599
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
40 changed files with 214 additions and 1024 deletions

View File

@ -198,26 +198,6 @@ public class TopicConfig {
public static final String PREALLOCATE_DOC = "True if we should preallocate the file on disk when " +
"creating a new log segment.";
/**
* @deprecated since 3.0, removal planned in 4.0. The default value for this config is appropriate
* for most situations.
*/
@Deprecated
public static final String MESSAGE_FORMAT_VERSION_CONFIG = "message.format.version";
/**
* @deprecated since 3.0, removal planned in 4.0. The default value for this config is appropriate
* for most situations.
*/
@Deprecated
public static final String MESSAGE_FORMAT_VERSION_DOC = "[DEPRECATED] Specify the message format version the broker " +
"will use to append messages to the logs. The value of this config is always assumed to be `3.0` if " +
"`inter.broker.protocol.version` is 3.0 or higher (the actual config value is ignored). Otherwise, the value should " +
"be a valid ApiVersion. Some examples are: 0.10.0, 1.1, 2.8, 3.0. By setting a particular message format version, the " +
"user is certifying that all the existing messages on disk are smaller or equal than the specified version. Setting " +
"this value incorrectly will cause consumers with older versions to break as they will receive messages with a format " +
"that they don't understand.";
public static final String MESSAGE_TIMESTAMP_TYPE_CONFIG = "message.timestamp.type";
public static final String MESSAGE_TIMESTAMP_TYPE_DOC = "Define whether the timestamp in the message is " +
"message create time or log append time.";

View File

@ -857,14 +857,13 @@ public class CoordinatorRuntime<S extends CoordinatorShard<U>, U> implements Aut
) {
if (currentBatch == null) {
LogConfig logConfig = partitionWriter.config(tp);
byte magic = logConfig.recordVersion().value;
int maxBatchSize = logConfig.maxMessageSize();
long prevLastWrittenOffset = coordinator.lastWrittenOffset();
ByteBuffer buffer = bufferSupplier.get(maxBatchSize);
MemoryRecordsBuilder builder = new MemoryRecordsBuilder(
buffer,
magic,
RecordBatch.CURRENT_MAGIC_VALUE,
compression,
TimestampType.CREATE_TIME,
0L,

View File

@ -25,7 +25,7 @@ import kafka.server.DynamicConfig
import kafka.utils.Implicits._
import kafka.utils.Logging
import org.apache.kafka.clients.admin.{Admin, AlterClientQuotasOptions, AlterConfigOp, AlterConfigsOptions, ConfigEntry, DescribeClusterOptions, DescribeConfigsOptions, ListTopicsOptions, ScramCredentialInfo, UserScramCredentialDeletion, UserScramCredentialUpsertion, ScramMechanism => PublicScramMechanism}
import org.apache.kafka.common.config.{ConfigResource, TopicConfig}
import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.errors.{InvalidConfigurationException, UnsupportedVersionException}
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.protocol.ApiKeys
@ -36,7 +36,6 @@ import org.apache.kafka.server.config.{ConfigType, QuotaConfig}
import org.apache.kafka.server.util.{CommandDefaultOptions, CommandLineUtils}
import org.apache.kafka.storage.internals.log.LogConfig
import scala.annotation.nowarn
import scala.jdk.CollectionConverters._
import scala.collection._
@ -95,8 +94,6 @@ object ConfigCommand extends Logging {
}
}
@nowarn("cat=deprecation")
def parseConfigsToBeAdded(opts: ConfigCommandOptions): Properties = {
val props = new Properties
if (opts.options.has(opts.addConfigFile)) {
@ -115,11 +112,6 @@ object ConfigCommand extends Logging {
//Create properties, parsing square brackets from values if necessary
configsToBeAdded.foreach(pair => props.setProperty(pair(0).trim, pair(1).replaceAll("\\[?\\]?", "").trim))
}
if (props.containsKey(TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG)) {
System.out.println(s"WARNING: The configuration ${TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG}=${props.getProperty(TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG)} is specified. " +
"This configuration will be ignored if the version is newer than the inter.broker.protocol.version specified in the broker or " +
"if the inter.broker.protocol.version is 3.0 or newer. This configuration is deprecated and it will be removed in Apache Kafka 4.0.")
}
validatePropsKey(props)
props
}

View File

@ -24,6 +24,7 @@ import java.util.concurrent.locks.ReentrantLock
import java.util.concurrent.{CompletableFuture, ConcurrentHashMap}
import java.util.function.Supplier
import com.yammer.metrics.core.Gauge
import kafka.cluster.Partition
import kafka.coordinator.group.GroupMetadataManager.maybeConvertOffsetCommitError
import kafka.server.ReplicaManager
import kafka.utils.CoreUtils.inLock
@ -240,79 +241,78 @@ class GroupMetadataManager(brokerId: Int,
groupAssignment: Map[String, Array[Byte]],
responseCallback: Errors => Unit,
requestLocal: RequestLocal = RequestLocal.noCaching): Unit = {
getMagic(partitionFor(group.groupId)) match {
case Some(magicValue) =>
// We always use CREATE_TIME, like the producer. The conversion to LOG_APPEND_TIME (if necessary) happens automatically.
val timestampType = TimestampType.CREATE_TIME
val timestamp = time.milliseconds()
val key = GroupMetadataManager.groupMetadataKey(group.groupId)
val value = GroupMetadataManager.groupMetadataValue(group, groupAssignment, interBrokerProtocolVersion)
if (onlinePartition(partitionFor(group.groupId)).isDefined) {
// We always use CREATE_TIME, like the producer. The conversion to LOG_APPEND_TIME (if necessary) happens automatically.
val timestampType = TimestampType.CREATE_TIME
val timestamp = time.milliseconds()
val key = GroupMetadataManager.groupMetadataKey(group.groupId)
val value = GroupMetadataManager.groupMetadataValue(group, groupAssignment, interBrokerProtocolVersion)
val records = {
val buffer = ByteBuffer.allocate(AbstractRecords.estimateSizeInBytes(magicValue, compression.`type`(),
Seq(new SimpleRecord(timestamp, key, value)).asJava))
val builder = MemoryRecords.builder(buffer, magicValue, compression, timestampType, 0L)
builder.append(timestamp, key, value)
builder.build()
}
val records = {
val buffer = ByteBuffer.allocate(AbstractRecords.estimateSizeInBytes(RecordBatch.CURRENT_MAGIC_VALUE, compression.`type`(),
Seq(new SimpleRecord(timestamp, key, value)).asJava))
val builder = MemoryRecords.builder(buffer, RecordBatch.CURRENT_MAGIC_VALUE, compression, timestampType, 0L)
builder.append(timestamp, key, value)
builder.build()
}
val groupMetadataPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, partitionFor(group.groupId))
val groupMetadataRecords = Map(groupMetadataPartition -> records)
val generationId = group.generationId
val groupMetadataPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, partitionFor(group.groupId))
val groupMetadataRecords = Map(groupMetadataPartition -> records)
val generationId = group.generationId
// set the callback function to insert the created group into cache after log append completed
def putCacheCallback(responseStatus: Map[TopicPartition, PartitionResponse]): Unit = {
// the append response should only contain the topics partition
if (responseStatus.size != 1 || !responseStatus.contains(groupMetadataPartition))
throw new IllegalStateException("Append status %s should only have one partition %s"
.format(responseStatus, groupMetadataPartition))
// set the callback function to insert the created group into cache after log append completed
def putCacheCallback(responseStatus: Map[TopicPartition, PartitionResponse]): Unit = {
// the append response should only contain the topics partition
if (responseStatus.size != 1 || !responseStatus.contains(groupMetadataPartition))
throw new IllegalStateException("Append status %s should only have one partition %s"
.format(responseStatus, groupMetadataPartition))
// construct the error status in the propagated assignment response in the cache
val status = responseStatus(groupMetadataPartition)
// construct the error status in the propagated assignment response in the cache
val status = responseStatus(groupMetadataPartition)
val responseError = if (status.error == Errors.NONE) {
Errors.NONE
} else {
debug(s"Metadata from group ${group.groupId} with generation $generationId failed when appending to log " +
s"due to ${status.error.exceptionName}")
val responseError = if (status.error == Errors.NONE) {
Errors.NONE
} else {
debug(s"Metadata from group ${group.groupId} with generation $generationId failed when appending to log " +
s"due to ${status.error.exceptionName}")
// transform the log append error code to the corresponding the commit status error code
status.error match {
case Errors.UNKNOWN_TOPIC_OR_PARTITION
| Errors.NOT_ENOUGH_REPLICAS
| Errors.NOT_ENOUGH_REPLICAS_AFTER_APPEND =>
Errors.COORDINATOR_NOT_AVAILABLE
// transform the log append error code to the corresponding the commit status error code
status.error match {
case Errors.UNKNOWN_TOPIC_OR_PARTITION
| Errors.NOT_ENOUGH_REPLICAS
| Errors.NOT_ENOUGH_REPLICAS_AFTER_APPEND =>
Errors.COORDINATOR_NOT_AVAILABLE
case Errors.NOT_LEADER_OR_FOLLOWER
| Errors.KAFKA_STORAGE_ERROR =>
Errors.NOT_COORDINATOR
case Errors.NOT_LEADER_OR_FOLLOWER
| Errors.KAFKA_STORAGE_ERROR =>
Errors.NOT_COORDINATOR
case Errors.REQUEST_TIMED_OUT =>
Errors.REBALANCE_IN_PROGRESS
case Errors.REQUEST_TIMED_OUT =>
Errors.REBALANCE_IN_PROGRESS
case Errors.MESSAGE_TOO_LARGE
| Errors.RECORD_LIST_TOO_LARGE
| Errors.INVALID_FETCH_SIZE =>
case Errors.MESSAGE_TOO_LARGE
| Errors.RECORD_LIST_TOO_LARGE
| Errors.INVALID_FETCH_SIZE =>
error(s"Appending metadata message for group ${group.groupId} generation $generationId failed due to " +
s"${status.error.exceptionName}, returning UNKNOWN error code to the client")
error(s"Appending metadata message for group ${group.groupId} generation $generationId failed due to " +
s"${status.error.exceptionName}, returning UNKNOWN error code to the client")
Errors.UNKNOWN_SERVER_ERROR
Errors.UNKNOWN_SERVER_ERROR
case other =>
error(s"Appending metadata message for group ${group.groupId} generation $generationId failed " +
s"due to unexpected error: ${status.error.exceptionName}")
case other =>
error(s"Appending metadata message for group ${group.groupId} generation $generationId failed " +
s"due to unexpected error: ${status.error.exceptionName}")
other
}
other
}
responseCallback(responseError)
}
appendForGroup(group, groupMetadataRecords, requestLocal, putCacheCallback)
case None =>
responseCallback(Errors.NOT_COORDINATOR)
responseCallback(responseError)
}
appendForGroup(group, groupMetadataRecords, requestLocal, putCacheCallback)
} else {
responseCallback(Errors.NOT_COORDINATOR)
}
}
@ -464,8 +464,7 @@ class GroupMetadataManager(brokerId: Int,
return
}
val magicOpt = getMagic(partitionFor(group.groupId))
if (magicOpt.isEmpty) {
if (onlinePartition(partitionFor(group.groupId)).isEmpty) {
val commitStatus = offsetMetadata.map { case (topicIdPartition, _) =>
(topicIdPartition, Errors.NOT_COORDINATOR)
}
@ -474,7 +473,7 @@ class GroupMetadataManager(brokerId: Int,
}
val isTxnOffsetCommit = producerId != RecordBatch.NO_PRODUCER_ID
val records = generateOffsetRecords(magicOpt.get, isTxnOffsetCommit, group.groupId, offsetTopicPartition, filteredOffsetMetadata, producerId, producerEpoch)
val records = generateOffsetRecords(RecordBatch.CURRENT_MAGIC_VALUE, isTxnOffsetCommit, group.groupId, offsetTopicPartition, filteredOffsetMetadata, producerId, producerEpoch)
val putCacheCallback = createPutCacheCallback(isTxnOffsetCommit, group, consumerId, offsetMetadata, filteredOffsetMetadata, responseCallback, producerId, records)
val verificationGuards = verificationGuard.map(guard => offsetTopicPartition -> guard).toMap
@ -868,56 +867,55 @@ class GroupMetadataManager(brokerId: Int,
(removedOffsets, group.is(Dead), group.generationId)
}
val offsetsPartition = partitionFor(groupId)
val appendPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, offsetsPartition)
getMagic(offsetsPartition) match {
case Some(magicValue) =>
// We always use CREATE_TIME, like the producer. The conversion to LOG_APPEND_TIME (if necessary) happens automatically.
val timestampType = TimestampType.CREATE_TIME
val timestamp = time.milliseconds()
val offsetsPartition = partitionFor(groupId)
val appendPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, offsetsPartition)
onlinePartition(offsetsPartition) match {
case Some(partition) =>
// We always use CREATE_TIME, like the producer. The conversion to LOG_APPEND_TIME (if necessary) happens automatically.
val timestampType = TimestampType.CREATE_TIME
val timestamp = time.milliseconds()
replicaManager.onlinePartition(appendPartition).foreach { partition =>
val tombstones = ArrayBuffer.empty[SimpleRecord]
removedOffsets.foreachEntry { (topicPartition, offsetAndMetadata) =>
trace(s"Removing expired/deleted offset and metadata for $groupId, $topicPartition: $offsetAndMetadata")
val commitKey = GroupMetadataManager.offsetCommitKey(groupId, topicPartition)
tombstones += new SimpleRecord(timestamp, commitKey, null)
}
trace(s"Marked ${removedOffsets.size} offsets in $appendPartition for deletion.")
val tombstones = ArrayBuffer.empty[SimpleRecord]
removedOffsets.foreachEntry { (topicPartition, offsetAndMetadata) =>
trace(s"Removing expired/deleted offset and metadata for $groupId, $topicPartition: $offsetAndMetadata")
val commitKey = GroupMetadataManager.offsetCommitKey(groupId, topicPartition)
tombstones += new SimpleRecord(timestamp, commitKey, null)
}
trace(s"Marked ${removedOffsets.size} offsets in $appendPartition for deletion.")
// We avoid writing the tombstone when the generationId is 0, since this group is only using
// Kafka for offset storage.
if (groupIsDead && groupMetadataCache.remove(groupId, group) && generation > 0) {
// Append the tombstone messages to the partition. It is okay if the replicas don't receive these (say,
// if we crash or leaders move) since the new leaders will still expire the consumers with heartbeat and
// retry removing this group.
val groupMetadataKey = GroupMetadataManager.groupMetadataKey(group.groupId)
tombstones += new SimpleRecord(timestamp, groupMetadataKey, null)
trace(s"Group $groupId removed from the metadata cache and marked for deletion in $appendPartition.")
}
// We avoid writing the tombstone when the generationId is 0, since this group is only using
// Kafka for offset storage.
if (groupIsDead && groupMetadataCache.remove(groupId, group) && generation > 0) {
// Append the tombstone messages to the partition. It is okay if the replicas don't receive these (say,
// if we crash or leaders move) since the new leaders will still expire the consumers with heartbeat and
// retry removing this group.
val groupMetadataKey = GroupMetadataManager.groupMetadataKey(group.groupId)
tombstones += new SimpleRecord(timestamp, groupMetadataKey, null)
trace(s"Group $groupId removed from the metadata cache and marked for deletion in $appendPartition.")
}
if (tombstones.nonEmpty) {
try {
// do not need to require acks since even if the tombstone is lost,
// it will be appended again in the next purge cycle
val records = MemoryRecords.withRecords(magicValue, 0L, compression, timestampType, tombstones.toArray: _*)
partition.appendRecordsToLeader(records, origin = AppendOrigin.COORDINATOR, requiredAcks = 0,
requestLocal = requestLocal)
if (tombstones.nonEmpty) {
try {
// do not need to require acks since even if the tombstone is lost,
// it will be appended again in the next purge cycle
val records = MemoryRecords.withRecords(RecordBatch.CURRENT_MAGIC_VALUE, 0L, compression, timestampType, tombstones.toArray: _*)
partition.appendRecordsToLeader(records, origin = AppendOrigin.COORDINATOR, requiredAcks = 0,
requestLocal = requestLocal)
offsetsRemoved += removedOffsets.size
trace(s"Successfully appended ${tombstones.size} tombstones to $appendPartition for expired/deleted " +
s"offsets and/or metadata for group $groupId")
} catch {
case t: Throwable =>
error(s"Failed to append ${tombstones.size} tombstones to $appendPartition for expired/deleted " +
s"offsets and/or metadata for group $groupId.", t)
// ignore and continue
}
offsetsRemoved += removedOffsets.size
trace(s"Successfully appended ${tombstones.size} tombstones to $appendPartition for expired/deleted " +
s"offsets and/or metadata for group $groupId")
} catch {
case t: Throwable =>
error(s"Failed to append ${tombstones.size} tombstones to $appendPartition for expired/deleted " +
s"offsets and/or metadata for group $groupId.", t)
// ignore and continue
}
}
case None =>
info(s"BrokerId $brokerId is no longer a coordinator for the group $groupId. Proceeding cleanup for other alive groups")
case None =>
info(s"BrokerId $brokerId is no longer a coordinator for the group $groupId. Proceeding cleanup for other alive groups")
}
}
@ -1002,14 +1000,8 @@ class GroupMetadataManager(brokerId: Int,
// TODO: clear the caches
}
/**
* Check if the replica is local and return the message format version
*
* @param partition Partition of GroupMetadataTopic
* @return Some(MessageFormatVersion) if replica is local, None otherwise
*/
private def getMagic(partition: Int): Option[Byte] =
replicaManager.getMagic(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, partition))
private def onlinePartition(partition: Int): Option[Partition] =
replicaManager.onlinePartition(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, partition))
/**
* Add a partition to the owned partition set.

View File

@ -35,14 +35,12 @@ import scala.jdk.CollectionConverters._
import scala.collection._
import scala.collection.mutable.ArrayBuffer
import scala.util.{Failure, Success, Try}
import org.apache.kafka.common.config.TopicConfig
import org.apache.kafka.common.requests.{AbstractControlRequest, LeaderAndIsrRequest}
import org.apache.kafka.image.TopicsImage
import org.apache.kafka.metadata.properties.{MetaProperties, MetaPropertiesEnsemble, PropertiesUtils}
import java.util.{Collections, OptionalLong, Properties}
import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.storage.internals.log.LogConfig.MessageFormatVersion
import org.apache.kafka.server.metrics.KafkaMetricsGroup
import org.apache.kafka.server.util.{FileLock, Scheduler}
import org.apache.kafka.storage.internals.log.{CleanerConfig, LogConfig, LogDirFailureChannel, ProducerStateManagerConfig, RemoteIndexCache}
@ -50,7 +48,6 @@ import org.apache.kafka.storage.internals.checkpoint.{CleanShutdownFileHandler,
import org.apache.kafka.storage.log.metrics.BrokerTopicStats
import java.util
import scala.annotation.nowarn
/**
* The entry point to the kafka log management subsystem. The log manager is responsible for log creation, retrieval, and cleaning.
@ -583,27 +580,13 @@ class LogManager(logDirs: Seq[File],
}
// visible for testing
@nowarn("cat=deprecation")
private[log] def fetchTopicConfigOverrides(defaultConfig: LogConfig, topicNames: Set[String]): Map[String, LogConfig] = {
val topicConfigOverrides = mutable.Map[String, LogConfig]()
val defaultProps = defaultConfig.originals()
topicNames.foreach { topicName =>
var overrides = configRepository.topicConfig(topicName)
val overrides = configRepository.topicConfig(topicName)
// save memory by only including configs for topics with overrides
if (!overrides.isEmpty) {
Option(overrides.getProperty(TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG)).foreach { versionString =>
val messageFormatVersion = new MessageFormatVersion(versionString, interBrokerProtocolVersion.version)
if (messageFormatVersion.shouldIgnore) {
val copy = new Properties()
copy.putAll(overrides)
copy.remove(TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG)
overrides = copy
if (messageFormatVersion.shouldWarn)
warn(messageFormatVersion.topicWarningMessage(topicName))
}
}
val logConfig = LogConfig.fromProps(defaultProps, overrides)
topicConfigOverrides(topicName) = logConfig
}

View File

@ -30,7 +30,6 @@ import org.apache.kafka.common.requests.ProduceResponse.RecordError
import org.apache.kafka.common.utils.{PrimitiveRef, Time, Utils}
import org.apache.kafka.common.{InvalidRecordException, KafkaException, TopicPartition, Uuid}
import org.apache.kafka.server.common.{MetadataVersion, OffsetAndEpoch, RequestLocal}
import org.apache.kafka.server.common.MetadataVersion.IBP_0_10_0_IV0
import org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig
import org.apache.kafka.server.metrics.KafkaMetricsGroup
import org.apache.kafka.server.record.BrokerCompressionType
@ -49,7 +48,6 @@ import java.util
import java.util.concurrent.{ConcurrentHashMap, ConcurrentMap, ScheduledFuture}
import java.util.stream.Collectors
import java.util.{Collections, Optional, OptionalInt, OptionalLong}
import scala.annotation.nowarn
import scala.collection.mutable.{ArrayBuffer, ListBuffer}
import scala.collection.{Seq, mutable}
import scala.jdk.CollectionConverters._
@ -254,10 +252,6 @@ class UnifiedLog(@volatile var logStartOffset: Long,
def updateConfig(newConfig: LogConfig): LogConfig = {
val oldConfig = localLog.config
localLog.updateConfig(newConfig)
val oldRecordVersion = oldConfig.recordVersion
val newRecordVersion = newConfig.recordVersion
if (newRecordVersion != oldRecordVersion)
initializeLeaderEpochCache()
oldConfig
}
@ -480,8 +474,6 @@ class UnifiedLog(@volatile var logStartOffset: Long,
updateHighWatermark(localLog.logEndOffsetMetadata)
}
private def recordVersion: RecordVersion = config.recordVersion
private def initializePartitionMetadata(): Unit = lock synchronized {
val partitionMetadata = PartitionMetadataFile.newFile(dir)
partitionMetadataFile = Some(new PartitionMetadataFile(partitionMetadata, logDirFailureChannel))
@ -518,7 +510,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
private def initializeLeaderEpochCache(): Unit = lock synchronized {
leaderEpochCache = UnifiedLog.maybeCreateLeaderEpochCache(
dir, topicPartition, logDirFailureChannel, recordVersion, logIdent, leaderEpochCache, scheduler)
dir, topicPartition, logDirFailureChannel, logIdent, leaderEpochCache, scheduler)
}
private def updateHighWatermarkWithLogEndOffset(): Unit = {
@ -552,7 +544,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
private def rebuildProducerState(lastOffset: Long,
producerStateManager: ProducerStateManager): Unit = lock synchronized {
localLog.checkIfMemoryMappedBufferClosed()
JUnifiedLog.rebuildProducerState(producerStateManager, localLog.segments, logStartOffset, lastOffset, recordVersion, time, false, logIdent)
JUnifiedLog.rebuildProducerState(producerStateManager, localLog.segments, logStartOffset, lastOffset, time, false, logIdent)
}
@threadsafe
@ -795,7 +787,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
appendInfo.sourceCompression,
targetCompression,
config.compact,
config.recordVersion.value,
RecordBatch.CURRENT_MAGIC_VALUE,
config.messageTimestampType,
config.messageTimestampBeforeMaxMs,
config.messageTimestampAfterMaxMs,
@ -1269,18 +1261,10 @@ class UnifiedLog(@volatile var logStartOffset: Long,
* <li>All special timestamp offset results are returned immediately irrespective of the remote storage.
* </ul>
*/
@nowarn("cat=deprecation")
def fetchOffsetByTimestamp(targetTimestamp: Long, remoteLogManager: Option[RemoteLogManager] = None): OffsetResultHolder = {
maybeHandleIOException(s"Error while fetching offset by timestamp for $topicPartition in dir ${dir.getParent}") {
debug(s"Searching offset for timestamp $targetTimestamp")
if (config.messageFormatVersion.isLessThan(IBP_0_10_0_IV0) &&
targetTimestamp != ListOffsetsRequest.EARLIEST_TIMESTAMP &&
targetTimestamp != ListOffsetsRequest.LATEST_TIMESTAMP)
throw new UnsupportedForMessageFormatException(s"Cannot search offsets based on timestamp because message format version " +
s"for partition $topicPartition is ${config.messageFormatVersion} which is earlier than the minimum " +
s"required version $IBP_0_10_0_IV0")
// For the earliest and latest, we do not need to return the timestamp.
if (targetTimestamp == ListOffsetsRequest.EARLIEST_TIMESTAMP ||
(!remoteLogEnabled() && targetTimestamp == ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP)) {
@ -1354,9 +1338,6 @@ class UnifiedLog(@volatile var logStartOffset: Long,
if (remoteLogManager.isEmpty) {
throw new KafkaException("RemoteLogManager is empty even though the remote log storage is enabled.")
}
if (recordVersion.value < RecordVersion.V2.value) {
throw new KafkaException("Tiered storage is supported only with versions supporting leader epochs, that means RecordVersion must be >= 2.")
}
val asyncOffsetReadFutureHolder = remoteLogManager.get.asyncOffsetRead(topicPartition, targetTimestamp,
logStartOffset, leaderEpochCache.get, () => searchOffsetInLocalLog(targetTimestamp, localLogStartOffset()))
@ -2038,7 +2019,6 @@ object UnifiedLog extends Logging {
dir,
topicPartition,
logDirFailureChannel,
config.recordVersion,
s"[UnifiedLog partition=$topicPartition, dir=${dir.getParent}] ",
None,
scheduler)
@ -2100,7 +2080,6 @@ object UnifiedLog extends Logging {
* @param dir The directory in which the log will reside
* @param topicPartition The topic partition
* @param logDirFailureChannel The LogDirFailureChannel to asynchronously handle log dir failure
* @param recordVersion The record version
* @param logPrefix The logging prefix
* @param currentCache The current LeaderEpochFileCache instance (if any)
* @param scheduler The scheduler for executing asynchronous tasks
@ -2109,23 +2088,13 @@ object UnifiedLog extends Logging {
def maybeCreateLeaderEpochCache(dir: File,
topicPartition: TopicPartition,
logDirFailureChannel: LogDirFailureChannel,
recordVersion: RecordVersion,
logPrefix: String,
currentCache: Option[LeaderEpochFileCache],
scheduler: Scheduler): Option[LeaderEpochFileCache] = {
val leaderEpochFile = LeaderEpochCheckpointFile.newFile(dir)
if (recordVersion.precedes(RecordVersion.V2)) {
if (leaderEpochFile.exists()) {
warn(s"${logPrefix}Deleting non-empty leader epoch cache due to incompatible message format $recordVersion")
}
Files.deleteIfExists(leaderEpochFile.toPath)
None
} else {
val checkpointFile = new LeaderEpochCheckpointFile(leaderEpochFile, logDirFailureChannel)
currentCache.map(_.withCheckpoint(checkpointFile))
.orElse(Some(new LeaderEpochFileCache(topicPartition, checkpointFile, scheduler)))
}
val checkpointFile = new LeaderEpochCheckpointFile(leaderEpochFile, logDirFailureChannel)
currentCache.map(_.withCheckpoint(checkpointFile))
.orElse(Some(new LeaderEpochFileCache(topicPartition, checkpointFile, scheduler)))
}
private[log] def replaceSegments(existingSegments: LogSegments,

View File

@ -25,7 +25,6 @@ import kafka.network.ConnectionQuotas
import kafka.server.QuotaFactory.QuotaManagers
import kafka.utils.Logging
import org.apache.kafka.server.config.{QuotaConfig, ReplicationConfigs, ZooKeeperInternals}
import org.apache.kafka.common.config.TopicConfig
import org.apache.kafka.common.metrics.Quota
import org.apache.kafka.common.metrics.Quota._
import org.apache.kafka.common.utils.Sanitizer
@ -34,9 +33,7 @@ import org.apache.kafka.security.CredentialProvider
import org.apache.kafka.server.ClientMetricsManager
import org.apache.kafka.server.common.StopPartition
import org.apache.kafka.storage.internals.log.{LogStartOffsetIncrementReason, ThrottledReplicaListValidator}
import org.apache.kafka.storage.internals.log.LogConfig.MessageFormatVersion
import scala.annotation.nowarn
import scala.jdk.CollectionConverters._
import scala.collection.Seq
import scala.util.Try
@ -60,19 +57,13 @@ class TopicConfigHandler(private val replicaManager: ReplicaManager,
private def updateLogConfig(topic: String,
topicConfig: Properties): Unit = {
val logManager = replicaManager.logManager
// Validate the configurations.
val configNamesToExclude = excludedConfigs(topic, topicConfig)
val props = new Properties()
topicConfig.asScala.foreachEntry { (key, value) =>
if (!configNamesToExclude.contains(key)) props.put(key, value)
}
val logs = logManager.logsByTopic(topic)
val wasRemoteLogEnabled = logs.exists(_.remoteLogEnabled())
val wasCopyDisabled = logs.exists(_.config.remoteLogCopyDisable())
// kafkaController is only defined in Zookeeper's mode
logManager.updateTopicConfig(topic, props, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(),
logManager.updateTopicConfig(topic, topicConfig, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(),
wasRemoteLogEnabled, kafkaController.isDefined)
maybeUpdateRemoteLogComponents(topic, logs, wasRemoteLogEnabled, wasCopyDisabled)
}
@ -158,24 +149,6 @@ class TopicConfigHandler(private val replicaManager: ReplicaManager,
.map(_ (0).toInt).toSeq //convert to list of partition ids
}
}
@nowarn("cat=deprecation")
private def excludedConfigs(topic: String, topicConfig: Properties): Set[String] = {
// Verify message format version
Option(topicConfig.getProperty(TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG)).flatMap { versionString =>
val messageFormatVersion = new MessageFormatVersion(versionString, kafkaConfig.interBrokerProtocolVersion.version)
if (messageFormatVersion.shouldIgnore) {
if (messageFormatVersion.shouldWarn)
warn(messageFormatVersion.topicWarningMessage(topic))
Some(TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG)
} else if (kafkaConfig.interBrokerProtocolVersion.isLessThan(messageFormatVersion.messageFormatVersion)) {
warn(s"Topic configuration ${TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG} is ignored for `$topic` because `$versionString` " +
s"is higher than what is allowed by the inter-broker protocol version `${kafkaConfig.interBrokerProtocolVersionString}`")
Some(TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG)
} else
None
}.toSet
}
}

View File

@ -29,7 +29,7 @@ import kafka.utils.{CoreUtils, Logging}
import kafka.zk.{AdminZkClient, KafkaZkClient}
import org.apache.kafka.common.Reconfigurable
import org.apache.kafka.common.config.internals.BrokerSecurityConfigs
import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, ConfigException, SaslConfigs, SslConfigs, TopicConfig}
import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, ConfigException, SaslConfigs, SslConfigs}
import org.apache.kafka.common.metrics.{Metrics, MetricsReporter}
import org.apache.kafka.common.config.types.Password
import org.apache.kafka.common.network.{ListenerName, ListenerReconfigurable}
@ -662,24 +662,13 @@ trait BrokerReconfigurable {
}
object DynamicLogConfig {
/**
* The log configurations that are non-reconfigurable. This set contains the names you
* would use when setting a dynamic configuration on a topic, which are different than the
* corresponding broker configuration names.
*
* For now, message.format.version is not reconfigurable, since we need to check that
* the version is supported on all brokers in the cluster.
*/
val NonReconfigrableLogConfigs: Set[String] = Set(TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG)
/**
* The broker configurations pertaining to logs that are reconfigurable. This set contains
* the names you would use when setting a static or dynamic broker configuration (not topic
* configuration).
*/
val ReconfigurableConfigs: Set[String] =
ServerTopicConfigSynonyms.TOPIC_CONFIG_SYNONYMS.asScala.
filterNot(s => NonReconfigrableLogConfigs.contains(s._1)).values.toSet
ServerTopicConfigSynonyms.TOPIC_CONFIG_SYNONYMS.asScala.values.toSet
}
class DynamicLogConfig(logManager: LogManager, server: KafkaBroker) extends BrokerReconfigurable with Logging {
@ -745,13 +734,6 @@ class DynamicLogConfig(logManager: LogManager, server: KafkaBroker) extends Brok
val originalLogConfig = logManager.currentDefaultConfig
val originalUncleanLeaderElectionEnable = originalLogConfig.uncleanLeaderElectionEnable
val newBrokerDefaults = new util.HashMap[String, Object](newConfig.extractLogConfigMap)
val originalLogConfigMap = originalLogConfig.originals()
DynamicLogConfig.NonReconfigrableLogConfigs.foreach(k => {
Option(originalLogConfigMap.get(k)) match {
case None => newBrokerDefaults.remove(k)
case Some(v) => newBrokerDefaults.put(k, v)
}
})
logManager.reconfigureDefaultLogConfig(new LogConfig(newBrokerDefaults))

View File

@ -2251,12 +2251,9 @@ class KafkaApis(val requestChannel: RequestChannel,
val currentErrors = new ConcurrentHashMap[TopicPartition, Errors]()
marker.partitions.forEach { partition =>
replicaManager.getMagic(partition) match {
case Some(magic) =>
if (magic < RecordBatch.MAGIC_VALUE_V2)
currentErrors.put(partition, Errors.UNSUPPORTED_FOR_MESSAGE_FORMAT)
else
partitionsWithCompatibleMessageFormat += partition
replicaManager.onlinePartition(partition) match {
case Some(_) =>
partitionsWithCompatibleMessageFormat += partition
case None =>
currentErrors.put(partition, Errors.UNKNOWN_TOPIC_OR_PARTITION)
}

View File

@ -50,10 +50,8 @@ import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig
import org.apache.kafka.server.metrics.MetricConfigs
import org.apache.kafka.server.util.Csv
import org.apache.kafka.storage.internals.log.{CleanerConfig, LogConfig}
import org.apache.kafka.storage.internals.log.LogConfig.MessageFormatVersion
import org.apache.zookeeper.client.ZKClientConfig
import scala.annotation.nowarn
import scala.jdk.CollectionConverters._
import scala.collection.{Map, Seq}
import scala.jdk.OptionConverters.RichOptional
@ -475,18 +473,6 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])
def logPreAllocateEnable: java.lang.Boolean = getBoolean(ServerLogConfigs.LOG_PRE_ALLOCATE_CONFIG)
def logInitialTaskDelayMs: java.lang.Long = Option(getLong(ServerLogConfigs.LOG_INITIAL_TASK_DELAY_MS_CONFIG)).getOrElse(ServerLogConfigs.LOG_INITIAL_TASK_DELAY_MS_DEFAULT)
// We keep the user-provided String as `MetadataVersion.fromVersionString` can choose a slightly different version (eg if `0.10.0`
// is passed, `0.10.0-IV0` may be picked)
@nowarn("cat=deprecation")
private val logMessageFormatVersionString = getString(ServerLogConfigs.LOG_MESSAGE_FORMAT_VERSION_CONFIG)
/* See `TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG` for details */
@deprecated("3.0")
lazy val logMessageFormatVersion =
if (LogConfig.shouldIgnoreMessageFormatVersion(interBrokerProtocolVersion))
MetadataVersion.fromVersionString(ServerLogConfigs.LOG_MESSAGE_FORMAT_VERSION_DEFAULT)
else MetadataVersion.fromVersionString(logMessageFormatVersionString)
def logMessageTimestampType = TimestampType.forName(getString(ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_TYPE_CONFIG))
def logMessageTimestampBeforeMaxMs: Long = getLong(ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_BEFORE_MAX_MS_CONFIG)
@ -789,7 +775,6 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])
validateValues()
@nowarn("cat=deprecation")
private def validateValues(): Unit = {
if (nodeId != brokerId) {
throw new ConfigException(s"You must set `${KRaftConfigs.NODE_ID_CONFIG}` to the same value as `${ServerConfigs.BROKER_ID_CONFIG}`.")
@ -946,15 +931,6 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])
s"Currently they both have the value ${controlPlaneListenerName.get}")
}
val messageFormatVersion = new MessageFormatVersion(logMessageFormatVersionString, interBrokerProtocolVersionString)
if (messageFormatVersion.shouldWarn)
warn(createBrokerWarningMessage)
val recordVersion = logMessageFormatVersion.highestSupportedRecordVersion
require(interBrokerProtocolVersion.highestSupportedRecordVersion().value >= recordVersion.value,
s"log.message.format.version $logMessageFormatVersionString can only be used when inter.broker.protocol.version " +
s"is set to version ${MetadataVersion.minSupportedFor(recordVersion).shortVersion} or higher")
if (groupCoordinatorConfig.offsetTopicCompressionType == CompressionType.ZSTD)
require(interBrokerProtocolVersion.highestSupportedRecordVersion().value >= IBP_2_1_IV0.highestSupportedRecordVersion().value,
"offsets.topic.compression.codec zstd can only be used when inter.broker.protocol.version " +
@ -1004,7 +980,6 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])
* Copy the subset of properties that are relevant to Logs. The individual properties
* are listed here since the names are slightly different in each Config class...
*/
@nowarn("cat=deprecation")
def extractLogConfigMap: java.util.Map[String, Object] = {
val logProps = new java.util.HashMap[String, Object]()
logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, logSegmentBytes)
@ -1030,7 +1005,6 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])
logProps.put(TopicConfig.COMPRESSION_ZSTD_LEVEL_CONFIG, zstdCompressionLevel)
logProps.put(TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG, uncleanLeaderElectionEnable)
logProps.put(TopicConfig.PREALLOCATE_CONFIG, logPreAllocateEnable)
logProps.put(TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG, logMessageFormatVersion.version)
logProps.put(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG, logMessageTimestampType.name)
logProps.put(TopicConfig.MESSAGE_TIMESTAMP_BEFORE_MAX_MS_CONFIG, logMessageTimestampBeforeMaxMs: java.lang.Long)
logProps.put(TopicConfig.MESSAGE_TIMESTAMP_AFTER_MAX_MS_CONFIG, logMessageTimestampAfterMaxMs: java.lang.Long)
@ -1038,11 +1012,4 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])
logProps.put(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, remoteLogManagerConfig.logLocalRetentionBytes: java.lang.Long)
logProps
}
@nowarn("cat=deprecation")
private def createBrokerWarningMessage: String = {
s"Broker configuration ${ServerLogConfigs.LOG_MESSAGE_FORMAT_VERSION_CONFIG} with value $logMessageFormatVersionString is ignored " +
s"because the inter-broker protocol version `$interBrokerProtocolVersionString` is greater or equal than 3.0. " +
"This configuration is deprecated and it will be removed in Apache Kafka 4.0."
}
}

View File

@ -2046,8 +2046,6 @@ class ReplicaManager(val config: KafkaConfig,
def getLogConfig(topicPartition: TopicPartition): Option[LogConfig] = localLog(topicPartition).map(_.config)
def getMagic(topicPartition: TopicPartition): Option[Byte] = getLogConfig(topicPartition).map(_.recordVersion.value)
def maybeUpdateMetadataCache(correlationId: Int, updateMetadataRequest: UpdateMetadataRequest) : Seq[TopicPartition] = {
replicaStateChangeLock synchronized {
if (updateMetadataRequest.controllerEpoch < controllerEpoch) {

View File

@ -559,7 +559,7 @@ class DescribeTopicPartitionsRequestHandlerTest {
int voterId = brokerId + 1;
properties.put(QuorumConfig.QUORUM_VOTERS_CONFIG, voterId + "@localhost:9093");
properties.put(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "SSL");
TestUtils.setIbpAndMessageFormatVersions(properties, MetadataVersion.latestProduction());
TestUtils.setIbpVersion(properties, MetadataVersion.latestProduction());
return new KafkaConfig(properties);
}
}

View File

@ -17,21 +17,17 @@
package kafka.api
import kafka.utils.TestInfoUtils
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.config.TopicConfig
import org.junit.jupiter.api.Assertions.{assertEquals, assertNull, assertThrows}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.MethodSource
import java.util
import java.util.{Collections, Optional, Properties}
import scala.annotation.nowarn
import java.util.{Collections, Optional}
import scala.jdk.CollectionConverters._
class ConsumerWithLegacyMessageFormatIntegrationTest extends AbstractConsumerTest {
@nowarn("cat=deprecation")
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
def testOffsetsForTimes(quorum: String, groupProtocol: String): Unit = {
@ -39,11 +35,8 @@ class ConsumerWithLegacyMessageFormatIntegrationTest extends AbstractConsumerTes
val topic1 = "part-test-topic-1"
val topic2 = "part-test-topic-2"
val topic3 = "part-test-topic-3"
val props = new Properties()
props.setProperty(TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG, "0.9.0")
createTopic(topic1, numParts)
// Topic2 is in old message format.
createTopic(topic2, numParts, 1, props)
createTopic(topic2, numParts)
createTopic(topic3, numParts)
val consumer = createConsumer()
@ -102,20 +95,14 @@ class ConsumerWithLegacyMessageFormatIntegrationTest extends AbstractConsumerTes
assertNull(timestampOffsets.get(new TopicPartition(topic3, 1)))
}
@nowarn("cat=deprecation")
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
def testEarliestOrLatestOffsets(quorum: String, groupProtocol: String): Unit = {
val topic0 = "topicWithNewMessageFormat"
val topic1 = "topicWithOldMessageFormat"
val prop = new Properties()
// idempotence producer doesn't support old version of messages
prop.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "false")
val producer = createProducer(configOverrides = prop)
val topic0 = "topic0"
val topic1 = "topic1"
val producer = createProducer()
createTopicAndSendRecords(producer, topicName = topic0, numPartitions = 2, recordsPerPartition = 100)
val props = new Properties()
props.setProperty(TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG, "0.9.0")
createTopic(topic1, numPartitions = 1, replicationFactor = 1, props)
createTopic(topic1)
sendRecords(producer, numRecords = 100, new TopicPartition(topic1, 0))
val t0p0 = new TopicPartition(topic0, 0)

View File

@ -303,7 +303,7 @@ class PartitionLockTest extends Logging {
val logDirFailureChannel = new LogDirFailureChannel(1)
val segments = new LogSegments(log.topicPartition)
val leaderEpochCache = UnifiedLog.maybeCreateLeaderEpochCache(
log.dir, log.topicPartition, logDirFailureChannel, log.config.recordVersion, "", None, mockTime.scheduler)
log.dir, log.topicPartition, logDirFailureChannel, "", None, mockTime.scheduler)
val maxTransactionTimeout = 5 * 60 * 1000
val producerStateManagerConfig = new ProducerStateManagerConfig(TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_DEFAULT, false)
val producerStateManager = new ProducerStateManager(

View File

@ -448,7 +448,7 @@ class PartitionTest extends AbstractPartitionTest {
val logDirFailureChannel = new LogDirFailureChannel(1)
val segments = new LogSegments(log.topicPartition)
val leaderEpochCache = UnifiedLog.maybeCreateLeaderEpochCache(
log.dir, log.topicPartition, logDirFailureChannel, log.config.recordVersion, "", None, time.scheduler)
log.dir, log.topicPartition, logDirFailureChannel, "", None, time.scheduler)
val maxTransactionTimeoutMs = 5 * 60 * 1000
val producerStateManagerConfig = new ProducerStateManagerConfig(TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_DEFAULT, true)
val producerStateManager = new ProducerStateManager(

View File

@ -1,66 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.cluster
import kafka.utils.TestUtils
import org.apache.kafka.common.config.TopicConfig
import org.apache.kafka.common.record.{RecordVersion, SimpleRecord}
import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.{UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET}
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Test
import java.util.Optional
import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.server.common.MetadataVersion.IBP_2_8_IV1
import scala.annotation.nowarn
class PartitionWithLegacyMessageFormatTest extends AbstractPartitionTest {
// legacy message formats are only supported with IBP < 3.0
override protected def interBrokerProtocolVersion: MetadataVersion = IBP_2_8_IV1
@nowarn("cat=deprecation")
@Test
def testMakeLeaderDoesNotUpdateEpochCacheForOldFormats(): Unit = {
val leaderEpoch = 8
configRepository.setTopicConfig(topicPartition.topic(),
TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG, MetadataVersion.IBP_0_10_2_IV0.shortVersion)
val log = logManager.getOrCreateLog(topicPartition, topicId = None)
log.appendAsLeader(TestUtils.records(List(
new SimpleRecord("k1".getBytes, "v1".getBytes),
new SimpleRecord("k2".getBytes, "v2".getBytes)),
magicValue = RecordVersion.V1.value
), leaderEpoch = 0)
log.appendAsLeader(TestUtils.records(List(
new SimpleRecord("k3".getBytes, "v3".getBytes),
new SimpleRecord("k4".getBytes, "v4".getBytes)),
magicValue = RecordVersion.V1.value
), leaderEpoch = 5)
assertEquals(4, log.logEndOffset)
val partition = setupPartitionWithMocks(leaderEpoch = leaderEpoch, isLeader = true)
assertEquals(Some(4), partition.leaderLogIfLocal.map(_.logEndOffset))
assertEquals(None, log.latestEpoch)
val epochEndOffset = partition.lastOffsetForLeaderEpoch(currentLeaderEpoch = Optional.of(leaderEpoch),
leaderEpoch = leaderEpoch, fetchOnlyFromLeader = true)
assertEquals(UNDEFINED_EPOCH_OFFSET, epochEndOffset.endOffset)
assertEquals(UNDEFINED_EPOCH, epochEndOffset.leaderEpoch)
}
}

View File

@ -22,6 +22,7 @@ import java.util.{Collections, Random}
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.locks.Lock
import kafka.coordinator.AbstractCoordinatorConcurrencyTest._
import kafka.cluster.Partition
import kafka.log.{LogManager, UnifiedLog}
import kafka.server.QuotaFactory.QuotaManagers
import kafka.server.{KafkaConfig, _}
@ -253,8 +254,8 @@ object AbstractCoordinatorConcurrencyTest {
producePurgatory.tryCompleteElseWatch(delayedProduce, producerRequestKeys.toList.asJava)
}
override def getMagic(topicPartition: TopicPartition): Option[Byte] = {
Some(RecordBatch.MAGIC_VALUE_V2)
override def onlinePartition(topicPartition: TopicPartition): Option[Partition] = {
Some(mock(classOf[Partition]))
}
def getOrCreateLogs(): mutable.Map[TopicPartition, (UnifiedLog, Long)] = {

View File

@ -416,6 +416,8 @@ class GroupCoordinatorTest {
}
// advance clock by GroupInitialRebalanceDelay to complete first InitialDelayedJoin
when(replicaManager.onlinePartition(any[TopicPartition]))
.thenReturn(Some(mock(classOf[Partition])))
timer.advanceClock(DefaultRebalanceTimeout + 1)
// Awaiting results
@ -636,8 +638,8 @@ class GroupCoordinatorTest {
}
private def verifySessionExpiration(groupId: String): Unit = {
when(replicaManager.getMagic(any[TopicPartition]))
.thenReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE))
when(replicaManager.onlinePartition(any[TopicPartition]))
.thenReturn(Some(mock(classOf[Partition])))
timer.advanceClock(DefaultSessionTimeout + 1)
@ -1591,6 +1593,7 @@ class GroupCoordinatorTest {
assertEquals(newGeneration, followerJoinGroupResult.generationId)
val leaderId = leaderJoinGroupResult.memberId
when(replicaManager.onlinePartition(any[TopicPartition])).thenReturn(Some(mock(classOf[Partition])))
val leaderSyncGroupResult = syncGroupLeader(groupId, leaderJoinGroupResult.generationId, leaderId, Map(leaderId -> Array[Byte]()))
assertEquals(Errors.NONE, leaderSyncGroupResult.error)
assertTrue(getGroup(groupId).is(Stable))
@ -1749,7 +1752,6 @@ class GroupCoordinatorTest {
when(replicaManager.getPartition(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId)))
.thenReturn(HostedPartition.None)
when(replicaManager.getMagic(any[TopicPartition])).thenReturn(Some(RecordBatch.MAGIC_VALUE_V1))
timer.advanceClock(DefaultSessionTimeout + 100)
@ -2061,8 +2063,6 @@ class GroupCoordinatorTest {
assertEquals(1, group.numPending)
assertEquals(Stable, group.currentState)
when(replicaManager.getMagic(any[TopicPartition])).thenReturn(Some(RecordBatch.MAGIC_VALUE_V1))
// advance clock to timeout the pending member
assertEquals(Set(firstMemberId), group.allMembers)
assertEquals(1, group.numPending)
@ -2155,7 +2155,6 @@ class GroupCoordinatorTest {
// Advancing Clock by > 100 (session timeout for third and fourth member)
// and < 500 (for first and second members). This will force the coordinator to attempt join
// completion on heartbeat expiration (since we are in PendingRebalance stage).
when(replicaManager.getMagic(any[TopicPartition])).thenReturn(Some(RecordBatch.MAGIC_VALUE_V1))
timer.advanceClock(120)
assertGroupState(groupState = CompletingRebalance)
@ -2230,8 +2229,8 @@ class GroupCoordinatorTest {
}
// Advance part the rebalance timeout to trigger the delayed operation.
when(replicaManager.getMagic(any[TopicPartition]))
.thenReturn(Some(RecordBatch.MAGIC_VALUE_V1))
when(replicaManager.onlinePartition(any[TopicPartition]))
.thenReturn(Some(mock(classOf[Partition])))
timer.advanceClock(DefaultRebalanceTimeout / 2 + 1)
@ -2618,7 +2617,6 @@ class GroupCoordinatorTest {
val groupTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId)
val partition: Partition = mock(classOf[Partition])
when(replicaManager.getMagic(any[TopicPartition])).thenReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE))
when(replicaManager.getPartition(groupTopicPartition)).thenReturn(HostedPartition.Online(partition))
when(replicaManager.onlinePartition(groupTopicPartition)).thenReturn(Some(partition))
@ -3514,7 +3512,6 @@ class GroupCoordinatorTest {
val groupTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId)
val partition: Partition = mock(classOf[Partition])
when(replicaManager.getMagic(any[TopicPartition])).thenReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE))
when(replicaManager.getPartition(groupTopicPartition)).thenReturn(HostedPartition.Online(partition))
when(replicaManager.onlinePartition(groupTopicPartition)).thenReturn(Some(partition))
@ -3549,7 +3546,6 @@ class GroupCoordinatorTest {
val groupTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId)
val partition: Partition = mock(classOf[Partition])
when(replicaManager.getMagic(any[TopicPartition])).thenReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE))
when(replicaManager.getPartition(groupTopicPartition)).thenReturn(HostedPartition.Online(partition))
when(replicaManager.onlinePartition(groupTopicPartition)).thenReturn(Some(partition))
@ -3609,7 +3605,6 @@ class GroupCoordinatorTest {
val groupTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId)
val partition: Partition = mock(classOf[Partition])
when(replicaManager.getMagic(any[TopicPartition])).thenReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE))
when(replicaManager.getPartition(groupTopicPartition)).thenReturn(HostedPartition.Online(partition))
when(replicaManager.onlinePartition(groupTopicPartition)).thenReturn(Some(partition))
@ -3690,7 +3685,6 @@ class GroupCoordinatorTest {
val groupTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId)
val partition: Partition = mock(classOf[Partition])
when(replicaManager.getMagic(any[TopicPartition])).thenReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE))
when(replicaManager.getPartition(groupTopicPartition)).thenReturn(HostedPartition.Online(partition))
when(replicaManager.onlinePartition(groupTopicPartition)).thenReturn(Some(partition))
@ -3733,7 +3727,6 @@ class GroupCoordinatorTest {
val groupTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId)
val partition: Partition = mock(classOf[Partition])
when(replicaManager.getMagic(any[TopicPartition])).thenReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE))
when(replicaManager.getPartition(groupTopicPartition)).thenReturn(HostedPartition.Online(partition))
when(replicaManager.onlinePartition(groupTopicPartition)).thenReturn(Some(partition))
@ -3928,8 +3921,6 @@ class GroupCoordinatorTest {
supportSkippingAssignment: Boolean = true): Future[JoinGroupResult] = {
val (responseFuture, responseCallback) = setupJoinGroupCallback
when(replicaManager.getMagic(any[TopicPartition])).thenReturn(Some(RecordBatch.MAGIC_VALUE_V1))
groupCoordinator.handleJoinGroup(groupId, memberId, groupInstanceId, requireKnownMemberId, supportSkippingAssignment,
"clientId", "clientHost", rebalanceTimeout, sessionTimeout, protocolType, protocols, responseCallback)
responseFuture
@ -3967,7 +3958,6 @@ class GroupCoordinatorTest {
)
)
})
when(replicaManager.getMagic(any[TopicPartition])).thenReturn(Some(RecordBatch.MAGIC_VALUE_V1))
groupCoordinator.handleJoinGroup(groupId, memberId, Some(groupInstanceId), requireKnownMemberId, supportSkippingAssignment,
"clientId", "clientHost", rebalanceTimeout, sessionTimeout, protocolType, protocols, responseCallback)
@ -4004,7 +3994,7 @@ class GroupCoordinatorTest {
)
}
)
when(replicaManager.getMagic(any[TopicPartition])).thenReturn(Some(RecordBatch.MAGIC_VALUE_V1))
when(replicaManager.onlinePartition(any[TopicPartition])).thenReturn(Some(mock(classOf[Partition])))
groupCoordinator.handleSyncGroup(groupId, generation, leaderId, protocolType, protocolName,
groupInstanceId, assignment, responseCallback)
@ -4150,7 +4140,7 @@ class GroupCoordinatorTest {
)
)
})
when(replicaManager.getMagic(any[TopicPartition])).thenReturn(Some(RecordBatch.MAGIC_VALUE_V1))
when(replicaManager.onlinePartition(any[TopicPartition])).thenReturn(Some(mock(classOf[Partition])))
groupCoordinator.handleCommitOffsets(groupId, memberId, groupInstanceId, generationId, offsets, responseCallback)
Await.result(responseFuture, Duration(40, TimeUnit.MILLISECONDS))
@ -4208,7 +4198,7 @@ class GroupCoordinatorTest {
)
)
})
when(replicaManager.getMagic(any[TopicPartition])).thenReturn(Some(RecordBatch.MAGIC_VALUE_V2))
when(replicaManager.onlinePartition(any[TopicPartition])).thenReturn(Some(mock(classOf[Partition])))
groupCoordinator.handleTxnCommitOffsets(groupId, transactionalId, producerId, producerEpoch,
memberId, groupInstanceId, generationId, offsets, responseCallback, RequestLocal.noCaching, ApiKeys.TXN_OFFSET_COMMIT.latestVersion())
@ -4232,7 +4222,7 @@ class GroupCoordinatorTest {
when(replicaManager.getPartition(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId)))
.thenReturn(HostedPartition.None)
when(replicaManager.getMagic(any[TopicPartition])).thenReturn(Some(RecordBatch.MAGIC_VALUE_V1))
when(replicaManager.onlinePartition(any[TopicPartition])).thenReturn(Some(mock(classOf[Partition])))
groupCoordinator.handleLeaveGroup(groupId, memberIdentities, responseCallback)
Await.result(responseFuture, Duration(40, TimeUnit.MILLISECONDS))

View File

@ -1189,7 +1189,7 @@ class GroupMetadataManagerTest {
any(),
any(),
any())
verify(replicaManager).getMagic(any())
verify(replicaManager).onlinePartition(any())
}
@Test
@ -1227,12 +1227,12 @@ class GroupMetadataManagerTest {
any(),
any(),
any())
verify(replicaManager).getMagic(any())
verify(replicaManager).onlinePartition(any())
}
@Test
def testStoreNonEmptyGroupWhenCoordinatorHasMoved(): Unit = {
when(replicaManager.getMagic(any())).thenReturn(None)
when(replicaManager.onlinePartition(any())).thenReturn(None)
val memberId = "memberId"
val clientId = "clientId"
val clientHost = "localhost"
@ -1253,7 +1253,7 @@ class GroupMetadataManagerTest {
groupMetadataManager.storeGroup(group, Map(memberId -> Array[Byte]()), callback)
assertEquals(Some(Errors.NOT_COORDINATOR), maybeError)
verify(replicaManager).getMagic(any())
verify(replicaManager).onlinePartition(any())
}
@Test
@ -1326,7 +1326,7 @@ class GroupMetadataManagerTest {
val offsets = immutable.Map(topicIdPartition -> offsetAndMetadata)
val capturedResponseCallback: ArgumentCaptor[Map[TopicPartition, PartitionResponse] => Unit] = ArgumentCaptor.forClass(classOf[Map[TopicPartition, PartitionResponse] => Unit])
when(replicaManager.getMagic(any())).thenReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE))
when(replicaManager.onlinePartition(any())).thenReturn(Some(mock(classOf[Partition])))
var commitErrors: Option[immutable.Map[TopicIdPartition, Errors]] = None
def callback(errors: immutable.Map[TopicIdPartition, Errors]): Unit = {
commitErrors = Some(errors)
@ -1348,7 +1348,7 @@ class GroupMetadataManagerTest {
any(),
any(),
ArgumentMatchers.eq(Map(offsetTopicPartition -> verificationGuard)))
verify(replicaManager).getMagic(any())
verify(replicaManager).onlinePartition(any())
capturedResponseCallback.getValue.apply(Map(groupTopicPartition ->
new PartitionResponse(Errors.NONE, 0L, RecordBatch.NO_TIMESTAMP, 0L)))
@ -1377,7 +1377,7 @@ class GroupMetadataManagerTest {
val offsetTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupMetadataManager.partitionFor(group.groupId))
val offsets = immutable.Map(topicIdPartition -> new OffsetAndMetadata(offset, noLeader, "", time.milliseconds(), noExpiration))
when(replicaManager.getMagic(any())).thenReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE))
when(replicaManager.onlinePartition(any())).thenReturn(Some(mock(classOf[Partition])))
var commitErrors: Option[immutable.Map[TopicIdPartition, Errors]] = None
def callback(errors: immutable.Map[TopicIdPartition, Errors]): Unit = {
@ -1410,7 +1410,7 @@ class GroupMetadataManagerTest {
any(),
any(),
ArgumentMatchers.eq(Map(offsetTopicPartition -> verificationGuard)))
verify(replicaManager).getMagic(any())
verify(replicaManager).onlinePartition(any())
}
@Test
@ -1429,7 +1429,7 @@ class GroupMetadataManagerTest {
val offsetTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupMetadataManager.partitionFor(group.groupId))
val offsets = immutable.Map(topicIdPartition -> new OffsetAndMetadata(offset, noLeader, "", time.milliseconds(), noExpiration))
when(replicaManager.getMagic(any())).thenReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE))
when(replicaManager.onlinePartition(any())).thenReturn(Some(mock(classOf[Partition])))
var commitErrors: Option[immutable.Map[TopicIdPartition, Errors]] = None
def callback(errors: immutable.Map[TopicIdPartition, Errors]): Unit = {
@ -1462,12 +1462,12 @@ class GroupMetadataManagerTest {
any(),
any(),
ArgumentMatchers.eq(Map(offsetTopicPartition -> verificationGuard)))
verify(replicaManager).getMagic(any())
verify(replicaManager).onlinePartition(any())
}
@Test
def testCommitOffsetWhenCoordinatorHasMoved(): Unit = {
when(replicaManager.getMagic(any())).thenReturn(None)
when(replicaManager.onlinePartition(any())).thenReturn(None)
val memberId = ""
val topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), 0, "foo")
val offset = 37
@ -1491,7 +1491,7 @@ class GroupMetadataManagerTest {
val maybeError = commitErrors.get.get(topicIdPartition)
assertEquals(Some(Errors.NOT_COORDINATOR), maybeError)
verify(replicaManager).getMagic(any())
verify(replicaManager).onlinePartition(any())
}
@Test
@ -1520,7 +1520,7 @@ class GroupMetadataManagerTest {
val offsetTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupMetadataManager.partitionFor(group.groupId))
val offsets = immutable.Map(topicIdPartition -> new OffsetAndMetadata(offset, noLeader, "", time.milliseconds(), noExpiration))
when(replicaManager.getMagic(any())).thenReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE))
when(replicaManager.onlinePartition(any())).thenReturn(Some(mock(classOf[Partition])))
var commitErrors: Option[immutable.Map[TopicIdPartition, Errors]] = None
def callback(errors: immutable.Map[TopicIdPartition, Errors]): Unit = {
@ -1549,7 +1549,7 @@ class GroupMetadataManagerTest {
cachedOffsets.get(topicIdPartition.topicPartition).map(_.offset)
)
verify(replicaManager).getMagic(any())
verify(replicaManager).onlinePartition(any())
// Will not update sensor if failed
assertEquals(0, TestUtils.totalMetricValue(metrics, "offset-commit-count"))
}
@ -1573,7 +1573,7 @@ class GroupMetadataManagerTest {
topicIdPartitionFailed -> new OffsetAndMetadata(offset, noLeader, "s" * (offsetConfig.maxMetadataSize + 1) , time.milliseconds(), noExpiration)
)
when(replicaManager.getMagic(any())).thenReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE))
when(replicaManager.onlinePartition(any())).thenReturn(Some(mock(classOf[Partition])))
var commitErrors: Option[immutable.Map[TopicIdPartition, Errors]] = None
def callback(errors: immutable.Map[TopicIdPartition, Errors]): Unit = {
@ -1617,7 +1617,7 @@ class GroupMetadataManagerTest {
any(),
any(),
any())
verify(replicaManager).getMagic(any())
verify(replicaManager).onlinePartition(any())
assertEquals(1, TestUtils.totalMetricValue(metrics, "offset-commit-count"))
}
@ -1740,7 +1740,7 @@ class GroupMetadataManagerTest {
val capturedResponseCallback: ArgumentCaptor[Map[TopicPartition, PartitionResponse] => Unit] =
ArgumentCaptor.forClass(classOf[Map[TopicPartition, PartitionResponse] => Unit])
when(replicaManager.getMagic(any())).thenReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE))
when(replicaManager.onlinePartition(any())).thenReturn(Some(mock(classOf[Partition])))
var commitErrors: Option[immutable.Map[TopicIdPartition, Errors]] = None
def callback(errors: immutable.Map[TopicIdPartition, Errors]): Unit = {
@ -1773,7 +1773,7 @@ class GroupMetadataManagerTest {
any(),
any(),
ArgumentMatchers.eq(Map(offsetTopicPartition -> verificationGuard)))
verify(replicaManager).getMagic(any())
verify(replicaManager).onlinePartition(any())
capturedResponseCallback.getValue.apply(Map(groupTopicPartition ->
new PartitionResponse(Errors.NONE, 0L, RecordBatch.NO_TIMESTAMP, 0L)))
@ -1854,7 +1854,7 @@ class GroupMetadataManagerTest {
any(),
any(),
any())
verify(replicaManager, times(2)).getMagic(any())
verify(replicaManager, times(2)).onlinePartition(any())
}
@Test
@ -1871,7 +1871,7 @@ class GroupMetadataManagerTest {
// expect the group metadata tombstone
val recordsCapture: ArgumentCaptor[MemoryRecords] = ArgumentCaptor.forClass(classOf[MemoryRecords])
when(replicaManager.getMagic(any())).thenReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE))
when(replicaManager.onlinePartition(any())).thenReturn(Some(mock(classOf[Partition])))
mockGetPartition()
when(partition.appendRecordsToLeader(recordsCapture.capture(),
origin = ArgumentMatchers.eq(AppendOrigin.COORDINATOR), requiredAcks = anyInt(),
@ -1914,7 +1914,7 @@ class GroupMetadataManagerTest {
// expect the group metadata tombstone
val recordsCapture: ArgumentCaptor[MemoryRecords] = ArgumentCaptor.forClass(classOf[MemoryRecords])
when(replicaManager.getMagic(any())).thenReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE))
when(replicaManager.onlinePartition(any())).thenReturn(Some(mock(classOf[Partition])))
mockGetPartition()
when(partition.appendRecordsToLeader(recordsCapture.capture(),
origin = ArgumentMatchers.eq(AppendOrigin.COORDINATOR), requiredAcks = anyInt(),
@ -1984,7 +1984,7 @@ class GroupMetadataManagerTest {
// expect the offset tombstone
val recordsCapture: ArgumentCaptor[MemoryRecords] = ArgumentCaptor.forClass(classOf[MemoryRecords])
when(replicaManager.onlinePartition(any())).thenReturn(Some(partition))
when(partition.appendRecordsToLeader(recordsCapture.capture(),
origin = ArgumentMatchers.eq(AppendOrigin.COORDINATOR), requiredAcks = anyInt(),
any(), any())).thenReturn(LogAppendInfo.UNKNOWN_LOG_APPEND_INFO)
@ -2017,7 +2017,7 @@ class GroupMetadataManagerTest {
cachedOffsets.get(topicIdPartition2.topicPartition).map(_.offset)
)
verify(replicaManager).onlinePartition(groupTopicPartition)
verify(replicaManager, times(2)).onlinePartition(groupTopicPartition)
}
@Test
@ -2088,7 +2088,7 @@ class GroupMetadataManagerTest {
assertEquals(Some(offset), cachedOffsets.get(topicIdPartition2.topicPartition).map(_.offset))
assertEquals(Some(offset), cachedOffsets.get(topicIdPartition3.topicPartition).map(_.offset))
verify(replicaManager).onlinePartition(groupTopicPartition)
verify(replicaManager, times(2)).onlinePartition(groupTopicPartition)
group.transitionTo(PreparingRebalance)
group.transitionTo(Empty)
@ -2114,7 +2114,7 @@ class GroupMetadataManagerTest {
assertEquals(Some(offset), cachedOffsets.get(topicIdPartition2.topicPartition).map(_.offset))
assertEquals(Some(offset), cachedOffsets.get(topicIdPartition3.topicPartition).map(_.offset))
verify(replicaManager, times(2)).onlinePartition(groupTopicPartition)
verify(replicaManager, times(3)).onlinePartition(groupTopicPartition)
time.sleep(2)
@ -2139,7 +2139,7 @@ class GroupMetadataManagerTest {
assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), cachedOffsets.get(topicIdPartition2.topicPartition).map(_.offset))
assertEquals(Some(offset), cachedOffsets.get(topicIdPartition3.topicPartition).map(_.offset))
verify(replicaManager, times(3)).onlinePartition(groupTopicPartition)
verify(replicaManager, times(4)).onlinePartition(groupTopicPartition)
// advance time to just before the offset of last partition is to be expired, no offset should expire
time.sleep(group.currentStateTimestamp.get + defaultOffsetRetentionMs - time.milliseconds() - 1)
@ -2170,7 +2170,7 @@ class GroupMetadataManagerTest {
cachedOffsets.get(topicIdPartition3.topicPartition).map(_.offset)
)
verify(replicaManager, times(4)).onlinePartition(groupTopicPartition)
verify(replicaManager, times(5)).onlinePartition(groupTopicPartition)
// advance time enough for that last offset to expire
time.sleep(2)
@ -2205,7 +2205,7 @@ class GroupMetadataManagerTest {
cachedOffsets.get(topicIdPartition3.topicPartition).map(_.offset)
)
verify(replicaManager, times(5)).onlinePartition(groupTopicPartition)
verify(replicaManager, times(6)).onlinePartition(groupTopicPartition)
assert(group.is(Dead))
}
@ -2261,7 +2261,7 @@ class GroupMetadataManagerTest {
)
assertEquals(Some(offset), cachedOffsets.get(topicIdPartition1.topicPartition).map(_.offset))
verify(replicaManager).onlinePartition(groupTopicPartition)
verify(replicaManager, times(2)).onlinePartition(groupTopicPartition)
// advance time to enough for offsets to expire
time.sleep(2)
@ -2286,7 +2286,7 @@ class GroupMetadataManagerTest {
cachedOffsets.get(topicIdPartition1.topicPartition).map(_.offset)
)
verify(replicaManager, times(2)).onlinePartition(groupTopicPartition)
verify(replicaManager, times(3)).onlinePartition(groupTopicPartition)
assert(group.is(Dead))
}
@ -2401,7 +2401,7 @@ class GroupMetadataManagerTest {
cachedOffsets.get(topic2IdPartition1.topicPartition).map(_.offset)
)
verify(replicaManager).onlinePartition(groupTopicPartition)
verify(replicaManager, times(2)).onlinePartition(groupTopicPartition)
group.transitionTo(PreparingRebalance)
@ -2419,6 +2419,7 @@ class GroupMetadataManagerTest {
group.initNextGeneration()
group.transitionTo(Stable)
when(replicaManager.onlinePartition(any)).thenReturn(Some(partition))
// expect the offset tombstone
when(partition.appendRecordsToLeader(any[MemoryRecords],
origin = ArgumentMatchers.eq(AppendOrigin.COORDINATOR), requiredAcks = anyInt(),
@ -2429,7 +2430,7 @@ class GroupMetadataManagerTest {
verify(partition).appendRecordsToLeader(any[MemoryRecords],
origin = ArgumentMatchers.eq(AppendOrigin.COORDINATOR), requiredAcks = anyInt(),
any(), any())
verify(replicaManager, times(2)).onlinePartition(groupTopicPartition)
verify(replicaManager, times(3)).onlinePartition(groupTopicPartition)
assertEquals(Some(group), groupMetadataManager.getGroup(groupId))
assert(group.is(Stable))
@ -2932,7 +2933,7 @@ class GroupMetadataManagerTest {
new PartitionResponse(error, 0L, RecordBatch.NO_TIMESTAMP, 0L)
)
)})
when(replicaManager.getMagic(any())).thenReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE))
when(replicaManager.onlinePartition(any())).thenReturn(Some(mock(classOf[Partition])))
capturedRecords
}

View File

@ -1256,8 +1256,6 @@ class TransactionStateManagerTest {
Map(new TopicPartition(TRANSACTION_STATE_TOPIC_NAME, partitionId) ->
new PartitionResponse(error, 0L, RecordBatch.NO_TIMESTAMP, 0L)))
)
when(replicaManager.getMagic(any()))
.thenReturn(Some(RecordBatch.MAGIC_VALUE_V1))
}
@Test

View File

@ -111,7 +111,7 @@ class LogCleanerManagerTest extends Logging {
val producerIdExpirationCheckIntervalMs = TransactionLogConfig.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT
val segments = new LogSegments(tp)
val leaderEpochCache = UnifiedLog.maybeCreateLeaderEpochCache(
tpDir, topicPartition, logDirFailureChannel, config.recordVersion, "", None, time.scheduler)
tpDir, topicPartition, logDirFailureChannel, "", None, time.scheduler)
val producerStateManager = new ProducerStateManager(topicPartition, tpDir, maxTransactionTimeoutMs, producerStateManagerConfig, time)
val offsets = new LogLoader(
tpDir,

View File

@ -25,17 +25,15 @@ import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.compress.Compression
import org.apache.kafka.common.config.TopicConfig
import org.apache.kafka.common.record._
import org.apache.kafka.server.common.MetadataVersion.{IBP_0_10_0_IV1, IBP_0_11_0_IV0, IBP_0_9_0}
import org.apache.kafka.server.config.ServerConfigs
import org.apache.kafka.server.util.MockTime
import org.apache.kafka.storage.internals.checkpoint.OffsetCheckpointFile
import org.apache.kafka.storage.internals.log.{CleanerConfig, LogConfig}
import org.apache.kafka.storage.internals.log.CleanerConfig
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.extension.ExtensionContext
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.{Arguments, ArgumentsProvider, ArgumentsSource}
import scala.annotation.nowarn
import scala.collection._
import scala.jdk.CollectionConverters._
@ -136,101 +134,6 @@ class LogCleanerParameterizedIntegrationTest extends AbstractLogCleanerIntegrati
assertEquals(toMap(messages), toMap(read), "Contents of the map shouldn't change")
}
@nowarn("cat=deprecation")
@ParameterizedTest
@ArgumentsSource(classOf[LogCleanerParameterizedIntegrationTest.ExcludeZstd])
def testCleanerWithMessageFormatV0(compressionType: CompressionType): Unit = {
val codec: Compression = Compression.of(compressionType).build()
val largeMessageKey = 20
val (largeMessageValue, largeMessageSet) = createLargeSingleMessageSet(largeMessageKey, RecordBatch.MAGIC_VALUE_V0, codec)
val maxMessageSize = codec match {
case Compression.NONE => largeMessageSet.sizeInBytes
case _ =>
// the broker assigns absolute offsets for message format 0 which potentially causes the compressed size to
// increase because the broker offsets are larger than the ones assigned by the client
// adding `6` to the message set size is good enough for this test: it covers the increased message size while
// still being less than the overhead introduced by the conversion from message format version 0 to 1
largeMessageSet.sizeInBytes + 6
}
cleaner = makeCleaner(partitions = topicPartitions, maxMessageSize = maxMessageSize)
val log = cleaner.logs.get(topicPartitions(0))
val props = logConfigProperties(maxMessageSize = maxMessageSize)
props.put(TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG, IBP_0_9_0.version)
log.updateConfig(new LogConfig(props))
val appends = writeDups(numKeys = 100, numDups = 3, log = log, codec = codec, magicValue = RecordBatch.MAGIC_VALUE_V0)
val startSize = log.size
cleaner.startup()
val firstDirty = log.activeSegment.baseOffset
checkLastCleaned("log", 0, firstDirty)
val compactedSize = log.logSegments.asScala.map(_.size).sum
assertTrue(startSize > compactedSize, s"log should have been compacted: startSize=$startSize compactedSize=$compactedSize")
checkLogAfterAppendingDups(log, startSize, appends)
val appends2: Seq[(Int, String, Long)] = {
val dupsV0 = writeDups(numKeys = 40, numDups = 3, log = log, codec = codec, magicValue = RecordBatch.MAGIC_VALUE_V0)
val appendInfo = log.appendAsLeader(largeMessageSet, leaderEpoch = 0)
// move LSO forward to increase compaction bound
log.updateHighWatermark(log.logEndOffset)
val largeMessageOffset = appendInfo.firstOffset
// also add some messages with version 1 and version 2 to check that we handle mixed format versions correctly
props.put(TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG, IBP_0_11_0_IV0.version)
log.updateConfig(new LogConfig(props))
val dupsV1 = writeDups(startKey = 30, numKeys = 40, numDups = 3, log = log, codec = codec, magicValue = RecordBatch.MAGIC_VALUE_V1)
val dupsV2 = writeDups(startKey = 15, numKeys = 5, numDups = 3, log = log, codec = codec, magicValue = RecordBatch.MAGIC_VALUE_V2)
appends ++ dupsV0 ++ Seq((largeMessageKey, largeMessageValue, largeMessageOffset)) ++ dupsV1 ++ dupsV2
}
val firstDirty2 = log.activeSegment.baseOffset
checkLastCleaned("log", 0, firstDirty2)
checkLogAfterAppendingDups(log, startSize, appends2)
}
@nowarn("cat=deprecation")
@ParameterizedTest
@ArgumentsSource(classOf[LogCleanerParameterizedIntegrationTest.ExcludeZstd])
def testCleaningNestedMessagesWithV0AndV1(codec: CompressionType): Unit = {
val compression = Compression.of(codec).build()
val maxMessageSize = 192
cleaner = makeCleaner(partitions = topicPartitions, maxMessageSize = maxMessageSize, segmentSize = 256)
val log = cleaner.logs.get(topicPartitions(0))
val props = logConfigProperties(maxMessageSize = maxMessageSize, segmentSize = 256)
props.put(TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG, IBP_0_9_0.version)
log.updateConfig(new LogConfig(props))
// with compression enabled, these messages will be written as a single message containing
// all of the individual messages
var appendsV0 = writeDupsSingleMessageSet(numKeys = 2, numDups = 3, log = log, codec = compression, magicValue = RecordBatch.MAGIC_VALUE_V0)
appendsV0 ++= writeDupsSingleMessageSet(numKeys = 2, startKey = 3, numDups = 2, log = log, codec = compression, magicValue = RecordBatch.MAGIC_VALUE_V0)
props.put(TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG, IBP_0_10_0_IV1.version)
log.updateConfig(new LogConfig(props))
var appendsV1 = writeDupsSingleMessageSet(startKey = 4, numKeys = 2, numDups = 2, log = log, codec = compression, magicValue = RecordBatch.MAGIC_VALUE_V1)
appendsV1 ++= writeDupsSingleMessageSet(startKey = 4, numKeys = 2, numDups = 2, log = log, codec = compression, magicValue = RecordBatch.MAGIC_VALUE_V1)
appendsV1 ++= writeDupsSingleMessageSet(startKey = 6, numKeys = 2, numDups = 2, log = log, codec = compression, magicValue = RecordBatch.MAGIC_VALUE_V1)
val appends = appendsV0 ++ appendsV1
val startSize = log.size
cleaner.startup()
val firstDirty = log.activeSegment.baseOffset
assertTrue(firstDirty > appendsV0.size) // ensure we clean data from V0 and V1
checkLastCleaned("log", 0, firstDirty)
val compactedSize = log.logSegments.asScala.map(_.size).sum
assertTrue(startSize > compactedSize, s"log should have been compacted: startSize=$startSize compactedSize=$compactedSize")
checkLogAfterAppendingDups(log, startSize, appends)
}
@ParameterizedTest
@ArgumentsSource(classOf[LogCleanerParameterizedIntegrationTest.AllCompressions])
def cleanerConfigUpdateTest(compressionType: CompressionType): Unit = {
@ -310,27 +213,6 @@ class LogCleanerParameterizedIntegrationTest extends AbstractLogCleanerIntegrati
(key, value, deepLogEntry.offset)
}
}
private def writeDupsSingleMessageSet(numKeys: Int, numDups: Int, log: UnifiedLog, codec: Compression,
startKey: Int = 0, magicValue: Byte): Seq[(Int, String, Long)] = {
val kvs = for (_ <- 0 until numDups; key <- startKey until (startKey + numKeys)) yield {
val payload = counter.toString
incCounter()
(key, payload)
}
val records = kvs.map { case (key, payload) =>
new SimpleRecord(key.toString.getBytes, payload.getBytes)
}
val appendInfo = log.appendAsLeader(MemoryRecords.withRecords(magicValue, codec, records: _*), leaderEpoch = 0)
// move LSO forward to increase compaction bound
log.updateHighWatermark(log.logEndOffset)
val offsets = appendInfo.firstOffset to appendInfo.lastOffset
kvs.zip(offsets).map { case (kv, offset) => (kv._1, kv._2, offset) }
}
}
object LogCleanerParameterizedIntegrationTest {

View File

@ -190,7 +190,7 @@ class LogCleanerTest extends Logging {
val producerIdExpirationCheckIntervalMs = TransactionLogConfig.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT
val logSegments = new LogSegments(topicPartition)
val leaderEpochCache = UnifiedLog.maybeCreateLeaderEpochCache(
dir, topicPartition, logDirFailureChannel, config.recordVersion, "", None, time.scheduler)
dir, topicPartition, logDirFailureChannel, "", None, time.scheduler)
val producerStateManager = new ProducerStateManager(topicPartition, dir,
maxTransactionTimeoutMs, producerStateManagerConfig, time)
val offsets = new LogLoader(

View File

@ -28,14 +28,12 @@ import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Test
import java.util.{Collections, Properties}
import org.apache.kafka.server.common.MetadataVersion.IBP_3_0_IV1
import org.apache.kafka.server.config.ServerLogConfigs
import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig
import org.apache.kafka.storage.internals.log.{LogConfig, ThrottledReplicaListValidator}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
import scala.annotation.nowarn
import scala.jdk.CollectionConverters._
class LogConfigTest {
@ -59,7 +57,6 @@ class LogConfigTest {
})
}
@nowarn("cat=deprecation")
@Test
def testKafkaConfigToProps(): Unit = {
val millisInHour = 60L * 60L * 1000L
@ -69,7 +66,6 @@ class LogConfigTest {
kafkaProps.put(ServerLogConfigs.LOG_ROLL_TIME_HOURS_CONFIG, "2")
kafkaProps.put(ServerLogConfigs.LOG_ROLL_TIME_JITTER_HOURS_CONFIG, "2")
kafkaProps.put(ServerLogConfigs.LOG_RETENTION_TIME_HOURS_CONFIG, "960") // 40 days
kafkaProps.put(ServerLogConfigs.LOG_MESSAGE_FORMAT_VERSION_CONFIG, "0.11.0")
kafkaProps.put(RemoteLogManagerConfig.LOG_LOCAL_RETENTION_MS_PROP, "2592000000") // 30 days
kafkaProps.put(RemoteLogManagerConfig.LOG_LOCAL_RETENTION_BYTES_PROP, "4294967296") // 4 GB
@ -77,13 +73,10 @@ class LogConfigTest {
assertEquals(2 * millisInHour, logProps.get(TopicConfig.SEGMENT_MS_CONFIG))
assertEquals(2 * millisInHour, logProps.get(TopicConfig.SEGMENT_JITTER_MS_CONFIG))
assertEquals(40 * millisInDay, logProps.get(TopicConfig.RETENTION_MS_CONFIG))
// The message format version should always be 3.0 if the inter-broker protocol version is 3.0 or higher
assertEquals(IBP_3_0_IV1.version, logProps.get(TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG))
assertEquals(30 * millisInDay, logProps.get(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG))
assertEquals(4 * bytesInGB, logProps.get(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG))
}
@nowarn("cat=deprecation")
@Test
def testFromPropsInvalid(): Unit = {
LogConfig.configNames.forEach(name => name match {
@ -93,7 +86,6 @@ class LogConfigTest {
case TopicConfig.CLEANUP_POLICY_CONFIG => assertPropertyInvalid(name, "true", "foobar")
case TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG => assertPropertyInvalid(name, "not_a_number", "-0.1", "1.2")
case TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG => assertPropertyInvalid(name, "not_a_number", "0", "-1")
case TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG => assertPropertyInvalid(name, "")
case TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG => assertPropertyInvalid(name, "not_a_boolean")
case TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG => assertPropertyInvalid(name, "not_a_number", "-3")
case TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG => assertPropertyInvalid(name, "not_a_number", "-3")

View File

@ -28,7 +28,6 @@ import org.apache.kafka.common.record.{ControlRecordType, DefaultRecordBatch, Me
import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.coordinator.transaction.TransactionLogConfig
import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.server.common.MetadataVersion.IBP_0_11_0_IV0
import org.apache.kafka.server.util.{MockTime, Scheduler}
import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache
import org.apache.kafka.storage.internals.log.{AbortedTxn, CleanerConfig, EpochEntry, LocalLog, LogConfig, LogDirFailureChannel, LogFileUtils, LogLoader, LogOffsetMetadata, LogSegment, LogSegments, LogStartOffsetIncrementReason, OffsetIndex, ProducerStateManager, ProducerStateManagerConfig, SnapshotFile}
@ -50,7 +49,6 @@ import java.nio.file.{Files, NoSuchFileException, Paths}
import java.util
import java.util.concurrent.{ConcurrentHashMap, ConcurrentMap}
import java.util.{Optional, OptionalLong, Properties}
import scala.annotation.nowarn
import scala.collection.mutable.ListBuffer
import scala.collection.{Iterable, Map, mutable}
import scala.jdk.CollectionConverters._
@ -158,7 +156,7 @@ class LogLoaderTest {
val logDirFailureChannel: LogDirFailureChannel = new LogDirFailureChannel(1)
val segments = new LogSegments(topicPartition)
val leaderEpochCache = UnifiedLog.maybeCreateLeaderEpochCache(
logDir, topicPartition, logDirFailureChannel, config.recordVersion, "", None, time.scheduler)
logDir, topicPartition, logDirFailureChannel, "", None, time.scheduler)
val producerStateManager = new ProducerStateManager(topicPartition, logDir,
this.maxTransactionTimeoutMs, this.producerStateManagerConfig, time)
val logLoader = new LogLoader(logDir, topicPartition, config, time.scheduler, time,
@ -311,11 +309,9 @@ class LogLoaderTest {
builder.build()
}
@nowarn("cat=deprecation")
private def testProducerSnapshotsRecoveryAfterUncleanShutdown(messageFormatVersion: String): Unit = {
val logProps = new Properties()
logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, "640")
logProps.put(TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG, messageFormatVersion)
val logConfig = new LogConfig(logProps)
var log = createLog(logDir, logConfig)
assertEquals(OptionalLong.empty(), log.oldestProducerSnapshotOffset)
@ -343,13 +339,8 @@ class LogLoaderTest {
val expectedSegmentsWithReads = mutable.Set[Long]()
val expectedSnapshotOffsets = mutable.Set[Long]()
if (logConfig.messageFormatVersion.isLessThan(IBP_0_11_0_IV0)) {
expectedSegmentsWithReads += activeSegmentOffset
expectedSnapshotOffsets ++= log.logSegments.asScala.map(_.baseOffset).toVector.takeRight(2) :+ log.logEndOffset
} else {
expectedSegmentsWithReads ++= segOffsetsBeforeRecovery ++ Set(activeSegmentOffset)
expectedSnapshotOffsets ++= log.logSegments.asScala.map(_.baseOffset).toVector.takeRight(4) :+ log.logEndOffset
}
expectedSegmentsWithReads ++= segOffsetsBeforeRecovery ++ Set(activeSegmentOffset)
expectedSnapshotOffsets ++= log.logSegments.asScala.map(_.baseOffset).toVector.takeRight(4) :+ log.logEndOffset
def createLogWithInterceptedReads(recoveryPoint: Long): UnifiedLog = {
val maxTransactionTimeoutMs = 5 * 60 * 1000
@ -372,7 +363,7 @@ class LogLoaderTest {
}
}
val leaderEpochCache = UnifiedLog.maybeCreateLeaderEpochCache(
logDir, topicPartition, logDirFailureChannel, logConfig.recordVersion, "", None, mockTime.scheduler)
logDir, topicPartition, logDirFailureChannel, "", None, mockTime.scheduler)
val producerStateManager = new ProducerStateManager(topicPartition, logDir,
maxTransactionTimeoutMs, producerStateManagerConfig, mockTime)
val logLoader = new LogLoader(
@ -440,7 +431,7 @@ class LogLoaderTest {
val config = new LogConfig(new Properties())
val segments = new LogSegments(topicPartition)
val leaderEpochCache = UnifiedLog.maybeCreateLeaderEpochCache(
logDir, topicPartition, logDirFailureChannel, config.recordVersion, "", None, mockTime.scheduler)
logDir, topicPartition, logDirFailureChannel, "", None, mockTime.scheduler)
val offsets = new LogLoader(
logDir,
topicPartition,
@ -533,119 +524,6 @@ class LogLoaderTest {
log.close()
}
@nowarn("cat=deprecation")
@Test
def testSkipTruncateAndReloadIfOldMessageFormatAndNoCleanShutdown(): Unit = {
val maxTransactionTimeoutMs = 60000
val producerStateManagerConfig = new ProducerStateManagerConfig(300000, false)
val stateManager: ProducerStateManager = mock(classOf[ProducerStateManager])
when(stateManager.isEmpty).thenReturn(true)
when(stateManager.firstUnstableOffset).thenReturn(Optional.empty[LogOffsetMetadata]())
when(stateManager.producerStateManagerConfig).thenReturn(producerStateManagerConfig)
when(stateManager.maxTransactionTimeoutMs).thenReturn(maxTransactionTimeoutMs)
val topicPartition = UnifiedLog.parseTopicPartitionName(logDir)
val logProps = new Properties()
logProps.put(TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG, "0.10.2")
val config = new LogConfig(logProps)
val logDirFailureChannel = null
val segments = new LogSegments(topicPartition)
val leaderEpochCache = UnifiedLog.maybeCreateLeaderEpochCache(
logDir, topicPartition, logDirFailureChannel, config.recordVersion, "", None, mockTime.scheduler)
val offsets = new LogLoader(
logDir,
topicPartition,
config,
mockTime.scheduler,
mockTime,
logDirFailureChannel,
false,
segments,
0L,
0L,
leaderEpochCache.toJava,
stateManager,
new ConcurrentHashMap[String, Integer],
false
).load()
val localLog = new LocalLog(logDir, config, segments, offsets.recoveryPoint,
offsets.nextOffsetMetadata, mockTime.scheduler, mockTime, topicPartition,
logDirFailureChannel)
new UnifiedLog(offsets.logStartOffset,
localLog,
brokerTopicStats = brokerTopicStats,
producerIdExpirationCheckIntervalMs = 30000,
leaderEpochCache = leaderEpochCache,
producerStateManager = stateManager,
_topicId = None,
keepPartitionMetadataFile = true)
verify(stateManager).removeStraySnapshots(any[java.util.List[java.lang.Long]])
verify(stateManager, times(2)).updateMapEndOffset(0L)
verify(stateManager, times(2)).takeSnapshot()
verify(stateManager).isEmpty
verify(stateManager).firstUnstableOffset
verify(stateManager, times(2)).takeSnapshot()
verify(stateManager, times(2)).updateMapEndOffset(0L)
}
@nowarn("cat=deprecation")
@Test
def testSkipTruncateAndReloadIfOldMessageFormatAndCleanShutdown(): Unit = {
val maxTransactionTimeoutMs = 60000
val producerStateManagerConfig = new ProducerStateManagerConfig(300000, false)
val stateManager: ProducerStateManager = mock(classOf[ProducerStateManager])
when(stateManager.isEmpty).thenReturn(true)
when(stateManager.firstUnstableOffset).thenReturn(Optional.empty[LogOffsetMetadata]())
when(stateManager.producerStateManagerConfig).thenReturn(producerStateManagerConfig)
when(stateManager.maxTransactionTimeoutMs).thenReturn(maxTransactionTimeoutMs)
val topicPartition = UnifiedLog.parseTopicPartitionName(logDir)
val logProps = new Properties()
logProps.put(TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG, "0.10.2")
val config = new LogConfig(logProps)
val logDirFailureChannel = null
val segments = new LogSegments(topicPartition)
val leaderEpochCache = UnifiedLog.maybeCreateLeaderEpochCache(
logDir, topicPartition, logDirFailureChannel, config.recordVersion, "", None, mockTime.scheduler)
val offsets = new LogLoader(
logDir,
topicPartition,
config,
mockTime.scheduler,
mockTime,
logDirFailureChannel,
true,
segments,
0L,
0L,
leaderEpochCache.toJava,
stateManager,
new ConcurrentHashMap[String, Integer],
false
).load()
val localLog = new LocalLog(logDir, config, segments, offsets.recoveryPoint,
offsets.nextOffsetMetadata, mockTime.scheduler, mockTime, topicPartition,
logDirFailureChannel)
new UnifiedLog(offsets.logStartOffset,
localLog,
brokerTopicStats = brokerTopicStats,
producerIdExpirationCheckIntervalMs = 30000,
leaderEpochCache = leaderEpochCache,
producerStateManager = stateManager,
_topicId = None,
keepPartitionMetadataFile = true)
verify(stateManager).removeStraySnapshots(any[java.util.List[java.lang.Long]])
verify(stateManager, times(2)).updateMapEndOffset(0L)
verify(stateManager, times(2)).takeSnapshot()
verify(stateManager).isEmpty
verify(stateManager).firstUnstableOffset
}
@nowarn("cat=deprecation")
@Test
def testSkipTruncateAndReloadIfNewMessageFormatAndCleanShutdown(): Unit = {
val maxTransactionTimeoutMs = 60000
@ -659,13 +537,11 @@ class LogLoaderTest {
when(stateManager.maxTransactionTimeoutMs).thenReturn(maxTransactionTimeoutMs)
val topicPartition = UnifiedLog.parseTopicPartitionName(logDir)
val logProps = new Properties()
logProps.put(TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG, "0.11.0")
val config = new LogConfig(logProps)
val config = new LogConfig(new Properties())
val logDirFailureChannel = null
val segments = new LogSegments(topicPartition)
val leaderEpochCache = UnifiedLog.maybeCreateLeaderEpochCache(
logDir, topicPartition, logDirFailureChannel, config.recordVersion, "", None, mockTime.scheduler)
logDir, topicPartition, logDirFailureChannel, "", None, mockTime.scheduler)
val offsets = new LogLoader(
logDir,
topicPartition,
@ -888,38 +764,6 @@ class LogLoaderTest {
log.close()
}
/**
* Test that if messages format version of the messages in a segment is before 0.10.0, the time index should be empty.
*/
@nowarn("cat=deprecation")
@Test
def testRebuildTimeIndexForOldMessages(): Unit = {
val numMessages = 200
val segmentSize = 200
val logProps = new Properties()
logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, segmentSize.toString)
logProps.put(TopicConfig.INDEX_INTERVAL_BYTES_CONFIG, "1")
logProps.put(TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG, "0.9.0")
val logConfig = new LogConfig(logProps)
var log = createLog(logDir, logConfig)
for (i <- 0 until numMessages)
log.appendAsLeader(TestUtils.singletonRecords(value = TestUtils.randomBytes(10),
timestamp = mockTime.milliseconds + i * 10, magicValue = RecordBatch.MAGIC_VALUE_V1), leaderEpoch = 0)
val timeIndexFiles = log.logSegments.asScala.map(_.timeIndexFile())
log.close()
// Delete the time index.
timeIndexFiles.foreach(file => Files.delete(file.toPath))
// The rebuilt time index should be empty
log = createLog(logDir, logConfig, recoveryPoint = numMessages + 1, lastShutdownClean = false)
for (segment <- log.logSegments.asScala.init) {
assertEquals(0, segment.timeIndex.entries, "The time index should be empty")
assertEquals(0, segment.timeIndexFile().length, "The time index file size should be 0")
}
}
/**
* Test that if we have corrupted an index segment it is rebuilt when the log is re-opened
*/
@ -1116,30 +960,6 @@ class LogLoaderTest {
Utils.delete(logDir)
}
@nowarn("cat=deprecation")
@Test
def testLeaderEpochCacheClearedAfterStaticMessageFormatDowngrade(): Unit = {
val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1000, indexIntervalBytes = 1, maxMessageBytes = 65536)
val log = createLog(logDir, logConfig)
log.appendAsLeader(TestUtils.records(List(new SimpleRecord("foo".getBytes()))), leaderEpoch = 5)
assertEquals(Some(5), log.latestEpoch)
log.close()
// reopen the log with an older message format version and check the cache
val logProps = new Properties()
logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, "1000")
logProps.put(TopicConfig.INDEX_INTERVAL_BYTES_CONFIG, "1")
logProps.put(TopicConfig.MAX_MESSAGE_BYTES_CONFIG, "65536")
logProps.put(TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG, "0.10.2")
val downgradedLogConfig = new LogConfig(logProps)
val reopened = createLog(logDir, downgradedLogConfig, lastShutdownClean = false)
LogTestUtils.assertLeaderEpochCacheEmpty(reopened)
reopened.appendAsLeader(TestUtils.records(List(new SimpleRecord("bar".getBytes())),
magicValue = RecordVersion.V1.value), leaderEpoch = 5)
LogTestUtils.assertLeaderEpochCacheEmpty(reopened)
}
@Test
def testOverCompactedLogRecoveryMultiRecord(): Unit = {
// append some messages to create some segments
@ -1814,7 +1634,7 @@ class LogLoaderTest {
assertEquals(5, segments.firstSegment.get.baseOffset)
val leaderEpochCache = UnifiedLog.maybeCreateLeaderEpochCache(
logDir, topicPartition, logDirFailureChannel, logConfig.recordVersion, "", None, mockTime.scheduler)
logDir, topicPartition, logDirFailureChannel, "", None, mockTime.scheduler)
val offsets = new LogLoader(
logDir,
topicPartition,

View File

@ -57,7 +57,6 @@ import java.nio.ByteBuffer
import java.nio.file.Files
import java.util.concurrent.{Callable, ConcurrentHashMap, Executors, TimeUnit}
import java.util.{Optional, OptionalLong, Properties}
import scala.annotation.nowarn
import scala.collection.immutable.SortedSet
import scala.collection.mutable.ListBuffer
import scala.jdk.CollectionConverters._
@ -2581,49 +2580,6 @@ class UnifiedLogTest {
assertEquals(None, log.leaderEpochCache.flatMap(_.latestEpoch.toScala))
}
@nowarn("cat=deprecation")
@Test
def testLeaderEpochCacheClearedAfterDynamicMessageFormatDowngrade(): Unit = {
val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1000, indexIntervalBytes = 1, maxMessageBytes = 64 * 1024)
val log = createLog(logDir, logConfig)
log.appendAsLeader(TestUtils.records(List(new SimpleRecord("foo".getBytes()))), leaderEpoch = 5)
assertEquals(Some(5), log.latestEpoch)
val logProps = new Properties()
logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, "1000")
logProps.put(TopicConfig.INDEX_INTERVAL_BYTES_CONFIG, "1")
logProps.put(TopicConfig.MAX_MESSAGE_BYTES_CONFIG, "65536")
logProps.put(TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG, "0.10.2")
val downgradedLogConfig = new LogConfig(logProps)
log.updateConfig(downgradedLogConfig)
LogTestUtils.assertLeaderEpochCacheEmpty(log)
log.appendAsLeader(TestUtils.records(List(new SimpleRecord("bar".getBytes())),
magicValue = RecordVersion.V1.value), leaderEpoch = 5)
LogTestUtils.assertLeaderEpochCacheEmpty(log)
}
@nowarn("cat=deprecation")
@Test
def testLeaderEpochCacheCreatedAfterMessageFormatUpgrade(): Unit = {
val logProps = new Properties()
logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, "1000")
logProps.put(TopicConfig.INDEX_INTERVAL_BYTES_CONFIG, "1")
logProps.put(TopicConfig.MAX_MESSAGE_BYTES_CONFIG, "65536")
logProps.put(TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG, "0.10.2")
val logConfig = new LogConfig(logProps)
val log = createLog(logDir, logConfig)
log.appendAsLeader(TestUtils.records(List(new SimpleRecord("bar".getBytes())),
magicValue = RecordVersion.V1.value), leaderEpoch = 5)
LogTestUtils.assertLeaderEpochCacheEmpty(log)
logProps.put(TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG, "0.11.0")
val upgradedLogConfig = new LogConfig(logProps)
log.updateConfig(upgradedLogConfig)
log.appendAsLeader(TestUtils.records(List(new SimpleRecord("foo".getBytes()))), leaderEpoch = 5)
assertEquals(Some(5), log.latestEpoch)
}
@Test
def testSplitOnOffsetOverflow(): Unit = {
// create a log such that one log segment has offsets that overflow, and call the split API on that segment

View File

@ -49,7 +49,6 @@ import org.mockito.ArgumentMatchers.anyString
import org.mockito.{ArgumentCaptor, ArgumentMatchers, Mockito}
import org.mockito.Mockito.{mock, verify, verifyNoMoreInteractions, when}
import scala.annotation.nowarn
import scala.jdk.CollectionConverters._
import scala.collection.Set
@ -277,7 +276,6 @@ class DynamicBrokerConfigTest {
verifyNoMoreInteractions(remoteLogManager)
}
@nowarn("cat=deprecation")
@Test
def testConfigUpdateWithSomeInvalidConfigs(): Unit = {
val origProps = TestUtils.createBrokerConfig(0, null, port = 8181)
@ -295,9 +293,6 @@ class DynamicBrokerConfigTest {
// Test update of configs with invalid type
val invalidProps = Map(CleanerConfig.LOG_CLEANER_THREADS_PROP -> "invalid")
verifyConfigUpdateWithInvalidConfig(config, origProps, validProps, invalidProps)
val excludedTopicConfig = Map(ServerLogConfigs.LOG_MESSAGE_FORMAT_VERSION_CONFIG -> "0.10.2")
verifyConfigUpdateWithInvalidConfig(config, origProps, validProps, excludedTopicConfig)
}
@Test

View File

@ -184,7 +184,7 @@ class KafkaApisTest extends Logging {
TestUtils.createBrokerConfig(brokerId, "zk")
}
overrideProperties.foreach( p => properties.put(p._1, p._2))
TestUtils.setIbpAndMessageFormatVersions(properties, interBrokerProtocolVersion)
TestUtils.setIbpVersion(properties, interBrokerProtocolVersion)
val config = new KafkaConfig(properties)
val forwardingManagerOpt = if (enableForwarding)
@ -3039,27 +3039,6 @@ class KafkaApisTest extends Logging {
() => kafkaApis.handleWriteTxnMarkersRequest(request, RequestLocal.withThreadConfinedCaching))
}
@Test
def shouldRespondWithUnsupportedForMessageFormatOnHandleWriteTxnMarkersWhenMagicLowerThanRequired(): Unit = {
val topicPartition = new TopicPartition("t", 0)
val (_, request) = createWriteTxnMarkersRequest(asList(topicPartition))
val expectedErrors = Map(topicPartition -> Errors.UNSUPPORTED_FOR_MESSAGE_FORMAT).asJava
val capturedResponse: ArgumentCaptor[WriteTxnMarkersResponse] = ArgumentCaptor.forClass(classOf[WriteTxnMarkersResponse])
when(replicaManager.getMagic(topicPartition))
.thenReturn(Some(RecordBatch.MAGIC_VALUE_V1))
kafkaApis = createKafkaApis()
kafkaApis.handleWriteTxnMarkersRequest(request, RequestLocal.withThreadConfinedCaching)
verify(requestChannel).sendResponse(
ArgumentMatchers.eq(request),
capturedResponse.capture(),
ArgumentMatchers.eq(None)
)
val markersResponse = capturedResponse.getValue
assertEquals(expectedErrors, markersResponse.errorsByProducerId.get(1L))
}
@Test
def shouldRespondWithUnknownTopicWhenPartitionIsNotHosted(): Unit = {
val topicPartition = new TopicPartition("t", 0)
@ -3067,7 +3046,7 @@ class KafkaApisTest extends Logging {
val expectedErrors = Map(topicPartition -> Errors.UNKNOWN_TOPIC_OR_PARTITION).asJava
val capturedResponse: ArgumentCaptor[WriteTxnMarkersResponse] = ArgumentCaptor.forClass(classOf[WriteTxnMarkersResponse])
when(replicaManager.getMagic(topicPartition))
when(replicaManager.onlinePartition(topicPartition))
.thenReturn(None)
kafkaApis = createKafkaApis()
kafkaApis.handleWriteTxnMarkersRequest(request, RequestLocal.withThreadConfinedCaching)
@ -3094,8 +3073,8 @@ class KafkaApisTest extends Logging {
val request = buildRequest(writeTxnMarkersRequest)
val capturedResponse: ArgumentCaptor[WriteTxnMarkersResponse] = ArgumentCaptor.forClass(classOf[WriteTxnMarkersResponse])
when(replicaManager.getMagic(any()))
.thenReturn(Some(RecordBatch.MAGIC_VALUE_V2))
when(replicaManager.onlinePartition(any()))
.thenReturn(Some(mock(classOf[Partition])))
when(groupCoordinator.isNewGroupCoordinator)
.thenReturn(true)
when(groupCoordinator.completeTransaction(
@ -3119,46 +3098,6 @@ class KafkaApisTest extends Logging {
assertEquals(2, markersResponse.errorsByProducerId.size())
}
@Test
def shouldRespondWithUnsupportedMessageFormatForBadPartitionAndNoErrorsForGoodPartition(): Unit = {
val tp1 = new TopicPartition("t", 0)
val tp2 = new TopicPartition("t1", 0)
val (_, request) = createWriteTxnMarkersRequest(asList(tp1, tp2))
val expectedErrors = Map(tp1 -> Errors.UNSUPPORTED_FOR_MESSAGE_FORMAT, tp2 -> Errors.NONE).asJava
val capturedResponse: ArgumentCaptor[WriteTxnMarkersResponse] = ArgumentCaptor.forClass(classOf[WriteTxnMarkersResponse])
val responseCallback: ArgumentCaptor[Map[TopicPartition, PartitionResponse] => Unit] = ArgumentCaptor.forClass(classOf[Map[TopicPartition, PartitionResponse] => Unit])
when(replicaManager.getMagic(tp1))
.thenReturn(Some(RecordBatch.MAGIC_VALUE_V1))
when(replicaManager.getMagic(tp2))
.thenReturn(Some(RecordBatch.MAGIC_VALUE_V2))
val requestLocal = RequestLocal.withThreadConfinedCaching
when(replicaManager.appendRecords(anyLong,
anyShort,
ArgumentMatchers.eq(true),
ArgumentMatchers.eq(AppendOrigin.COORDINATOR),
any(),
responseCallback.capture(),
any(),
any(),
ArgumentMatchers.eq(requestLocal),
any(),
any()
)).thenAnswer(_ => responseCallback.getValue.apply(Map(tp2 -> new PartitionResponse(Errors.NONE))))
kafkaApis = createKafkaApis()
kafkaApis.handleWriteTxnMarkersRequest(request, requestLocal)
verify(requestChannel).sendResponse(
ArgumentMatchers.eq(request),
capturedResponse.capture(),
ArgumentMatchers.eq(None)
)
val markersResponse = capturedResponse.getValue
assertEquals(expectedErrors, markersResponse.errorsByProducerId.get(1L))
}
@Test
def shouldResignCoordinatorsIfStopReplicaReceivedWithDeleteFlagAndLeaderEpoch(): Unit = {
shouldResignCoordinatorsIfStopReplicaReceivedWithDeleteFlag(
@ -3261,10 +3200,10 @@ class KafkaApisTest extends Logging {
val capturedResponse: ArgumentCaptor[WriteTxnMarkersResponse] = ArgumentCaptor.forClass(classOf[WriteTxnMarkersResponse])
val responseCallback: ArgumentCaptor[Map[TopicPartition, PartitionResponse] => Unit] = ArgumentCaptor.forClass(classOf[Map[TopicPartition, PartitionResponse] => Unit])
when(replicaManager.getMagic(tp1))
when(replicaManager.onlinePartition(tp1))
.thenReturn(None)
when(replicaManager.getMagic(tp2))
.thenReturn(Some(RecordBatch.MAGIC_VALUE_V2))
when(replicaManager.onlinePartition(tp2))
.thenReturn(Some(mock(classOf[Partition])))
val requestLocal = RequestLocal.withThreadConfinedCaching
when(replicaManager.appendRecords(anyLong,
@ -3296,8 +3235,8 @@ class KafkaApisTest extends Logging {
def shouldAppendToLogOnWriteTxnMarkersWhenCorrectMagicVersion(allowedAclOperation: String): Unit = {
val topicPartition = new TopicPartition("t", 0)
val request = createWriteTxnMarkersRequest(asList(topicPartition))._2
when(replicaManager.getMagic(topicPartition))
.thenReturn(Some(RecordBatch.MAGIC_VALUE_V2))
when(replicaManager.onlinePartition(topicPartition))
.thenReturn(Some(mock(classOf[Partition])))
val requestLocal = RequestLocal.withThreadConfinedCaching
@ -3374,8 +3313,8 @@ class KafkaApisTest extends Logging {
val requestChannelRequest = buildRequest(writeTxnMarkersRequest)
allPartitions.foreach { tp =>
when(replicaManager.getMagic(tp))
.thenReturn(Some(RecordBatch.MAGIC_VALUE_V2))
when(replicaManager.onlinePartition(tp))
.thenReturn(Some(mock(classOf[Partition])))
}
when(groupCoordinator.onTransactionCompleted(
@ -3500,8 +3439,8 @@ class KafkaApisTest extends Logging {
val requestChannelRequest = buildRequest(writeTxnMarkersRequest)
allPartitions.foreach { tp =>
when(replicaManager.getMagic(tp))
.thenReturn(Some(RecordBatch.MAGIC_VALUE_V2))
when(replicaManager.onlinePartition(tp))
.thenReturn(Some(mock(classOf[Partition])))
}
when(groupCoordinator.completeTransaction(
@ -3618,8 +3557,8 @@ class KafkaApisTest extends Logging {
val requestChannelRequest = buildRequest(writeTxnMarkersRequest)
when(replicaManager.getMagic(offset0))
.thenReturn(Some(RecordBatch.MAGIC_VALUE_V2))
when(replicaManager.onlinePartition(offset0))
.thenReturn(Some(mock(classOf[Partition])))
when(groupCoordinator.completeTransaction(
ArgumentMatchers.eq(offset0),

View File

@ -39,7 +39,7 @@ import org.apache.kafka.network.SocketServerConfigs
import org.apache.kafka.raft.QuorumConfig
import org.apache.kafka.security.PasswordEncoderConfigs
import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.server.common.MetadataVersion.{IBP_0_8_2, IBP_3_0_IV1}
import org.apache.kafka.server.common.MetadataVersion.IBP_0_8_2
import org.apache.kafka.server.config.{DelegationTokenManagerConfigs, KRaftConfigs, QuotaConfig, ReplicationConfigs, ServerConfigs, ServerLogConfigs, ServerTopicConfigSynonyms, ZkConfigs}
import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig
import org.apache.kafka.server.metrics.MetricConfigs
@ -48,7 +48,6 @@ import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.function.Executable
import scala.annotation.nowarn
import scala.jdk.CollectionConverters._
class KafkaConfigTest {
@ -649,7 +648,6 @@ class KafkaConfigTest {
assertEquals(conf.effectiveAdvertisedBrokerListeners, listenerListToEndPoints("PLAINTEXT://:9092"))
}
@nowarn("cat=deprecation")
@Test
def testVersionConfiguration(): Unit = {
val props = new Properties()
@ -659,15 +657,11 @@ class KafkaConfigTest {
assertEquals(MetadataVersion.latestProduction, conf.interBrokerProtocolVersion)
props.setProperty(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG, "0.8.2.0")
// We need to set the message format version to make the configuration valid.
props.setProperty(ServerLogConfigs.LOG_MESSAGE_FORMAT_VERSION_CONFIG, "0.8.2.0")
val conf2 = KafkaConfig.fromProps(props)
assertEquals(IBP_0_8_2, conf2.interBrokerProtocolVersion)
// check that 0.8.2.0 is the same as 0.8.2.1
props.setProperty(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG, "0.8.2.1")
// We need to set the message format version to make the configuration valid
props.setProperty(ServerLogConfigs.LOG_MESSAGE_FORMAT_VERSION_CONFIG, "0.8.2.1")
val conf3 = KafkaConfig.fromProps(props)
assertEquals(IBP_0_8_2, conf3.interBrokerProtocolVersion)
@ -818,32 +812,6 @@ class KafkaConfigTest {
assertBadConfigContainingMessage(props, "advertised.listeners listener names must be equal to or a subset of the ones defined in listeners")
}
@nowarn("cat=deprecation")
@Test
def testInterBrokerVersionMessageFormatCompatibility(): Unit = {
def buildConfig(interBrokerProtocol: MetadataVersion, messageFormat: MetadataVersion): KafkaConfig = {
val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
props.setProperty(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG, interBrokerProtocol.version)
props.setProperty(ServerLogConfigs.LOG_MESSAGE_FORMAT_VERSION_CONFIG, messageFormat.version)
KafkaConfig.fromProps(props)
}
MetadataVersion.VERSIONS.foreach { interBrokerVersion =>
MetadataVersion.VERSIONS.foreach { messageFormatVersion =>
if (interBrokerVersion.highestSupportedRecordVersion.value >= messageFormatVersion.highestSupportedRecordVersion.value) {
val config = buildConfig(interBrokerVersion, messageFormatVersion)
assertEquals(interBrokerVersion, config.interBrokerProtocolVersion)
if (interBrokerVersion.isAtLeast(IBP_3_0_IV1))
assertEquals(IBP_3_0_IV1, config.logMessageFormatVersion)
else
assertEquals(messageFormatVersion, config.logMessageFormatVersion)
} else {
assertThrows(classOf[IllegalArgumentException], () => buildConfig(interBrokerVersion, messageFormatVersion))
}
}
}
}
@Test
def testFromPropsInvalid(): Unit = {
def baseProperties: Properties = {
@ -1151,7 +1119,6 @@ class KafkaConfigTest {
}
}
@nowarn("cat=deprecation")
@Test
def testDynamicLogConfigs(): Unit = {
def baseProperties: Properties = {
@ -1233,7 +1200,6 @@ class KafkaConfigTest {
assertDynamic(kafkaConfigProp, 10015L, () => config.remoteLogManagerConfig.logLocalRetentionMs)
case TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG =>
assertDynamic(kafkaConfigProp, 10016L, () => config.remoteLogManagerConfig.logLocalRetentionBytes)
case TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG =>
// not dynamically updatable
case QuotaConfig.FOLLOWER_REPLICATION_THROTTLED_REPLICAS_CONFIG =>
// topic only config

View File

@ -2910,7 +2910,7 @@ class ReplicaManagerTest {
val maxProducerIdExpirationMs = 30000
val segments = new LogSegments(tp)
val leaderEpochCache = UnifiedLog.maybeCreateLeaderEpochCache(
logDir, tp, mockLogDirFailureChannel, logConfig.recordVersion, "", None, time.scheduler)
logDir, tp, mockLogDirFailureChannel, "", None, time.scheduler)
val producerStateManager = new ProducerStateManager(tp, logDir,
maxTransactionTimeoutMs, new ProducerStateManagerConfig(maxProducerIdExpirationMs, true), time)
val offsets = new LogLoader(

View File

@ -141,7 +141,7 @@ class SchedulerTest {
val logDirFailureChannel = new LogDirFailureChannel(10)
val segments = new LogSegments(topicPartition)
val leaderEpochCache = UnifiedLog.maybeCreateLeaderEpochCache(
logDir, topicPartition, logDirFailureChannel, logConfig.recordVersion, "", None, mockTime.scheduler)
logDir, topicPartition, logDirFailureChannel, "", None, mockTime.scheduler)
val producerStateManager = new ProducerStateManager(topicPartition, logDir,
maxTransactionTimeoutMs, new ProducerStateManagerConfig(maxProducerIdExpirationMs, false), mockTime)
val offsets = new LogLoader(

View File

@ -78,7 +78,6 @@ import java.util
import java.util.concurrent._
import java.util.concurrent.atomic.AtomicBoolean
import java.util.{Collections, Optional, Properties}
import scala.annotation.nowarn
import scala.collection.mutable.ArrayBuffer
import scala.collection.{Map, Seq, mutable}
import scala.concurrent.duration.FiniteDuration
@ -356,12 +355,8 @@ object TestUtils extends Logging {
props
}
@nowarn("cat=deprecation")
def setIbpAndMessageFormatVersions(config: Properties, version: MetadataVersion): Unit = {
def setIbpVersion(config: Properties, version: MetadataVersion): Unit = {
config.setProperty(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG, version.version)
// for clarity, only set the log message format version if it's not ignored
if (!LogConfig.shouldIgnoreMessageFormatVersion(version))
config.setProperty(ServerLogConfigs.LOG_MESSAGE_FORMAT_VERSION_CONFIG, version.version)
}
def createAdminClient[B <: KafkaBroker](

View File

@ -61,6 +61,8 @@
</li>
<li>The <code>org.apache.kafka.clients.producer.internals.DefaultPartitioner</code> and <code>org.apache.kafka.clients.producer.UniformStickyPartitioner</code> class was removed.
</li>
<li>The <code>log.message.format.version</code> and <code>message.format.version</code> configs were removed.
</li>
</ul>
</li>
<li><b>Broker</b>

View File

@ -19,7 +19,6 @@ package org.apache.kafka.server.config;
import org.apache.kafka.common.config.TopicConfig;
import static org.apache.kafka.server.common.MetadataVersion.IBP_3_0_IV1;
import static org.apache.kafka.server.config.ServerTopicConfigSynonyms.LOG_PREFIX;
/**
@ -106,19 +105,6 @@ public class ServerLogConfigs {
public static final String LOG_PRE_ALLOCATE_CONFIG = ServerTopicConfigSynonyms.serverSynonym(TopicConfig.PREALLOCATE_CONFIG);
public static final String LOG_PRE_ALLOCATE_ENABLE_DOC = "Should pre allocate file when create new segment? If you are using Kafka on Windows, you probably need to set it to true.";
/* See `TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG` for details */
/**
* @deprecated since "3.0"
*/
@Deprecated
public static final String LOG_MESSAGE_FORMAT_VERSION_CONFIG = ServerTopicConfigSynonyms.serverSynonym(TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG);
@Deprecated
public static final String LOG_MESSAGE_FORMAT_VERSION_DEFAULT = IBP_3_0_IV1.version();
public static final String LOG_MESSAGE_FORMAT_VERSION_DOC = "Specify the message format version the broker will use to append messages to the logs. The value should be a valid MetadataVersion. " +
"Some examples are: 0.8.2, 0.9.0.0, 0.10.0, check MetadataVersion for more details. By setting a particular message format version, the " +
"user is certifying that all the existing messages on disk are smaller or equal than the specified version. Setting this value incorrectly " +
"will cause consumers with older versions to break as they will receive messages with a format that they don't understand.";
public static final String LOG_MESSAGE_TIMESTAMP_TYPE_CONFIG = ServerTopicConfigSynonyms.serverSynonym(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG);
public static final String LOG_MESSAGE_TIMESTAMP_TYPE_DEFAULT = "CreateTime";
public static final String LOG_MESSAGE_TIMESTAMP_TYPE_DOC = "Define whether the timestamp in the message is message create time or log append time. The value should be either " +

View File

@ -50,7 +50,6 @@ public final class ServerTopicConfigSynonyms {
* the first synonym and ignore the second.
*/
// Topic configs with no mapping to a server config can be found in `LogConfig.CONFIGS_WITH_NO_SERVER_DEFAULTS`
@SuppressWarnings("deprecation")
public static final Map<String, List<ConfigSynonym>> ALL_TOPIC_CONFIG_SYNONYMS = Collections.unmodifiableMap(Utils.mkMap(
sameNameWithLogPrefix(TopicConfig.SEGMENT_BYTES_CONFIG),
listWithLogPrefix(TopicConfig.SEGMENT_MS_CONFIG,
@ -84,7 +83,6 @@ public final class ServerTopicConfigSynonyms {
sameName(TopicConfig.COMPRESSION_LZ4_LEVEL_CONFIG),
sameName(TopicConfig.COMPRESSION_ZSTD_LEVEL_CONFIG),
sameNameWithLogPrefix(TopicConfig.PREALLOCATE_CONFIG),
sameNameWithLogPrefix(TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG),
sameNameWithLogPrefix(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG),
sameNameWithLogPrefix(TopicConfig.MESSAGE_TIMESTAMP_BEFORE_MAX_MS_CONFIG),
sameNameWithLogPrefix(TopicConfig.MESSAGE_TIMESTAMP_AFTER_MAX_MS_CONFIG),

View File

@ -24,7 +24,6 @@ import org.apache.kafka.common.message.FetchResponseData;
import org.apache.kafka.common.record.FileLogInputStream;
import org.apache.kafka.common.record.FileRecords;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.RecordVersion;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
@ -223,11 +222,6 @@ public class LocalLog {
public void updateConfig(LogConfig newConfig) {
LogConfig oldConfig = config;
config = newConfig;
RecordVersion oldRecordVersion = oldConfig.recordVersion();
RecordVersion newRecordVersion = newConfig.recordVersion();
if (newRecordVersion.precedes(oldRecordVersion)) {
logger.warn("Record format version has been downgraded from {} to {}.", oldRecordVersion, newRecordVersion);
}
}
public void checkIfMemoryMappedBufferClosed() {

View File

@ -27,13 +27,11 @@ import org.apache.kafka.common.config.TopicConfig;
import org.apache.kafka.common.errors.InvalidConfigurationException;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.LegacyRecord;
import org.apache.kafka.common.record.RecordVersion;
import org.apache.kafka.common.record.Records;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.utils.ConfigUtils;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.server.common.MetadataVersionValidator;
import org.apache.kafka.server.config.QuotaConfig;
import org.apache.kafka.server.config.ServerLogConfigs;
import org.apache.kafka.server.config.ServerTopicConfigSynonyms;
@ -69,45 +67,6 @@ import static org.apache.kafka.server.common.MetadataVersion.IBP_3_0_IV1;
public class LogConfig extends AbstractConfig {
public static class MessageFormatVersion {
private final String messageFormatVersionString;
private final String interBrokerProtocolVersionString;
private final MetadataVersion messageFormatVersion;
private final MetadataVersion interBrokerProtocolVersion;
public MessageFormatVersion(String messageFormatVersionString, String interBrokerProtocolVersionString) {
this.messageFormatVersionString = messageFormatVersionString;
this.interBrokerProtocolVersionString = interBrokerProtocolVersionString;
this.messageFormatVersion = MetadataVersion.fromVersionString(messageFormatVersionString);
this.interBrokerProtocolVersion = MetadataVersion.fromVersionString(interBrokerProtocolVersionString);
}
public MetadataVersion messageFormatVersion() {
return messageFormatVersion;
}
public MetadataVersion interBrokerProtocolVersion() {
return interBrokerProtocolVersion;
}
public boolean shouldIgnore() {
return shouldIgnoreMessageFormatVersion(interBrokerProtocolVersion);
}
public boolean shouldWarn() {
return interBrokerProtocolVersion.isAtLeast(IBP_3_0_IV1)
&& messageFormatVersion.highestSupportedRecordVersion().precedes(RecordVersion.V2);
}
@SuppressWarnings("deprecation")
public String topicWarningMessage(String topicName) {
return "Topic configuration " + TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG + " with value `"
+ messageFormatVersionString + "` is ignored for `" + topicName + "` because the "
+ "inter-broker protocol version `" + interBrokerProtocolVersionString + "` is greater or "
+ "equal than 3.0. This configuration is deprecated and it will be removed in Apache Kafka 4.0.";
}
}
private static class RemoteLogConfig {
private final boolean remoteStorageEnable;
@ -187,15 +146,6 @@ public class LogConfig extends AbstractConfig {
public static final long DEFAULT_LOCAL_RETENTION_BYTES = -2; // It indicates the value to be derived from RetentionBytes
public static final long DEFAULT_LOCAL_RETENTION_MS = -2; // It indicates the value to be derived from RetentionMs
/* See `TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG` for details
* Keep DEFAULT_MESSAGE_FORMAT_VERSION as a way to handle the deprecated value */
@Deprecated
public static final String DEFAULT_MESSAGE_FORMAT_VERSION = ServerLogConfigs.LOG_MESSAGE_FORMAT_VERSION_DEFAULT;
/* See `TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG` for details */
@SuppressWarnings("deprecation")
private static final String MESSAGE_FORMAT_VERSION_CONFIG = TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG;
// Visible for testing
public static final Set<String> CONFIGS_WITH_NO_SERVER_DEFAULTS = Set.of(
TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG,
@ -205,10 +155,6 @@ public class LogConfig extends AbstractConfig {
QuotaConfig.FOLLOWER_REPLICATION_THROTTLED_REPLICAS_CONFIG
);
@SuppressWarnings("deprecation")
private static final String MESSAGE_FORMAT_VERSION_DOC = TopicConfig.MESSAGE_FORMAT_VERSION_DOC;
@SuppressWarnings("deprecation")
public static final ConfigDef SERVER_CONFIG_DEF = new ConfigDef()
.define(ServerLogConfigs.NUM_PARTITIONS_CONFIG, INT, ServerLogConfigs.NUM_PARTITIONS_DEFAULT, atLeast(1), MEDIUM, ServerLogConfigs.NUM_PARTITIONS_DOC)
.define(ServerLogConfigs.LOG_DIR_CONFIG, STRING, ServerLogConfigs.LOG_DIR_DEFAULT, HIGH, ServerLogConfigs.LOG_DIR_DOC)
@ -240,7 +186,6 @@ public class LogConfig extends AbstractConfig {
.define(ServerLogConfigs.NUM_RECOVERY_THREADS_PER_DATA_DIR_CONFIG, INT, ServerLogConfigs.NUM_RECOVERY_THREADS_PER_DATA_DIR_DEFAULT, atLeast(1), HIGH, ServerLogConfigs.NUM_RECOVERY_THREADS_PER_DATA_DIR_DOC)
.define(ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG, BOOLEAN, ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_DEFAULT, HIGH, ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_DOC)
.define(ServerLogConfigs.MIN_IN_SYNC_REPLICAS_CONFIG, INT, ServerLogConfigs.MIN_IN_SYNC_REPLICAS_DEFAULT, atLeast(1), HIGH, ServerLogConfigs.MIN_IN_SYNC_REPLICAS_DOC)
.define(ServerLogConfigs.LOG_MESSAGE_FORMAT_VERSION_CONFIG, STRING, ServerLogConfigs.LOG_MESSAGE_FORMAT_VERSION_DEFAULT, new MetadataVersionValidator(), MEDIUM, ServerLogConfigs.LOG_MESSAGE_FORMAT_VERSION_DOC)
.define(ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_TYPE_CONFIG, STRING, ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_TYPE_DEFAULT, ConfigDef.ValidString.in("CreateTime", "LogAppendTime"), MEDIUM, ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_TYPE_DOC)
.define(ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_BEFORE_MAX_MS_CONFIG, LONG, ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_BEFORE_MAX_MS_DEFAULT, atLeast(0), MEDIUM, ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_BEFORE_MAX_MS_DOC)
.define(ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_AFTER_MAX_MS_CONFIG, LONG, ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_AFTER_MAX_MS_DEFAULT, atLeast(0), MEDIUM, ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_AFTER_MAX_MS_DOC)
@ -297,8 +242,6 @@ public class LogConfig extends AbstractConfig {
.define(TopicConfig.COMPRESSION_ZSTD_LEVEL_CONFIG, INT, CompressionType.ZSTD.defaultLevel(),
CompressionType.ZSTD.levelValidator(), MEDIUM, TopicConfig.COMPRESSION_ZSTD_LEVEL_DOC)
.define(TopicConfig.PREALLOCATE_CONFIG, BOOLEAN, DEFAULT_PREALLOCATE, MEDIUM, TopicConfig.PREALLOCATE_DOC)
.define(MESSAGE_FORMAT_VERSION_CONFIG, STRING, DEFAULT_MESSAGE_FORMAT_VERSION, new MetadataVersionValidator(), MEDIUM,
MESSAGE_FORMAT_VERSION_DOC)
.define(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG, STRING, ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_TYPE_DEFAULT,
in("CreateTime", "LogAppendTime"), MEDIUM, TopicConfig.MESSAGE_TIMESTAMP_TYPE_DOC)
.define(TopicConfig.MESSAGE_TIMESTAMP_BEFORE_MAX_MS_CONFIG, LONG, ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_BEFORE_MAX_MS_DEFAULT,
@ -347,10 +290,6 @@ public class LogConfig extends AbstractConfig {
public final Optional<Compression> compression;
public final boolean preallocate;
/* See `TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG` for details regarding the deprecation */
@Deprecated
public final MetadataVersion messageFormatVersion;
public final TimestampType messageTimestampType;
public final long messageTimestampBeforeMaxMs;
@ -366,7 +305,7 @@ public class LogConfig extends AbstractConfig {
this(props, Collections.emptySet());
}
@SuppressWarnings({"deprecation", "this-escape"})
@SuppressWarnings({"this-escape"})
public LogConfig(Map<?, ?> props, Set<String> overriddenConfigs) {
super(CONFIG, props, false);
this.props = Collections.unmodifiableMap(props);
@ -400,7 +339,6 @@ public class LogConfig extends AbstractConfig {
this.compressionType = BrokerCompressionType.forName(getString(TopicConfig.COMPRESSION_TYPE_CONFIG));
this.compression = getCompression();
this.preallocate = getBoolean(TopicConfig.PREALLOCATE_CONFIG);
this.messageFormatVersion = MetadataVersion.fromVersionString(getString(TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG));
this.messageTimestampType = TimestampType.forName(getString(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG));
this.messageTimestampBeforeMaxMs = getLong(TopicConfig.MESSAGE_TIMESTAMP_BEFORE_MAX_MS_CONFIG);
this.messageTimestampAfterMaxMs = getLong(TopicConfig.MESSAGE_TIMESTAMP_AFTER_MAX_MS_CONFIG);
@ -435,10 +373,6 @@ public class LogConfig extends AbstractConfig {
}
}
public RecordVersion recordVersion() {
return messageFormatVersion.highestSupportedRecordVersion();
}
// Exposed as a method so it can be mocked
public int maxMessageSize() {
return maxMessageSize;
@ -739,7 +673,6 @@ public class LogConfig extends AbstractConfig {
", minInSyncReplicas=" + minInSyncReplicas +
", compressionType='" + compressionType + '\'' +
", preallocate=" + preallocate +
", messageFormatVersion=" + messageFormatVersion +
", messageTimestampType=" + messageTimestampType +
", leaderReplicationThrottledReplicas=" + leaderReplicationThrottledReplicas +
", followerReplicationThrottledReplicas=" + followerReplicationThrottledReplicas +

View File

@ -238,7 +238,6 @@ public class LogLoader {
segments,
newLogStartOffset,
recoveryOffsets.nextOffset,
config.recordVersion(),
time,
hadCleanShutdown,
logPrefix);
@ -408,7 +407,6 @@ public class LogLoader {
segments,
logStartOffsetCheckpoint,
segment.baseOffset(),
config.recordVersion(),
time,
false,
logPrefix);

View File

@ -19,7 +19,6 @@ package org.apache.kafka.storage.internals.log;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.record.FileRecords;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.record.RecordVersion;
import org.apache.kafka.common.record.Records;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
@ -58,7 +57,6 @@ public class UnifiedLog {
LogSegments segments,
long logStartOffset,
long lastOffset,
RecordVersion recordVersion,
Time time,
boolean reloadFromCleanShutdown,
String logPrefix) throws IOException {
@ -72,22 +70,20 @@ public class UnifiedLog {
}
offsetsToSnapshot.add(Optional.of(lastOffset));
LOG.info("{}Loading producer state till offset {} with message format version {}", logPrefix, lastOffset, recordVersion.value);
LOG.info("{}Loading producer state till offset {}", logPrefix, lastOffset);
// We want to avoid unnecessary scanning of the log to build the producer state when the broker is being
// upgraded. The basic idea is to use the absence of producer snapshot files to detect the upgrade case,
// but we have to be careful not to assume too much in the presence of broker failures. The two most common
// upgrade cases in which we expect to find no snapshots are the following:
// but we have to be careful not to assume too much in the presence of broker failures. The most common
// upgrade case in which we expect to find no snapshots is the following:
//
// 1. The broker has been upgraded, but the topic is still on the old message format.
// 2. The broker has been upgraded, the topic is on the new message format, and we had a clean shutdown.
// * The broker has been upgraded, and we had a clean shutdown.
//
// If we hit either of these cases, we skip producer state loading and write a new snapshot at the log end
// If we hit this case, we skip producer state loading and write a new snapshot at the log end
// offset (see below). The next time the log is reloaded, we will load producer state using this snapshot
// (or later snapshots). Otherwise, if there is no snapshot file, then we have to rebuild producer state
// from the first segment.
if (recordVersion.value < RecordBatch.MAGIC_VALUE_V2 ||
(!producerStateManager.latestSnapshotOffset().isPresent() && reloadFromCleanShutdown)) {
if (!producerStateManager.latestSnapshotOffset().isPresent() && reloadFromCleanShutdown) {
// To avoid an expensive scan through all the segments, we take empty snapshots from the start of the
// last two segments and the last offset. This should avoid the full scan in the case that the log needs
// truncation.

View File

@ -174,11 +174,6 @@ public abstract class TopicCommand {
configsToBeAdded.stream()
.forEach(pair -> props.setProperty(pair.get(0).trim(), pair.get(1).trim()));
LogConfig.validate(props);
if (props.containsKey(TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG)) {
System.out.println("WARNING: The configuration ${TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG}=${props.getProperty(TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG)} is specified. " +
"This configuration will be ignored if the version is newer than the inter.broker.protocol.version specified in the broker or " +
"if the inter.broker.protocol.version is 3.0 or newer. This configuration is deprecated and it will be removed in Apache Kafka 4.0.");
}
return props;
}