KAFKA-17336 Add IT to make sure the production MV does not use unstable version of LIST_OFFSET (#16893)

- due to the server config UNSTABLE_API_VERSIONS_ENABLE_CONFIG is true, so we can't test the scenario of ListOffsetsRequest is unstable version. We want to test this case in this PR
- get the MV from metadataCache.metadataVersion() instead of config.interBrokerProtocolVersion since MV can be set dynamically.

Reviewers: Jun Rao <junrao@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
Ken Huang 2024-08-23 03:47:55 +08:00 committed by GitHub
parent 246b165456
commit a3aa6372ea
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 59 additions and 14 deletions

View File

@ -139,7 +139,7 @@ object Partition {
new Partition(topicPartition,
_topicId = topicId,
replicaLagTimeMaxMs = replicaManager.config.replicaLagTimeMaxMs,
interBrokerProtocolVersion = replicaManager.config.interBrokerProtocolVersion,
interBrokerProtocolVersion = replicaManager.metadataCache.metadataVersion(),
localBrokerId = replicaManager.config.brokerId,
localBrokerEpochSupplier = replicaManager.brokerEpochSupplier,
time = time,

View File

@ -33,7 +33,6 @@ import org.apache.kafka.common.requests.{TransactionResult, WriteTxnMarkersReque
import org.apache.kafka.common.security.JaasContext
import org.apache.kafka.common.utils.{LogContext, Time}
import org.apache.kafka.common.{Node, Reconfigurable, TopicPartition}
import org.apache.kafka.server.common.MetadataVersion.IBP_2_8_IV0
import org.apache.kafka.server.metrics.KafkaMetricsGroup
import org.apache.kafka.server.util.{InterBrokerSendThread, RequestAndCompletionHandler}
@ -180,10 +179,6 @@ class TransactionMarkerChannelManager(
private val transactionsWithPendingMarkers = new ConcurrentHashMap[String, PendingCompleteTxn]
private val writeTxnMarkersRequestVersion: Short =
if (config.interBrokerProtocolVersion.isAtLeast(IBP_2_8_IV0)) 1
else 0
metricsGroup.newGauge(UnknownDestinationQueueSizeMetricName, () => markersQueueForUnknownBroker.totalNumMarkers)
metricsGroup.newGauge(LogAppendRetryQueueSizeMetricName, () => txnLogAppendRetryQueue.size)
@ -261,7 +256,9 @@ class TransactionMarkerChannelManager(
}.filter { case (_, entries) => !entries.isEmpty }.map { case (node, entries) =>
val markersToSend = entries.asScala.map(_.txnMarkerEntry).asJava
val requestCompletionHandler = new TransactionMarkerRequestCompletionHandler(node.id, txnStateManager, this, entries)
val request = new WriteTxnMarkersRequest.Builder(writeTxnMarkersRequestVersion, markersToSend)
val request = new WriteTxnMarkersRequest.Builder(
metadataCache.metadataVersion().writeTxnMarkersRequestVersion(), markersToSend
)
new RequestAndCompletionHandler(
currentTimeMs,

View File

@ -455,7 +455,7 @@ class KafkaApis(val requestChannel: RequestChannel,
if (!authHelper.authorize(request.context, READ, GROUP, offsetCommitRequest.data.groupId)) {
requestHelper.sendMaybeThrottle(request, offsetCommitRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED.exception))
CompletableFuture.completedFuture[Unit](())
} else if (offsetCommitRequest.data.groupInstanceId != null && config.interBrokerProtocolVersion.isLessThan(IBP_2_3_IV0)) {
} else if (offsetCommitRequest.data.groupInstanceId != null && metadataCache.metadataVersion().isLessThan(IBP_2_3_IV0)) {
// Only enable static membership when IBP >= 2.3, because it is not safe for the broker to use the static member logic
// until we are sure that all brokers support it. If static group being loaded by an older coordinator, it will discard
// the group.instance.id field, so static members could accidentally become "dynamic", which leads to wrong states.
@ -1799,7 +1799,7 @@ class KafkaApis(val requestChannel: RequestChannel,
): CompletableFuture[Unit] = {
val joinGroupRequest = request.body[JoinGroupRequest]
if (joinGroupRequest.data.groupInstanceId != null && config.interBrokerProtocolVersion.isLessThan(IBP_2_3_IV0)) {
if (joinGroupRequest.data.groupInstanceId != null && metadataCache.metadataVersion().isLessThan(IBP_2_3_IV0)) {
// Only enable static membership when IBP >= 2.3, because it is not safe for the broker to use the static member logic
// until we are sure that all brokers support it. If static group being loaded by an older coordinator, it will discard
// the group.instance.id field, so static members could accidentally become "dynamic", which leads to wrong states.
@ -1829,7 +1829,7 @@ class KafkaApis(val requestChannel: RequestChannel,
): CompletableFuture[Unit] = {
val syncGroupRequest = request.body[SyncGroupRequest]
if (syncGroupRequest.data.groupInstanceId != null && config.interBrokerProtocolVersion.isLessThan(IBP_2_3_IV0)) {
if (syncGroupRequest.data.groupInstanceId != null && metadataCache.metadataVersion().isLessThan(IBP_2_3_IV0)) {
// Only enable static membership when IBP >= 2.3, because it is not safe for the broker to use the static member logic
// until we are sure that all brokers support it. If static group being loaded by an older coordinator, it will discard
// the group.instance.id field, so static members could accidentally become "dynamic", which leads to wrong states.
@ -1898,7 +1898,7 @@ class KafkaApis(val requestChannel: RequestChannel,
def handleHeartbeatRequest(request: RequestChannel.Request): CompletableFuture[Unit] = {
val heartbeatRequest = request.body[HeartbeatRequest]
if (heartbeatRequest.data.groupInstanceId != null && config.interBrokerProtocolVersion.isLessThan(IBP_2_3_IV0)) {
if (heartbeatRequest.data.groupInstanceId != null && metadataCache.metadataVersion().isLessThan(IBP_2_3_IV0)) {
// Only enable static membership when IBP >= 2.3, because it is not safe for the broker to use the static member logic
// until we are sure that all brokers support it. If static group being loaded by an older coordinator, it will discard
// the group.instance.id field, so static members could accidentally become "dynamic", which leads to wrong states.
@ -2529,8 +2529,8 @@ class KafkaApis(val requestChannel: RequestChannel,
}
def ensureInterBrokerVersion(version: MetadataVersion): Unit = {
if (config.interBrokerProtocolVersion.isLessThan(version))
throw new UnsupportedVersionException(s"inter.broker.protocol.version: ${config.interBrokerProtocolVersion} is less than the required version: ${version}")
if (metadataCache.metadataVersion().isLessThan(version))
throw new UnsupportedVersionException(s"metadata.version: ${metadataCache.metadataVersion()} is less than the required version: ${version}")
}
def handleAddPartitionsToTxnRequest(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = {

View File

@ -92,6 +92,9 @@ class TransactionCoordinatorConcurrencyTest extends AbstractCoordinatorConcurren
)
}
when(metadataCache.metadataVersion())
.thenReturn(MetadataVersion.latestProduction())
txnStateManager = new TransactionStateManager(0, scheduler, replicaManager, metadataCache, txnConfig, time,
new Metrics())
txnStateManager.startup(() => zkClient.getTopicPartitionCount(TRANSACTION_STATE_TOPIC_NAME).get,

View File

@ -28,6 +28,7 @@ import org.apache.kafka.common.record.RecordBatch
import org.apache.kafka.common.requests.{RequestHeader, TransactionResult, WriteTxnMarkersRequest, WriteTxnMarkersResponse}
import org.apache.kafka.common.utils.MockTime
import org.apache.kafka.common.{Node, TopicPartition}
import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.server.metrics.{KafkaMetricsGroup, KafkaYammerMetrics}
import org.apache.kafka.server.util.RequestAndCompletionHandler
import org.junit.jupiter.api.Assertions._
@ -86,6 +87,8 @@ class TransactionMarkerChannelManagerTest {
.thenReturn(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata1))))
when(txnStateManager.getTransactionState(ArgumentMatchers.eq(transactionalId2)))
.thenReturn(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata2))))
when(metadataCache.metadataVersion())
.thenReturn(MetadataVersion.latestProduction())
}
@Test

View File

@ -129,7 +129,7 @@ class KafkaApisTest extends Logging {
private val brokerId = 1
// KRaft tests should override this with a KRaftMetadataCache
private var metadataCache: MetadataCache = MetadataCache.zkMetadataCache(brokerId, MetadataVersion.latestTesting())
private val brokerEpochManager: ZkBrokerEpochManager = new ZkBrokerEpochManager(metadataCache, controller, None)
private var brokerEpochManager: ZkBrokerEpochManager = new ZkBrokerEpochManager(metadataCache, controller, None)
private val clientQuotaManager: ClientQuotaManager = mock(classOf[ClientQuotaManager])
private val clientRequestQuotaManager: ClientRequestQuotaManager = mock(classOf[ClientRequestQuotaManager])
private val clientControllerQuotaManager: ControllerMutationQuotaManager = mock(classOf[ControllerMutationQuotaManager])
@ -2919,6 +2919,8 @@ class KafkaApisTest extends Logging {
@Test
def shouldThrowUnsupportedVersionExceptionOnHandleAddOffsetToTxnRequestWhenInterBrokerProtocolNotSupported(): Unit = {
metadataCache = MetadataCache.zkMetadataCache(brokerId, IBP_0_10_2_IV0)
brokerEpochManager = new ZkBrokerEpochManager(metadataCache, controller, None)
kafkaApis = createKafkaApis(IBP_0_10_2_IV0)
assertThrows(classOf[UnsupportedVersionException],
() => kafkaApis.handleAddOffsetsToTxnRequest(null, RequestLocal.withThreadConfinedCaching))
@ -2926,6 +2928,8 @@ class KafkaApisTest extends Logging {
@Test
def shouldThrowUnsupportedVersionExceptionOnHandleAddPartitionsToTxnRequestWhenInterBrokerProtocolNotSupported(): Unit = {
metadataCache = MetadataCache.zkMetadataCache(brokerId, IBP_0_10_2_IV0)
brokerEpochManager = new ZkBrokerEpochManager(metadataCache, controller, None)
kafkaApis = createKafkaApis(IBP_0_10_2_IV0)
assertThrows(classOf[UnsupportedVersionException],
() => kafkaApis.handleAddPartitionsToTxnRequest(null, RequestLocal.withThreadConfinedCaching))
@ -2933,6 +2937,8 @@ class KafkaApisTest extends Logging {
@Test
def shouldThrowUnsupportedVersionExceptionOnHandleTxnOffsetCommitRequestWhenInterBrokerProtocolNotSupported(): Unit = {
metadataCache = MetadataCache.zkMetadataCache(brokerId, IBP_0_10_2_IV0)
brokerEpochManager = new ZkBrokerEpochManager(metadataCache, controller, None)
kafkaApis = createKafkaApis(IBP_0_10_2_IV0)
assertThrows(classOf[UnsupportedVersionException],
() => kafkaApis.handleAddPartitionsToTxnRequest(null, RequestLocal.withThreadConfinedCaching))
@ -2940,6 +2946,8 @@ class KafkaApisTest extends Logging {
@Test
def shouldThrowUnsupportedVersionExceptionOnHandleEndTxnRequestWhenInterBrokerProtocolNotSupported(): Unit = {
metadataCache = MetadataCache.zkMetadataCache(brokerId, IBP_0_10_2_IV0)
brokerEpochManager = new ZkBrokerEpochManager(metadataCache, controller, None)
kafkaApis = createKafkaApis(IBP_0_10_2_IV0)
assertThrows(classOf[UnsupportedVersionException],
() => kafkaApis.handleEndTxnRequest(null, RequestLocal.withThreadConfinedCaching))
@ -2947,6 +2955,8 @@ class KafkaApisTest extends Logging {
@Test
def shouldThrowUnsupportedVersionExceptionOnHandleWriteTxnMarkersRequestWhenInterBrokerProtocolNotSupported(): Unit = {
metadataCache = MetadataCache.zkMetadataCache(brokerId, IBP_0_10_2_IV0)
brokerEpochManager = new ZkBrokerEpochManager(metadataCache, controller, None)
kafkaApis = createKafkaApis(IBP_0_10_2_IV0)
assertThrows(classOf[UnsupportedVersionException],
() => kafkaApis.handleWriteTxnMarkersRequest(null, RequestLocal.withThreadConfinedCaching))
@ -8748,6 +8758,8 @@ class KafkaApisTest extends Logging {
).build()
val requestChannelRequest = buildRequest(joinGroupRequest)
metadataCache = MetadataCache.zkMetadataCache(brokerId, IBP_2_2_IV1)
brokerEpochManager = new ZkBrokerEpochManager(metadataCache, controller, None)
kafkaApis = createKafkaApis(IBP_2_2_IV1)
kafkaApis.handleJoinGroupRequest(requestChannelRequest, RequestLocal.withThreadConfinedCaching)
@ -8766,6 +8778,8 @@ class KafkaApisTest extends Logging {
).build()
val requestChannelRequest = buildRequest(syncGroupRequest)
metadataCache = MetadataCache.zkMetadataCache(brokerId, IBP_2_2_IV1)
brokerEpochManager = new ZkBrokerEpochManager(metadataCache, controller, None)
kafkaApis = createKafkaApis(IBP_2_2_IV1)
kafkaApis.handleSyncGroupRequest(requestChannelRequest, RequestLocal.withThreadConfinedCaching)
@ -8783,6 +8797,8 @@ class KafkaApisTest extends Logging {
.setGenerationId(1)
).build()
val requestChannelRequest = buildRequest(heartbeatRequest)
metadataCache = MetadataCache.zkMetadataCache(brokerId, IBP_2_2_IV1)
brokerEpochManager = new ZkBrokerEpochManager(metadataCache, controller, None)
kafkaApis = createKafkaApis(IBP_2_2_IV1)
kafkaApis.handleHeartbeatRequest(requestChannelRequest)
@ -8812,6 +8828,9 @@ class KafkaApisTest extends Logging {
).build()
val requestChannelRequest = buildRequest(offsetCommitRequest)
metadataCache = MetadataCache.zkMetadataCache(brokerId, IBP_2_2_IV1)
brokerEpochManager = new ZkBrokerEpochManager(metadataCache, controller, None)
kafkaApis = createKafkaApis(IBP_2_2_IV1)
kafkaApis.handleOffsetCommitRequest(requestChannelRequest, RequestLocal.withThreadConfinedCaching)

View File

@ -646,6 +646,14 @@ public enum MetadataVersion {
return version != lowVersion;
}
public short writeTxnMarkersRequestVersion() {
if (isAtLeast(IBP_2_8_IV0)) {
return 1;
} else {
return 0;
}
}
public boolean isAtLeast(MetadataVersion otherVersion) {
return this.compareTo(otherVersion) >= 0;
}

View File

@ -17,6 +17,7 @@
package org.apache.kafka.server.common;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.record.RecordVersion;
import org.junit.jupiter.api.Test;
@ -466,6 +467,20 @@ class MetadataVersionTest {
" to be less than the latest of " + MetadataVersion.latestTesting());
}
/**
* We need to ensure that the latest production MV doesn't inadvertently rely on an unstable
* request version. Currently, the broker selects the version for some inter-broker RPCs based on the MV
* rather than using the supported version from the ApiResponse.
*/
@Test
public void testProductionMetadataDontUseUnstableApiVersion() {
MetadataVersion mv = MetadataVersion.latestProduction();
assertTrue(mv.listOffsetRequestVersion() <= ApiKeys.LIST_OFFSETS.latestVersion(false));
assertTrue(mv.fetchRequestVersion() <= ApiKeys.FETCH.latestVersion(false));
assertTrue(mv.offsetForLeaderEpochRequestVersion() <= ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion(false));
assertTrue(mv.writeTxnMarkersRequestVersion() <= ApiKeys.WRITE_TXN_MARKERS.latestVersion(false));
}
@Test
public void assertLatestProductionIsProduction() {
assertTrue(LATEST_PRODUCTION.isProduction());