From d74f1f847a42ffb3013eba68a275da03b970f175 Mon Sep 17 00:00:00 2001 From: "Chung, Ming-Yen" Date: Sat, 12 Oct 2024 09:42:54 +0800 Subject: [PATCH] KAFKA-17199 Add UT for TransactionLogConfig and TransactionStateManagerConfig (#17442) Reviewers: Ken Huang , Chia-Ping Tsai --- build.gradle | 5 + ...import-control-transaction-coordinator.xml | 2 + .../transaction/TransactionCoordinator.scala | 2 +- .../transaction/TransactionLogConfig.java | 8 +- .../transaction/TransactionLogConfigTest.java | 102 ++++++++++++++++++ .../TransactionStateManagerConfigTest.java | 75 +++++++++++++ 6 files changed, 189 insertions(+), 5 deletions(-) create mode 100644 transaction-coordinator/src/test/java/org/apache/kafka/coordinator/transaction/TransactionLogConfigTest.java create mode 100644 transaction-coordinator/src/test/java/org/apache/kafka/coordinator/transaction/TransactionStateManagerConfigTest.java diff --git a/build.gradle b/build.gradle index 892eacfa661..e1157468bb0 100644 --- a/build.gradle +++ b/build.gradle @@ -1640,6 +1640,11 @@ project(':transaction-coordinator') { dependencies { implementation libs.jacksonDatabind implementation project(':clients') + testImplementation libs.junitJupiter + testImplementation libs.mockitoCore + + testRuntimeOnly runtimeTestLibs + generator project(':generator') } diff --git a/checkstyle/import-control-transaction-coordinator.xml b/checkstyle/import-control-transaction-coordinator.xml index de73c5ac8da..8619b04ed19 100644 --- a/checkstyle/import-control-transaction-coordinator.xml +++ b/checkstyle/import-control-transaction-coordinator.xml @@ -24,6 +24,8 @@ + + diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala index 72a196eca60..6105c19266a 100644 --- a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala +++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala @@ -50,7 +50,7 @@ object TransactionCoordinator { config.transactionLogConfig.transactionTopicPartitions, config.transactionLogConfig.transactionTopicReplicationFactor, config.transactionLogConfig.transactionTopicSegmentBytes, - config.transactionLogConfig.transactionsLoadBufferSize, + config.transactionLogConfig.transactionLoadBufferSize, config.transactionLogConfig.transactionTopicMinISR, config.transactionStateManagerConfig.transactionAbortTimedOutTransactionCleanupIntervalMs, config.transactionStateManagerConfig.transactionRemoveExpiredTransactionalIdCleanupIntervalMs, diff --git a/transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionLogConfig.java b/transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionLogConfig.java index c8f236f4d6a..9a083bc5a21 100644 --- a/transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionLogConfig.java +++ b/transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionLogConfig.java @@ -76,7 +76,7 @@ public final class TransactionLogConfig { private final AbstractConfig config; private final int transactionTopicMinISR; - private final int transactionsLoadBufferSize; + private final int transactionLoadBufferSize; private final short transactionTopicReplicationFactor; private final int transactionTopicPartitions; private final int transactionTopicSegmentBytes; @@ -85,7 +85,7 @@ public final class TransactionLogConfig { public TransactionLogConfig(AbstractConfig config) { this.config = config; this.transactionTopicMinISR = config.getInt(TRANSACTIONS_TOPIC_MIN_ISR_CONFIG); - this.transactionsLoadBufferSize = config.getInt(TRANSACTIONS_LOAD_BUFFER_SIZE_CONFIG); + this.transactionLoadBufferSize = config.getInt(TRANSACTIONS_LOAD_BUFFER_SIZE_CONFIG); this.transactionTopicReplicationFactor = config.getShort(TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG); this.transactionTopicPartitions = config.getInt(TRANSACTIONS_TOPIC_PARTITIONS_CONFIG); this.transactionTopicSegmentBytes = config.getInt(TRANSACTIONS_TOPIC_SEGMENT_BYTES_CONFIG); @@ -96,8 +96,8 @@ public final class TransactionLogConfig { return transactionTopicMinISR; } - public int transactionsLoadBufferSize() { - return transactionsLoadBufferSize; + public int transactionLoadBufferSize() { + return transactionLoadBufferSize; } public short transactionTopicReplicationFactor() { diff --git a/transaction-coordinator/src/test/java/org/apache/kafka/coordinator/transaction/TransactionLogConfigTest.java b/transaction-coordinator/src/test/java/org/apache/kafka/coordinator/transaction/TransactionLogConfigTest.java new file mode 100644 index 00000000000..ea68367681c --- /dev/null +++ b/transaction-coordinator/src/test/java/org/apache/kafka/coordinator/transaction/TransactionLogConfigTest.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.coordinator.transaction; + +import org.apache.kafka.common.config.AbstractConfig; + +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +class TransactionLogConfigTest { + @Test + void ShouldDefineAllConfigInConfigDef() { + Set declaredConfigs = Arrays.stream(TransactionLogConfig.class.getDeclaredFields()) + .filter(field -> field.getName().endsWith("_CONFIG")) + .peek(field -> field.setAccessible(true)) + .map(field -> assertDoesNotThrow(() -> (String) field.get(null))) + .collect(Collectors.toSet()); + assertEquals(declaredConfigs, TransactionLogConfig.CONFIG_DEF.names()); + } + + @Test + void ShouldGetStaticValueFromClassAttribute() { + AbstractConfig config = mock(AbstractConfig.class); + doReturn(1).when(config).getInt(TransactionLogConfig.TRANSACTIONS_TOPIC_MIN_ISR_CONFIG); + doReturn(2).when(config).getInt(TransactionLogConfig.TRANSACTIONS_LOAD_BUFFER_SIZE_CONFIG); + doReturn((short) 3).when(config).getShort(TransactionLogConfig.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG); + doReturn(4).when(config).getInt(TransactionLogConfig.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG); + doReturn(5).when(config).getInt(TransactionLogConfig.TRANSACTIONS_TOPIC_SEGMENT_BYTES_CONFIG); + doReturn(6).when(config).getInt(TransactionLogConfig.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_CONFIG); + + TransactionLogConfig transactionLogConfig = new TransactionLogConfig(config); + + assertEquals(1, transactionLogConfig.transactionTopicMinISR()); + assertEquals(2, transactionLogConfig.transactionLoadBufferSize()); + assertEquals((short) 3, transactionLogConfig.transactionTopicReplicationFactor()); + assertEquals(4, transactionLogConfig.transactionTopicPartitions()); + assertEquals(5, transactionLogConfig.transactionTopicSegmentBytes()); + assertEquals(6, transactionLogConfig.producerIdExpirationCheckIntervalMs()); + + + // If the following calls are missing, we won’t be able to distinguish whether the value is set in the constructor or if + // it fetches the latest value from AbstractConfig with each call. + transactionLogConfig.transactionTopicMinISR(); + transactionLogConfig.transactionLoadBufferSize(); + transactionLogConfig.transactionTopicReplicationFactor(); + transactionLogConfig.transactionTopicPartitions(); + transactionLogConfig.transactionTopicSegmentBytes(); + transactionLogConfig.producerIdExpirationCheckIntervalMs(); + + verify(config, times(1)).getInt(TransactionLogConfig.TRANSACTIONS_TOPIC_MIN_ISR_CONFIG); + verify(config, times(1)).getInt(TransactionLogConfig.TRANSACTIONS_LOAD_BUFFER_SIZE_CONFIG); + verify(config, times(1)).getShort(TransactionLogConfig.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG); + verify(config, times(1)).getInt(TransactionLogConfig.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG); + verify(config, times(1)).getInt(TransactionLogConfig.TRANSACTIONS_TOPIC_SEGMENT_BYTES_CONFIG); + verify(config, times(1)).getInt(TransactionLogConfig.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_CONFIG); + } + + @Test + void ShouldGetDynamicValueFromAbstractConfig() { + AbstractConfig config = mock(AbstractConfig.class); + doReturn(false).when(config).getBoolean(TransactionLogConfig.TRANSACTION_PARTITION_VERIFICATION_ENABLE_CONFIG); + doReturn(88).when(config).getInt(TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_CONFIG); + + TransactionLogConfig transactionLogConfig = new TransactionLogConfig(config); + + assertEquals(false, transactionLogConfig.transactionPartitionVerificationEnable()); + assertEquals(88, transactionLogConfig.producerIdExpirationMs()); + + // If the following calls are missing, we won’t be able to distinguish whether the value is set in the constructor or if + // it fetches the latest value from AbstractConfig with each call. + transactionLogConfig.transactionPartitionVerificationEnable(); + transactionLogConfig.producerIdExpirationMs(); + + verify(config, times(2)).getBoolean(TransactionLogConfig.TRANSACTION_PARTITION_VERIFICATION_ENABLE_CONFIG); + verify(config, times(2)).getInt(TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_CONFIG); + } +} \ No newline at end of file diff --git a/transaction-coordinator/src/test/java/org/apache/kafka/coordinator/transaction/TransactionStateManagerConfigTest.java b/transaction-coordinator/src/test/java/org/apache/kafka/coordinator/transaction/TransactionStateManagerConfigTest.java new file mode 100644 index 00000000000..7b68f692ef8 --- /dev/null +++ b/transaction-coordinator/src/test/java/org/apache/kafka/coordinator/transaction/TransactionStateManagerConfigTest.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.coordinator.transaction; + +import org.apache.kafka.common.config.AbstractConfig; + +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +class TransactionStateManagerConfigTest { + @Test + void ShouldDefineAllConfigInConfigDef() { + Set declaredConfigs = Arrays.stream(TransactionStateManagerConfig.class.getDeclaredFields()) + .filter(field -> field.getName().endsWith("_CONFIG")) + .peek(field -> field.setAccessible(true)) + .map(field -> assertDoesNotThrow(() -> (String) field.get(null))) + .collect(Collectors.toSet()); + assertEquals(declaredConfigs, TransactionStateManagerConfig.CONFIG_DEF.names()); + } + + @Test + void ShouldGetStaticValueFromClassAttribute() { + AbstractConfig config = mock(AbstractConfig.class); + doReturn(1).when(config).getInt(TransactionStateManagerConfig.TRANSACTIONS_MAX_TIMEOUT_MS_CONFIG); + doReturn(2).when(config).getInt(TransactionStateManagerConfig.TRANSACTIONAL_ID_EXPIRATION_MS_CONFIG); + doReturn(3).when(config).getInt(TransactionStateManagerConfig.TRANSACTIONS_ABORT_TIMED_OUT_TRANSACTION_CLEANUP_INTERVAL_MS_CONFIG); + doReturn(4).when(config).getInt(TransactionStateManagerConfig.TRANSACTIONS_REMOVE_EXPIRED_TRANSACTIONAL_ID_CLEANUP_INTERVAL_MS_CONFIG); + + TransactionStateManagerConfig transactionStateManagerConfig = new TransactionStateManagerConfig(config); + + assertEquals(1, transactionStateManagerConfig.transactionMaxTimeoutMs()); + assertEquals(2, transactionStateManagerConfig.transactionalIdExpirationMs()); + assertEquals(3, transactionStateManagerConfig.transactionAbortTimedOutTransactionCleanupIntervalMs()); + assertEquals(4, transactionStateManagerConfig.transactionRemoveExpiredTransactionalIdCleanupIntervalMs()); + + + // If the following calls are missing, we won’t be able to distinguish whether the value is set in the constructor or if + // it fetches the latest value from AbstractConfig with each call. + transactionStateManagerConfig.transactionMaxTimeoutMs(); + transactionStateManagerConfig.transactionalIdExpirationMs(); + transactionStateManagerConfig.transactionAbortTimedOutTransactionCleanupIntervalMs(); + transactionStateManagerConfig.transactionRemoveExpiredTransactionalIdCleanupIntervalMs(); + + verify(config, times(1)).getInt(TransactionStateManagerConfig.TRANSACTIONS_MAX_TIMEOUT_MS_CONFIG); + verify(config, times(1)).getInt(TransactionStateManagerConfig.TRANSACTIONAL_ID_EXPIRATION_MS_CONFIG); + verify(config, times(1)).getInt(TransactionStateManagerConfig.TRANSACTIONS_ABORT_TIMED_OUT_TRANSACTION_CLEANUP_INTERVAL_MS_CONFIG); + verify(config, times(1)).getInt(TransactionStateManagerConfig.TRANSACTIONS_REMOVE_EXPIRED_TRANSACTIONAL_ID_CLEANUP_INTERVAL_MS_CONFIG); + } + +} \ No newline at end of file