KAFKA-9875: Make integration tests more resilient (#8578)

Reviewers: Guozhang Wang <wangguoz@gmail.com>
This commit is contained in:
John Roesler 2020-04-29 17:11:49 -05:00 committed by GitHub
parent 7edbff3394
commit dc4d439825
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
25 changed files with 185 additions and 134 deletions

View File

@ -84,6 +84,7 @@ import java.util.concurrent.atomic.AtomicReference;
import static java.util.Collections.emptyList;
import static java.util.Collections.singletonList;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
import static org.easymock.EasyMock.anyInt;
import static org.easymock.EasyMock.anyLong;
import static org.easymock.EasyMock.anyObject;
@ -848,8 +849,9 @@ public class KafkaStreamsTest {
@Test
public void statelessTopologyShouldNotCreateStateDirectory() throws Exception {
final String inputTopic = testName.getMethodName() + "-input";
final String outputTopic = testName.getMethodName() + "-output";
final String safeTestName = safeUniqueTestName(getClass(), testName);
final String inputTopic = safeTestName + "-input";
final String outputTopic = safeTestName + "-output";
final Topology topology = new Topology();
topology.addSource("source", Serdes.String().deserializer(), Serdes.String().deserializer(), inputTopic)
.addProcessor("process", () -> new AbstractProcessor<String, String>() {
@ -866,22 +868,24 @@ public class KafkaStreamsTest {
@Test
public void inMemoryStatefulTopologyShouldNotCreateStateDirectory() throws Exception {
final String inputTopic = testName.getMethodName() + "-input";
final String outputTopic = testName.getMethodName() + "-output";
final String globalTopicName = testName.getMethodName() + "-global";
final String storeName = testName.getMethodName() + "-counts";
final String globalStoreName = testName.getMethodName() + "-globalStore";
final String safeTestName = safeUniqueTestName(getClass(), testName);
final String inputTopic = safeTestName + "-input";
final String outputTopic = safeTestName + "-output";
final String globalTopicName = safeTestName + "-global";
final String storeName = safeTestName + "-counts";
final String globalStoreName = safeTestName + "-globalStore";
final Topology topology = getStatefulTopology(inputTopic, outputTopic, globalTopicName, storeName, globalStoreName, false);
startStreamsAndCheckDirExists(topology, false);
}
@Test
public void statefulTopologyShouldCreateStateDirectory() throws Exception {
final String inputTopic = testName.getMethodName() + "-input";
final String outputTopic = testName.getMethodName() + "-output";
final String globalTopicName = testName.getMethodName() + "-global";
final String storeName = testName.getMethodName() + "-counts";
final String globalStoreName = testName.getMethodName() + "-globalStore";
final String safeTestName = safeUniqueTestName(getClass(), testName);
final String inputTopic = safeTestName + "-input";
final String outputTopic = safeTestName + "-output";
final String globalTopicName = safeTestName + "-global";
final String storeName = safeTestName + "-counts";
final String globalStoreName = safeTestName + "-globalStore";
final Topology topology = getStatefulTopology(inputTopic, outputTopic, globalTopicName, storeName, globalStoreName, true);
startStreamsAndCheckDirExists(topology, true);
}

View File

@ -53,8 +53,8 @@ import static java.util.Collections.singletonList;
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.common.utils.Utils.mkProperties;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.cleanStateAfterTest;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.cleanStateBeforeTest;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.quietlyCleanStateAfterTest;
import static org.junit.Assert.assertFalse;
@ -158,7 +158,7 @@ public class EOSUncleanShutdownIntegrationTest {
// the state directory should still exist with the empty checkpoint file
assertFalse(stateDir.exists());
cleanStateAfterTest(CLUSTER, driver);
quietlyCleanStateAfterTest(CLUSTER, driver);
}
}
}

View File

@ -58,6 +58,7 @@ import java.util.Iterator;
import java.util.Map;
import java.util.Properties;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
import static org.junit.Assert.assertNotNull;
@RunWith(Parameterized.class)
@ -108,10 +109,8 @@ public class GlobalKTableEOSIntegrationTest {
builder = new StreamsBuilder();
createTopics();
streamsConfiguration = new Properties();
final String applicationId = "globalTable-eos-test-" + testName.getMethodName()
.replace('[', '_')
.replace(']', '_');
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
final String safeTestName = safeUniqueTestName(getClass(), testName);
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + safeTestName);
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
@ -291,12 +290,9 @@ public class GlobalKTableEOSIntegrationTest {
}
private void createTopics() throws Exception {
final String suffix = testName.getMethodName()
.replace('[', '_')
.replace(']', '_');
streamTopic = "stream-" + suffix;
globalTableTopic = "globalTable-" + suffix;
CLUSTER.deleteAllTopicsAndWait(300_000L);
final String safeTestName = safeUniqueTestName(getClass(), testName);
streamTopic = "stream-" + safeTestName;
globalTableTopic = "globalTable-" + safeTestName;
CLUSTER.createTopics(streamTopic);
CLUSTER.createTopic(globalTableTopic, 2, 1);
}

View File

@ -16,7 +16,6 @@
*/
package org.apache.kafka.streams.integration;
import java.time.Duration;
import kafka.utils.MockTime;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.LongSerializer;
@ -39,8 +38,8 @@ import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.TestUtils;
@ -52,12 +51,14 @@ import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import java.time.Duration;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import static java.util.Collections.singletonList;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForApplicationState;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.IsEqual.equalTo;
@ -91,8 +92,8 @@ public class GlobalKTableIntegrationTest {
builder = new StreamsBuilder();
createTopics();
streamsConfiguration = new Properties();
final String applicationId = "globalTableTopic-table-test-" + testName.getMethodName();
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
final String safeTestName = safeUniqueTestName(getClass(), testName);
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + safeTestName);
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
@ -300,8 +301,9 @@ public class GlobalKTableIntegrationTest {
}
private void createTopics() throws Exception {
streamTopic = "stream-" + testName.getMethodName();
globalTableTopic = "globalTable-" + testName.getMethodName();
final String safeTestName = safeUniqueTestName(getClass(), testName);
streamTopic = "stream-" + safeTestName;
globalTableTopic = "globalTable-" + safeTestName;
CLUSTER.createTopics(streamTopic);
CLUSTER.createTopic(globalTableTopic, 2, 1);
}

View File

@ -54,6 +54,7 @@ import java.util.List;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicInteger;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
import static org.junit.Assert.assertEquals;
@ -101,8 +102,8 @@ public class GlobalThreadShutDownOrderTest {
builder = new StreamsBuilder();
createTopics();
streamsConfiguration = new Properties();
final String applicationId = "global-thread-shutdown-test" + testName.getMethodName();
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
final String safeTestName = safeUniqueTestName(getClass(), testName);
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + safeTestName);
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());

View File

@ -57,6 +57,7 @@ import java.util.List;
import java.util.Properties;
import static java.time.Duration.ofMillis;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
/**
* Similar to KStreamAggregationIntegrationTest but with dedupping enabled
@ -88,8 +89,8 @@ public class KStreamAggregationDedupIntegrationTest {
builder = new StreamsBuilder();
createTopics();
streamsConfiguration = new Properties();
final String applicationId = "kgrouped-stream-test-" + testName.getMethodName();
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
final String safeTestName = safeUniqueTestName(getClass(), testName);
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + safeTestName);
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
@ -222,8 +223,9 @@ public class KStreamAggregationDedupIntegrationTest {
private void createTopics() throws InterruptedException {
streamOneInput = "stream-one-" + testName.getMethodName();
outputTopic = "output-" + testName.getMethodName();
final String safeTestName = safeUniqueTestName(getClass(), testName);
streamOneInput = "stream-one-" + safeTestName;
outputTopic = "output-" + safeTestName;
CLUSTER.createTopic(streamOneInput, 3, 1);
CLUSTER.createTopic(outputTopic);
}
@ -238,9 +240,10 @@ public class KStreamAggregationDedupIntegrationTest {
final Deserializer<V> valueDeserializer,
final List<KeyValueTimestamp<K, V>> expectedRecords)
throws InterruptedException {
final String safeTestName = safeUniqueTestName(getClass(), testName);
final Properties consumerProperties = new Properties();
consumerProperties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
consumerProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "kgroupedstream-test-" + testName.getMethodName());
consumerProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group-" + safeTestName);
consumerProperties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumerProperties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer.getClass().getName());
consumerProperties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer.getClass().getName());

View File

@ -88,6 +88,7 @@ import java.util.concurrent.TimeUnit;
import static java.time.Duration.ofMillis;
import static java.time.Instant.ofEpochMilli;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.Is.is;
@ -107,7 +108,7 @@ public class KStreamAggregationIntegrationTest {
private Properties streamsConfiguration;
private KafkaStreams kafkaStreams;
private String streamOneInput;
private String userSessionsStream = "user-sessions";
private String userSessionsStream;
private String outputTopic;
private KGroupedStream<String, String> groupedStream;
private Reducer<String> reducer;
@ -123,8 +124,8 @@ public class KStreamAggregationIntegrationTest {
builder = new StreamsBuilder();
createTopics();
streamsConfiguration = new Properties();
final String applicationId = "kgrouped-stream-test-" + testName.getMethodName();
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
final String safeTestName = safeUniqueTestName(getClass(), testName);
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + safeTestName);
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
@ -791,9 +792,10 @@ public class KStreamAggregationIntegrationTest {
private void createTopics() throws InterruptedException {
streamOneInput = "stream-one-" + testName.getMethodName();
outputTopic = "output-" + testName.getMethodName();
userSessionsStream = userSessionsStream + "-" + testName.getMethodName();
final String safeTestName = safeUniqueTestName(getClass(), testName);
streamOneInput = "stream-one-" + safeTestName;
outputTopic = "output-" + safeTestName;
userSessionsStream = "user-sessions-" + safeTestName;
CLUSTER.createTopic(streamOneInput, 3, 1);
CLUSTER.createTopics(userSessionsStream, outputTopic);
}
@ -814,9 +816,10 @@ public class KStreamAggregationIntegrationTest {
final Deserializer<V> valueDeserializer,
final Class innerClass,
final int numMessages) throws InterruptedException {
final String safeTestName = safeUniqueTestName(getClass(), testName);
final Properties consumerProperties = new Properties();
consumerProperties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
consumerProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "kgroupedstream-test-" + testName.getMethodName());
consumerProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group-" + safeTestName);
consumerProperties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumerProperties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer.getClass().getName());
consumerProperties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer.getClass().getName());
@ -835,9 +838,10 @@ public class KStreamAggregationIntegrationTest {
final Deserializer<V> valueDeserializer,
final Class innerClass,
final int numMessages) throws InterruptedException {
final String safeTestName = safeUniqueTestName(getClass(), testName);
final Properties consumerProperties = new Properties();
consumerProperties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
consumerProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "kgroupedstream-test-" + testName.getMethodName());
consumerProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group-" + safeTestName);
consumerProperties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumerProperties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer.getClass().getName());
consumerProperties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer.getClass().getName());

View File

@ -77,6 +77,7 @@ import java.util.regex.Pattern;
import static org.apache.kafka.streams.KafkaStreams.State.ERROR;
import static org.apache.kafka.streams.KafkaStreams.State.REBALANCING;
import static org.apache.kafka.streams.KafkaStreams.State.RUNNING;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
@ -116,16 +117,12 @@ public class KStreamRepartitionIntegrationTest {
streamsConfiguration = new Properties();
kafkaStreamsInstances = new ArrayList<>();
final String suffix = testName.getMethodName()
.replace('[', '_')
.replace(']', '_')
.replace(' ', '_')
.replace('=', '_');
final String safeTestName = safeUniqueTestName(getClass(), testName);
topicB = "topic-b-" + suffix;
inputTopic = "input-topic-" + suffix;
outputTopic = "output-topic-" + suffix;
applicationId = "kstream-repartition-stream-test-" + suffix;
topicB = "topic-b-" + safeTestName;
inputTopic = "input-topic-" + safeTestName;
outputTopic = "output-topic-" + safeTestName;
applicationId = "app-" + safeTestName;
CLUSTER.createTopic(inputTopic, 4, 1);
CLUSTER.createTopic(outputTopic, 1, 1);
@ -812,9 +809,10 @@ public class KStreamRepartitionIntegrationTest {
final Deserializer<V> valueSerializer,
final List<KeyValue<K, V>> expectedRecords) throws InterruptedException {
final String safeTestName = safeUniqueTestName(getClass(), testName);
final Properties consumerProperties = new Properties();
consumerProperties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
consumerProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "kstream-repartition-test-" + testName.getMethodName());
consumerProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group-" + safeTestName);
consumerProperties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumerProperties.setProperty(
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,

View File

@ -54,6 +54,7 @@ import static java.util.Collections.emptyMap;
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.common.utils.Utils.mkProperties;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
@ -87,8 +88,9 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
@Before
public void before() {
final String safeTestName = safeUniqueTestName(getClass(), testName);
streamsConfig = mkProperties(mkMap(
mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, "ktable-ktable-joinOnForeignKey-" + testName.getMethodName()),
mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + safeTestName),
mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "asdf:0000"),
mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()),
mkEntry(StreamsConfig.TOPOLOGY_OPTIMIZATION, optimization)

View File

@ -50,6 +50,7 @@ import static java.util.Collections.emptyMap;
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.common.utils.Utils.mkProperties;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
@ -75,8 +76,9 @@ public class KTableKTableForeignKeyJoinMaterializationIntegrationTest {
@Before
public void before() {
final String safeTestName = safeUniqueTestName(getClass(), testName);
streamsConfig = mkProperties(mkMap(
mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, "ktable-ktable-joinOnForeignKey-" + testName.getMethodName()),
mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + safeTestName),
mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "asdf:0000"),
mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath())
));

View File

@ -66,6 +66,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import static org.apache.kafka.common.utils.Utils.mkSet;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.startApplicationAndWaitUntilRunning;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.IsEqual.equalTo;
@ -88,16 +89,17 @@ public class LagFetchIntegrationTest {
private String stateStoreName;
@Rule
public TestName name = new TestName();
public TestName testName = new TestName();
@Before
public void before() {
inputTopicName = "input-topic-" + name.getMethodName();
outputTopicName = "output-topic-" + name.getMethodName();
stateStoreName = "lagfetch-test-store" + name.getMethodName();
final String safeTestName = safeUniqueTestName(getClass(), testName);
inputTopicName = "input-topic-" + safeTestName;
outputTopicName = "output-topic-" + safeTestName;
stateStoreName = "lagfetch-test-store" + safeTestName;
streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "lag-fetch-" + name.getMethodName());
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + safeTestName);
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
@ -106,7 +108,7 @@ public class LagFetchIntegrationTest {
consumerConfiguration = new Properties();
consumerConfiguration.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
consumerConfiguration.setProperty(ConsumerConfig.GROUP_ID_CONFIG, name.getMethodName() + "-consumer");
consumerConfiguration.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group-" + safeTestName);
consumerConfiguration.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumerConfiguration.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerConfiguration.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());

View File

@ -59,6 +59,7 @@ import java.util.List;
import java.util.Properties;
import java.util.stream.Collectors;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
@ -72,8 +73,6 @@ public class MetricsIntegrationTest {
public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
private final long timeout = 60000;
private final static String APPLICATION_ID_VALUE = "stream-metrics-test";
// Metric group
private static final String STREAM_CLIENT_NODE_METRICS = "stream-metrics";
private static final String STREAM_THREAD_NODE_METRICS_0100_TO_24 = "stream-metrics";
@ -227,14 +226,15 @@ public class MetricsIntegrationTest {
private String appId;
@Rule
public TestName name = new TestName();
public TestName testName = new TestName();
@Before
public void before() throws InterruptedException {
builder = new StreamsBuilder();
CLUSTER.createTopics(STREAM_INPUT, STREAM_OUTPUT_1, STREAM_OUTPUT_2, STREAM_OUTPUT_3, STREAM_OUTPUT_4);
appId = APPLICATION_ID_VALUE + "-" + name.getMethodName();
final String safeTestName = safeUniqueTestName(getClass(), testName);
appId = "app-" + safeTestName;
streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, appId);

View File

@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.integration;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.startApplicationAndWaitUntilRunning;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
@ -71,7 +72,7 @@ public class OptimizedKTableIntegrationTest {
public final EmbeddedKafkaCluster cluster = new EmbeddedKafkaCluster(NUM_BROKERS);
@Rule
public final TestName name = new TestName();
public final TestName testName = new TestName();
private final List<KafkaStreams> streamsToCleanup = new ArrayList<>();
private final MockTime mockTime = cluster.time;
@ -215,13 +216,13 @@ public class OptimizedKTableIntegrationTest {
}
private Properties streamsConfiguration() {
final String applicationId = "streamsApp";
final String safeTestName = safeUniqueTestName(getClass(), testName);
final Properties config = new Properties();
config.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.OPTIMIZE);
config.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId + name.getMethodName());
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + safeTestName);
config.put(StreamsConfig.APPLICATION_SERVER_CONFIG, "localhost:" + (++port));
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers());
config.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory(applicationId).getPath());
config.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
config.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);

View File

@ -98,6 +98,7 @@ import static java.time.Duration.ofMillis;
import static java.time.Duration.ofSeconds;
import static java.time.Instant.ofEpochMilli;
import static org.apache.kafka.common.utils.Utils.mkSet;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.startApplicationAndWaitUntilRunning;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForApplicationState;
import static org.apache.kafka.test.StreamsTestUtils.startKafkaStreamsAndWaitForRunningState;
@ -140,14 +141,15 @@ public class QueryableStateIntegrationTest {
private Comparator<KeyValue<String, Long>> stringLongComparator;
private void createTopics() throws Exception {
streamOne = streamOne + "-" + name.getMethodName();
streamConcurrent = streamConcurrent + "-" + name.getMethodName();
streamThree = streamThree + "-" + name.getMethodName();
outputTopic = outputTopic + "-" + name.getMethodName();
outputTopicConcurrent = outputTopicConcurrent + "-" + name.getMethodName();
outputTopicConcurrentWindowed = outputTopicConcurrentWindowed + "-" + name.getMethodName();
outputTopicThree = outputTopicThree + "-" + name.getMethodName();
streamTwo = streamTwo + "-" + name.getMethodName();
final String safeTestName = safeUniqueTestName(getClass(), testName);
streamOne = streamOne + "-" + safeTestName;
streamConcurrent = streamConcurrent + "-" + safeTestName;
streamThree = streamThree + "-" + safeTestName;
outputTopic = outputTopic + "-" + safeTestName;
outputTopicConcurrent = outputTopicConcurrent + "-" + safeTestName;
outputTopicConcurrentWindowed = outputTopicConcurrentWindowed + "-" + safeTestName;
outputTopicThree = outputTopicThree + "-" + safeTestName;
streamTwo = streamTwo + "-" + safeTestName;
CLUSTER.createTopics(streamOne, streamConcurrent);
CLUSTER.createTopic(streamTwo, STREAM_TWO_PARTITIONS, NUM_REPLICAS);
CLUSTER.createTopic(streamThree, STREAM_THREE_PARTITIONS, 1);
@ -191,18 +193,18 @@ public class QueryableStateIntegrationTest {
}
@Rule
public TestName name = new TestName();
public TestName testName = new TestName();
@Before
public void before() throws Exception {
createTopics();
streamsConfiguration = new Properties();
final String applicationId = "queryable-state-" + name.getMethodName();
final String safeTestName = safeUniqueTestName(getClass(), testName);
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + safeTestName);
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory("state-" + applicationId).getPath());
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);

View File

@ -53,9 +53,10 @@ import static java.util.Arrays.asList;
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.common.utils.Utils.mkProperties;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.cleanStateAfterTest;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.cleanStateBeforeTest;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.getStartedStreams;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.quietlyCleanStateAfterTest;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
@ -92,11 +93,11 @@ public class ResetPartitionTimeIntegrationTest {
public String processingGuarantee;
@Rule
public TestName name = new TestName();
public TestName testName = new TestName();
@Test
public void shouldPreservePartitionTimeOnKafkaStreamRestart() {
final String appId = name.getMethodName();
final String appId = "app-" + safeUniqueTestName(getClass(), testName);
final String input = "input";
final String outputRaw = "output-raw";
@ -156,7 +157,7 @@ public class ResetPartitionTimeIntegrationTest {
assertThat(lastRecordedTimestamp, is(5000L));
} finally {
kafkaStreams.close();
cleanStateAfterTest(CLUSTER, kafkaStreams);
quietlyCleanStateAfterTest(CLUSTER, kafkaStreams);
}
}

View File

@ -60,6 +60,7 @@ import java.util.List;
import java.util.Properties;
import java.util.stream.Collectors;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
@ -113,7 +114,7 @@ public class RocksDBMetricsIntegrationTest {
public String processingGuarantee;
@Rule
public TestName name = new TestName();
public TestName testName = new TestName();
@Before
public void before() throws Exception {
@ -154,10 +155,8 @@ public class RocksDBMetricsIntegrationTest {
private Properties streamsConfig() {
final Properties streamsConfiguration = new Properties();
final String suffix = name.getMethodName()
.replace('[', '_')
.replace(']', '_');
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "test-application-" + suffix);
final String safeTestName = safeUniqueTestName(getClass(), testName);
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "test-application-" + safeTestName);
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

View File

@ -45,6 +45,8 @@ import org.junit.rules.TestName;
import java.util.Properties;
import java.util.function.Predicate;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
@Category({IntegrationTest.class})
public class StandbyTaskCreationIntegrationTest {
@ -75,11 +77,11 @@ public class StandbyTaskCreationIntegrationTest {
}
private Properties streamsConfiguration() {
final String applicationId = "testApp" + testName.getMethodName();
final String safeTestName = safeUniqueTestName(getClass(), testName);
final Properties streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + safeTestName);
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory(applicationId).getPath());
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
streamsConfiguration.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);

View File

@ -51,6 +51,7 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
import static org.apache.kafka.test.TestUtils.waitForCondition;
import static org.junit.Assert.assertTrue;
@ -72,8 +73,8 @@ public class StandbyTaskEOSIntegrationTest {
@Parameterized.Parameter
public String eosConfig;
private final String appId = "eos-test-app";
private final String inputTopic = "input";
private String appId;
private String inputTopic;
@ClassRule
public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(3);
@ -83,6 +84,9 @@ public class StandbyTaskEOSIntegrationTest {
@Before
public void createTopics() throws Exception {
final String safeTestName = safeUniqueTestName(getClass(), testName);
appId = "app-" + safeTestName;
inputTopic = "input-" + safeTestName;
CLUSTER.deleteTopicsAndWait(inputTopic, appId + "-KSTREAM-AGGREGATE-STATE-STORE-0000000001-changelog");
CLUSTER.createTopic(inputTopic, 1, 3);
}
@ -159,10 +163,7 @@ public class StandbyTaskEOSIntegrationTest {
private Properties props(final String stateDirPath) {
final Properties streamsConfiguration = new Properties();
final String suffix = testName.getMethodName()
.replace('[', '_')
.replace(']', '_');
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, appId + suffix);
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, appId);
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, stateDirPath);

View File

@ -24,10 +24,10 @@ import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyQueryMetadata;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StoreQueryParameters;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Consumed;
@ -38,7 +38,6 @@ import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
@ -56,6 +55,7 @@ import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.startApplicationAndWaitUntilRunning;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
@ -318,13 +318,13 @@ public class StoreQueryIntegrationTest {
}
private Properties streamsConfiguration() {
final String applicationId = "streamsApp" + testName.getMethodName();
final String safeTestName = safeUniqueTestName(getClass(), testName);
final Properties config = new Properties();
config.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.OPTIMIZE);
config.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + safeTestName);
config.put(StreamsConfig.APPLICATION_SERVER_CONFIG, "localhost:" + (++port));
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers());
config.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory(applicationId).getPath());
config.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
config.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);

View File

@ -56,29 +56,31 @@ import java.util.Properties;
import static java.util.Arrays.asList;
import static java.util.Collections.singletonList;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
@Category({IntegrationTest.class})
public class StoreUpgradeIntegrationTest {
private static String inputStream;
private static final String STORE_NAME = "store";
private String inputStream;
private KafkaStreams kafkaStreams;
@ClassRule
public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
@Before
public void createTopics() throws Exception {
inputStream = "input-stream-" + testName.getMethodName();
CLUSTER.createTopic(inputStream);
}
@Rule
public TestName testName = new TestName();
@Before
public void createTopics() throws Exception {
inputStream = "input-stream-" + safeUniqueTestName(getClass(), testName);
CLUSTER.createTopic(inputStream);
}
private Properties props() {
final Properties streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "addId-" + testName.getMethodName());
final String safeTestName = safeUniqueTestName(getClass(), testName);
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + safeTestName);
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());

View File

@ -45,8 +45,10 @@ import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestUtils;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
@ -56,7 +58,6 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
@ -69,9 +70,10 @@ import static java.util.Arrays.asList;
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.common.utils.Utils.mkProperties;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.cleanStateAfterTest;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.cleanStateBeforeTest;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.getStartedStreams;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.quietlyCleanStateAfterTest;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
import static org.apache.kafka.streams.kstream.Suppressed.BufferConfig.maxRecords;
import static org.apache.kafka.streams.kstream.Suppressed.untilTimeLimit;
import static org.hamcrest.CoreMatchers.is;
@ -81,12 +83,18 @@ import static org.hamcrest.Matchers.equalTo;
@RunWith(Parameterized.class)
@Category({IntegrationTest.class})
public class SuppressionDurabilityIntegrationTest {
private static final Logger LOG = LoggerFactory.getLogger(SuppressionDurabilityIntegrationTest.class);
@ClassRule
public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(
3,
mkProperties(mkMap()),
0L
);
@Rule
public TestName testName = new TestName();
private static final StringDeserializer STRING_DESERIALIZER = new StringDeserializer();
private static final StringSerializer STRING_SERIALIZER = new StringSerializer();
private static final Serde<String> STRING_SERDE = Serdes.String();
@ -107,8 +115,8 @@ public class SuppressionDurabilityIntegrationTest {
@Test
public void shouldRecoverBufferAfterShutdown() {
final String testId = "-shouldRecoverBufferAfterShutdown";
final String appId = getClass().getSimpleName().toLowerCase(Locale.getDefault()) + testId;
final String testId = safeUniqueTestName(getClass(), testName);
final String appId = "appId_" + testId;
final String input = "input" + testId;
final String storeName = "counts";
final String outputSuppressed = "output-suppressed" + testId;
@ -243,7 +251,7 @@ public class SuppressionDurabilityIntegrationTest {
} finally {
driver.close();
cleanStateAfterTest(CLUSTER, driver);
quietlyCleanStateAfterTest(CLUSTER, driver);
}
}

View File

@ -66,8 +66,8 @@ import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.common.utils.Utils.mkProperties;
import static org.apache.kafka.streams.StreamsConfig.AT_LEAST_ONCE;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.DEFAULT_TIMEOUT;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.cleanStateAfterTest;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.cleanStateBeforeTest;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.quietlyCleanStateAfterTest;
import static org.apache.kafka.streams.kstream.Suppressed.BufferConfig.maxBytes;
import static org.apache.kafka.streams.kstream.Suppressed.BufferConfig.maxRecords;
import static org.apache.kafka.streams.kstream.Suppressed.untilTimeLimit;
@ -150,7 +150,7 @@ public class SuppressionIntegrationTest {
assertThat(suppressedRecords, is(true));
} finally {
driver.close();
cleanStateAfterTest(CLUSTER, driver);
quietlyCleanStateAfterTest(CLUSTER, driver);
}
}
@ -203,7 +203,7 @@ public class SuppressionIntegrationTest {
assertThat(suppressedRecords, is(true));
} finally {
driver.close();
cleanStateAfterTest(CLUSTER, driver);
quietlyCleanStateAfterTest(CLUSTER, driver);
}
}
@ -272,7 +272,7 @@ public class SuppressionIntegrationTest {
verifyErrorShutdown(driver);
} finally {
driver.close();
cleanStateAfterTest(CLUSTER, driver);
quietlyCleanStateAfterTest(CLUSTER, driver);
}
}
@ -314,7 +314,7 @@ public class SuppressionIntegrationTest {
verifyErrorShutdown(driver);
} finally {
driver.close();
cleanStateAfterTest(CLUSTER, driver);
quietlyCleanStateAfterTest(CLUSTER, driver);
}
}
@ -374,7 +374,7 @@ public class SuppressionIntegrationTest {
assertThat(suppressedRecords, is(true));
} finally {
driver.close();
cleanStateAfterTest(CLUSTER, driver);
quietlyCleanStateAfterTest(CLUSTER, driver);
}
}
@ -430,7 +430,7 @@ public class SuppressionIntegrationTest {
assertThat(suppressedRecords, is(true));
} finally {
driver.close();
cleanStateAfterTest(CLUSTER, driver);
quietlyCleanStateAfterTest(CLUSTER, driver);
}
}
@ -492,7 +492,7 @@ public class SuppressionIntegrationTest {
assertThat(suppressedRecords, is(true));
} finally {
driver.close();
cleanStateAfterTest(CLUSTER, driver);
quietlyCleanStateAfterTest(CLUSTER, driver);
}
}

View File

@ -50,6 +50,9 @@ import org.apache.kafka.streams.processor.internals.ThreadStateTransitionValidat
import org.apache.kafka.streams.state.QueryableStoreType;
import org.apache.kafka.test.TestCondition;
import org.apache.kafka.test.TestUtils;
import org.junit.rules.TestName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Option;
import java.io.File;
@ -91,6 +94,7 @@ import static org.junit.Assert.fail;
public class IntegrationTestUtils {
public static final long DEFAULT_TIMEOUT = 60 * 1000L;
private static final Logger LOG = LoggerFactory.getLogger(IntegrationTestUtils.class);
/*
* Records state transition for StreamThread
@ -111,6 +115,19 @@ public class IntegrationTestUtils {
}
}
/**
* Gives a test name that is safe to be used in application ids, topic names, etc.
* The name is safe even for parameterized methods.
*/
public static String safeUniqueTestName(final Class<?> testClass, final TestName testName) {
return (testClass.getSimpleName() + testName.getMethodName())
.replace('.', '_')
.replace('[', '_')
.replace(']', '_')
.replace(' ', '_')
.replace('=', '_');
}
/**
* Removes local state stores. Useful to reset state in-between integration test runs.
*
@ -145,12 +162,12 @@ public class IntegrationTestUtils {
}
}
public static void cleanStateAfterTest(final EmbeddedKafkaCluster cluster, final KafkaStreams driver) {
driver.cleanUp();
public static void quietlyCleanStateAfterTest(final EmbeddedKafkaCluster cluster, final KafkaStreams driver) {
try {
driver.cleanUp();
cluster.deleteAllTopicsAndWait(DEFAULT_TIMEOUT);
} catch (final InterruptedException e) {
throw new RuntimeException(e);
} catch (final RuntimeException | InterruptedException e) {
LOG.warn("Ignoring failure to clean test state", e);
}
}

View File

@ -45,6 +45,7 @@ import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.common.utils.Utils.mkProperties;
import static org.apache.kafka.common.utils.Utils.mkSet;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
@ -239,7 +240,8 @@ public class KTableKTableForeignKeyJoinScenarioTest {
private void validateTopologyCanProcessData(final StreamsBuilder builder) {
final Properties config = new Properties();
config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "dummy-" + testName.getMethodName());
final String safeTestName = safeUniqueTestName(getClass(), testName);
config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "dummy-" + safeTestName);
config.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy");
config.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName());
config.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName());

View File

@ -106,7 +106,9 @@ public final class StreamsTestUtils {
kafkaStreams.start();
assertThat(
"KafkaStreams did not transit to RUNNING state within " + timeoutMs + " milli seconds.",
countDownLatch.await(timeoutMs, TimeUnit.MILLISECONDS), equalTo(true));
countDownLatch.await(timeoutMs, TimeUnit.MILLISECONDS),
equalTo(true)
);
}
public static <K, V> List<KeyValue<K, V>> toList(final Iterator<KeyValue<K, V>> iterator) {