KAFKA-16192: Introduce transaction.version and usage of flexible records to coordinators (#16183)

This change includes adding transaction.version (part of KIP-1022)

New transaction version 1 is introduced to support writing flexible fields in transaction state log messages.

Transaction version 2 is created in anticipation for further KIP-890 changes.

Neither are made production ready. Tests for the new transaction version and new MV are created.

Also include change to not report a feature as supported if the range is 0-0.

Reviewers: Jun Rao <junrao@apache.org>, David Jacot <djacot@confluent.io>, Artem Livshits <alivshits@confluent.io>, Colin P. McCabe <cmccabe@apache.org>
This commit is contained in:
Justine Olshan 2024-07-26 11:38:44 -07:00 committed by GitHub
parent a07294a732
commit a0f6e6f816
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
18 changed files with 246 additions and 64 deletions

View File

@ -55,7 +55,7 @@ object TransactionCoordinator {
config.transactionRemoveExpiredTransactionalIdCleanupIntervalMs,
config.requestTimeoutMs)
val txnStateManager = new TransactionStateManager(config.brokerId, scheduler, replicaManager, txnConfig,
val txnStateManager = new TransactionStateManager(config.brokerId, scheduler, replicaManager, metadataCache, txnConfig,
time, metrics)
val logContext = new LogContext(s"[TransactionCoordinator id=${config.brokerId}] ")

View File

@ -62,7 +62,8 @@ object TransactionLog {
*
* @return value payload bytes
*/
private[transaction] def valueToBytes(txnMetadata: TxnTransitMetadata): Array[Byte] = {
private[transaction] def valueToBytes(txnMetadata: TxnTransitMetadata,
usesFlexibleRecords: Boolean): Array[Byte] = {
if (txnMetadata.txnState == Empty && txnMetadata.topicPartitions.nonEmpty)
throw new IllegalStateException(s"Transaction is not expected to have any partitions since its state is ${txnMetadata.txnState}: $txnMetadata")
@ -75,9 +76,11 @@ object TransactionLog {
.setPartitionIds(partitions.map(tp => Integer.valueOf(tp.partition)).toList.asJava)
}.toList.asJava
// Serialize with the highest supported non-flexible version
// until a tagged field is introduced or the version is bumped.
MessageUtil.toVersionPrefixedBytes(0,
// Serialize with version 0 (highest non-flexible version) until transaction.version 1 is enabled
// which enables flexible fields in records.
val version: Short =
if (usesFlexibleRecords) 1 else 0
MessageUtil.toVersionPrefixedBytes(version,
new TransactionLogValue()
.setProducerId(txnMetadata.producerId)
.setProducerEpoch(txnMetadata.producerEpoch)

View File

@ -20,7 +20,7 @@ import java.nio.ByteBuffer
import java.util.Properties
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.locks.ReentrantReadWriteLock
import kafka.server.{ReplicaManager, RequestLocal}
import kafka.server.{MetadataCache, ReplicaManager, RequestLocal}
import kafka.utils.CoreUtils.{inReadLock, inWriteLock}
import kafka.utils.{Logging, Pool}
import kafka.utils.Implicits._
@ -36,6 +36,7 @@ import org.apache.kafka.common.requests.TransactionResult
import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.common.{KafkaException, TopicPartition}
import org.apache.kafka.coordinator.transaction.{TransactionLogConfigs, TransactionStateManagerConfigs}
import org.apache.kafka.server.common.TransactionVersion
import org.apache.kafka.server.config.ServerConfigs
import org.apache.kafka.server.record.BrokerCompressionType
import org.apache.kafka.server.util.Scheduler
@ -65,6 +66,7 @@ import scala.collection.mutable
class TransactionStateManager(brokerId: Int,
scheduler: Scheduler,
replicaManager: ReplicaManager,
metadataCache: MetadataCache,
config: TransactionConfig,
time: Time,
metrics: Metrics) extends Logging {
@ -99,6 +101,10 @@ class TransactionStateManager(brokerId: Int,
TransactionStateManagerConfigs.METRICS_GROUP,
"The avg time it took to load the partitions in the last 30sec"), new Avg())
private[transaction] def usesFlexibleRecords(): Boolean = {
metadataCache.features().finalizedFeatures().getOrDefault(TransactionVersion.FEATURE_NAME, 0.toShort) > 0
}
// visible for testing only
private[transaction] def addLoadingPartition(partitionId: Int, coordinatorEpoch: Int): Unit = {
val partitionAndLeaderEpoch = TransactionPartitionAndLeaderEpoch(partitionId, coordinatorEpoch)
@ -618,7 +624,7 @@ class TransactionStateManager(brokerId: Int,
// generate the message for this transaction metadata
val keyBytes = TransactionLog.keyToBytes(transactionalId)
val valueBytes = TransactionLog.valueToBytes(newMetadata)
val valueBytes = TransactionLog.valueToBytes(newMetadata, usesFlexibleRecords())
val timestamp = time.milliseconds()
val records = MemoryRecords.withRecords(TransactionLog.EnforcedCompression, new SimpleRecord(timestamp, keyBytes, valueBytes))

View File

@ -90,13 +90,15 @@ object BrokerFeatures extends Logging {
} else {
MetadataVersion.latestProduction.featureLevel
}))
PRODUCTION_FEATURES.forEach { feature => features.put(feature.featureName,
new SupportedVersionRange(0,
if (unstableFeatureVersionsEnabled) {
PRODUCTION_FEATURES.forEach {
feature =>
val maxVersion = if (unstableFeatureVersionsEnabled)
feature.latestTesting
} else {
else
feature.latestProduction
}))
if (maxVersion > 0) {
features.put(feature.featureName, new SupportedVersionRange(feature.minimumProduction(), maxVersion))
}
}
Features.supportedFeatures(features)
}

View File

@ -37,7 +37,7 @@ import org.apache.kafka.metadata.properties.MetaPropertiesEnsemble.VerificationF
import org.apache.kafka.metadata.properties.{MetaProperties, MetaPropertiesEnsemble, MetaPropertiesVersion}
import org.apache.kafka.network.SocketServerConfigs
import org.apache.kafka.raft.QuorumConfig
import org.apache.kafka.server.common.{ApiMessageAndVersion, MetadataVersion}
import org.apache.kafka.server.common.{ApiMessageAndVersion, Features, MetadataVersion}
import org.apache.kafka.server.config.{KRaftConfigs, ServerConfigs, ServerLogConfigs}
import org.apache.kafka.server.fault.{FaultHandler, MockFaultHandler}
import org.apache.zookeeper.client.ZKClientConfig
@ -346,6 +346,13 @@ abstract class QuorumTestHarness extends Logging {
setName(MetadataVersion.FEATURE_NAME).
setFeatureLevel(metadataVersion.featureLevel()), 0.toShort))
metadataRecords.add(new ApiMessageAndVersion(
new FeatureLevelRecord()
.setName(Features.TRANSACTION_VERSION.featureName)
.setFeatureLevel(Features.TRANSACTION_VERSION.latestTesting),
0.toShort
))
optionalMetadataRecords.foreach { metadataArguments =>
for (record <- metadataArguments) metadataRecords.add(record)
}

View File

@ -35,6 +35,7 @@ import org.apache.kafka.common.record.{FileRecords, MemoryRecords, RecordBatch,
import org.apache.kafka.common.requests._
import org.apache.kafka.common.utils.{LogContext, MockTime, ProducerIdAndEpoch}
import org.apache.kafka.common.{Node, TopicPartition}
import org.apache.kafka.server.common.{FinalizedFeatures, MetadataVersion, TransactionVersion}
import org.apache.kafka.storage.internals.log.{FetchDataInfo, FetchIsolation, LogConfig, LogOffsetMetadata}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
@ -75,7 +76,23 @@ class TransactionCoordinatorConcurrencyTest extends AbstractCoordinatorConcurren
when(zkClient.getTopicPartitionCount(TRANSACTION_STATE_TOPIC_NAME))
.thenReturn(Some(numPartitions))
txnStateManager = new TransactionStateManager(0, scheduler, replicaManager, txnConfig, time,
val brokerNode = new Node(0, "host", 10)
val metadataCache: MetadataCache = mock(classOf[MetadataCache])
when(metadataCache.getPartitionLeaderEndpoint(
anyString,
anyInt,
any[ListenerName])
).thenReturn(Some(brokerNode))
when(metadataCache.features()).thenReturn {
new FinalizedFeatures(
MetadataVersion.latestTesting(),
Collections.singletonMap(TransactionVersion.FEATURE_NAME, TransactionVersion.TV_2.featureLevel()),
0,
true
)
}
txnStateManager = new TransactionStateManager(0, scheduler, replicaManager, metadataCache, txnConfig, time,
new Metrics())
txnStateManager.startup(() => zkClient.getTopicPartitionCount(TRANSACTION_STATE_TOPIC_NAME).get,
enableTransactionalIdExpiration = true)
@ -89,13 +106,6 @@ class TransactionCoordinatorConcurrencyTest extends AbstractCoordinatorConcurren
} else {
Success(producerId)
})
val brokerNode = new Node(0, "host", 10)
val metadataCache: MetadataCache = mock(classOf[MetadataCache])
when(metadataCache.getPartitionLeaderEndpoint(
anyString,
anyInt,
any[ListenerName])
).thenReturn(Some(brokerNode))
val networkClient: NetworkClient = mock(classOf[NetworkClient])
txnMarkerChannelManager = new TransactionMarkerChannelManager(
KafkaConfig.fromProps(serverProps),
@ -451,10 +461,10 @@ class TransactionCoordinatorConcurrencyTest extends AbstractCoordinatorConcurren
addPartitionsOp.awaitAndVerify(txn)
val txnMetadata = transactionMetadata(txn).getOrElse(throw new IllegalStateException(s"Transaction not found $txn"))
txnRecords += new SimpleRecord(txn.txnMessageKeyBytes, TransactionLog.valueToBytes(txnMetadata.prepareNoTransit()))
txnRecords += new SimpleRecord(txn.txnMessageKeyBytes, TransactionLog.valueToBytes(txnMetadata.prepareNoTransit(), true))
txnMetadata.state = PrepareCommit
txnRecords += new SimpleRecord(txn.txnMessageKeyBytes, TransactionLog.valueToBytes(txnMetadata.prepareNoTransit()))
txnRecords += new SimpleRecord(txn.txnMessageKeyBytes, TransactionLog.valueToBytes(txnMetadata.prepareNoTransit(), true))
prepareTxnLog(partitionId)
}

View File

@ -51,7 +51,7 @@ class TransactionLogTest {
val txnMetadata = TransactionMetadata(transactionalId, producerId, producerEpoch, transactionTimeoutMs, 0)
txnMetadata.addPartitions(topicPartitions)
assertThrows(classOf[IllegalStateException], () => TransactionLog.valueToBytes(txnMetadata.prepareNoTransit()))
assertThrows(classOf[IllegalStateException], () => TransactionLog.valueToBytes(txnMetadata.prepareNoTransit(), true))
}
@Test
@ -79,7 +79,7 @@ class TransactionLogTest {
txnMetadata.addPartitions(topicPartitions)
val keyBytes = TransactionLog.keyToBytes(transactionalId)
val valueBytes = TransactionLog.valueToBytes(txnMetadata.prepareNoTransit())
val valueBytes = TransactionLog.valueToBytes(txnMetadata.prepareNoTransit(), true)
new SimpleRecord(keyBytes, valueBytes)
}.toSeq
@ -119,7 +119,7 @@ class TransactionLogTest {
txnMetadata.addPartitions(Set(topicPartition))
val keyBytes = TransactionLog.keyToBytes(transactionalId)
val valueBytes = TransactionLog.valueToBytes(txnMetadata.prepareNoTransit())
val valueBytes = TransactionLog.valueToBytes(txnMetadata.prepareNoTransit(), true)
val transactionMetadataRecord = TestUtils.records(Seq(
new SimpleRecord(keyBytes, valueBytes)
)).records.asScala.head
@ -145,10 +145,17 @@ class TransactionLogTest {
@Test
def testSerializeTransactionLogValueToHighestNonFlexibleVersion(): Unit = {
val txnTransitMetadata = TxnTransitMetadata(1, 1, 1, 1, 1000, CompleteCommit, Set.empty, 500, 500)
val txnLogValueBuffer = ByteBuffer.wrap(TransactionLog.valueToBytes(txnTransitMetadata))
val txnLogValueBuffer = ByteBuffer.wrap(TransactionLog.valueToBytes(txnTransitMetadata, false))
assertEquals(0, txnLogValueBuffer.getShort)
}
@Test
def testSerializeTransactionLogValueToFlexibleVersion(): Unit = {
val txnTransitMetadata = TxnTransitMetadata(1, 1, 1, 1, 1000, CompleteCommit, Set.empty, 500, 500)
val txnLogValueBuffer = ByteBuffer.wrap(TransactionLog.valueToBytes(txnTransitMetadata, true))
assertEquals(TransactionLogValue.HIGHEST_SUPPORTED_VERSION, txnLogValueBuffer.getShort)
}
@Test
def testDeserializeHighestSupportedTransactionLogValue(): Unit = {
val txnPartitions = new TransactionLogValue.PartitionsSchema()

View File

@ -22,7 +22,7 @@ import java.util.concurrent.CountDownLatch
import java.util.concurrent.locks.ReentrantLock
import javax.management.ObjectName
import kafka.log.UnifiedLog
import kafka.server.{ReplicaManager, RequestLocal}
import kafka.server.{MetadataCache, ReplicaManager, RequestLocal}
import kafka.utils.{Pool, TestUtils}
import kafka.zk.KafkaZkClient
import org.apache.kafka.common.TopicPartition
@ -34,15 +34,19 @@ import org.apache.kafka.common.record._
import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
import org.apache.kafka.common.requests.TransactionResult
import org.apache.kafka.common.utils.MockTime
import org.apache.kafka.server.common.{FinalizedFeatures, MetadataVersion, TransactionVersion}
import org.apache.kafka.coordinator.transaction.generated.TransactionLogKey
import org.apache.kafka.server.util.MockScheduler
import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo, FetchIsolation, LogConfig, LogOffsetMetadata}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.EnumSource
import org.mockito.{ArgumentCaptor, ArgumentMatchers}
import org.mockito.ArgumentMatchers.{any, anyInt, anyLong, anyShort}
import org.mockito.Mockito.{atLeastOnce, mock, reset, times, verify, when}
import java.util.Collections
import scala.collection.{Map, mutable}
import scala.jdk.CollectionConverters._
@ -60,15 +64,25 @@ class TransactionStateManagerTest {
val scheduler = new MockScheduler(time)
val zkClient: KafkaZkClient = mock(classOf[KafkaZkClient])
val replicaManager: ReplicaManager = mock(classOf[ReplicaManager])
val metadataCache: MetadataCache = mock(classOf[MetadataCache])
when(zkClient.getTopicPartitionCount(TRANSACTION_STATE_TOPIC_NAME))
.thenReturn(Some(numPartitions))
when(metadataCache.features()).thenReturn {
new FinalizedFeatures(
MetadataVersion.latestTesting(),
Collections.singletonMap(TransactionVersion.FEATURE_NAME, TransactionVersion.TV_2.featureLevel()),
0,
true
)
}
val metrics = new Metrics()
val txnConfig = TransactionConfig()
val transactionManager: TransactionStateManager = new TransactionStateManager(0, scheduler,
replicaManager, txnConfig, time, metrics)
replicaManager, metadataCache, txnConfig, time, metrics)
val transactionalId1: String = "one"
val transactionalId2: String = "two"
@ -167,7 +181,7 @@ class TransactionStateManagerTest {
new TopicPartition("topic1", 0),
new TopicPartition("topic1", 1)))
val records = MemoryRecords.withRecords(startOffset, Compression.NONE,
new SimpleRecord(txnMessageKeyBytes1, TransactionLog.valueToBytes(txnMetadata1.prepareNoTransit())))
new SimpleRecord(txnMessageKeyBytes1, TransactionLog.valueToBytes(txnMetadata1.prepareNoTransit(), true)))
// We create a latch which is awaited while the log is loading. This ensures that the deletion
// is triggered before the loading returns
@ -211,19 +225,19 @@ class TransactionStateManagerTest {
txnMetadata1.addPartitions(Set[TopicPartition](new TopicPartition("topic1", 0),
new TopicPartition("topic1", 1)))
txnRecords += new SimpleRecord(txnMessageKeyBytes1, TransactionLog.valueToBytes(txnMetadata1.prepareNoTransit()))
txnRecords += new SimpleRecord(txnMessageKeyBytes1, TransactionLog.valueToBytes(txnMetadata1.prepareNoTransit(), true))
// pid1's transaction adds three more partitions
txnMetadata1.addPartitions(Set[TopicPartition](new TopicPartition("topic2", 0),
new TopicPartition("topic2", 1),
new TopicPartition("topic2", 2)))
txnRecords += new SimpleRecord(txnMessageKeyBytes1, TransactionLog.valueToBytes(txnMetadata1.prepareNoTransit()))
txnRecords += new SimpleRecord(txnMessageKeyBytes1, TransactionLog.valueToBytes(txnMetadata1.prepareNoTransit(), true))
// pid1's transaction is preparing to commit
txnMetadata1.state = PrepareCommit
txnRecords += new SimpleRecord(txnMessageKeyBytes1, TransactionLog.valueToBytes(txnMetadata1.prepareNoTransit()))
txnRecords += new SimpleRecord(txnMessageKeyBytes1, TransactionLog.valueToBytes(txnMetadata1.prepareNoTransit(), true))
// pid2's transaction started with three partitions
txnMetadata2.state = Ongoing
@ -231,23 +245,23 @@ class TransactionStateManagerTest {
new TopicPartition("topic3", 1),
new TopicPartition("topic3", 2)))
txnRecords += new SimpleRecord(txnMessageKeyBytes2, TransactionLog.valueToBytes(txnMetadata2.prepareNoTransit()))
txnRecords += new SimpleRecord(txnMessageKeyBytes2, TransactionLog.valueToBytes(txnMetadata2.prepareNoTransit(), true))
// pid2's transaction is preparing to abort
txnMetadata2.state = PrepareAbort
txnRecords += new SimpleRecord(txnMessageKeyBytes2, TransactionLog.valueToBytes(txnMetadata2.prepareNoTransit()))
txnRecords += new SimpleRecord(txnMessageKeyBytes2, TransactionLog.valueToBytes(txnMetadata2.prepareNoTransit(), true))
// pid2's transaction has aborted
txnMetadata2.state = CompleteAbort
txnRecords += new SimpleRecord(txnMessageKeyBytes2, TransactionLog.valueToBytes(txnMetadata2.prepareNoTransit()))
txnRecords += new SimpleRecord(txnMessageKeyBytes2, TransactionLog.valueToBytes(txnMetadata2.prepareNoTransit(), true))
// pid2's epoch has advanced, with no ongoing transaction yet
txnMetadata2.state = Empty
txnMetadata2.topicPartitions.clear()
txnRecords += new SimpleRecord(txnMessageKeyBytes2, TransactionLog.valueToBytes(txnMetadata2.prepareNoTransit()))
txnRecords += new SimpleRecord(txnMessageKeyBytes2, TransactionLog.valueToBytes(txnMetadata2.prepareNoTransit(), true))
val startOffset = 15L // it should work for any start offset
val records = MemoryRecords.withRecords(startOffset, Compression.NONE, txnRecords.toArray: _*)
@ -876,7 +890,7 @@ class TransactionStateManagerTest {
txnMetadata1.addPartitions(Set[TopicPartition](new TopicPartition("topic1", 0),
new TopicPartition("topic1", 1)))
txnRecords += new SimpleRecord(txnMessageKeyBytes1, TransactionLog.valueToBytes(txnMetadata1.prepareNoTransit()))
txnRecords += new SimpleRecord(txnMessageKeyBytes1, TransactionLog.valueToBytes(txnMetadata1.prepareNoTransit(), true))
val startOffset = 0L
val records = MemoryRecords.withRecords(startOffset, Compression.NONE, txnRecords.toArray: _*)
@ -1039,7 +1053,7 @@ class TransactionStateManagerTest {
txnMetadata1.addPartitions(Set[TopicPartition](new TopicPartition("topic1", 0),
new TopicPartition("topic1", 1)))
txnRecords += new SimpleRecord(txnMessageKeyBytes1, TransactionLog.valueToBytes(txnMetadata1.prepareNoTransit()))
txnRecords += new SimpleRecord(txnMessageKeyBytes1, TransactionLog.valueToBytes(txnMetadata1.prepareNoTransit(), true))
val startOffset = 0L
val records = MemoryRecords.withRecords(startOffset, Compression.NONE, txnRecords.toArray: _*)
@ -1145,7 +1159,7 @@ class TransactionStateManagerTest {
txnMetadata1.addPartitions(Set[TopicPartition](new TopicPartition("topic1", 1),
new TopicPartition("topic1", 1)))
txnRecords += new SimpleRecord(txnMessageKeyBytes1, TransactionLog.valueToBytes(txnMetadata1.prepareNoTransit()))
txnRecords += new SimpleRecord(txnMessageKeyBytes1, TransactionLog.valueToBytes(txnMetadata1.prepareNoTransit(), true))
val startOffset = 15L
val records = MemoryRecords.withRecords(startOffset, Compression.NONE, txnRecords.toArray: _*)
@ -1164,7 +1178,7 @@ class TransactionStateManagerTest {
txnMetadata1.addPartitions(Set[TopicPartition](new TopicPartition("topic1", 0),
new TopicPartition("topic1", 1)))
txnRecords += new SimpleRecord(txnMessageKeyBytes1, TransactionLog.valueToBytes(txnMetadata1.prepareNoTransit()))
txnRecords += new SimpleRecord(txnMessageKeyBytes1, TransactionLog.valueToBytes(txnMetadata1.prepareNoTransit(), true))
val startOffset = 0L
val unknownKey = new TransactionLogKey()
@ -1193,4 +1207,23 @@ class TransactionStateManagerTest {
assertEquals(txnMetadata1.topicPartitions, txnMetadata.topicPartitions)
assertEquals(1, transactionManager.transactionMetadataCache(partitionId).coordinatorEpoch)
}
@ParameterizedTest
@EnumSource(classOf[TransactionVersion])
def testUsesFlexibleRecords(transactionVersion: TransactionVersion): Unit = {
val metadataCache = mock(classOf[MetadataCache])
when(metadataCache.features()).thenReturn {
new FinalizedFeatures(
MetadataVersion.latestTesting(),
Collections.singletonMap(TransactionVersion.FEATURE_NAME, transactionVersion.featureLevel()),
0,
true
)
}
val transactionManager = new TransactionStateManager(0, scheduler,
replicaManager, metadataCache, txnConfig, time, metrics)
val expectFlexibleRecords = transactionVersion.featureLevel > 0
assertEquals(expectFlexibleRecords, transactionManager.usesFlexibleRecords())
}
}

View File

@ -26,7 +26,7 @@ import org.apache.kafka.common.protocol.ApiKeys
import org.apache.kafka.common.record.RecordVersion
import org.apache.kafka.common.requests.{ApiVersionsRequest, ApiVersionsResponse, RequestUtils}
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.server.common.{MetadataVersion, TransactionVersion}
import org.apache.kafka.test.TestUtils
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Tag
@ -69,7 +69,7 @@ abstract class AbstractApiVersionsRequestTest(cluster: ClusterInstance) {
assertEquals(MetadataVersion.latestTesting().featureLevel(), apiVersionsResponse.data().finalizedFeatures().find(MetadataVersion.FEATURE_NAME).minVersionLevel())
assertEquals(MetadataVersion.latestTesting().featureLevel(), apiVersionsResponse.data().finalizedFeatures().find(MetadataVersion.FEATURE_NAME).maxVersionLevel())
assertEquals(2, apiVersionsResponse.data().supportedFeatures().size())
assertEquals(3, apiVersionsResponse.data().supportedFeatures().size())
assertEquals(MetadataVersion.MINIMUM_KRAFT_VERSION.featureLevel(), apiVersionsResponse.data().supportedFeatures().find(MetadataVersion.FEATURE_NAME).minVersion())
if (apiVersion < 4) {
assertEquals(1, apiVersionsResponse.data().supportedFeatures().find("kraft.version").minVersion())
@ -77,6 +77,9 @@ abstract class AbstractApiVersionsRequestTest(cluster: ClusterInstance) {
assertEquals(0, apiVersionsResponse.data().supportedFeatures().find("kraft.version").minVersion())
}
assertEquals(MetadataVersion.latestTesting().featureLevel(), apiVersionsResponse.data().supportedFeatures().find(MetadataVersion.FEATURE_NAME).maxVersion())
assertEquals(0, apiVersionsResponse.data().supportedFeatures().find(TransactionVersion.FEATURE_NAME).minVersion())
assertEquals(TransactionVersion.TV_2.featureLevel(), apiVersionsResponse.data().supportedFeatures().find(TransactionVersion.FEATURE_NAME).maxVersion())
}
val expectedApis = if (!cluster.isKRaftTest) {
ApiVersionsResponse.collectApis(

View File

@ -18,9 +18,11 @@
package kafka.server
import org.apache.kafka.common.feature.{Features, SupportedVersionRange}
import org.apache.kafka.server.common.MetadataVersion
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}
import org.apache.kafka.server.common.{MetadataVersion, Features => ServerFeatures}
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertNotEquals, assertTrue}
import org.junit.jupiter.api.Test
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
import scala.jdk.CollectionConverters._
@ -95,10 +97,20 @@ class BrokerFeaturesTest {
val expectedFeatures = Map[String, Short](
MetadataVersion.FEATURE_NAME -> MetadataVersion.latestTesting().featureLevel(),
ServerFeatures.TRANSACTION_VERSION.featureName() -> ServerFeatures.TRANSACTION_VERSION.latestTesting(),
"kraft.version" -> 0,
"test_feature_1" -> 4,
"test_feature_2" -> 3,
"test_feature_3" -> 7)
assertEquals(expectedFeatures, brokerFeatures.defaultFinalizedFeatures)
}
@ParameterizedTest
@ValueSource(booleans = Array(true, false))
def ensureDefaultSupportedFeaturesRangeMaxNotZero(unstableVersionsEnabled: Boolean): Unit = {
val brokerFeatures = BrokerFeatures.createDefault(unstableVersionsEnabled)
brokerFeatures.supportedFeatures.features().values().forEach { supportedVersionRange =>
assertNotEquals(0, supportedVersionRange.max())
}
}
}

View File

@ -63,11 +63,10 @@ public final class QuorumFeatures {
MetadataVersion.latestTesting().featureLevel() :
MetadataVersion.latestProduction().featureLevel()));
for (Features feature : Features.PRODUCTION_FEATURES) {
features.put(feature.featureName(), VersionRange.of(
0,
enableUnstable ?
feature.latestTesting() :
feature.latestProduction()));
short maxVersion = enableUnstable ? feature.latestTesting() : feature.latestProduction();
if (maxVersion > 0) {
features.put(feature.featureName(), VersionRange.of(feature.minimumProduction(), maxVersion));
}
}
return features;
}

View File

@ -26,6 +26,8 @@ import org.apache.kafka.server.common.Features;
import org.apache.kafka.server.common.MetadataVersion;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import java.util.Arrays;
import java.util.Collections;
@ -35,6 +37,7 @@ import java.util.Optional;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class QuorumFeaturesTest {
@ -58,11 +61,14 @@ public class QuorumFeaturesTest {
MetadataVersion.MINIMUM_KRAFT_VERSION.featureLevel(),
MetadataVersion.LATEST_PRODUCTION.featureLevel()));
for (Features feature : Features.PRODUCTION_FEATURES) {
short maxVersion = feature.defaultValue(MetadataVersion.LATEST_PRODUCTION);
if (maxVersion > 0) {
expectedFeatures.put(feature.featureName(), VersionRange.of(
0,
feature.defaultValue(MetadataVersion.LATEST_PRODUCTION)
feature.minimumProduction(),
maxVersion
));
}
}
assertEquals(expectedFeatures, QuorumFeatures.defaultFeatureMap(false));
}
@ -73,14 +79,26 @@ public class QuorumFeaturesTest {
MetadataVersion.MINIMUM_KRAFT_VERSION.featureLevel(),
MetadataVersion.latestTesting().featureLevel()));
for (Features feature : Features.PRODUCTION_FEATURES) {
short maxVersion = feature.defaultValue(MetadataVersion.latestTesting());
if (maxVersion > 0) {
expectedFeatures.put(feature.featureName(), VersionRange.of(
0,
feature.defaultValue(MetadataVersion.latestTesting())
feature.minimumProduction(),
maxVersion
));
}
}
assertEquals(expectedFeatures, QuorumFeatures.defaultFeatureMap(true));
}
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void ensureDefaultSupportedFeaturesRangeMaxNotZero(boolean unstableVersionsEnabled) {
Map<String, VersionRange> quorumFeatures = QuorumFeatures.defaultFeatureMap(unstableVersionsEnabled);
for (VersionRange range : quorumFeatures.values()) {
assertNotEquals(0, range.max());
}
}
@Test
public void testLocalSupportedFeature() {
assertEquals(VersionRange.of(0, 3), QUORUM_FEATURES.localSupportedFeature("foo"));

View File

@ -40,7 +40,8 @@ public enum Features {
* See {@link TestFeatureVersion} as an example. See {@link FeatureVersion} when implementing a new feature.
*/
TEST_VERSION("test.feature.version", TestFeatureVersion.values()),
KRAFT_VERSION("kraft.version", KRaftVersion.values());
KRAFT_VERSION("kraft.version", KRaftVersion.values()),
TRANSACTION_VERSION("transaction.version", TransactionVersion.values());
public static final Features[] FEATURES;
public static final List<Features> PRODUCTION_FEATURES;
@ -77,6 +78,10 @@ public enum Features {
return defaultValue(MetadataVersion.LATEST_PRODUCTION);
}
public short minimumProduction() {
return featureVersions[0].featureLevel();
}
public short latestTesting() {
return featureVersions[featureVersions.length - 1].featureLevel();
}

View File

@ -21,6 +21,8 @@ import java.util.Map;
public enum GroupVersion implements FeatureVersion {
// Version 0 is the original group coordinator prior to KIP-848.
GV_0(0, MetadataVersion.MINIMUM_KRAFT_VERSION, Collections.emptyMap()),
// Version 1 enables the consumer rebalance protocol (KIP-848).
GV_1(1, MetadataVersion.IBP_4_0_IV0, Collections.emptyMap());

View File

@ -217,7 +217,7 @@ public enum MetadataVersion {
// Add ELR related supports (KIP-966).
IBP_3_9_IV1(22, "3.9", "IV1", true),
// Introduce version 1 of the GroupVersion feature (KIP-848).
// Bootstrap metadata version for version 1 of the GroupVersion feature (KIP-848) and transaction versions 1 and 2 (KIP-890)
IBP_4_0_IV0(23, "4.0", "IV0", false);
// NOTES when adding a new version:

View File

@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server.common;
import java.util.Collections;
import java.util.Map;
public enum TransactionVersion implements FeatureVersion {
// Version 0 is the original transaction coordinator with no extra features enabled.
TV_0(0, MetadataVersion.MINIMUM_KRAFT_VERSION, Collections.emptyMap()),
// Version 1 enables flexible transactional state records. (KIP-890)
TV_1(1, MetadataVersion.IBP_4_0_IV0, Collections.emptyMap()),
// Version 2 enables epoch bump per transaction and optimizations. (KIP-890)
TV_2(2, MetadataVersion.IBP_4_0_IV0, Collections.emptyMap());
public static final String FEATURE_NAME = "transaction.version";
private final short featureLevel;
private final MetadataVersion bootstrapMetadataVersion;
private final Map<String, Short> dependencies;
TransactionVersion(
int featureLevel,
MetadataVersion bootstrapMetadataVersion,
Map<String, Short> dependencies
) {
this.featureLevel = (short) featureLevel;
this.bootstrapMetadataVersion = bootstrapMetadataVersion;
this.dependencies = dependencies;
}
@Override
public short featureLevel() {
return featureLevel;
}
@Override
public String featureName() {
return FEATURE_NAME;
}
@Override
public MetadataVersion bootstrapMetadataVersion() {
return bootstrapMetadataVersion;
}
@Override
public Map<String, Short> dependencies() {
return dependencies;
}
}

View File

@ -103,8 +103,13 @@ public class FeaturesTest {
@EnumSource(Features.class)
public void testDefaultValueAllFeatures(Features feature) {
for (FeatureVersion featureImpl : feature.featureVersions()) {
assertEquals(feature.defaultValue(featureImpl.bootstrapMetadataVersion()), featureImpl.featureLevel(),
"Failed to get the correct default for " + featureImpl);
// If features have the same bootstrapMetadataVersion, the highest level feature should be chosen.
short defaultLevel = feature.defaultValue(featureImpl.bootstrapMetadataVersion());
if (defaultLevel != featureImpl.featureLevel()) {
FeatureVersion otherFeature = feature.fromFeatureLevel(defaultLevel, true);
assertEquals(featureImpl.bootstrapMetadataVersion(), otherFeature.bootstrapMetadataVersion());
assertTrue(defaultLevel > featureImpl.featureLevel());
}
}
}

View File

@ -69,6 +69,8 @@ public class FeatureCommandTest {
"SupportedMaxVersion: 1\tFinalizedVersionLevel: 0\t", outputWithoutEpoch(features.get(0)));
assertEquals("Feature: metadata.version\tSupportedMinVersion: 3.0-IV1\t" +
"SupportedMaxVersion: 4.0-IV0\tFinalizedVersionLevel: 3.3-IV1\t", outputWithoutEpoch(features.get(1)));
assertEquals("Feature: transaction.version\tSupportedMinVersion: 0\t" +
"SupportedMaxVersion: 2\tFinalizedVersionLevel: 0\t", outputWithoutEpoch(features.get(2)));
}
// Use the first MetadataVersion that supports KIP-919
@ -85,6 +87,8 @@ public class FeatureCommandTest {
"SupportedMaxVersion: 1\tFinalizedVersionLevel: 0\t", outputWithoutEpoch(features.get(0)));
assertEquals("Feature: metadata.version\tSupportedMinVersion: 3.0-IV1\t" +
"SupportedMaxVersion: 4.0-IV0\tFinalizedVersionLevel: 3.7-IV0\t", outputWithoutEpoch(features.get(1)));
assertEquals("Feature: transaction.version\tSupportedMinVersion: 0\t" +
"SupportedMaxVersion: 2\tFinalizedVersionLevel: 0\t", outputWithoutEpoch(features.get(2)));
}
@ClusterTest(types = {Type.ZK}, metadataVersion = MetadataVersion.IBP_3_3_IV1)