KAFKA-16736 Remove `offsets.commit.required.acks` in 4.0 (#16938)

Reviewers: Jeff Kim <jeff.kim@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
David Jacot 2024-08-21 05:14:04 +02:00 committed by GitHub
parent 7b6d4fbc57
commit 944c1353a9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 5 additions and 50 deletions

View File

@ -36,7 +36,6 @@ import org.apache.kafka.coordinator.group.{Group, OffsetConfig}
import org.apache.kafka.server.record.BrokerCompressionType
import org.apache.kafka.storage.internals.log.VerificationGuard
import scala.annotation.nowarn
import scala.collection.{Map, Seq, Set, immutable, mutable}
import scala.math.max
@ -1768,7 +1767,6 @@ object GroupCoordinator {
GroupCoordinator(config, replicaManager, heartbeatPurgatory, rebalancePurgatory, time, metrics)
}
@nowarn("cat=deprecation")
private[group] def offsetConfig(config: KafkaConfig) = new OffsetConfig(
config.groupCoordinatorConfig.offsetMetadataMaxSize,
config.groupCoordinatorConfig.offsetsLoadBufferSize,
@ -1778,8 +1776,7 @@ object GroupCoordinator {
config.groupCoordinatorConfig.offsetsTopicSegmentBytes,
config.groupCoordinatorConfig.offsetsTopicReplicationFactor,
config.groupCoordinatorConfig.offsetTopicCompressionType,
config.groupCoordinatorConfig.offsetCommitTimeoutMs,
config.groupCoordinatorConfig.offsetCommitRequiredAcks
config.groupCoordinatorConfig.offsetCommitTimeoutMs
)
private[group] def apply(

View File

@ -330,7 +330,7 @@ class GroupMetadataManager(brokerId: Int,
// call replica manager to append the group message
replicaManager.appendRecords(
timeout = config.offsetCommitTimeoutMs.toLong,
requiredAcks = config.offsetCommitRequiredAcks,
requiredAcks = -1,
internalTopicsAllowed = true,
origin = AppendOrigin.COORDINATOR,
entriesPerPartition = records,

View File

@ -1067,11 +1067,6 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])
require(principalBuilderClass != null, s"${BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG} must be non-null")
require(classOf[KafkaPrincipalSerde].isAssignableFrom(principalBuilderClass),
s"${BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG} must implement KafkaPrincipalSerde")
if (originals.containsKey(GroupCoordinatorConfig.OFFSET_COMMIT_REQUIRED_ACKS_CONFIG)) {
warn(s"${GroupCoordinatorConfig.OFFSET_COMMIT_REQUIRED_ACKS_CONFIG} is deprecated and it will be removed in Apache Kafka 4.0.")
}
}
/**

View File

@ -56,7 +56,6 @@ import org.mockito.{ArgumentCaptor, ArgumentMatchers}
import org.mockito.ArgumentMatchers.{any, anyInt, anyLong, anyShort}
import org.mockito.Mockito.{mock, reset, times, verify, when}
import scala.annotation.nowarn
import scala.jdk.CollectionConverters._
import scala.collection.{immutable, _}
@ -80,7 +79,6 @@ class GroupMetadataManagerTest {
val defaultRequireStable = false
val numOffsetsPartitions = 2
@nowarn("cat=deprecation")
private val offsetConfig = {
val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(nodeId = 0, zkConnect = ""))
new OffsetConfig(config.groupCoordinatorConfig.offsetMetadataMaxSize,
@ -91,8 +89,7 @@ class GroupMetadataManagerTest {
config.groupCoordinatorConfig.offsetsTopicSegmentBytes,
config.groupCoordinatorConfig.offsetsTopicReplicationFactor,
config.groupCoordinatorConfig.offsetTopicCompressionType,
config.groupCoordinatorConfig.offsetCommitTimeoutMs,
config.groupCoordinatorConfig.offsetCommitRequiredAcks)
config.groupCoordinatorConfig.offsetCommitTimeoutMs)
}
@BeforeEach

View File

@ -982,7 +982,6 @@ class KafkaConfigTest {
case GroupCoordinatorConfig.OFFSETS_RETENTION_MINUTES_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", "0")
case GroupCoordinatorConfig.OFFSETS_RETENTION_CHECK_INTERVAL_MS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", "0")
case GroupCoordinatorConfig.OFFSET_COMMIT_TIMEOUT_MS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", "0")
case GroupCoordinatorConfig.OFFSET_COMMIT_REQUIRED_ACKS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", "-2")
case TransactionStateManagerConfigs.TRANSACTIONAL_ID_EXPIRATION_MS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", "0", "-2")
case TransactionStateManagerConfigs.TRANSACTIONS_MAX_TIMEOUT_MS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", "0", "-2")
case TransactionLogConfigs.TRANSACTIONS_TOPIC_MIN_ISR_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", "0", "-2")

View File

@ -204,11 +204,6 @@ public class GroupCoordinatorConfig {
public static final String OFFSET_COMMIT_TIMEOUT_MS_DOC = "Offset commit will be delayed until all replicas for the offsets topic receive the commit " +
"or this timeout is reached. This is similar to the producer request timeout.";
@Deprecated
public static final String OFFSET_COMMIT_REQUIRED_ACKS_CONFIG = "offsets.commit.required.acks";
public static final short OFFSET_COMMIT_REQUIRED_ACKS_DEFAULT = -1;
public static final String OFFSET_COMMIT_REQUIRED_ACKS_DOC = "DEPRECATED: The required acks before the commit can be accepted. In general, the default (-1) should not be overridden.";
public static final ConfigDef GROUP_COORDINATOR_CONFIG_DEF = new ConfigDef()
.define(GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG, INT, GROUP_MIN_SESSION_TIMEOUT_MS_DEFAULT, MEDIUM, GROUP_MIN_SESSION_TIMEOUT_MS_DOC)
.define(GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG, INT, GROUP_MAX_SESSION_TIMEOUT_MS_DEFAULT, MEDIUM, GROUP_MAX_SESSION_TIMEOUT_MS_DOC)
@ -232,8 +227,7 @@ public class GroupCoordinatorConfig {
.define(OFFSETS_TOPIC_COMPRESSION_CODEC_CONFIG, INT, (int) OFFSETS_TOPIC_COMPRESSION_CODEC_DEFAULT.id, HIGH, OFFSETS_TOPIC_COMPRESSION_CODEC_DOC)
.define(OFFSETS_RETENTION_MINUTES_CONFIG, INT, OFFSETS_RETENTION_MINUTES_DEFAULT, atLeast(1), HIGH, OFFSETS_RETENTION_MINUTES_DOC)
.define(OFFSETS_RETENTION_CHECK_INTERVAL_MS_CONFIG, LONG, OFFSETS_RETENTION_CHECK_INTERVAL_MS_DEFAULT, atLeast(1), HIGH, OFFSETS_RETENTION_CHECK_INTERVAL_MS_DOC)
.define(OFFSET_COMMIT_TIMEOUT_MS_CONFIG, INT, OFFSET_COMMIT_TIMEOUT_MS_DEFAULT, atLeast(1), HIGH, OFFSET_COMMIT_TIMEOUT_MS_DOC)
.define(OFFSET_COMMIT_REQUIRED_ACKS_CONFIG, SHORT, OFFSET_COMMIT_REQUIRED_ACKS_DEFAULT, HIGH, OFFSET_COMMIT_REQUIRED_ACKS_DOC);
.define(OFFSET_COMMIT_TIMEOUT_MS_CONFIG, INT, OFFSET_COMMIT_TIMEOUT_MS_DEFAULT, atLeast(1), HIGH, OFFSET_COMMIT_TIMEOUT_MS_DOC);
public static final ConfigDef CONSUMER_GROUP_CONFIG_DEF = new ConfigDef()
.define(CONSUMER_GROUP_SESSION_TIMEOUT_MS_CONFIG, INT, CONSUMER_GROUP_SESSION_TIMEOUT_MS_DEFAULT, atLeast(1), MEDIUM, CONSUMER_GROUP_SESSION_TIMEOUT_MS_DOC)
.define(CONSUMER_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG, INT, CONSUMER_GROUP_MIN_SESSION_TIMEOUT_MS_DEFAULT, atLeast(1), MEDIUM, CONSUMER_GROUP_MIN_SESSION_TIMEOUT_MS_DOC)
@ -278,7 +272,6 @@ public class GroupCoordinatorConfig {
private final int offsetsLoadBufferSize;
private final int offsetsTopicPartitions;
private final short offsetsTopicReplicationFactor;
private final short offsetCommitRequiredAcks;
private final int consumerGroupMinSessionTimeoutMs;
private final int consumerGroupMaxSessionTimeoutMs;
private final int consumerGroupMinHeartbeatIntervalMs;
@ -317,7 +310,6 @@ public class GroupCoordinatorConfig {
this.offsetsLoadBufferSize = config.getInt(GroupCoordinatorConfig.OFFSETS_LOAD_BUFFER_SIZE_CONFIG);
this.offsetsTopicPartitions = config.getInt(GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG);
this.offsetsTopicReplicationFactor = config.getShort(GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG);
this.offsetCommitRequiredAcks = config.getShort(GroupCoordinatorConfig.OFFSET_COMMIT_REQUIRED_ACKS_CONFIG);
this.consumerGroupMinSessionTimeoutMs = config.getInt(GroupCoordinatorConfig.CONSUMER_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG);
this.consumerGroupMaxSessionTimeoutMs = config.getInt(GroupCoordinatorConfig.CONSUMER_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG);
this.consumerGroupMinHeartbeatIntervalMs = config.getInt(GroupCoordinatorConfig.CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG);
@ -331,9 +323,6 @@ public class GroupCoordinatorConfig {
this.shareGroupMaxHeartbeatIntervalMs = config.getInt(GroupCoordinatorConfig.SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG);
this.shareGroupMaxSize = config.getInt(GroupCoordinatorConfig.SHARE_GROUP_MAX_SIZE_CONFIG);
require(offsetCommitRequiredAcks >= -1 && offsetCommitRequiredAcks <= offsetsTopicReplicationFactor,
String.format("%s must be greater or equal to -1 and less or equal to %s", OFFSET_COMMIT_REQUIRED_ACKS_CONFIG, OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG));
// New group coordinator configs validation.
require(consumerGroupMaxHeartbeatIntervalMs >= consumerGroupMinHeartbeatIntervalMs,
String.format("%s must be greater than or equals to %s", CONSUMER_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG, CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG));
@ -543,15 +532,6 @@ public class GroupCoordinatorConfig {
return offsetsTopicReplicationFactor;
}
/**
* DEPRECATED: The required acks before the commit can be accepted.
* In general, the default (-1) should not be overridden.
*/
@Deprecated // since 3.8
public short offsetCommitRequiredAcks() {
return offsetCommitRequiredAcks;
}
/**
* The minimum allowed session timeout for registered consumers.
*/

View File

@ -28,7 +28,6 @@ public class OffsetConfig {
public final short offsetsTopicReplicationFactor;
public final CompressionType offsetsTopicCompressionType;
public final int offsetCommitTimeoutMs;
public final short offsetCommitRequiredAcks;
/**
* Configuration settings for in-built offset management
@ -50,8 +49,6 @@ public class OffsetConfig {
* order to achieve "atomic" commits.
* @param offsetCommitTimeoutMs The offset commit will be delayed until all replicas for the offsets topic receive the
* commit or this timeout is reached. (Similar to the producer request timeout.)
* @param offsetCommitRequiredAcks The required acks before the commit can be accepted. In general, the default (-1)
* should not be overridden.
*/
public OffsetConfig(int maxMetadataSize,
int loadBufferSize,
@ -61,8 +58,7 @@ public class OffsetConfig {
int offsetsTopicSegmentBytes,
short offsetsTopicReplicationFactor,
CompressionType offsetsTopicCompressionType,
int offsetCommitTimeoutMs,
short offsetCommitRequiredAcks
int offsetCommitTimeoutMs
) {
this.maxMetadataSize = maxMetadataSize;
this.loadBufferSize = loadBufferSize;
@ -73,6 +69,5 @@ public class OffsetConfig {
this.offsetsTopicReplicationFactor = offsetsTopicReplicationFactor;
this.offsetsTopicCompressionType = offsetsTopicCompressionType;
this.offsetCommitTimeoutMs = offsetCommitTimeoutMs;
this.offsetCommitRequiredAcks = offsetCommitRequiredAcks;
}
}

View File

@ -68,7 +68,6 @@ public class GroupCoordinatorConfigTest {
configs.put(GroupCoordinatorConfig.OFFSETS_LOAD_BUFFER_SIZE_CONFIG, 555);
configs.put(GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, 111);
configs.put(GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, (short) 11);
configs.put(GroupCoordinatorConfig.OFFSET_COMMIT_REQUIRED_ACKS_CONFIG, (short) 0);
configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG, 333);
configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG, 666);
configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG, 111);
@ -97,7 +96,6 @@ public class GroupCoordinatorConfigTest {
assertEquals(555, config.offsetsLoadBufferSize());
assertEquals(111, config.offsetsTopicPartitions());
assertEquals(11, config.offsetsTopicReplicationFactor());
assertEquals(0, config.offsetCommitRequiredAcks());
assertEquals(333, config.consumerGroupMinSessionTimeoutMs());
assertEquals(666, config.consumerGroupMaxSessionTimeoutMs());
assertEquals(111, config.consumerGroupMinHeartbeatIntervalMs());
@ -107,12 +105,6 @@ public class GroupCoordinatorConfigTest {
@Test
public void testInvalidConfigs() {
Map<String, Object> configs = new HashMap<>();
configs.put(GroupCoordinatorConfig.OFFSET_COMMIT_REQUIRED_ACKS_CONFIG, (short) -2);
configs.put(GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, (short) 3);
assertEquals("offsets.commit.required.acks must be greater or equal to -1 and less or equal to offsets.topic.replication.factor",
assertThrows(IllegalArgumentException.class, () -> createConfig(configs)).getMessage());
configs.clear();
configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG, 10);
configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG, 20);
configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, 20);