MINOR: Migrate tests in o.a.k.streams to JUnit 5 (except KafkaStreamsTest) (#15942)

Reviewers: Christo Lolov <lolovc@amazon.com>, Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
Bruno Cadonna 2024-05-17 21:31:05 +02:00 committed by GitHub
parent 5fa4821444
commit 69fc4c5da4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 83 additions and 97 deletions

View File

@ -20,7 +20,6 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import org.apache.kafka.streams.KafkaStreams.State;
import org.apache.kafka.streams.processor.internals.StreamThread;
/**

View File

@ -16,16 +16,14 @@
*/
package org.apache.kafka.streams;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
@Timeout(600)
public class KeyValueTest {
@Rule
public Timeout globalTimeout = Timeout.seconds(600);
@Test
public void shouldHaveSameEqualsAndHashCode() {
@ -49,28 +47,28 @@ public class KeyValueTest {
assertEquals(kv.hashCode(), copyOfCopyOfKV.hashCode());
// Inequality scenarios
assertNotEquals("must be false for null", null, kv);
assertNotEquals("must be false if key is non-null and other key is null", kv, KeyValue.pair(null, kv.value));
assertNotEquals("must be false if value is non-null and other value is null", kv, KeyValue.pair(kv.key, null));
assertNotEquals(null, kv, "must be false for null");
assertNotEquals(kv, KeyValue.pair(null, kv.value), "must be false if key is non-null and other key is null");
assertNotEquals(kv, KeyValue.pair(kv.key, null), "must be false if value is non-null and other value is null");
final KeyValue<Long, Long> differentKeyType = KeyValue.pair(1L, kv.value);
assertNotEquals("must be false for different key types", kv, differentKeyType);
assertNotEquals(kv, differentKeyType, "must be false for different key types");
final KeyValue<String, String> differentValueType = KeyValue.pair(kv.key, "anyString");
assertNotEquals("must be false for different value types", kv, differentValueType);
assertNotEquals(kv, differentValueType, "must be false for different value types");
final KeyValue<Long, String> differentKeyValueTypes = KeyValue.pair(1L, "anyString");
assertNotEquals("must be false for different key and value types", kv, differentKeyValueTypes);
assertNotEquals("must be false for different types of objects", kv, new Object());
assertNotEquals(kv, differentKeyValueTypes, "must be false for different key and value types");
assertNotEquals(kv, new Object(), "must be false for different types of objects");
final KeyValue<String, Long> differentKey = KeyValue.pair(kv.key + "suffix", kv.value);
assertNotEquals("must be false if key is different", kv, differentKey);
assertNotEquals("must be false if key is different", differentKey, kv);
assertNotEquals(kv, differentKey, "must be false if key is different");
assertNotEquals(differentKey, kv, "must be false if key is different");
final KeyValue<String, Long> differentValue = KeyValue.pair(kv.key, kv.value + 1L);
assertNotEquals("must be false if value is different", kv, differentValue);
assertNotEquals("must be false if value is different", differentValue, kv);
assertNotEquals(kv, differentValue, "must be false if value is different");
assertNotEquals(differentValue, kv, "must be false if value is different");
final KeyValue<String, Long> differentKeyAndValue = KeyValue.pair(kv.key + "suffix", kv.value + 1L);
assertNotEquals("must be false if key and value are different", kv, differentKeyAndValue);
assertNotEquals("must be false if key and value are different", differentKeyAndValue, kv);
assertNotEquals(kv, differentKeyAndValue, "must be false if key and value are different");
assertNotEquals(differentKeyAndValue, kv, "must be false if key and value are different");
}
}

View File

@ -62,10 +62,9 @@ import org.apache.kafka.test.MockValueJoiner;
import org.apache.kafka.test.NoopValueTransformer;
import org.apache.kafka.test.NoopValueTransformerWithKey;
import org.apache.kafka.test.StreamsTestUtils;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import java.time.Duration;
import java.time.Instant;
@ -85,14 +84,13 @@ import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
@Timeout(600)
public class StreamsBuilderTest {
@Rule
public Timeout globalTimeout = Timeout.seconds(600);
private static final String STREAM_TOPIC = "stream-topic";
@ -106,7 +104,7 @@ public class StreamsBuilderTest {
private Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
@Before
@BeforeEach
public void before() {
props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
}
@ -1558,21 +1556,21 @@ public class StreamsBuilderTest {
private static void assertNamesForOperation(final ProcessorTopology topology, final String... expected) {
final List<ProcessorNode<?, ?, ?, ?>> processors = topology.processors();
assertEquals("Invalid number of expected processors", expected.length, processors.size());
assertEquals(expected.length, processors.size(), "Invalid number of expected processors");
for (int i = 0; i < expected.length; i++) {
assertEquals(expected[i], processors.get(i).name());
}
}
private static void assertNamesForStateStore(final List<StateStore> stores, final String... expected) {
assertEquals("Invalid number of expected state stores", expected.length, stores.size());
assertEquals(expected.length, stores.size(), "Invalid number of expected state stores");
for (int i = 0; i < expected.length; i++) {
assertEquals(expected[i], stores.get(i).name());
}
}
private static void assertTypesForStateStore(final List<StateStore> stores, final Class<?>... expected) {
assertEquals("Invalid number of expected state stores", expected.length, stores.size());
assertEquals(expected.length, stores.size(), "Invalid number of expected state stores");
for (int i = 0; i < expected.length; i++) {
StateStore store = stores.get(i);
while (store instanceof WrappedStateStore && !(expected[i].isInstance(store))) {

View File

@ -37,10 +37,9 @@ import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier;
import org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor;
import org.apache.kafka.common.utils.LogCaptureAppender;
import org.apache.kafka.streams.state.BuiltInDslStoreSuppliers;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import java.io.File;
import java.nio.charset.StandardCharsets;
@ -80,17 +79,16 @@ import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.IsEqual.equalTo;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
@Timeout(600)
public class StreamsConfigTest {
@Rule
public Timeout globalTimeout = Timeout.seconds(600);
private final Properties props = new Properties();
private StreamsConfig streamsConfig;
@ -98,7 +96,7 @@ public class StreamsConfigTest {
private final String clientId = "client";
private final int threadIdx = 1;
@Before
@BeforeEach
public void setUp() {
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-config-test");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
@ -240,16 +238,16 @@ public class StreamsConfigTest {
serializer.configure(serializerConfigs, true);
assertEquals(
"Should get the original string after serialization and deserialization with the configured encoding",
str,
streamsConfig.defaultKeySerde().deserializer().deserialize(topic, serializer.serialize(topic, str))
streamsConfig.defaultKeySerde().deserializer().deserialize(topic, serializer.serialize(topic, str)),
"Should get the original string after serialization and deserialization with the configured encoding"
);
serializer.configure(serializerConfigs, false);
assertEquals(
"Should get the original string after serialization and deserialization with the configured encoding",
str,
streamsConfig.defaultValueSerde().deserializer().deserialize(topic, serializer.serialize(topic, str))
streamsConfig.defaultValueSerde().deserializer().deserialize(topic, serializer.serialize(topic, str)),
"Should get the original string after serialization and deserialization with the configured encoding"
);
}
@ -1023,7 +1021,7 @@ public class StreamsConfigTest {
public void shouldSpecifyNoOptimizationWhenNotExplicitlyAddedToConfigs() {
final String expectedOptimizeConfig = "none";
final String actualOptimizedConifig = streamsConfig.getString(TOPOLOGY_OPTIMIZATION_CONFIG);
assertEquals("Optimization should be \"none\"", expectedOptimizeConfig, actualOptimizedConifig);
assertEquals(expectedOptimizeConfig, actualOptimizedConifig, "Optimization should be \"none\"");
}
@Test
@ -1032,7 +1030,7 @@ public class StreamsConfigTest {
props.put(TOPOLOGY_OPTIMIZATION_CONFIG, "all");
final StreamsConfig config = new StreamsConfig(props);
final String actualOptimizedConifig = config.getString(TOPOLOGY_OPTIMIZATION_CONFIG);
assertEquals("Optimization should be \"all\"", expectedOptimizeConfig, actualOptimizedConifig);
assertEquals(expectedOptimizeConfig, actualOptimizedConifig, "Optimization should be \"all\"");
}
@Test
@ -1046,7 +1044,7 @@ public class StreamsConfigTest {
public void shouldSpecifyRocksdbWhenNotExplicitlyAddedToConfigs() {
final String expectedDefaultStoreType = StreamsConfig.ROCKS_DB;
final String actualDefaultStoreType = streamsConfig.getString(DEFAULT_DSL_STORE_CONFIG);
assertEquals("default.dsl.store should be \"rocksDB\"", expectedDefaultStoreType, actualDefaultStoreType);
assertEquals(expectedDefaultStoreType, actualDefaultStoreType, "default.dsl.store should be \"rocksDB\"");
}
@SuppressWarnings("deprecation")
@ -1056,7 +1054,7 @@ public class StreamsConfigTest {
props.put(DEFAULT_DSL_STORE_CONFIG, expectedDefaultStoreType);
final StreamsConfig config = new StreamsConfig(props);
final String actualDefaultStoreType = config.getString(DEFAULT_DSL_STORE_CONFIG);
assertEquals("default.dsl.store should be \"in_memory\"", expectedDefaultStoreType, actualDefaultStoreType);
assertEquals(expectedDefaultStoreType, actualDefaultStoreType, "default.dsl.store should be \"in_memory\"");
}
@SuppressWarnings("deprecation")
@ -1071,9 +1069,10 @@ public class StreamsConfigTest {
final Class<?> expectedDefaultStoreType = BuiltInDslStoreSuppliers.RocksDBDslStoreSuppliers.class;
final Class<?> actualDefaultStoreType = streamsConfig.getClass(DSL_STORE_SUPPLIERS_CLASS_CONFIG);
assertEquals(
"default " + DSL_STORE_SUPPLIERS_CLASS_CONFIG + " should be " + expectedDefaultStoreType,
expectedDefaultStoreType,
actualDefaultStoreType);
expectedDefaultStoreType,
actualDefaultStoreType,
"default " + DSL_STORE_SUPPLIERS_CLASS_CONFIG + " should be " + expectedDefaultStoreType
);
}
@Test
@ -1083,9 +1082,10 @@ public class StreamsConfigTest {
final StreamsConfig config = new StreamsConfig(props);
final Class<?> actualDefaultStoreType = config.getClass(DSL_STORE_SUPPLIERS_CLASS_CONFIG);
assertEquals(
"default " + DSL_STORE_SUPPLIERS_CLASS_CONFIG + " should be " + expectedDefaultStoreType,
expectedDefaultStoreType,
actualDefaultStoreType);
expectedDefaultStoreType,
actualDefaultStoreType,
"default " + DSL_STORE_SUPPLIERS_CLASS_CONFIG + " should be " + expectedDefaultStoreType
);
}
@SuppressWarnings("deprecation")

View File

@ -51,15 +51,13 @@ import org.apache.kafka.test.MockKeyValueStore;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.MockValueJoiner;
import org.apache.kafka.test.StreamsTestUtils;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
import org.junit.runner.RunWith;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
import org.mockito.internal.util.collections.Sets;
import org.mockito.junit.jupiter.MockitoExtension;
import java.time.Duration;
import java.util.Arrays;
@ -73,17 +71,16 @@ import static java.time.Duration.ofMillis;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.fail;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@SuppressWarnings("deprecation")
@RunWith(MockitoJUnitRunner.StrictStubs.class)
@Timeout(600)
@ExtendWith(MockitoExtension.class)
public class TopologyTest {
@Rule
public Timeout globalTimeout = Timeout.seconds(600);
@Mock
private StoreBuilder<MockKeyValueStore> storeBuilder;
@ -93,7 +90,7 @@ public class TopologyTest {
private final InternalTopologyBuilder.TopologyDescription expectedDescription = new InternalTopologyBuilder.TopologyDescription();
private StreamsConfig streamsConfig;
@Before
@BeforeEach
public void setUp() {
final HashMap<String, Object> configs = new HashMap<>();
configs.put(StreamsConfig.APPLICATION_ID_CONFIG, "applicationId");
@ -330,8 +327,6 @@ public class TopologyTest {
private void mockStoreBuilder() {
when(storeBuilder.name()).thenReturn("store");
when(storeBuilder.logConfig()).thenReturn(Collections.emptyMap());
when(storeBuilder.loggingEnabled()).thenReturn(false);
}
@Test
@ -341,8 +336,6 @@ public class TopologyTest {
final StoreBuilder<?> otherStoreBuilder = mock(StoreBuilder.class);
when(otherStoreBuilder.name()).thenReturn("store");
when(otherStoreBuilder.logConfig()).thenReturn(Collections.emptyMap());
when(otherStoreBuilder.loggingEnabled()).thenReturn(false);
try {
topology.addStateStore(otherStoreBuilder);
fail("Should have thrown TopologyException for same store name with different StoreBuilder");
@ -2098,18 +2091,16 @@ public class TopologyTest {
final KTable<Object, Object> table = builder.table("input-topic");
table.mapValues((readOnlyKey, value) -> null);
final TopologyDescription describe = builder.build().describe();
Assert.assertEquals(
"Topologies:\n" +
" Sub-topology: 0\n" +
" Source: KSTREAM-SOURCE-0000000001 (topics: [input-topic])\n" +
" --> KTABLE-SOURCE-0000000002\n" +
" Processor: KTABLE-SOURCE-0000000002 (stores: [])\n" +
" --> KTABLE-MAPVALUES-0000000003\n" +
" <-- KSTREAM-SOURCE-0000000001\n" +
" Processor: KTABLE-MAPVALUES-0000000003 (stores: [])\n" +
" --> none\n" +
" <-- KTABLE-SOURCE-0000000002\n\n",
describe.toString());
assertEquals("Topologies:\n" +
" Sub-topology: 0\n" +
" Source: KSTREAM-SOURCE-0000000001 (topics: [input-topic])\n" +
" --> KTABLE-SOURCE-0000000002\n" +
" Processor: KTABLE-SOURCE-0000000002 (stores: [])\n" +
" --> KTABLE-MAPVALUES-0000000003\n" +
" <-- KSTREAM-SOURCE-0000000001\n" +
" Processor: KTABLE-MAPVALUES-0000000003 (stores: [])\n" +
" --> none\n" +
" <-- KTABLE-SOURCE-0000000002\n\n", describe.toString());
}
@Test
@ -2121,7 +2112,7 @@ public class TopologyTest {
Materialized.<Object, Object, KeyValueStore<Bytes, byte[]>>with(null, null)
.withStoreType(Materialized.StoreType.IN_MEMORY));
final TopologyDescription describe = builder.build().describe();
Assert.assertEquals(
assertEquals(
"Topologies:\n" +
" Sub-topology: 0\n" +
" Source: KSTREAM-SOURCE-0000000001 (topics: [input-topic])\n" +
@ -2147,7 +2138,7 @@ public class TopologyTest {
(readOnlyKey, value) -> null,
Materialized.<Object, Object, KeyValueStore<Bytes, byte[]>>as("store-name").withKeySerde(null).withValueSerde(null));
final TopologyDescription describe = builder.build().describe();
Assert.assertEquals(
assertEquals(
"Topologies:\n" +
" Sub-topology: 0\n" +
" Source: KSTREAM-SOURCE-0000000001 (topics: [input-topic])\n" +
@ -2168,7 +2159,7 @@ public class TopologyTest {
final KTable<Object, Object> table = builder.table("input-topic");
table.filter((key, value) -> false);
final TopologyDescription describe = builder.build().describe();
Assert.assertEquals(
assertEquals(
"Topologies:\n" +
" Sub-topology: 0\n" +
" Source: KSTREAM-SOURCE-0000000001 (topics: [input-topic])\n" +
@ -2188,7 +2179,7 @@ public class TopologyTest {
final KTable<Object, Object> table = builder.table("input-topic");
table.filter((key, value) -> false, Materialized.with(null, null));
final TopologyDescription describe = builder.build().describe();
Assert.assertEquals(
assertEquals(
"Topologies:\n" +
" Sub-topology: 0\n" +
" Source: KSTREAM-SOURCE-0000000001 (topics: [input-topic])\n" +
@ -2213,7 +2204,7 @@ public class TopologyTest {
table.filter((key, value) -> false, Materialized.as("store-name"));
final TopologyDescription describe = builder.build().describe();
Assert.assertEquals(
assertEquals(
"Topologies:\n" +
" Sub-topology: 0\n" +
" Source: KSTREAM-SOURCE-0000000001 (topics: [input-topic])\n" +