KAFKA-16735; Deprecate offsets.commit.required.acks (#15931)

This patch deprecates `offsets.commit.required.acks` in Apache Kafka 3.8 as described in KIP-1041: https://cwiki.apache.org/confluence/x/9YobEg.

Reviewers: Justine Olshan <jolshan@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
David Jacot 2024-05-13 20:30:34 +02:00 committed by GitHub
parent 5439914c32
commit f9169b7d3a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 11 additions and 1 deletions

View File

@ -36,6 +36,7 @@ import org.apache.kafka.coordinator.group.{Group, OffsetConfig}
import org.apache.kafka.server.record.BrokerCompressionType
import org.apache.kafka.storage.internals.log.VerificationGuard
import scala.annotation.nowarn
import scala.collection.{Map, Seq, Set, immutable, mutable}
import scala.math.max
@ -1767,6 +1768,7 @@ object GroupCoordinator {
GroupCoordinator(config, replicaManager, heartbeatPurgatory, rebalancePurgatory, time, metrics)
}
@nowarn("cat=deprecation")
private[group] def offsetConfig(config: KafkaConfig) = new OffsetConfig(
config.offsetMetadataMaxSize,
config.offsetsLoadBufferSize,

View File

@ -1060,6 +1060,7 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
val offsetsTopicReplicationFactor = getShort(GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG)
val offsetsTopicPartitions = getInt(GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG)
val offsetCommitTimeoutMs = getInt(GroupCoordinatorConfig.OFFSET_COMMIT_TIMEOUT_MS_CONFIG)
@deprecated("3.8")
val offsetCommitRequiredAcks = getShort(GroupCoordinatorConfig.OFFSET_COMMIT_REQUIRED_ACKS_CONFIG)
val offsetsTopicSegmentBytes = getInt(GroupCoordinatorConfig.OFFSETS_TOPIC_SEGMENT_BYTES_CONFIG)
val offsetsTopicCompressionType = Option(getInt(GroupCoordinatorConfig.OFFSETS_TOPIC_COMPRESSION_CODEC_CONFIG)).map(value => CompressionType.forId(value)).orNull
@ -1547,6 +1548,10 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
require(consumerGroupSessionTimeoutMs <= consumerGroupMaxSessionTimeoutMs,
s"${GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_CONFIG} must be less than or equals " +
s"to ${GroupCoordinatorConfig.CONSUMER_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG}")
if (originals.containsKey(GroupCoordinatorConfig.OFFSET_COMMIT_REQUIRED_ACKS_CONFIG)) {
warn(s"${GroupCoordinatorConfig.OFFSET_COMMIT_REQUIRED_ACKS_CONFIG} is deprecated and it will be removed in Apache Kafka 4.0.")
}
}
/**

View File

@ -55,6 +55,7 @@ import org.mockito.{ArgumentCaptor, ArgumentMatchers}
import org.mockito.ArgumentMatchers.{any, anyInt, anyLong, anyShort}
import org.mockito.Mockito.{mock, reset, times, verify, when}
import scala.annotation.nowarn
import scala.jdk.CollectionConverters._
import scala.collection.{immutable, _}
@ -78,6 +79,7 @@ class GroupMetadataManagerTest {
val defaultRequireStable = false
val numOffsetsPartitions = 2
@nowarn("cat=deprecation")
private val offsetConfig = {
val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(nodeId = 0, zkConnect = ""))
new OffsetConfig(config.offsetMetadataMaxSize,

View File

@ -149,9 +149,10 @@ public class GroupCoordinatorConfig {
public static final String OFFSET_COMMIT_TIMEOUT_MS_DOC = "Offset commit will be delayed until all replicas for the offsets topic receive the commit " +
"or this timeout is reached. This is similar to the producer request timeout.";
@Deprecated
public static final String OFFSET_COMMIT_REQUIRED_ACKS_CONFIG = "offsets.commit.required.acks";
public static final short OFFSET_COMMIT_REQUIRED_ACKS_DEFAULT = -1;
public static final String OFFSET_COMMIT_REQUIRED_ACKS_DOC = "The required acks before the commit can be accepted. In general, the default (-1) should not be overridden.";
public static final String OFFSET_COMMIT_REQUIRED_ACKS_DOC = "DEPRECATED: The required acks before the commit can be accepted. In general, the default (-1) should not be overridden.";
/**
* The timeout used to wait for a new member in milliseconds.