mirror of https://github.com/apache/kafka.git
KAFKA-17199 Add UT for TransactionLogConfig and TransactionStateManagerConfig (#17442)
Reviewers: Ken Huang <s7133700@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
parent
fef105e5ad
commit
d74f1f847a
|
@ -1640,6 +1640,11 @@ project(':transaction-coordinator') {
|
||||||
dependencies {
|
dependencies {
|
||||||
implementation libs.jacksonDatabind
|
implementation libs.jacksonDatabind
|
||||||
implementation project(':clients')
|
implementation project(':clients')
|
||||||
|
testImplementation libs.junitJupiter
|
||||||
|
testImplementation libs.mockitoCore
|
||||||
|
|
||||||
|
testRuntimeOnly runtimeTestLibs
|
||||||
|
|
||||||
generator project(':generator')
|
generator project(':generator')
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -24,6 +24,8 @@
|
||||||
|
|
||||||
<!-- common library dependencies -->
|
<!-- common library dependencies -->
|
||||||
<allow pkg="java" />
|
<allow pkg="java" />
|
||||||
|
<allow pkg="org.junit" />
|
||||||
|
<allow pkg="org.mockito" />
|
||||||
<!-- no one depends on the server -->
|
<!-- no one depends on the server -->
|
||||||
<disallow pkg="kafka" />
|
<disallow pkg="kafka" />
|
||||||
|
|
||||||
|
|
|
@ -50,7 +50,7 @@ object TransactionCoordinator {
|
||||||
config.transactionLogConfig.transactionTopicPartitions,
|
config.transactionLogConfig.transactionTopicPartitions,
|
||||||
config.transactionLogConfig.transactionTopicReplicationFactor,
|
config.transactionLogConfig.transactionTopicReplicationFactor,
|
||||||
config.transactionLogConfig.transactionTopicSegmentBytes,
|
config.transactionLogConfig.transactionTopicSegmentBytes,
|
||||||
config.transactionLogConfig.transactionsLoadBufferSize,
|
config.transactionLogConfig.transactionLoadBufferSize,
|
||||||
config.transactionLogConfig.transactionTopicMinISR,
|
config.transactionLogConfig.transactionTopicMinISR,
|
||||||
config.transactionStateManagerConfig.transactionAbortTimedOutTransactionCleanupIntervalMs,
|
config.transactionStateManagerConfig.transactionAbortTimedOutTransactionCleanupIntervalMs,
|
||||||
config.transactionStateManagerConfig.transactionRemoveExpiredTransactionalIdCleanupIntervalMs,
|
config.transactionStateManagerConfig.transactionRemoveExpiredTransactionalIdCleanupIntervalMs,
|
||||||
|
|
|
@ -76,7 +76,7 @@ public final class TransactionLogConfig {
|
||||||
|
|
||||||
private final AbstractConfig config;
|
private final AbstractConfig config;
|
||||||
private final int transactionTopicMinISR;
|
private final int transactionTopicMinISR;
|
||||||
private final int transactionsLoadBufferSize;
|
private final int transactionLoadBufferSize;
|
||||||
private final short transactionTopicReplicationFactor;
|
private final short transactionTopicReplicationFactor;
|
||||||
private final int transactionTopicPartitions;
|
private final int transactionTopicPartitions;
|
||||||
private final int transactionTopicSegmentBytes;
|
private final int transactionTopicSegmentBytes;
|
||||||
|
@ -85,7 +85,7 @@ public final class TransactionLogConfig {
|
||||||
public TransactionLogConfig(AbstractConfig config) {
|
public TransactionLogConfig(AbstractConfig config) {
|
||||||
this.config = config;
|
this.config = config;
|
||||||
this.transactionTopicMinISR = config.getInt(TRANSACTIONS_TOPIC_MIN_ISR_CONFIG);
|
this.transactionTopicMinISR = config.getInt(TRANSACTIONS_TOPIC_MIN_ISR_CONFIG);
|
||||||
this.transactionsLoadBufferSize = config.getInt(TRANSACTIONS_LOAD_BUFFER_SIZE_CONFIG);
|
this.transactionLoadBufferSize = config.getInt(TRANSACTIONS_LOAD_BUFFER_SIZE_CONFIG);
|
||||||
this.transactionTopicReplicationFactor = config.getShort(TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG);
|
this.transactionTopicReplicationFactor = config.getShort(TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG);
|
||||||
this.transactionTopicPartitions = config.getInt(TRANSACTIONS_TOPIC_PARTITIONS_CONFIG);
|
this.transactionTopicPartitions = config.getInt(TRANSACTIONS_TOPIC_PARTITIONS_CONFIG);
|
||||||
this.transactionTopicSegmentBytes = config.getInt(TRANSACTIONS_TOPIC_SEGMENT_BYTES_CONFIG);
|
this.transactionTopicSegmentBytes = config.getInt(TRANSACTIONS_TOPIC_SEGMENT_BYTES_CONFIG);
|
||||||
|
@ -96,8 +96,8 @@ public final class TransactionLogConfig {
|
||||||
return transactionTopicMinISR;
|
return transactionTopicMinISR;
|
||||||
}
|
}
|
||||||
|
|
||||||
public int transactionsLoadBufferSize() {
|
public int transactionLoadBufferSize() {
|
||||||
return transactionsLoadBufferSize;
|
return transactionLoadBufferSize;
|
||||||
}
|
}
|
||||||
|
|
||||||
public short transactionTopicReplicationFactor() {
|
public short transactionTopicReplicationFactor() {
|
||||||
|
|
|
@ -0,0 +1,102 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.kafka.coordinator.transaction;
|
||||||
|
|
||||||
|
import org.apache.kafka.common.config.AbstractConfig;
|
||||||
|
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
|
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||||
|
import static org.mockito.Mockito.doReturn;
|
||||||
|
import static org.mockito.Mockito.mock;
|
||||||
|
import static org.mockito.Mockito.times;
|
||||||
|
import static org.mockito.Mockito.verify;
|
||||||
|
|
||||||
|
class TransactionLogConfigTest {
|
||||||
|
@Test
|
||||||
|
void ShouldDefineAllConfigInConfigDef() {
|
||||||
|
Set<String> declaredConfigs = Arrays.stream(TransactionLogConfig.class.getDeclaredFields())
|
||||||
|
.filter(field -> field.getName().endsWith("_CONFIG"))
|
||||||
|
.peek(field -> field.setAccessible(true))
|
||||||
|
.map(field -> assertDoesNotThrow(() -> (String) field.get(null)))
|
||||||
|
.collect(Collectors.toSet());
|
||||||
|
assertEquals(declaredConfigs, TransactionLogConfig.CONFIG_DEF.names());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void ShouldGetStaticValueFromClassAttribute() {
|
||||||
|
AbstractConfig config = mock(AbstractConfig.class);
|
||||||
|
doReturn(1).when(config).getInt(TransactionLogConfig.TRANSACTIONS_TOPIC_MIN_ISR_CONFIG);
|
||||||
|
doReturn(2).when(config).getInt(TransactionLogConfig.TRANSACTIONS_LOAD_BUFFER_SIZE_CONFIG);
|
||||||
|
doReturn((short) 3).when(config).getShort(TransactionLogConfig.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG);
|
||||||
|
doReturn(4).when(config).getInt(TransactionLogConfig.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG);
|
||||||
|
doReturn(5).when(config).getInt(TransactionLogConfig.TRANSACTIONS_TOPIC_SEGMENT_BYTES_CONFIG);
|
||||||
|
doReturn(6).when(config).getInt(TransactionLogConfig.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_CONFIG);
|
||||||
|
|
||||||
|
TransactionLogConfig transactionLogConfig = new TransactionLogConfig(config);
|
||||||
|
|
||||||
|
assertEquals(1, transactionLogConfig.transactionTopicMinISR());
|
||||||
|
assertEquals(2, transactionLogConfig.transactionLoadBufferSize());
|
||||||
|
assertEquals((short) 3, transactionLogConfig.transactionTopicReplicationFactor());
|
||||||
|
assertEquals(4, transactionLogConfig.transactionTopicPartitions());
|
||||||
|
assertEquals(5, transactionLogConfig.transactionTopicSegmentBytes());
|
||||||
|
assertEquals(6, transactionLogConfig.producerIdExpirationCheckIntervalMs());
|
||||||
|
|
||||||
|
|
||||||
|
// If the following calls are missing, we won’t be able to distinguish whether the value is set in the constructor or if
|
||||||
|
// it fetches the latest value from AbstractConfig with each call.
|
||||||
|
transactionLogConfig.transactionTopicMinISR();
|
||||||
|
transactionLogConfig.transactionLoadBufferSize();
|
||||||
|
transactionLogConfig.transactionTopicReplicationFactor();
|
||||||
|
transactionLogConfig.transactionTopicPartitions();
|
||||||
|
transactionLogConfig.transactionTopicSegmentBytes();
|
||||||
|
transactionLogConfig.producerIdExpirationCheckIntervalMs();
|
||||||
|
|
||||||
|
verify(config, times(1)).getInt(TransactionLogConfig.TRANSACTIONS_TOPIC_MIN_ISR_CONFIG);
|
||||||
|
verify(config, times(1)).getInt(TransactionLogConfig.TRANSACTIONS_LOAD_BUFFER_SIZE_CONFIG);
|
||||||
|
verify(config, times(1)).getShort(TransactionLogConfig.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG);
|
||||||
|
verify(config, times(1)).getInt(TransactionLogConfig.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG);
|
||||||
|
verify(config, times(1)).getInt(TransactionLogConfig.TRANSACTIONS_TOPIC_SEGMENT_BYTES_CONFIG);
|
||||||
|
verify(config, times(1)).getInt(TransactionLogConfig.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_CONFIG);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void ShouldGetDynamicValueFromAbstractConfig() {
|
||||||
|
AbstractConfig config = mock(AbstractConfig.class);
|
||||||
|
doReturn(false).when(config).getBoolean(TransactionLogConfig.TRANSACTION_PARTITION_VERIFICATION_ENABLE_CONFIG);
|
||||||
|
doReturn(88).when(config).getInt(TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_CONFIG);
|
||||||
|
|
||||||
|
TransactionLogConfig transactionLogConfig = new TransactionLogConfig(config);
|
||||||
|
|
||||||
|
assertEquals(false, transactionLogConfig.transactionPartitionVerificationEnable());
|
||||||
|
assertEquals(88, transactionLogConfig.producerIdExpirationMs());
|
||||||
|
|
||||||
|
// If the following calls are missing, we won’t be able to distinguish whether the value is set in the constructor or if
|
||||||
|
// it fetches the latest value from AbstractConfig with each call.
|
||||||
|
transactionLogConfig.transactionPartitionVerificationEnable();
|
||||||
|
transactionLogConfig.producerIdExpirationMs();
|
||||||
|
|
||||||
|
verify(config, times(2)).getBoolean(TransactionLogConfig.TRANSACTION_PARTITION_VERIFICATION_ENABLE_CONFIG);
|
||||||
|
verify(config, times(2)).getInt(TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_CONFIG);
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,75 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.kafka.coordinator.transaction;
|
||||||
|
|
||||||
|
import org.apache.kafka.common.config.AbstractConfig;
|
||||||
|
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
|
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||||
|
import static org.mockito.Mockito.doReturn;
|
||||||
|
import static org.mockito.Mockito.mock;
|
||||||
|
import static org.mockito.Mockito.times;
|
||||||
|
import static org.mockito.Mockito.verify;
|
||||||
|
|
||||||
|
class TransactionStateManagerConfigTest {
|
||||||
|
@Test
|
||||||
|
void ShouldDefineAllConfigInConfigDef() {
|
||||||
|
Set<String> declaredConfigs = Arrays.stream(TransactionStateManagerConfig.class.getDeclaredFields())
|
||||||
|
.filter(field -> field.getName().endsWith("_CONFIG"))
|
||||||
|
.peek(field -> field.setAccessible(true))
|
||||||
|
.map(field -> assertDoesNotThrow(() -> (String) field.get(null)))
|
||||||
|
.collect(Collectors.toSet());
|
||||||
|
assertEquals(declaredConfigs, TransactionStateManagerConfig.CONFIG_DEF.names());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void ShouldGetStaticValueFromClassAttribute() {
|
||||||
|
AbstractConfig config = mock(AbstractConfig.class);
|
||||||
|
doReturn(1).when(config).getInt(TransactionStateManagerConfig.TRANSACTIONS_MAX_TIMEOUT_MS_CONFIG);
|
||||||
|
doReturn(2).when(config).getInt(TransactionStateManagerConfig.TRANSACTIONAL_ID_EXPIRATION_MS_CONFIG);
|
||||||
|
doReturn(3).when(config).getInt(TransactionStateManagerConfig.TRANSACTIONS_ABORT_TIMED_OUT_TRANSACTION_CLEANUP_INTERVAL_MS_CONFIG);
|
||||||
|
doReturn(4).when(config).getInt(TransactionStateManagerConfig.TRANSACTIONS_REMOVE_EXPIRED_TRANSACTIONAL_ID_CLEANUP_INTERVAL_MS_CONFIG);
|
||||||
|
|
||||||
|
TransactionStateManagerConfig transactionStateManagerConfig = new TransactionStateManagerConfig(config);
|
||||||
|
|
||||||
|
assertEquals(1, transactionStateManagerConfig.transactionMaxTimeoutMs());
|
||||||
|
assertEquals(2, transactionStateManagerConfig.transactionalIdExpirationMs());
|
||||||
|
assertEquals(3, transactionStateManagerConfig.transactionAbortTimedOutTransactionCleanupIntervalMs());
|
||||||
|
assertEquals(4, transactionStateManagerConfig.transactionRemoveExpiredTransactionalIdCleanupIntervalMs());
|
||||||
|
|
||||||
|
|
||||||
|
// If the following calls are missing, we won’t be able to distinguish whether the value is set in the constructor or if
|
||||||
|
// it fetches the latest value from AbstractConfig with each call.
|
||||||
|
transactionStateManagerConfig.transactionMaxTimeoutMs();
|
||||||
|
transactionStateManagerConfig.transactionalIdExpirationMs();
|
||||||
|
transactionStateManagerConfig.transactionAbortTimedOutTransactionCleanupIntervalMs();
|
||||||
|
transactionStateManagerConfig.transactionRemoveExpiredTransactionalIdCleanupIntervalMs();
|
||||||
|
|
||||||
|
verify(config, times(1)).getInt(TransactionStateManagerConfig.TRANSACTIONS_MAX_TIMEOUT_MS_CONFIG);
|
||||||
|
verify(config, times(1)).getInt(TransactionStateManagerConfig.TRANSACTIONAL_ID_EXPIRATION_MS_CONFIG);
|
||||||
|
verify(config, times(1)).getInt(TransactionStateManagerConfig.TRANSACTIONS_ABORT_TIMED_OUT_TRANSACTION_CLEANUP_INTERVAL_MS_CONFIG);
|
||||||
|
verify(config, times(1)).getInt(TransactionStateManagerConfig.TRANSACTIONS_REMOVE_EXPIRED_TRANSACTIONAL_ID_CLEANUP_INTERVAL_MS_CONFIG);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
Loading…
Reference in New Issue