KAFKA-17880 Move integration test from streams module to streams/integration-tests module (#17615)

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
PoAn Yang 2024-11-01 18:21:06 +08:00 committed by GitHub
parent 346fdbafc5
commit 5a3b544d61
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
93 changed files with 280 additions and 176 deletions

View File

@ -2652,16 +2652,6 @@ project(':streams') {
testImplementation project(':metadata')
testImplementation project(':clients').sourceSets.test.output
testImplementation project(':server')
testImplementation project(':core')
testImplementation project(':tools')
testImplementation project(':test-common')
testImplementation project(':storage')
testImplementation project(':group-coordinator')
testImplementation project(':transaction-coordinator')
testImplementation project(':server-common')
testImplementation project(':server-common').sourceSets.test.output
testImplementation project(':server')
testImplementation libs.reload4j
testImplementation libs.junitJupiter
testImplementation libs.bcpkix
@ -2669,7 +2659,6 @@ project(':streams') {
testImplementation libs.mockitoCore
testImplementation libs.mockitoJunitJupiter // supports MockitoExtension
testImplementation libs.junitPlatformSuiteEngine // supports suite test
testImplementation project(':group-coordinator')
testRuntimeOnly project(':streams:test-utils')
testRuntimeOnly runtimeTestLibs
@ -2761,6 +2750,7 @@ project(':streams') {
task testAll(
dependsOn: [
':streams:test',
':streams:integration-tests',
':streams:test-utils:test',
':streams:streams-scala:test',
':streams:upgrade-system-tests-0100:test',
@ -2803,18 +2793,12 @@ project(':streams:streams-scala') {
api project(':streams')
api libs.scalaLibrary
testImplementation project(':group-coordinator')
testImplementation project(':core')
testImplementation project(':test-common')
testImplementation project(':server-common').sourceSets.test.output
testImplementation project(':streams').sourceSets.test.output
testImplementation project(':clients').sourceSets.test.output
testImplementation project(':streams:test-utils')
testImplementation libs.junitJupiter
testImplementation libs.mockitoCore
testImplementation libs.mockitoJunitJupiter // supports MockitoExtension
testImplementation libs.hamcrest
testRuntimeOnly runtimeTestLibs
}
@ -2848,6 +2832,58 @@ project(':streams:streams-scala') {
}
}
project(':streams:integration-tests') {
apply plugin: 'scala'
base {
archivesName = "kafka-streams-integration-tests"
}
dependencies {
testImplementation project(':clients').sourceSets.test.output
testImplementation project(':group-coordinator')
testImplementation project(':server')
testImplementation project(':server-common')
testImplementation project(':server-common').sourceSets.test.output
testImplementation project(':storage')
testImplementation project(':streams').sourceSets.test.output
testImplementation project(':streams:streams-scala')
testImplementation project(':test-common')
testImplementation project(':tools')
testImplementation project(':transaction-coordinator')
testImplementation libs.bcpkix
testImplementation libs.hamcrest
testImplementation libs.junitJupiter
testImplementation libs.junitPlatformSuiteEngine // supports suite test
testImplementation libs.mockitoCore
testImplementation libs.reload4j
testImplementation libs.slf4jApi
testImplementation project(':streams:test-utils')
testRuntimeOnly runtimeTestLibs
}
sourceSets {
// Set java/scala source folders in the `scala` block to enable joint compilation
main {
java {
srcDirs = []
}
scala {
srcDirs = []
}
}
test {
java {
srcDirs = []
}
scala {
srcDirs = ["src/test/java", "src/test/scala"]
}
}
}
}
project(':streams:test-utils') {
base {
archivesName = "kafka-streams-test-utils"

View File

@ -85,6 +85,7 @@ include 'clients',
'storage:api',
'streams',
'streams:examples',
'streams:integration-tests',
'streams:streams-scala',
'streams:test-utils',
'streams:upgrade-system-tests-0100',

View File

@ -60,6 +60,7 @@ import java.util.stream.Collectors;
import static java.time.Duration.ofMillis;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForEmptyConsumerGroup;
import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertTrue;
@ -153,7 +154,7 @@ public abstract class AbstractResetIntegrationTest {
protected static final int TIMEOUT_MULTIPLIER = 30;
void prepareTest(final TestInfo testInfo) throws Exception {
final String appID = IntegrationTestUtils.safeUniqueTestName(testInfo);
final String appID = safeUniqueTestName(testInfo);
prepareConfigs(appID);
prepareEnvironment();
@ -194,7 +195,7 @@ public abstract class AbstractResetIntegrationTest {
@Test
public void testResetWhenInternalTopicsAreSpecified(final TestInfo testInfo) throws Exception {
final String appID = IntegrationTestUtils.safeUniqueTestName(testInfo);
final String appID = safeUniqueTestName(testInfo);
streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, appID);
// RUN
@ -222,7 +223,7 @@ public abstract class AbstractResetIntegrationTest {
@Test
public void testReprocessingFromScratchAfterResetWithoutIntermediateUserTopic(final TestInfo testInfo) throws Exception {
final String appID = IntegrationTestUtils.safeUniqueTestName(testInfo);
final String appID = safeUniqueTestName(testInfo);
streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, appID);
// RUN
@ -267,7 +268,7 @@ public abstract class AbstractResetIntegrationTest {
cluster.createTopic(INTERMEDIATE_USER_TOPIC);
}
final String appID = IntegrationTestUtils.safeUniqueTestName(testInfo);
final String appID = safeUniqueTestName(testInfo);
streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, appID);
// RUN

View File

@ -21,7 +21,6 @@ import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.LogCaptureAppender;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KafkaStreamsWrapper;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.ThreadMetadata;
@ -63,7 +62,7 @@ import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.common.utils.Utils.mkObjectProperties;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.purgeLocalStreamsState;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName;
import static org.apache.kafka.test.TestUtils.waitForCondition;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is;

View File

@ -58,8 +58,8 @@ import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.startApplicationAndWaitUntilRunning;
import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;

View File

@ -48,7 +48,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.common.utils.Utils.mkObjectProperties;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName;
@Timeout(600)
@Tag("integration")

View File

@ -99,10 +99,10 @@ import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.DEFAULT_TIMEOUT;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.purgeLocalStreamsState;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.startApplicationAndWaitUntilRunning;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForApplicationState;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForEmptyConsumerGroup;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinRecordsReceived;
import static org.apache.kafka.streams.query.StateQueryRequest.inStore;
import static org.apache.kafka.streams.utils.TestUtils.waitForApplicationState;
import static org.apache.kafka.test.TestUtils.consumerConfig;
import static org.apache.kafka.test.TestUtils.waitForCondition;
import static org.hamcrest.CoreMatchers.equalTo;

View File

@ -64,7 +64,7 @@ import java.util.Map;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicReference;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;

View File

@ -61,8 +61,8 @@ import java.util.Map;
import java.util.Properties;
import static java.util.Collections.singletonList;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForApplicationState;
import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName;
import static org.apache.kafka.streams.utils.TestUtils.waitForApplicationState;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.IsEqual.equalTo;
import static org.junit.jupiter.api.Assertions.assertNotNull;

View File

@ -57,7 +57,7 @@ import java.util.Collections;
import java.util.List;
import java.util.Properties;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;

View File

@ -57,7 +57,7 @@ import java.util.List;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicInteger;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName;
import static org.junit.jupiter.api.Assertions.assertEquals;

View File

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.processor.internals;
package org.apache.kafka.streams.integration;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
@ -40,7 +40,7 @@ import java.io.IOException;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;

View File

@ -71,7 +71,7 @@ import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.common.utils.Utils.mkObjectProperties;
import static org.apache.kafka.common.utils.Utils.mkProperties;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;

View File

@ -79,6 +79,7 @@ import java.util.concurrent.TimeoutException;
import static java.util.Collections.singleton;
import static org.apache.kafka.streams.query.StateQueryRequest.inStore;
import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
@ -159,7 +160,7 @@ public class IQv2IntegrationTest {
);
final String safeTestName = IntegrationTestUtils.safeUniqueTestName(testInfo);
final String safeTestName = safeUniqueTestName(testInfo);
kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration(safeTestName));
kafkaStreams.cleanUp();
}
@ -423,7 +424,7 @@ public class IQv2IntegrationTest {
// Discard the basic streams and replace with test-specific topology
kafkaStreams.close();
final String safeTestName = IntegrationTestUtils.safeUniqueTestName(testInfo);
final String safeTestName = safeUniqueTestName(testInfo);
kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration(safeTestName));
kafkaStreams.cleanUp();

View File

@ -63,7 +63,7 @@ import static org.apache.kafka.common.utils.Utils.mkProperties;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.cleanStateBeforeTest;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.getStartedStreams;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.quietlyCleanStateAfterTest;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;

View File

@ -18,7 +18,6 @@ package org.apache.kafka.streams.integration;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreamsWrapper;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;

View File

@ -60,7 +60,7 @@ import java.util.Properties;
import static java.time.Duration.ofMillis;
import static java.time.Duration.ofMinutes;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName;
/**
* Similar to KStreamAggregationIntegrationTest but with dedupping enabled

View File

@ -90,7 +90,7 @@ import java.util.concurrent.TimeUnit;
import static java.time.Duration.ofMillis;
import static java.time.Duration.ofMinutes;
import static java.time.Instant.ofEpochMilli;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.Is.is;

View File

@ -54,9 +54,9 @@ import java.util.Set;
import static java.time.Duration.ofSeconds;
import static java.util.Arrays.asList;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.startApplicationAndWaitUntilRunning;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived;
import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.IsEqual.equalTo;

View File

@ -76,7 +76,7 @@ import java.util.stream.IntStream;
import static org.apache.kafka.streams.KafkaStreams.State.ERROR;
import static org.apache.kafka.streams.KafkaStreams.State.REBALANCING;
import static org.apache.kafka.streams.KafkaStreams.State.RUNNING;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;

View File

@ -65,10 +65,10 @@ import java.util.Set;
import static java.time.Duration.ofSeconds;
import static java.util.Arrays.asList;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.startApplicationAndWaitUntilRunning;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForApplicationState;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived;
import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName;
import static org.apache.kafka.streams.utils.TestUtils.waitForApplicationState;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.IsEqual.equalTo;

View File

@ -54,7 +54,7 @@ import java.util.function.Function;
import java.util.function.Predicate;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.quietlyCleanStateAfterTest;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName;
import static org.junit.jupiter.api.Assertions.assertEquals;
@Timeout(600)

View File

@ -53,6 +53,8 @@ import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName;
@Timeout(600)
@Tag("integration")
public class KTableSourceTopicRestartIntegrationTest {
@ -99,7 +101,7 @@ public class KTableSourceTopicRestartIntegrationTest {
@BeforeEach
public void before(final TestInfo testInfo) throws Exception {
final String safeTestName = IntegrationTestUtils.safeUniqueTestName(testInfo);
final String safeTestName = safeUniqueTestName(testInfo);
sourceTopic = SOURCE_TOPIC + "-" + safeTestName;
CLUSTER.createTopic(sourceTopic);

View File

@ -58,6 +58,7 @@ import java.util.List;
import java.util.Properties;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForEmptyConsumerGroup;
import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName;
@Tag("integration")
@Timeout(600)
@ -98,7 +99,7 @@ public class KafkaStreamsCloseOptionsIntegrationTest {
public void before(final TestInfo testName) throws Exception {
mockTime = CLUSTER.time;
final String appID = IntegrationTestUtils.safeUniqueTestName(testName);
final String appID = safeUniqueTestName(testName);
commonClientConfig = new Properties();
commonClientConfig.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());

View File

@ -14,8 +14,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams;
package org.apache.kafka.streams.integration;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.processor.internals.StreamThread;
import java.util.ArrayList;

View File

@ -25,7 +25,6 @@ import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.server.util.MockTime;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KafkaStreamsWrapper;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.LagInfo;
import org.apache.kafka.streams.StreamsBuilder;
@ -67,8 +66,8 @@ import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.startApplicationAndWaitUntilRunning;
import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.IsEqual.equalTo;
import static org.junit.jupiter.api.Assertions.assertTrue;

View File

@ -60,7 +60,7 @@ import java.util.List;
import java.util.Properties;
import java.util.stream.Collectors;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;

View File

@ -42,7 +42,7 @@ import java.util.List;
import java.util.Map;
import java.util.Properties;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.MatcherAssert.assertThat;

View File

@ -85,9 +85,9 @@ import static java.util.Collections.singleton;
import static org.apache.kafka.streams.KeyQueryMetadata.NOT_AVAILABLE;
import static org.apache.kafka.streams.KeyValue.pair;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.DEFAULT_TIMEOUT;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForApplicationState;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived;
import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName;
import static org.apache.kafka.streams.utils.TestUtils.waitForApplicationState;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.not;
@ -443,7 +443,7 @@ public class NamedTopologyIntegrationTest {
IntegrationTestUtils.startApplicationAndWaitUntilRunning(streams);
streams.addNamedTopology(topology2Builder.build()).all().get();
IntegrationTestUtils.waitForApplicationState(Collections.singletonList(streams), State.RUNNING, Duration.ofMillis(DEFAULT_TIMEOUT));
waitForApplicationState(Collections.singletonList(streams), State.RUNNING, Duration.ofMillis(DEFAULT_TIMEOUT));
assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_1, 5), equalTo(COUNT_OUTPUT_DATA));
assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_2, 5), equalTo(COUNT_OUTPUT_DATA));
@ -461,7 +461,7 @@ public class NamedTopologyIntegrationTest {
streams.addNamedTopology(topology3Builder.build()).all().get();
IntegrationTestUtils.waitForApplicationState(Collections.singletonList(streams), State.RUNNING, Duration.ofMillis(DEFAULT_TIMEOUT));
waitForApplicationState(Collections.singletonList(streams), State.RUNNING, Duration.ofMillis(DEFAULT_TIMEOUT));
assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_1, 5), equalTo(COUNT_OUTPUT_DATA));
assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_2, 5), equalTo(COUNT_OUTPUT_DATA));

View File

@ -63,8 +63,8 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.startApplicationAndWaitUntilRunning;
import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;

View File

@ -59,10 +59,10 @@ import static java.util.Collections.singletonList;
import static org.apache.kafka.streams.KeyValue.pair;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.cleanStateBeforeTest;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.getTopicSize;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForApplicationState;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilStreamsHasPolled;
import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName;
import static org.apache.kafka.streams.utils.TestUtils.waitForApplicationState;
import static org.apache.kafka.test.TestUtils.waitForCondition;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;

View File

@ -104,11 +104,11 @@ import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.common.utils.Utils.mkProperties;
import static org.apache.kafka.streams.StoreQueryParameters.fromNameAndType;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.getRunningStreams;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.startApplicationAndWaitUntilRunning;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForApplicationState;
import static org.apache.kafka.streams.state.QueryableStoreTypes.keyValueStore;
import static org.apache.kafka.streams.state.QueryableStoreTypes.sessionStore;
import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName;
import static org.apache.kafka.streams.utils.TestUtils.waitForApplicationState;
import static org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;

View File

@ -71,6 +71,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Pattern;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.startApplicationAndWaitUntilRunning;
import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
@ -139,7 +140,7 @@ public class RegexSourceIntegrationTest {
properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10000);
streamsConfiguration = StreamsTestUtils.getStreamsConfig(
IntegrationTestUtils.safeUniqueTestName(testInfo),
safeUniqueTestName(testInfo),
CLUSTER.bootstrapServers(),
STRING_SERDE_CLASSNAME,
STRING_SERDE_CLASSNAME,

View File

@ -47,6 +47,7 @@ import java.util.Properties;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.isEmptyConsumerGroup;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForEmptyConsumerGroup;
import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
@ -99,7 +100,7 @@ public class ResetIntegrationTest extends AbstractResetIntegrationTest {
@Test
public void shouldNotAllowToResetWhileStreamsIsRunning(final TestInfo testInfo) throws Exception {
final String appID = IntegrationTestUtils.safeUniqueTestName(testInfo);
final String appID = safeUniqueTestName(testInfo);
final String[] parameters = new String[] {
"--application-id", appID,
"--bootstrap-server", cluster.bootstrapServers(),
@ -123,7 +124,7 @@ public class ResetIntegrationTest extends AbstractResetIntegrationTest {
@Test
public void shouldNotAllowToResetWhenInputTopicAbsent(final TestInfo testInfo) {
final String appID = IntegrationTestUtils.safeUniqueTestName(testInfo);
final String appID = safeUniqueTestName(testInfo);
final String[] parameters = new String[] {
"--application-id", appID,
"--bootstrap-server", cluster.bootstrapServers(),
@ -139,7 +140,7 @@ public class ResetIntegrationTest extends AbstractResetIntegrationTest {
@Test
public void shouldNotAllowToResetWhenIntermediateTopicAbsent(final TestInfo testInfo) {
final String appID = IntegrationTestUtils.safeUniqueTestName(testInfo);
final String appID = safeUniqueTestName(testInfo);
final String[] parameters = new String[] {
"--application-id", appID,
"--bootstrap-server", cluster.bootstrapServers(),
@ -155,7 +156,7 @@ public class ResetIntegrationTest extends AbstractResetIntegrationTest {
@Test
public void shouldNotAllowToResetWhenSpecifiedInternalTopicDoesNotExist(final TestInfo testInfo) {
final String appID = IntegrationTestUtils.safeUniqueTestName(testInfo);
final String appID = safeUniqueTestName(testInfo);
final String[] parameters = new String[] {
"--application-id", appID,
"--bootstrap-server", cluster.bootstrapServers(),
@ -171,7 +172,7 @@ public class ResetIntegrationTest extends AbstractResetIntegrationTest {
@Test
public void shouldNotAllowToResetWhenSpecifiedInternalTopicIsNotInternal(final TestInfo testInfo) {
final String appID = IntegrationTestUtils.safeUniqueTestName(testInfo);
final String appID = safeUniqueTestName(testInfo);
final String[] parameters = new String[] {
"--application-id", appID,
"--bootstrap-server", cluster.bootstrapServers(),
@ -187,7 +188,7 @@ public class ResetIntegrationTest extends AbstractResetIntegrationTest {
@Test
public void testResetWhenLongSessionTimeoutConfiguredWithForceOption(final TestInfo testInfo) throws Exception {
final String appID = IntegrationTestUtils.safeUniqueTestName(testInfo);
final String appID = safeUniqueTestName(testInfo);
streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, appID);
streamsConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, Integer.toString(STREAMS_CONSUMER_TIMEOUT * 100));
@ -224,7 +225,7 @@ public class ResetIntegrationTest extends AbstractResetIntegrationTest {
@Test
public void testReprocessingFromFileAfterResetWithoutIntermediateUserTopic(final TestInfo testInfo) throws Exception {
final String appID = IntegrationTestUtils.safeUniqueTestName(testInfo);
final String appID = safeUniqueTestName(testInfo);
streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, appID);
// RUN
@ -266,7 +267,7 @@ public class ResetIntegrationTest extends AbstractResetIntegrationTest {
@Test
public void testReprocessingFromDateTimeAfterResetWithoutIntermediateUserTopic(final TestInfo testInfo) throws Exception {
final String appID = IntegrationTestUtils.safeUniqueTestName(testInfo);
final String appID = safeUniqueTestName(testInfo);
streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, appID);
// RUN
@ -313,7 +314,7 @@ public class ResetIntegrationTest extends AbstractResetIntegrationTest {
@Test
public void testReprocessingByDurationAfterResetWithoutIntermediateUserTopic(final TestInfo testInfo) throws Exception {
final String appID = IntegrationTestUtils.safeUniqueTestName(testInfo);
final String appID = safeUniqueTestName(testInfo);
streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, appID);
// RUN

View File

@ -55,7 +55,7 @@ import static org.apache.kafka.common.utils.Utils.mkProperties;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.cleanStateBeforeTest;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.getStartedStreams;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.quietlyCleanStateAfterTest;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;

View File

@ -100,11 +100,11 @@ import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.common.utils.Utils.mkObjectProperties;
import static org.apache.kafka.streams.Topology.AutoOffsetReset.EARLIEST;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.purgeLocalStreamsState;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.startApplicationAndWaitUntilRunning;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForActiveRestoringTask;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForCompletion;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForStandbyCompletion;
import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName;
import static org.apache.kafka.test.TestUtils.waitForCondition;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.IsEqual.equalTo;

View File

@ -55,8 +55,8 @@ import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.startApplicationAndWaitUntilRunning;
import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;

View File

@ -51,7 +51,7 @@ import java.util.Properties;
import static java.time.Duration.ofMinutes;
import static java.util.Arrays.asList;
import static java.util.Collections.singletonList;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.Is.is;

View File

@ -69,7 +69,7 @@ import static java.util.Arrays.asList;
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.common.utils.Utils.mkProperties;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.Is.is;

View File

@ -47,7 +47,7 @@ import java.time.Duration;
import java.util.Properties;
import java.util.function.Predicate;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName;
@Timeout(600)
@Tag("integration")

View File

@ -58,8 +58,8 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import static java.util.Arrays.asList;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.startApplicationAndWaitUntilRunning;
import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName;
import static org.apache.kafka.test.TestUtils.waitForCondition;
import static org.junit.jupiter.api.Assertions.assertTrue;

View File

@ -50,7 +50,7 @@ import java.util.concurrent.TimeUnit;
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.common.utils.Utils.mkProperties;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName;
import static org.junit.jupiter.api.Assertions.assertTrue;
@Tag("integration")

View File

@ -72,10 +72,10 @@ import java.util.stream.IntStream;
import static java.util.Collections.singletonList;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.getStore;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.startApplicationAndWaitUntilRunning;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForApplicationState;
import static org.apache.kafka.streams.state.QueryableStoreTypes.keyValueStore;
import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName;
import static org.apache.kafka.streams.utils.TestUtils.waitForApplicationState;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.containsString;

View File

@ -58,7 +58,7 @@ import java.util.Properties;
import static java.util.Arrays.asList;
import static java.util.Collections.singletonList;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName;
@Tag("integration")
public class StoreUpgradeIntegrationTest {

View File

@ -58,7 +58,7 @@ import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;

View File

@ -70,9 +70,9 @@ import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.St
import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_APPLICATION;
import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.purgeLocalStreamsState;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.startApplicationAndWaitUntilRunning;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForApplicationState;
import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName;
import static org.apache.kafka.streams.utils.TestUtils.waitForApplicationState;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.fail;

View File

@ -70,9 +70,9 @@ import static org.apache.kafka.common.utils.Utils.mkProperties;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.cleanStateBeforeTest;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.getStartedStreams;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.quietlyCleanStateAfterTest;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
import static org.apache.kafka.streams.kstream.Suppressed.BufferConfig.maxRecords;
import static org.apache.kafka.streams.kstream.Suppressed.untilTimeLimit;
import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;

View File

@ -56,7 +56,7 @@ import java.util.Collections;
import java.util.Map;
import java.util.Properties;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName;
@Timeout(600)
@Tag("integration")

View File

@ -50,7 +50,7 @@ import java.util.function.Supplier;
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.common.utils.Utils.mkObjectProperties;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;

View File

@ -53,7 +53,7 @@ import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.common.utils.Utils.mkObjectProperties;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.purgeLocalStreamsState;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;

View File

@ -73,7 +73,7 @@ import static java.util.Arrays.asList;
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.common.utils.Utils.mkProperties;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.Is.is;
import static org.junit.jupiter.api.Assertions.assertThrows;

View File

@ -74,7 +74,7 @@ import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;

View File

@ -29,7 +29,6 @@ import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.utils.Time;
@ -57,14 +56,12 @@ import org.apache.kafka.streams.state.QueryableStoreType;
import org.apache.kafka.test.TestCondition;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.TestInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.nio.file.Paths;
import java.time.Duration;
import java.util.ArrayList;
@ -219,27 +216,6 @@ public class IntegrationTestUtils {
}
}
public static String safeUniqueTestName(final TestInfo testInfo) {
final String methodName = testInfo.getTestMethod().map(Method::getName).orElse("unknownMethodName");
return safeUniqueTestName(methodName);
}
private static String safeUniqueTestName(final String testName) {
return sanitize(testName + Uuid.randomUuid().toString());
}
private static String sanitize(final String str) {
return str
// The `-` is used in Streams' thread name as a separator and some tests rely on this.
.replace('-', '_')
.replace(':', '_')
.replace('.', '_')
.replace('[', '_')
.replace(']', '_')
.replace(' ', '_')
.replace('=', '_');
}
/**
* Removes local state stores. Useful to reset state in-between integration test runs.
*
@ -922,7 +898,7 @@ public class IntegrationTestUtils {
* {@link State#RUNNING} state at the same time. Note that states may change between the time
* that this method returns and the calling function executes its next statement.<p>
*
* If the application is already started, use {@link #waitForApplicationState(List, State, Duration)}
* If the application is already started, use {@link org.apache.kafka.streams.utils.TestUtils#waitForApplicationState(List, State, Duration)}
* to wait for instances to reach {@link State#RUNNING} state.
*
* @param streamsList the list of streams instances to run.
@ -991,41 +967,6 @@ public class IntegrationTestUtils {
}
}
/**
* Waits for the given {@link KafkaStreams} instances to all be in a specific {@link State}.
* Prefer {@link #startApplicationAndWaitUntilRunning(List, Duration)} when possible
* because this method uses polling, which can be more error prone and slightly slower.
*
* @param streamsList the list of streams instances to run.
* @param state the expected state that all the streams to be in within timeout
* @param timeout the time to wait for the streams to all be in the specific state.
*
* @throws InterruptedException if the streams doesn't change to the expected state in time.
*/
public static void waitForApplicationState(final List<KafkaStreams> streamsList,
final State state,
final Duration timeout) throws InterruptedException {
retryOnExceptionWithTimeout(timeout.toMillis(), () -> {
final Map<KafkaStreams, State> streamsToStates = streamsList
.stream()
.collect(Collectors.toMap(stream -> stream, KafkaStreams::state));
final Map<KafkaStreams, State> wrongStateMap = streamsToStates.entrySet()
.stream()
.filter(entry -> entry.getValue() != state)
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
final String reason = String.format(
"Expected all streams instances in %s to be %s within %d ms, but the following were not: %s",
streamsList,
state,
timeout.toMillis(),
wrongStateMap
);
assertThat(reason, wrongStateMap.isEmpty());
});
}
private static class ConsumerGroupInactiveCondition implements TestCondition {
private final Admin adminClient;
private final String applicationId;

View File

@ -0,0 +1,37 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c:%L)%n
log4j.logger.kafka=ERROR
log4j.logger.state.change.logger=ERROR
log4j.logger.org.apache.kafka=ERROR
log4j.logger.org.apache.zookeeper=ERROR
log4j.logger.org.apache.kafka.clients=ERROR
# These are the only logs we will likely ever find anything useful in to debug Streams test failures
log4j.logger.org.apache.kafka.clients.consumer=INFO
log4j.logger.org.apache.kafka.clients.producer=INFO
log4j.logger.org.apache.kafka.streams=INFO
# printing out the configs takes up a huge amount of the allotted characters,
# and provides little value as we can always figure out the test configs without the logs
log4j.logger.org.apache.kafka.clients.producer.ProducerConfig=ERROR
log4j.logger.org.apache.kafka.clients.consumer.ConsumerConfig=ERROR
log4j.logger.org.apache.kafka.clients.admin.AdminClientConfig=ERROR
log4j.logger.org.apache.kafka.streams.StreamsConfig=ERROR

View File

@ -14,18 +14,19 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.scala
package org.apache.kafka.streams.integration
import org.apache.kafka.streams.integration.utils.StreamToTableJoinScalaIntegrationTestBase
import org.apache.kafka.streams.scala.ImplicitConversions._
import org.apache.kafka.streams.scala.StreamsBuilder
import org.apache.kafka.streams.scala.kstream._
import org.apache.kafka.streams.scala.serialization.{Serdes => NewSerdes}
import org.apache.kafka.streams.{KafkaStreams, KeyValue, StreamsConfig}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api._
import java.util.Properties
import org.apache.kafka.streams.{KafkaStreams, KeyValue, StreamsConfig}
import org.apache.kafka.streams.scala.serialization.{Serdes => NewSerdes}
import org.apache.kafka.streams.scala.ImplicitConversions._
import org.apache.kafka.streams.scala.kstream._
import org.apache.kafka.streams.scala.utils.StreamToTableJoinScalaIntegrationTestBase
import org.junit.jupiter.api._
import org.junit.jupiter.api.Assertions._
/**
* Test suite that does an example to demonstrate stream-table joins in Kafka Streams
* <p>
@ -125,11 +126,11 @@ class StreamToTableJoinScalaIntegrationTestImplicitSerdes extends StreamToTableJ
@Test
def testShouldCountClicksPerRegionJava(): Unit = {
import java.lang.{Long => JLong}
import org.apache.kafka.streams.kstream.{KStream => KStreamJ, KTable => KTableJ, _}
import org.apache.kafka.streams.{KafkaStreams => KafkaStreamsJ, StreamsBuilder => StreamsBuilderJ}
import java.lang.{Long => JLong}
val streamsConfiguration: Properties = getStreamsConfiguration()
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, NewSerdes.stringSerde.getClass.getName)

View File

@ -14,20 +14,21 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.scala
package org.apache.kafka.streams.integration
import java.util.Properties
import java.util.regex.Pattern
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api._
import org.apache.kafka.streams.scala.serialization.{Serdes => NewSerdes}
import org.apache.kafka.streams.scala.ImplicitConversions._
import org.apache.kafka.streams.scala.StreamsBuilder
import org.apache.kafka.streams.{KafkaStreams, KeyValue, StreamsConfig}
import org.apache.kafka.streams.scala.kstream._
import org.apache.kafka.streams.integration.utils.{EmbeddedKafkaCluster, IntegrationTestUtils}
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.common.utils.{MockTime, Utils}
import ImplicitConversions._
import org.apache.kafka.common.serialization.{LongDeserializer, StringDeserializer, StringSerializer}
import org.apache.kafka.test.TestUtils
import org.junit.jupiter.api.Tag

View File

@ -14,19 +14,18 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.scala.utils
package org.apache.kafka.streams.integration.utils
import java.util.Properties
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.common.serialization._
import org.apache.kafka.common.utils.{MockTime, Utils}
import org.apache.kafka.streams._
import org.apache.kafka.streams.integration.utils.{EmbeddedKafkaCluster, IntegrationTestUtils}
import org.apache.kafka.test.TestUtils
import org.junit.jupiter.api._
import java.io.File
import java.util.Properties
/**
* Test suite base that prepares Kafka cluster for stream-table joins in Kafka Streams

View File

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.scala.utils
package org.apache.kafka.streams.integration.utils
import org.apache.kafka.streams.KeyValue

View File

@ -97,9 +97,9 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import static java.util.Collections.singletonList;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForApplicationState;
import static org.apache.kafka.streams.state.QueryableStoreTypes.keyValueStore;
import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName;
import static org.apache.kafka.streams.utils.TestUtils.waitForApplicationState;
import static org.apache.kafka.test.TestUtils.waitForCondition;
import static org.hamcrest.CoreMatchers.hasItem;
import static org.hamcrest.CoreMatchers.not;

View File

@ -16,8 +16,6 @@
*/
package org.apache.kafka.streams.kstream.internals.suppress;
import org.apache.kafka.streams.integration.SuppressionDurabilityIntegrationTest;
import org.apache.kafka.streams.integration.SuppressionIntegrationTest;
import org.apache.kafka.streams.kstream.SuppressedTest;
import org.apache.kafka.streams.kstream.internals.FullChangeSerdeTest;
import org.apache.kafka.streams.kstream.internals.SuppressScenarioTest;
@ -48,8 +46,6 @@ import org.junit.platform.suite.api.Suite;
InMemoryTimeOrderedKeyValueChangeBufferTest.class,
TimeOrderedKeyValueBufferTest.class,
FullChangeSerdeTest.class,
SuppressionIntegrationTest.class,
SuppressionDurabilityIntegrationTest.class
})
public class SuppressSuite {
}

View File

@ -17,7 +17,6 @@
package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.streams.integration.StandbyTaskCreationIntegrationTest;
import org.apache.kafka.streams.processor.internals.assignment.LegacyStickyTaskAssignorTest;
import org.apache.kafka.streams.processor.internals.metrics.TaskMetricsTest;
@ -39,7 +38,6 @@ import org.junit.platform.suite.api.Suite;
TaskMetricsTest.class,
LegacyStickyTaskAssignorTest.class,
StreamsPartitionAssignorTest.class,
StandbyTaskCreationIntegrationTest.class,
})
public class TaskSuite {
}

View File

@ -0,0 +1,88 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.utils;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.streams.KafkaStreams;
import org.junit.jupiter.api.TestInfo;
import java.lang.reflect.Method;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import static org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout;
import static org.hamcrest.MatcherAssert.assertThat;
public class TestUtils {
/**
* Waits for the given {@link KafkaStreams} instances to all be in a specific {@link KafkaStreams.State}.
* This method uses polling, which can be more error prone and slightly slower.
*
* @param streamsList the list of streams instances to run.
* @param state the expected state that all the streams to be in within timeout
* @param timeout the time to wait for the streams to all be in the specific state.
*
* @throws InterruptedException if the streams doesn't change to the expected state in time.
*/
public static void waitForApplicationState(final List<KafkaStreams> streamsList,
final KafkaStreams.State state,
final Duration timeout) throws InterruptedException {
retryOnExceptionWithTimeout(timeout.toMillis(), () -> {
final Map<KafkaStreams, KafkaStreams.State> streamsToStates = streamsList
.stream()
.collect(Collectors.toMap(stream -> stream, KafkaStreams::state));
final Map<KafkaStreams, KafkaStreams.State> wrongStateMap = streamsToStates.entrySet()
.stream()
.filter(entry -> entry.getValue() != state)
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
final String reason = String.format(
"Expected all streams instances in %s to be %s within %d ms, but the following were not: %s",
streamsList,
state,
timeout.toMillis(),
wrongStateMap
);
assertThat(reason, wrongStateMap.isEmpty());
});
}
public static String safeUniqueTestName(final TestInfo testInfo) {
final String methodName = testInfo.getTestMethod().map(Method::getName).orElse("unknownMethodName");
return safeUniqueTestName(methodName);
}
private static String safeUniqueTestName(final String testName) {
return sanitize(testName + Uuid.randomUuid().toString());
}
private static String sanitize(final String str) {
return str
// The `-` is used in Streams' thread name as a separator and some tests rely on this.
.replace('-', '_')
.replace(':', '_')
.replace('.', '_')
.replace('[', '_')
.replace(']', '_')
.replace(' ', '_')
.replace('=', '_');
}
}