diff --git a/build.gradle b/build.gradle index 33d41be06ba..d062351938a 100644 --- a/build.gradle +++ b/build.gradle @@ -2652,16 +2652,6 @@ project(':streams') { testImplementation project(':metadata') testImplementation project(':clients').sourceSets.test.output - testImplementation project(':server') - testImplementation project(':core') - testImplementation project(':tools') - testImplementation project(':test-common') - testImplementation project(':storage') - testImplementation project(':group-coordinator') - testImplementation project(':transaction-coordinator') - testImplementation project(':server-common') - testImplementation project(':server-common').sourceSets.test.output - testImplementation project(':server') testImplementation libs.reload4j testImplementation libs.junitJupiter testImplementation libs.bcpkix @@ -2669,7 +2659,6 @@ project(':streams') { testImplementation libs.mockitoCore testImplementation libs.mockitoJunitJupiter // supports MockitoExtension testImplementation libs.junitPlatformSuiteEngine // supports suite test - testImplementation project(':group-coordinator') testRuntimeOnly project(':streams:test-utils') testRuntimeOnly runtimeTestLibs @@ -2761,6 +2750,7 @@ project(':streams') { task testAll( dependsOn: [ ':streams:test', + ':streams:integration-tests', ':streams:test-utils:test', ':streams:streams-scala:test', ':streams:upgrade-system-tests-0100:test', @@ -2803,18 +2793,12 @@ project(':streams:streams-scala') { api project(':streams') api libs.scalaLibrary - testImplementation project(':group-coordinator') - testImplementation project(':core') - testImplementation project(':test-common') - testImplementation project(':server-common').sourceSets.test.output testImplementation project(':streams').sourceSets.test.output testImplementation project(':clients').sourceSets.test.output testImplementation project(':streams:test-utils') testImplementation libs.junitJupiter - testImplementation libs.mockitoCore testImplementation libs.mockitoJunitJupiter // supports MockitoExtension - testImplementation libs.hamcrest testRuntimeOnly runtimeTestLibs } @@ -2848,6 +2832,58 @@ project(':streams:streams-scala') { } } +project(':streams:integration-tests') { + apply plugin: 'scala' + + base { + archivesName = "kafka-streams-integration-tests" + } + + dependencies { + testImplementation project(':clients').sourceSets.test.output + testImplementation project(':group-coordinator') + testImplementation project(':server') + testImplementation project(':server-common') + testImplementation project(':server-common').sourceSets.test.output + testImplementation project(':storage') + testImplementation project(':streams').sourceSets.test.output + testImplementation project(':streams:streams-scala') + testImplementation project(':test-common') + testImplementation project(':tools') + testImplementation project(':transaction-coordinator') + testImplementation libs.bcpkix + testImplementation libs.hamcrest + testImplementation libs.junitJupiter + testImplementation libs.junitPlatformSuiteEngine // supports suite test + testImplementation libs.mockitoCore + testImplementation libs.reload4j + testImplementation libs.slf4jApi + testImplementation project(':streams:test-utils') + + testRuntimeOnly runtimeTestLibs + } + + sourceSets { + // Set java/scala source folders in the `scala` block to enable joint compilation + main { + java { + srcDirs = [] + } + scala { + srcDirs = [] + } + } + test { + java { + srcDirs = [] + } + scala { + srcDirs = ["src/test/java", "src/test/scala"] + } + } + } +} + project(':streams:test-utils') { base { archivesName = "kafka-streams-test-utils" diff --git a/settings.gradle b/settings.gradle index 48f7924737e..22db0972542 100644 --- a/settings.gradle +++ b/settings.gradle @@ -85,6 +85,7 @@ include 'clients', 'storage:api', 'streams', 'streams:examples', + 'streams:integration-tests', 'streams:streams-scala', 'streams:test-utils', 'streams:upgrade-system-tests-0100', diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java similarity index 100% rename from streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java rename to streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java similarity index 98% rename from streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java rename to streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java index 293c5f5d286..f4e33e36ce8 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java @@ -60,6 +60,7 @@ import java.util.stream.Collectors; import static java.time.Duration.ofMillis; import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForEmptyConsumerGroup; +import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -153,7 +154,7 @@ public abstract class AbstractResetIntegrationTest { protected static final int TIMEOUT_MULTIPLIER = 30; void prepareTest(final TestInfo testInfo) throws Exception { - final String appID = IntegrationTestUtils.safeUniqueTestName(testInfo); + final String appID = safeUniqueTestName(testInfo); prepareConfigs(appID); prepareEnvironment(); @@ -194,7 +195,7 @@ public abstract class AbstractResetIntegrationTest { @Test public void testResetWhenInternalTopicsAreSpecified(final TestInfo testInfo) throws Exception { - final String appID = IntegrationTestUtils.safeUniqueTestName(testInfo); + final String appID = safeUniqueTestName(testInfo); streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, appID); // RUN @@ -222,7 +223,7 @@ public abstract class AbstractResetIntegrationTest { @Test public void testReprocessingFromScratchAfterResetWithoutIntermediateUserTopic(final TestInfo testInfo) throws Exception { - final String appID = IntegrationTestUtils.safeUniqueTestName(testInfo); + final String appID = safeUniqueTestName(testInfo); streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, appID); // RUN @@ -267,7 +268,7 @@ public abstract class AbstractResetIntegrationTest { cluster.createTopic(INTERMEDIATE_USER_TOPIC); } - final String appID = IntegrationTestUtils.safeUniqueTestName(testInfo); + final String appID = safeUniqueTestName(testInfo); streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, appID); // RUN diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java similarity index 99% rename from streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java rename to streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java index 916940ce9bf..d01a3acbb1a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java @@ -21,7 +21,6 @@ import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.utils.LogCaptureAppender; import org.apache.kafka.streams.KafkaStreams; -import org.apache.kafka.streams.KafkaStreamsWrapper; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.ThreadMetadata; @@ -63,7 +62,7 @@ import static org.apache.kafka.common.utils.Utils.mkEntry; import static org.apache.kafka.common.utils.Utils.mkMap; import static org.apache.kafka.common.utils.Utils.mkObjectProperties; import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.purgeLocalStreamsState; -import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName; +import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName; import static org.apache.kafka.test.TestUtils.waitForCondition; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.is; diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/ConsistencyVectorIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/ConsistencyVectorIntegrationTest.java similarity index 98% rename from streams/src/test/java/org/apache/kafka/streams/integration/ConsistencyVectorIntegrationTest.java rename to streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/ConsistencyVectorIntegrationTest.java index be9b058b71d..7b1c02e5d22 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/ConsistencyVectorIntegrationTest.java +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/ConsistencyVectorIntegrationTest.java @@ -58,8 +58,8 @@ import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import java.util.stream.IntStream; -import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName; import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.startApplicationAndWaitUntilRunning; +import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/EOSUncleanShutdownIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/EOSUncleanShutdownIntegrationTest.java similarity index 100% rename from streams/src/test/java/org/apache/kafka/streams/integration/EOSUncleanShutdownIntegrationTest.java rename to streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/EOSUncleanShutdownIntegrationTest.java diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/EmitOnChangeIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/EmitOnChangeIntegrationTest.java similarity index 98% rename from streams/src/test/java/org/apache/kafka/streams/integration/EmitOnChangeIntegrationTest.java rename to streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/EmitOnChangeIntegrationTest.java index 71b32fe2e59..020436be419 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/EmitOnChangeIntegrationTest.java +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/EmitOnChangeIntegrationTest.java @@ -48,7 +48,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import static org.apache.kafka.common.utils.Utils.mkEntry; import static org.apache.kafka.common.utils.Utils.mkMap; import static org.apache.kafka.common.utils.Utils.mkObjectProperties; -import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName; +import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName; @Timeout(600) @Tag("integration") diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java similarity index 99% rename from streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java rename to streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java index e82bec4aed1..4ccba92686a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java @@ -99,10 +99,10 @@ import static org.apache.kafka.common.utils.Utils.mkMap; import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.DEFAULT_TIMEOUT; import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.purgeLocalStreamsState; import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.startApplicationAndWaitUntilRunning; -import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForApplicationState; import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForEmptyConsumerGroup; import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinRecordsReceived; import static org.apache.kafka.streams.query.StateQueryRequest.inStore; +import static org.apache.kafka.streams.utils.TestUtils.waitForApplicationState; import static org.apache.kafka.test.TestUtils.consumerConfig; import static org.apache.kafka.test.TestUtils.waitForCondition; import static org.hamcrest.CoreMatchers.equalTo; diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/FineGrainedAutoResetIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/FineGrainedAutoResetIntegrationTest.java similarity index 100% rename from streams/src/test/java/org/apache/kafka/streams/integration/FineGrainedAutoResetIntegrationTest.java rename to streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/FineGrainedAutoResetIntegrationTest.java diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/ForeignKeyJoinSuite.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/ForeignKeyJoinSuite.java similarity index 100% rename from streams/src/test/java/org/apache/kafka/streams/integration/ForeignKeyJoinSuite.java rename to streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/ForeignKeyJoinSuite.java diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java similarity index 99% rename from streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java rename to streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java index 8f1be1b4f5a..640e438103f 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java @@ -64,7 +64,7 @@ import java.util.Map; import java.util.Properties; import java.util.concurrent.atomic.AtomicReference; -import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName; +import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertTrue; diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java similarity index 98% rename from streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java rename to streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java index 145ebf34d41..2b34d76f9d0 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java @@ -61,8 +61,8 @@ import java.util.Map; import java.util.Properties; import static java.util.Collections.singletonList; -import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName; -import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForApplicationState; +import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName; +import static org.apache.kafka.streams.utils.TestUtils.waitForApplicationState; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.core.IsEqual.equalTo; import static org.junit.jupiter.api.Assertions.assertNotNull; diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalStateReprocessTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/GlobalStateReprocessTest.java similarity index 98% rename from streams/src/test/java/org/apache/kafka/streams/integration/GlobalStateReprocessTest.java rename to streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/GlobalStateReprocessTest.java index 3869de0b136..a8ff5cd94ec 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalStateReprocessTest.java +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/GlobalStateReprocessTest.java @@ -57,7 +57,7 @@ import java.util.Collections; import java.util.List; import java.util.Properties; -import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName; +import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.containsString; diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalThreadShutDownOrderTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/GlobalThreadShutDownOrderTest.java similarity index 98% rename from streams/src/test/java/org/apache/kafka/streams/integration/GlobalThreadShutDownOrderTest.java rename to streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/GlobalThreadShutDownOrderTest.java index d466a22130c..55dcec4b839 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalThreadShutDownOrderTest.java +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/GlobalThreadShutDownOrderTest.java @@ -57,7 +57,7 @@ import java.util.List; import java.util.Properties; import java.util.concurrent.atomic.AtomicInteger; -import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName; +import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName; import static org.junit.jupiter.api.Assertions.assertEquals; diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/HandlingSourceTopicDeletionIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/HandlingSourceTopicDeletionIntegrationTest.java similarity index 97% rename from streams/src/test/java/org/apache/kafka/streams/processor/internals/HandlingSourceTopicDeletionIntegrationTest.java rename to streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/HandlingSourceTopicDeletionIntegrationTest.java index 1aeaa45d92c..b7673cd882b 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/HandlingSourceTopicDeletionIntegrationTest.java +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/HandlingSourceTopicDeletionIntegrationTest.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.kafka.streams.processor.internals; +package org.apache.kafka.streams.integration; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KafkaStreams; @@ -40,7 +40,7 @@ import java.io.IOException; import java.util.Properties; import java.util.concurrent.atomic.AtomicBoolean; -import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName; +import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/HighAvailabilityTaskAssignorIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/HighAvailabilityTaskAssignorIntegrationTest.java similarity index 99% rename from streams/src/test/java/org/apache/kafka/streams/integration/HighAvailabilityTaskAssignorIntegrationTest.java rename to streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/HighAvailabilityTaskAssignorIntegrationTest.java index 18407a4e107..df4d981fa71 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/HighAvailabilityTaskAssignorIntegrationTest.java +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/HighAvailabilityTaskAssignorIntegrationTest.java @@ -71,7 +71,7 @@ import static org.apache.kafka.common.utils.Utils.mkEntry; import static org.apache.kafka.common.utils.Utils.mkMap; import static org.apache.kafka.common.utils.Utils.mkObjectProperties; import static org.apache.kafka.common.utils.Utils.mkProperties; -import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName; +import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.is; diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/IQv2IntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/IQv2IntegrationTest.java similarity index 98% rename from streams/src/test/java/org/apache/kafka/streams/integration/IQv2IntegrationTest.java rename to streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/IQv2IntegrationTest.java index 555786e81e4..2d76d21ae10 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/IQv2IntegrationTest.java +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/IQv2IntegrationTest.java @@ -79,6 +79,7 @@ import java.util.concurrent.TimeoutException; import static java.util.Collections.singleton; import static org.apache.kafka.streams.query.StateQueryRequest.inStore; +import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; @@ -159,7 +160,7 @@ public class IQv2IntegrationTest { ); - final String safeTestName = IntegrationTestUtils.safeUniqueTestName(testInfo); + final String safeTestName = safeUniqueTestName(testInfo); kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration(safeTestName)); kafkaStreams.cleanUp(); } @@ -423,7 +424,7 @@ public class IQv2IntegrationTest { // Discard the basic streams and replace with test-specific topology kafkaStreams.close(); - final String safeTestName = IntegrationTestUtils.safeUniqueTestName(testInfo); + final String safeTestName = safeUniqueTestName(testInfo); kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration(safeTestName)); kafkaStreams.cleanUp(); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java similarity index 100% rename from streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java rename to streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/IQv2VersionedStoreIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/IQv2VersionedStoreIntegrationTest.java similarity index 100% rename from streams/src/test/java/org/apache/kafka/streams/integration/IQv2VersionedStoreIntegrationTest.java rename to streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/IQv2VersionedStoreIntegrationTest.java diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java similarity index 100% rename from streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java rename to streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/JoinGracePeriodDurabilityIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/JoinGracePeriodDurabilityIntegrationTest.java similarity index 99% rename from streams/src/test/java/org/apache/kafka/streams/integration/JoinGracePeriodDurabilityIntegrationTest.java rename to streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/JoinGracePeriodDurabilityIntegrationTest.java index ba941910b35..a2d34cb12ba 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/JoinGracePeriodDurabilityIntegrationTest.java +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/JoinGracePeriodDurabilityIntegrationTest.java @@ -63,7 +63,7 @@ import static org.apache.kafka.common.utils.Utils.mkProperties; import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.cleanStateBeforeTest; import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.getStartedStreams; import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.quietlyCleanStateAfterTest; -import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName; +import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/JoinStoreIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/JoinStoreIntegrationTest.java similarity index 100% rename from streams/src/test/java/org/apache/kafka/streams/integration/JoinStoreIntegrationTest.java rename to streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/JoinStoreIntegrationTest.java diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/JoinWithIncompleteMetadataIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/JoinWithIncompleteMetadataIntegrationTest.java similarity index 98% rename from streams/src/test/java/org/apache/kafka/streams/integration/JoinWithIncompleteMetadataIntegrationTest.java rename to streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/JoinWithIncompleteMetadataIntegrationTest.java index c3059011cbf..413799042e7 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/JoinWithIncompleteMetadataIntegrationTest.java +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/JoinWithIncompleteMetadataIntegrationTest.java @@ -18,7 +18,6 @@ package org.apache.kafka.streams.integration; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.Serdes; -import org.apache.kafka.streams.KafkaStreamsWrapper; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java similarity index 99% rename from streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java rename to streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java index 158a7c5a4a0..cca699b9493 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java @@ -60,7 +60,7 @@ import java.util.Properties; import static java.time.Duration.ofMillis; import static java.time.Duration.ofMinutes; -import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName; +import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName; /** * Similar to KStreamAggregationIntegrationTest but with dedupping enabled diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java similarity index 99% rename from streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java rename to streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java index 9e2e784c967..97840cd4a8c 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java @@ -90,7 +90,7 @@ import java.util.concurrent.TimeUnit; import static java.time.Duration.ofMillis; import static java.time.Duration.ofMinutes; import static java.time.Instant.ofEpochMilli; -import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName; +import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.core.Is.is; diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKStreamIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KStreamKStreamIntegrationTest.java similarity index 98% rename from streams/src/test/java/org/apache/kafka/streams/integration/KStreamKStreamIntegrationTest.java rename to streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KStreamKStreamIntegrationTest.java index 6aa70b77679..23504ef4490 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKStreamIntegrationTest.java +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KStreamKStreamIntegrationTest.java @@ -54,9 +54,9 @@ import java.util.Set; import static java.time.Duration.ofSeconds; import static java.util.Arrays.asList; -import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName; import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.startApplicationAndWaitUntilRunning; import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived; +import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.core.IsEqual.equalTo; diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionIntegrationTest.java similarity index 99% rename from streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionIntegrationTest.java rename to streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionIntegrationTest.java index 74e888e5c32..d60a13915b5 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionIntegrationTest.java +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionIntegrationTest.java @@ -76,7 +76,7 @@ import java.util.stream.IntStream; import static org.apache.kafka.streams.KafkaStreams.State.ERROR; import static org.apache.kafka.streams.KafkaStreams.State.REBALANCING; import static org.apache.kafka.streams.KafkaStreams.State.RUNNING; -import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName; +import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertTrue; diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamTransformIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KStreamTransformIntegrationTest.java similarity index 100% rename from streams/src/test/java/org/apache/kafka/streams/integration/KStreamTransformIntegrationTest.java rename to streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KStreamTransformIntegrationTest.java diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KTableEfficientRangeQueryTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KTableEfficientRangeQueryTest.java similarity index 100% rename from streams/src/test/java/org/apache/kafka/streams/integration/KTableEfficientRangeQueryTest.java rename to streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KTableEfficientRangeQueryTest.java diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest.java similarity index 98% rename from streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest.java rename to streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest.java index a2b0750550e..2e4faf6551a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest.java +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest.java @@ -65,10 +65,10 @@ import java.util.Set; import static java.time.Duration.ofSeconds; import static java.util.Arrays.asList; -import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName; import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.startApplicationAndWaitUntilRunning; -import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForApplicationState; import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived; +import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName; +import static org.apache.kafka.streams.utils.TestUtils.waitForApplicationState; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.core.IsEqual.equalTo; diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinMultiIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinMultiIntegrationTest.java similarity index 100% rename from streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinMultiIntegrationTest.java rename to streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinMultiIntegrationTest.java diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinDistributedTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinDistributedTest.java similarity index 99% rename from streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinDistributedTest.java rename to streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinDistributedTest.java index 86174d3f8be..637daad1e0c 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinDistributedTest.java +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinDistributedTest.java @@ -54,7 +54,7 @@ import java.util.function.Function; import java.util.function.Predicate; import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.quietlyCleanStateAfterTest; -import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName; +import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName; import static org.junit.jupiter.api.Assertions.assertEquals; @Timeout(600) diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java similarity index 100% rename from streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java rename to streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinMaterializationIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinMaterializationIntegrationTest.java similarity index 100% rename from streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinMaterializationIntegrationTest.java rename to streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinMaterializationIntegrationTest.java diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KTableSourceTopicRestartIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KTableSourceTopicRestartIntegrationTest.java similarity index 98% rename from streams/src/test/java/org/apache/kafka/streams/integration/KTableSourceTopicRestartIntegrationTest.java rename to streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KTableSourceTopicRestartIntegrationTest.java index 5f742d95b3c..52dd78ec17e 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/KTableSourceTopicRestartIntegrationTest.java +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KTableSourceTopicRestartIntegrationTest.java @@ -53,6 +53,8 @@ import java.util.Map; import java.util.Properties; import java.util.concurrent.ConcurrentHashMap; +import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName; + @Timeout(600) @Tag("integration") public class KTableSourceTopicRestartIntegrationTest { @@ -99,7 +101,7 @@ public class KTableSourceTopicRestartIntegrationTest { @BeforeEach public void before(final TestInfo testInfo) throws Exception { - final String safeTestName = IntegrationTestUtils.safeUniqueTestName(testInfo); + final String safeTestName = safeUniqueTestName(testInfo); sourceTopic = SOURCE_TOPIC + "-" + safeTestName; CLUSTER.createTopic(sourceTopic); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsCloseOptionsIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsCloseOptionsIntegrationTest.java similarity index 98% rename from streams/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsCloseOptionsIntegrationTest.java rename to streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsCloseOptionsIntegrationTest.java index bcbc36f3152..97fc2b40ba8 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsCloseOptionsIntegrationTest.java +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsCloseOptionsIntegrationTest.java @@ -58,6 +58,7 @@ import java.util.List; import java.util.Properties; import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForEmptyConsumerGroup; +import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName; @Tag("integration") @Timeout(600) @@ -98,7 +99,7 @@ public class KafkaStreamsCloseOptionsIntegrationTest { public void before(final TestInfo testName) throws Exception { mockTime = CLUSTER.time; - final String appID = IntegrationTestUtils.safeUniqueTestName(testName); + final String appID = safeUniqueTestName(testName); commonClientConfig = new Properties(); commonClientConfig.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsWrapper.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsWrapper.java similarity index 94% rename from streams/src/test/java/org/apache/kafka/streams/KafkaStreamsWrapper.java rename to streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsWrapper.java index 8e24e246199..31783776560 100644 --- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsWrapper.java +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsWrapper.java @@ -14,8 +14,10 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.kafka.streams; +package org.apache.kafka.streams.integration; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.processor.internals.StreamThread; import java.util.ArrayList; diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/LagFetchIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/LagFetchIntegrationTest.java similarity index 99% rename from streams/src/test/java/org/apache/kafka/streams/integration/LagFetchIntegrationTest.java rename to streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/LagFetchIntegrationTest.java index 6c4aaa82a1b..0441b7d49e9 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/LagFetchIntegrationTest.java +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/LagFetchIntegrationTest.java @@ -25,7 +25,6 @@ import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.server.util.MockTime; import org.apache.kafka.streams.KafkaStreams; -import org.apache.kafka.streams.KafkaStreamsWrapper; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.LagInfo; import org.apache.kafka.streams.StreamsBuilder; @@ -67,8 +66,8 @@ import java.util.concurrent.CyclicBarrier; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; -import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName; import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.startApplicationAndWaitUntilRunning; +import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.core.IsEqual.equalTo; import static org.junit.jupiter.api.Assertions.assertTrue; diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java similarity index 99% rename from streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java rename to streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java index 68e9d8a518e..d083a205e80 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java @@ -60,7 +60,7 @@ import java.util.List; import java.util.Properties; import java.util.stream.Collectors; -import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName; +import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.jupiter.api.Assertions.assertEquals; diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/MetricsReporterIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/MetricsReporterIntegrationTest.java similarity index 98% rename from streams/src/test/java/org/apache/kafka/streams/integration/MetricsReporterIntegrationTest.java rename to streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/MetricsReporterIntegrationTest.java index e08146ebf7a..c9c10f0afaa 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/MetricsReporterIntegrationTest.java +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/MetricsReporterIntegrationTest.java @@ -42,7 +42,7 @@ import java.util.List; import java.util.Map; import java.util.Properties; -import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName; +import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName; import static org.hamcrest.CoreMatchers.notNullValue; import static org.hamcrest.MatcherAssert.assertThat; diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java similarity index 98% rename from streams/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java rename to streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java index 7bc67f57102..8186a5b3049 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java @@ -85,9 +85,9 @@ import static java.util.Collections.singleton; import static org.apache.kafka.streams.KeyQueryMetadata.NOT_AVAILABLE; import static org.apache.kafka.streams.KeyValue.pair; import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.DEFAULT_TIMEOUT; -import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName; -import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForApplicationState; import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived; +import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName; +import static org.apache.kafka.streams.utils.TestUtils.waitForApplicationState; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.CoreMatchers.not; @@ -443,7 +443,7 @@ public class NamedTopologyIntegrationTest { IntegrationTestUtils.startApplicationAndWaitUntilRunning(streams); streams.addNamedTopology(topology2Builder.build()).all().get(); - IntegrationTestUtils.waitForApplicationState(Collections.singletonList(streams), State.RUNNING, Duration.ofMillis(DEFAULT_TIMEOUT)); + waitForApplicationState(Collections.singletonList(streams), State.RUNNING, Duration.ofMillis(DEFAULT_TIMEOUT)); assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_1, 5), equalTo(COUNT_OUTPUT_DATA)); assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_2, 5), equalTo(COUNT_OUTPUT_DATA)); @@ -461,7 +461,7 @@ public class NamedTopologyIntegrationTest { streams.addNamedTopology(topology3Builder.build()).all().get(); - IntegrationTestUtils.waitForApplicationState(Collections.singletonList(streams), State.RUNNING, Duration.ofMillis(DEFAULT_TIMEOUT)); + waitForApplicationState(Collections.singletonList(streams), State.RUNNING, Duration.ofMillis(DEFAULT_TIMEOUT)); assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_1, 5), equalTo(COUNT_OUTPUT_DATA)); assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_2, 5), equalTo(COUNT_OUTPUT_DATA)); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/OptimizedKTableIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/OptimizedKTableIntegrationTest.java similarity index 99% rename from streams/src/test/java/org/apache/kafka/streams/integration/OptimizedKTableIntegrationTest.java rename to streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/OptimizedKTableIntegrationTest.java index ed3751ee43b..7d076b7487f 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/OptimizedKTableIntegrationTest.java +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/OptimizedKTableIntegrationTest.java @@ -63,8 +63,8 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import java.util.stream.IntStream; -import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName; import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.startApplicationAndWaitUntilRunning; +import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/PauseResumeIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/PauseResumeIntegrationTest.java similarity index 98% rename from streams/src/test/java/org/apache/kafka/streams/integration/PauseResumeIntegrationTest.java rename to streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/PauseResumeIntegrationTest.java index 67b25e35f68..29f5276cfb1 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/PauseResumeIntegrationTest.java +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/PauseResumeIntegrationTest.java @@ -59,10 +59,10 @@ import static java.util.Collections.singletonList; import static org.apache.kafka.streams.KeyValue.pair; import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.cleanStateBeforeTest; import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.getTopicSize; -import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName; -import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForApplicationState; import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived; import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilStreamsHasPolled; +import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName; +import static org.apache.kafka.streams.utils.TestUtils.waitForApplicationState; import static org.apache.kafka.test.TestUtils.waitForCondition; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.jupiter.api.Assertions.assertEquals; diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/PositionRestartIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/PositionRestartIntegrationTest.java similarity index 100% rename from streams/src/test/java/org/apache/kafka/streams/integration/PositionRestartIntegrationTest.java rename to streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/PositionRestartIntegrationTest.java diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/ProcessingExceptionHandlerIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/ProcessingExceptionHandlerIntegrationTest.java similarity index 100% rename from streams/src/test/java/org/apache/kafka/streams/integration/ProcessingExceptionHandlerIntegrationTest.java rename to streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/ProcessingExceptionHandlerIntegrationTest.java diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/PurgeRepartitionTopicIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/PurgeRepartitionTopicIntegrationTest.java similarity index 100% rename from streams/src/test/java/org/apache/kafka/streams/integration/PurgeRepartitionTopicIntegrationTest.java rename to streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/PurgeRepartitionTopicIntegrationTest.java diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java similarity index 99% rename from streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java rename to streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java index 7349f213650..6ae15dcdf03 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java @@ -104,11 +104,11 @@ import static org.apache.kafka.common.utils.Utils.mkMap; import static org.apache.kafka.common.utils.Utils.mkProperties; import static org.apache.kafka.streams.StoreQueryParameters.fromNameAndType; import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.getRunningStreams; -import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName; import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.startApplicationAndWaitUntilRunning; -import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForApplicationState; import static org.apache.kafka.streams.state.QueryableStoreTypes.keyValueStore; import static org.apache.kafka.streams.state.QueryableStoreTypes.sessionStore; +import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName; +import static org.apache.kafka.streams.utils.TestUtils.waitForApplicationState; import static org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.is; diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RangeQueryIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/RangeQueryIntegrationTest.java similarity index 100% rename from streams/src/test/java/org/apache/kafka/streams/integration/RangeQueryIntegrationTest.java rename to streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/RangeQueryIntegrationTest.java diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java similarity index 99% rename from streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java rename to streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java index fe82f3360ff..ceb8b653eb1 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java @@ -71,6 +71,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.regex.Pattern; import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.startApplicationAndWaitUntilRunning; +import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; @@ -139,7 +140,7 @@ public class RegexSourceIntegrationTest { properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10000); streamsConfiguration = StreamsTestUtils.getStreamsConfig( - IntegrationTestUtils.safeUniqueTestName(testInfo), + safeUniqueTestName(testInfo), CLUSTER.bootstrapServers(), STRING_SERDE_CLASSNAME, STRING_SERDE_CLASSNAME, diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RelaxedNullKeyRequirementJoinTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/RelaxedNullKeyRequirementJoinTest.java similarity index 100% rename from streams/src/test/java/org/apache/kafka/streams/integration/RelaxedNullKeyRequirementJoinTest.java rename to streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/RelaxedNullKeyRequirementJoinTest.java diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java similarity index 95% rename from streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java rename to streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java index 13856c58dc9..a197de03ec6 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java @@ -47,6 +47,7 @@ import java.util.Properties; import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.isEmptyConsumerGroup; import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForEmptyConsumerGroup; +import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -99,7 +100,7 @@ public class ResetIntegrationTest extends AbstractResetIntegrationTest { @Test public void shouldNotAllowToResetWhileStreamsIsRunning(final TestInfo testInfo) throws Exception { - final String appID = IntegrationTestUtils.safeUniqueTestName(testInfo); + final String appID = safeUniqueTestName(testInfo); final String[] parameters = new String[] { "--application-id", appID, "--bootstrap-server", cluster.bootstrapServers(), @@ -123,7 +124,7 @@ public class ResetIntegrationTest extends AbstractResetIntegrationTest { @Test public void shouldNotAllowToResetWhenInputTopicAbsent(final TestInfo testInfo) { - final String appID = IntegrationTestUtils.safeUniqueTestName(testInfo); + final String appID = safeUniqueTestName(testInfo); final String[] parameters = new String[] { "--application-id", appID, "--bootstrap-server", cluster.bootstrapServers(), @@ -139,7 +140,7 @@ public class ResetIntegrationTest extends AbstractResetIntegrationTest { @Test public void shouldNotAllowToResetWhenIntermediateTopicAbsent(final TestInfo testInfo) { - final String appID = IntegrationTestUtils.safeUniqueTestName(testInfo); + final String appID = safeUniqueTestName(testInfo); final String[] parameters = new String[] { "--application-id", appID, "--bootstrap-server", cluster.bootstrapServers(), @@ -155,7 +156,7 @@ public class ResetIntegrationTest extends AbstractResetIntegrationTest { @Test public void shouldNotAllowToResetWhenSpecifiedInternalTopicDoesNotExist(final TestInfo testInfo) { - final String appID = IntegrationTestUtils.safeUniqueTestName(testInfo); + final String appID = safeUniqueTestName(testInfo); final String[] parameters = new String[] { "--application-id", appID, "--bootstrap-server", cluster.bootstrapServers(), @@ -171,7 +172,7 @@ public class ResetIntegrationTest extends AbstractResetIntegrationTest { @Test public void shouldNotAllowToResetWhenSpecifiedInternalTopicIsNotInternal(final TestInfo testInfo) { - final String appID = IntegrationTestUtils.safeUniqueTestName(testInfo); + final String appID = safeUniqueTestName(testInfo); final String[] parameters = new String[] { "--application-id", appID, "--bootstrap-server", cluster.bootstrapServers(), @@ -187,7 +188,7 @@ public class ResetIntegrationTest extends AbstractResetIntegrationTest { @Test public void testResetWhenLongSessionTimeoutConfiguredWithForceOption(final TestInfo testInfo) throws Exception { - final String appID = IntegrationTestUtils.safeUniqueTestName(testInfo); + final String appID = safeUniqueTestName(testInfo); streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, appID); streamsConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, Integer.toString(STREAMS_CONSUMER_TIMEOUT * 100)); @@ -224,7 +225,7 @@ public class ResetIntegrationTest extends AbstractResetIntegrationTest { @Test public void testReprocessingFromFileAfterResetWithoutIntermediateUserTopic(final TestInfo testInfo) throws Exception { - final String appID = IntegrationTestUtils.safeUniqueTestName(testInfo); + final String appID = safeUniqueTestName(testInfo); streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, appID); // RUN @@ -266,7 +267,7 @@ public class ResetIntegrationTest extends AbstractResetIntegrationTest { @Test public void testReprocessingFromDateTimeAfterResetWithoutIntermediateUserTopic(final TestInfo testInfo) throws Exception { - final String appID = IntegrationTestUtils.safeUniqueTestName(testInfo); + final String appID = safeUniqueTestName(testInfo); streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, appID); // RUN @@ -313,7 +314,7 @@ public class ResetIntegrationTest extends AbstractResetIntegrationTest { @Test public void testReprocessingByDurationAfterResetWithoutIntermediateUserTopic(final TestInfo testInfo) throws Exception { - final String appID = IntegrationTestUtils.safeUniqueTestName(testInfo); + final String appID = safeUniqueTestName(testInfo); streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, appID); // RUN diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationWithSslTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationWithSslTest.java similarity index 100% rename from streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationWithSslTest.java rename to streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationWithSslTest.java diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/ResetPartitionTimeIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/ResetPartitionTimeIntegrationTest.java similarity index 98% rename from streams/src/test/java/org/apache/kafka/streams/integration/ResetPartitionTimeIntegrationTest.java rename to streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/ResetPartitionTimeIntegrationTest.java index 8fdee1baa46..83a38ea982e 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/ResetPartitionTimeIntegrationTest.java +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/ResetPartitionTimeIntegrationTest.java @@ -55,7 +55,7 @@ import static org.apache.kafka.common.utils.Utils.mkProperties; import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.cleanStateBeforeTest; import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.getStartedStreams; import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.quietlyCleanStateAfterTest; -import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName; +import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java similarity index 99% rename from streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java rename to streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java index 5fe61eed66e..92231da7e6e 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java @@ -100,11 +100,11 @@ import static org.apache.kafka.common.utils.Utils.mkMap; import static org.apache.kafka.common.utils.Utils.mkObjectProperties; import static org.apache.kafka.streams.Topology.AutoOffsetReset.EARLIEST; import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.purgeLocalStreamsState; -import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName; import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.startApplicationAndWaitUntilRunning; import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForActiveRestoringTask; import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForCompletion; import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForStandbyCompletion; +import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName; import static org.apache.kafka.test.TestUtils.waitForCondition; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.core.IsEqual.equalTo; diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RocksDBMetricsIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/RocksDBMetricsIntegrationTest.java similarity index 99% rename from streams/src/test/java/org/apache/kafka/streams/integration/RocksDBMetricsIntegrationTest.java rename to streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/RocksDBMetricsIntegrationTest.java index cb7e687d119..54aa5dd5f0d 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/RocksDBMetricsIntegrationTest.java +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/RocksDBMetricsIntegrationTest.java @@ -55,8 +55,8 @@ import java.util.Properties; import java.util.Set; import java.util.stream.Collectors; -import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName; import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.startApplicationAndWaitUntilRunning; +import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.notNullValue; diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/SelfJoinUpgradeIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SelfJoinUpgradeIntegrationTest.java similarity index 99% rename from streams/src/test/java/org/apache/kafka/streams/integration/SelfJoinUpgradeIntegrationTest.java rename to streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SelfJoinUpgradeIntegrationTest.java index 25c8723e867..d97d85a6af3 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/SelfJoinUpgradeIntegrationTest.java +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SelfJoinUpgradeIntegrationTest.java @@ -51,7 +51,7 @@ import java.util.Properties; import static java.time.Duration.ofMinutes; import static java.util.Arrays.asList; import static java.util.Collections.singletonList; -import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName; +import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.core.Is.is; diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/SlidingWindowedKStreamIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SlidingWindowedKStreamIntegrationTest.java similarity index 99% rename from streams/src/test/java/org/apache/kafka/streams/integration/SlidingWindowedKStreamIntegrationTest.java rename to streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SlidingWindowedKStreamIntegrationTest.java index b75b6c517c7..5f972f7c4f4 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/SlidingWindowedKStreamIntegrationTest.java +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SlidingWindowedKStreamIntegrationTest.java @@ -69,7 +69,7 @@ import static java.util.Arrays.asList; import static org.apache.kafka.common.utils.Utils.mkEntry; import static org.apache.kafka.common.utils.Utils.mkMap; import static org.apache.kafka.common.utils.Utils.mkProperties; -import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName; +import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.core.Is.is; diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.java similarity index 100% rename from streams/src/test/java/org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.java rename to streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.java diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskCreationIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StandbyTaskCreationIntegrationTest.java similarity index 98% rename from streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskCreationIntegrationTest.java rename to streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StandbyTaskCreationIntegrationTest.java index 418f78c088d..f7dc074d4b8 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskCreationIntegrationTest.java +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StandbyTaskCreationIntegrationTest.java @@ -47,7 +47,7 @@ import java.time.Duration; import java.util.Properties; import java.util.function.Predicate; -import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName; +import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName; @Timeout(600) @Tag("integration") diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java similarity index 99% rename from streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java rename to streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java index 38dac86d711..7195133f33b 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java @@ -58,8 +58,8 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import static java.util.Arrays.asList; -import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName; import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.startApplicationAndWaitUntilRunning; +import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName; import static org.apache.kafka.test.TestUtils.waitForCondition; import static org.junit.jupiter.api.Assertions.assertTrue; diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSMultiRebalanceIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSMultiRebalanceIntegrationTest.java similarity index 100% rename from streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSMultiRebalanceIntegrationTest.java rename to streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSMultiRebalanceIntegrationTest.java diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/StateDirectoryIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StateDirectoryIntegrationTest.java similarity index 99% rename from streams/src/test/java/org/apache/kafka/streams/integration/StateDirectoryIntegrationTest.java rename to streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StateDirectoryIntegrationTest.java index 3c3a3721748..fbed04dd8a4 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/StateDirectoryIntegrationTest.java +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StateDirectoryIntegrationTest.java @@ -50,7 +50,7 @@ import java.util.concurrent.TimeUnit; import static org.apache.kafka.common.utils.Utils.mkEntry; import static org.apache.kafka.common.utils.Utils.mkMap; import static org.apache.kafka.common.utils.Utils.mkProperties; -import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName; +import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName; import static org.junit.jupiter.api.Assertions.assertTrue; @Tag("integration") diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java similarity index 99% rename from streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java rename to streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java index f9b586a66bc..da7a9076db7 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java @@ -72,10 +72,10 @@ import java.util.stream.IntStream; import static java.util.Collections.singletonList; import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.getStore; -import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName; import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.startApplicationAndWaitUntilRunning; -import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForApplicationState; import static org.apache.kafka.streams.state.QueryableStoreTypes.keyValueStore; +import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName; +import static org.apache.kafka.streams.utils.TestUtils.waitForApplicationState; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.anyOf; import static org.hamcrest.Matchers.containsString; diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/StoreQuerySuite.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StoreQuerySuite.java similarity index 100% rename from streams/src/test/java/org/apache/kafka/streams/integration/StoreQuerySuite.java rename to streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StoreQuerySuite.java diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/StoreUpgradeIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StoreUpgradeIntegrationTest.java similarity index 99% rename from streams/src/test/java/org/apache/kafka/streams/integration/StoreUpgradeIntegrationTest.java rename to streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StoreUpgradeIntegrationTest.java index 557650a6838..f0286d4fe93 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/StoreUpgradeIntegrationTest.java +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StoreUpgradeIntegrationTest.java @@ -58,7 +58,7 @@ import java.util.Properties; import static java.util.Arrays.asList; import static java.util.Collections.singletonList; -import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName; +import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName; @Tag("integration") public class StoreUpgradeIntegrationTest { diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/StreamStreamJoinIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StreamStreamJoinIntegrationTest.java similarity index 100% rename from streams/src/test/java/org/apache/kafka/streams/integration/StreamStreamJoinIntegrationTest.java rename to streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StreamStreamJoinIntegrationTest.java diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinIntegrationTest.java similarity index 100% rename from streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinIntegrationTest.java rename to streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinIntegrationTest.java diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinTopologyOptimizationIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinTopologyOptimizationIntegrationTest.java similarity index 99% rename from streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinTopologyOptimizationIntegrationTest.java rename to streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinTopologyOptimizationIntegrationTest.java index f25ffca5d9f..deb6b12c1d4 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinTopologyOptimizationIntegrationTest.java +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinTopologyOptimizationIntegrationTest.java @@ -58,7 +58,7 @@ import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName; +import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinWithGraceIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinWithGraceIntegrationTest.java similarity index 100% rename from streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinWithGraceIntegrationTest.java rename to streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinWithGraceIntegrationTest.java diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java similarity index 98% rename from streams/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java rename to streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java index 6a61d781f51..2ccc36aab24 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java @@ -70,9 +70,9 @@ import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.St import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_APPLICATION; import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT; import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.purgeLocalStreamsState; -import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName; import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.startApplicationAndWaitUntilRunning; -import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForApplicationState; +import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName; +import static org.apache.kafka.streams.utils.TestUtils.waitForApplicationState; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.jupiter.api.Assertions.fail; diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/StreamsUpgradeTestIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StreamsUpgradeTestIntegrationTest.java similarity index 100% rename from streams/src/test/java/org/apache/kafka/streams/integration/StreamsUpgradeTestIntegrationTest.java rename to streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StreamsUpgradeTestIntegrationTest.java diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionDurabilityIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SuppressionDurabilityIntegrationTest.java similarity index 99% rename from streams/src/test/java/org/apache/kafka/streams/integration/SuppressionDurabilityIntegrationTest.java rename to streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SuppressionDurabilityIntegrationTest.java index 1442bbcc69b..7cb8108a2de 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionDurabilityIntegrationTest.java +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SuppressionDurabilityIntegrationTest.java @@ -70,9 +70,9 @@ import static org.apache.kafka.common.utils.Utils.mkProperties; import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.cleanStateBeforeTest; import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.getStartedStreams; import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.quietlyCleanStateAfterTest; -import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName; import static org.apache.kafka.streams.kstream.Suppressed.BufferConfig.maxRecords; import static org.apache.kafka.streams.kstream.Suppressed.untilTimeLimit; +import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SuppressionIntegrationTest.java similarity index 100% rename from streams/src/test/java/org/apache/kafka/streams/integration/SuppressionIntegrationTest.java rename to streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SuppressionIntegrationTest.java diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/SwallowUnknownTopicErrorIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SwallowUnknownTopicErrorIntegrationTest.java similarity index 98% rename from streams/src/test/java/org/apache/kafka/streams/integration/SwallowUnknownTopicErrorIntegrationTest.java rename to streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SwallowUnknownTopicErrorIntegrationTest.java index 886908cec71..a82e832e21c 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/SwallowUnknownTopicErrorIntegrationTest.java +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SwallowUnknownTopicErrorIntegrationTest.java @@ -56,7 +56,7 @@ import java.util.Collections; import java.util.Map; import java.util.Properties; -import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName; +import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName; @Timeout(600) @Tag("integration") diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/TableTableJoinIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TableTableJoinIntegrationTest.java similarity index 100% rename from streams/src/test/java/org/apache/kafka/streams/integration/TableTableJoinIntegrationTest.java rename to streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TableTableJoinIntegrationTest.java diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/TaskAssignorIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TaskAssignorIntegrationTest.java similarity index 98% rename from streams/src/test/java/org/apache/kafka/streams/integration/TaskAssignorIntegrationTest.java rename to streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TaskAssignorIntegrationTest.java index 3dad9f12990..f0fc5477be9 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/TaskAssignorIntegrationTest.java +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TaskAssignorIntegrationTest.java @@ -50,7 +50,7 @@ import java.util.function.Supplier; import static org.apache.kafka.common.utils.Utils.mkEntry; import static org.apache.kafka.common.utils.Utils.mkMap; import static org.apache.kafka.common.utils.Utils.mkObjectProperties; -import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName; +import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/TaskMetadataIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TaskMetadataIntegrationTest.java similarity index 98% rename from streams/src/test/java/org/apache/kafka/streams/integration/TaskMetadataIntegrationTest.java rename to streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TaskMetadataIntegrationTest.java index ab36eda4190..a7d1a0c65b1 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/TaskMetadataIntegrationTest.java +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TaskMetadataIntegrationTest.java @@ -53,7 +53,7 @@ import static org.apache.kafka.common.utils.Utils.mkEntry; import static org.apache.kafka.common.utils.Utils.mkMap; import static org.apache.kafka.common.utils.Utils.mkObjectProperties; import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.purgeLocalStreamsState; -import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName; +import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/TestingMetricsInterceptingAdminClient.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TestingMetricsInterceptingAdminClient.java similarity index 100% rename from streams/src/test/java/org/apache/kafka/streams/integration/TestingMetricsInterceptingAdminClient.java rename to streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TestingMetricsInterceptingAdminClient.java diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/TimeWindowedKStreamIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TimeWindowedKStreamIntegrationTest.java similarity index 99% rename from streams/src/test/java/org/apache/kafka/streams/integration/TimeWindowedKStreamIntegrationTest.java rename to streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TimeWindowedKStreamIntegrationTest.java index 7c8592f29a4..14a07e569ee 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/TimeWindowedKStreamIntegrationTest.java +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TimeWindowedKStreamIntegrationTest.java @@ -73,7 +73,7 @@ import static java.util.Arrays.asList; import static org.apache.kafka.common.utils.Utils.mkEntry; import static org.apache.kafka.common.utils.Utils.mkMap; import static org.apache.kafka.common.utils.Utils.mkProperties; -import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName; +import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.core.Is.is; import static org.junit.jupiter.api.Assertions.assertThrows; diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/VersionedKeyValueStoreIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/VersionedKeyValueStoreIntegrationTest.java similarity index 99% rename from streams/src/test/java/org/apache/kafka/streams/integration/VersionedKeyValueStoreIntegrationTest.java rename to streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/VersionedKeyValueStoreIntegrationTest.java index 8a361d12e27..86dd025e7fd 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/VersionedKeyValueStoreIntegrationTest.java +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/VersionedKeyValueStoreIntegrationTest.java @@ -74,7 +74,7 @@ import java.util.Objects; import java.util.Optional; import java.util.Properties; -import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName; +import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/CompositeStateListener.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/utils/CompositeStateListener.java similarity index 100% rename from streams/src/test/java/org/apache/kafka/streams/integration/utils/CompositeStateListener.java rename to streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/utils/CompositeStateListener.java diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java similarity index 100% rename from streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java rename to streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java similarity index 96% rename from streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java rename to streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java index 997e2faa603..2d66e0fd86b 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java @@ -29,7 +29,6 @@ import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.Metric; import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.Uuid; import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.header.Headers; import org.apache.kafka.common.utils.Time; @@ -57,14 +56,12 @@ import org.apache.kafka.streams.state.QueryableStoreType; import org.apache.kafka.test.TestCondition; import org.apache.kafka.test.TestUtils; -import org.junit.jupiter.api.TestInfo; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.File; import java.io.IOException; import java.lang.reflect.Field; -import java.lang.reflect.Method; import java.nio.file.Paths; import java.time.Duration; import java.util.ArrayList; @@ -219,27 +216,6 @@ public class IntegrationTestUtils { } } - public static String safeUniqueTestName(final TestInfo testInfo) { - final String methodName = testInfo.getTestMethod().map(Method::getName).orElse("unknownMethodName"); - return safeUniqueTestName(methodName); - } - - private static String safeUniqueTestName(final String testName) { - return sanitize(testName + Uuid.randomUuid().toString()); - } - - private static String sanitize(final String str) { - return str - // The `-` is used in Streams' thread name as a separator and some tests rely on this. - .replace('-', '_') - .replace(':', '_') - .replace('.', '_') - .replace('[', '_') - .replace(']', '_') - .replace(' ', '_') - .replace('=', '_'); - } - /** * Removes local state stores. Useful to reset state in-between integration test runs. * @@ -922,7 +898,7 @@ public class IntegrationTestUtils { * {@link State#RUNNING} state at the same time. Note that states may change between the time * that this method returns and the calling function executes its next statement.

* - * If the application is already started, use {@link #waitForApplicationState(List, State, Duration)} + * If the application is already started, use {@link org.apache.kafka.streams.utils.TestUtils#waitForApplicationState(List, State, Duration)} * to wait for instances to reach {@link State#RUNNING} state. * * @param streamsList the list of streams instances to run. @@ -991,41 +967,6 @@ public class IntegrationTestUtils { } } - /** - * Waits for the given {@link KafkaStreams} instances to all be in a specific {@link State}. - * Prefer {@link #startApplicationAndWaitUntilRunning(List, Duration)} when possible - * because this method uses polling, which can be more error prone and slightly slower. - * - * @param streamsList the list of streams instances to run. - * @param state the expected state that all the streams to be in within timeout - * @param timeout the time to wait for the streams to all be in the specific state. - * - * @throws InterruptedException if the streams doesn't change to the expected state in time. - */ - public static void waitForApplicationState(final List streamsList, - final State state, - final Duration timeout) throws InterruptedException { - retryOnExceptionWithTimeout(timeout.toMillis(), () -> { - final Map streamsToStates = streamsList - .stream() - .collect(Collectors.toMap(stream -> stream, KafkaStreams::state)); - - final Map wrongStateMap = streamsToStates.entrySet() - .stream() - .filter(entry -> entry.getValue() != state) - .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); - - final String reason = String.format( - "Expected all streams instances in %s to be %s within %d ms, but the following were not: %s", - streamsList, - state, - timeout.toMillis(), - wrongStateMap - ); - assertThat(reason, wrongStateMap.isEmpty()); - }); - } - private static class ConsumerGroupInactiveCondition implements TestCondition { private final Admin adminClient; private final String applicationId; diff --git a/streams/integration-tests/src/test/resources/log4j.properties b/streams/integration-tests/src/test/resources/log4j.properties new file mode 100644 index 00000000000..b7e1fb2d60e --- /dev/null +++ b/streams/integration-tests/src/test/resources/log4j.properties @@ -0,0 +1,37 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +log4j.rootLogger=INFO, stdout + +log4j.appender.stdout=org.apache.log4j.ConsoleAppender +log4j.appender.stdout.layout=org.apache.log4j.PatternLayout +log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c:%L)%n + +log4j.logger.kafka=ERROR +log4j.logger.state.change.logger=ERROR +log4j.logger.org.apache.kafka=ERROR +log4j.logger.org.apache.zookeeper=ERROR +log4j.logger.org.apache.kafka.clients=ERROR + +# These are the only logs we will likely ever find anything useful in to debug Streams test failures +log4j.logger.org.apache.kafka.clients.consumer=INFO +log4j.logger.org.apache.kafka.clients.producer=INFO +log4j.logger.org.apache.kafka.streams=INFO + +# printing out the configs takes up a huge amount of the allotted characters, +# and provides little value as we can always figure out the test configs without the logs +log4j.logger.org.apache.kafka.clients.producer.ProducerConfig=ERROR +log4j.logger.org.apache.kafka.clients.consumer.ConsumerConfig=ERROR +log4j.logger.org.apache.kafka.clients.admin.AdminClientConfig=ERROR +log4j.logger.org.apache.kafka.streams.StreamsConfig=ERROR diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala b/streams/integration-tests/src/test/scala/org/apache/kafka/streams/integration/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala similarity index 97% rename from streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala rename to streams/integration-tests/src/test/scala/org/apache/kafka/streams/integration/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala index e9577bcf73c..4cfc811728e 100644 --- a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala +++ b/streams/integration-tests/src/test/scala/org/apache/kafka/streams/integration/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala @@ -14,18 +14,19 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.kafka.streams.scala +package org.apache.kafka.streams.integration + +import org.apache.kafka.streams.integration.utils.StreamToTableJoinScalaIntegrationTestBase +import org.apache.kafka.streams.scala.ImplicitConversions._ +import org.apache.kafka.streams.scala.StreamsBuilder +import org.apache.kafka.streams.scala.kstream._ +import org.apache.kafka.streams.scala.serialization.{Serdes => NewSerdes} +import org.apache.kafka.streams.{KafkaStreams, KeyValue, StreamsConfig} +import org.junit.jupiter.api.Assertions._ +import org.junit.jupiter.api._ import java.util.Properties -import org.apache.kafka.streams.{KafkaStreams, KeyValue, StreamsConfig} -import org.apache.kafka.streams.scala.serialization.{Serdes => NewSerdes} -import org.apache.kafka.streams.scala.ImplicitConversions._ -import org.apache.kafka.streams.scala.kstream._ -import org.apache.kafka.streams.scala.utils.StreamToTableJoinScalaIntegrationTestBase -import org.junit.jupiter.api._ -import org.junit.jupiter.api.Assertions._ - /** * Test suite that does an example to demonstrate stream-table joins in Kafka Streams *

@@ -125,11 +126,11 @@ class StreamToTableJoinScalaIntegrationTestImplicitSerdes extends StreamToTableJ @Test def testShouldCountClicksPerRegionJava(): Unit = { - import java.lang.{Long => JLong} - import org.apache.kafka.streams.kstream.{KStream => KStreamJ, KTable => KTableJ, _} import org.apache.kafka.streams.{KafkaStreams => KafkaStreamsJ, StreamsBuilder => StreamsBuilderJ} + import java.lang.{Long => JLong} + val streamsConfiguration: Properties = getStreamsConfiguration() streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, NewSerdes.stringSerde.getClass.getName) diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/WordCountTest.scala b/streams/integration-tests/src/test/scala/org/apache/kafka/streams/integration/WordCountTest.scala similarity index 98% rename from streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/WordCountTest.scala rename to streams/integration-tests/src/test/scala/org/apache/kafka/streams/integration/WordCountTest.scala index bd75354204f..3e9813dda24 100644 --- a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/WordCountTest.scala +++ b/streams/integration-tests/src/test/scala/org/apache/kafka/streams/integration/WordCountTest.scala @@ -14,20 +14,21 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.kafka.streams.scala +package org.apache.kafka.streams.integration import java.util.Properties import java.util.regex.Pattern import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api._ import org.apache.kafka.streams.scala.serialization.{Serdes => NewSerdes} +import org.apache.kafka.streams.scala.ImplicitConversions._ +import org.apache.kafka.streams.scala.StreamsBuilder import org.apache.kafka.streams.{KafkaStreams, KeyValue, StreamsConfig} import org.apache.kafka.streams.scala.kstream._ import org.apache.kafka.streams.integration.utils.{EmbeddedKafkaCluster, IntegrationTestUtils} import org.apache.kafka.clients.consumer.ConsumerConfig import org.apache.kafka.clients.producer.ProducerConfig import org.apache.kafka.common.utils.{MockTime, Utils} -import ImplicitConversions._ import org.apache.kafka.common.serialization.{LongDeserializer, StringDeserializer, StringSerializer} import org.apache.kafka.test.TestUtils import org.junit.jupiter.api.Tag diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/utils/StreamToTableJoinScalaIntegrationTestBase.scala b/streams/integration-tests/src/test/scala/org/apache/kafka/streams/integration/utils/StreamToTableJoinScalaIntegrationTestBase.scala similarity index 97% rename from streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/utils/StreamToTableJoinScalaIntegrationTestBase.scala rename to streams/integration-tests/src/test/scala/org/apache/kafka/streams/integration/utils/StreamToTableJoinScalaIntegrationTestBase.scala index 984cb74a6e2..f3aec5784cd 100644 --- a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/utils/StreamToTableJoinScalaIntegrationTestBase.scala +++ b/streams/integration-tests/src/test/scala/org/apache/kafka/streams/integration/utils/StreamToTableJoinScalaIntegrationTestBase.scala @@ -14,19 +14,18 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.kafka.streams.scala.utils +package org.apache.kafka.streams.integration.utils -import java.util.Properties import org.apache.kafka.clients.consumer.ConsumerConfig import org.apache.kafka.clients.producer.ProducerConfig import org.apache.kafka.common.serialization._ import org.apache.kafka.common.utils.{MockTime, Utils} import org.apache.kafka.streams._ -import org.apache.kafka.streams.integration.utils.{EmbeddedKafkaCluster, IntegrationTestUtils} import org.apache.kafka.test.TestUtils import org.junit.jupiter.api._ import java.io.File +import java.util.Properties /** * Test suite base that prepares Kafka cluster for stream-table joins in Kafka Streams diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/utils/StreamToTableJoinTestData.scala b/streams/integration-tests/src/test/scala/org/apache/kafka/streams/integration/utils/StreamToTableJoinTestData.scala similarity index 97% rename from streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/utils/StreamToTableJoinTestData.scala rename to streams/integration-tests/src/test/scala/org/apache/kafka/streams/integration/utils/StreamToTableJoinTestData.scala index 29d06953e75..4e8a2f024a7 100644 --- a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/utils/StreamToTableJoinTestData.scala +++ b/streams/integration-tests/src/test/scala/org/apache/kafka/streams/integration/utils/StreamToTableJoinTestData.scala @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.kafka.streams.scala.utils +package org.apache.kafka.streams.integration.utils import org.apache.kafka.streams.KeyValue diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java index 519fa9617d8..efe2f26e295 100644 --- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java @@ -97,9 +97,9 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import static java.util.Collections.singletonList; -import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName; -import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForApplicationState; import static org.apache.kafka.streams.state.QueryableStoreTypes.keyValueStore; +import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName; +import static org.apache.kafka.streams.utils.TestUtils.waitForApplicationState; import static org.apache.kafka.test.TestUtils.waitForCondition; import static org.hamcrest.CoreMatchers.hasItem; import static org.hamcrest.CoreMatchers.not; diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/SuppressSuite.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/SuppressSuite.java index 2933c42841e..bead622f2c5 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/SuppressSuite.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/SuppressSuite.java @@ -16,8 +16,6 @@ */ package org.apache.kafka.streams.kstream.internals.suppress; -import org.apache.kafka.streams.integration.SuppressionDurabilityIntegrationTest; -import org.apache.kafka.streams.integration.SuppressionIntegrationTest; import org.apache.kafka.streams.kstream.SuppressedTest; import org.apache.kafka.streams.kstream.internals.FullChangeSerdeTest; import org.apache.kafka.streams.kstream.internals.SuppressScenarioTest; @@ -48,8 +46,6 @@ import org.junit.platform.suite.api.Suite; InMemoryTimeOrderedKeyValueChangeBufferTest.class, TimeOrderedKeyValueBufferTest.class, FullChangeSerdeTest.class, - SuppressionIntegrationTest.class, - SuppressionDurabilityIntegrationTest.class }) public class SuppressSuite { } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskSuite.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskSuite.java index 52294138ac1..172321fab11 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskSuite.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskSuite.java @@ -17,7 +17,6 @@ package org.apache.kafka.streams.processor.internals; -import org.apache.kafka.streams.integration.StandbyTaskCreationIntegrationTest; import org.apache.kafka.streams.processor.internals.assignment.LegacyStickyTaskAssignorTest; import org.apache.kafka.streams.processor.internals.metrics.TaskMetricsTest; @@ -39,7 +38,6 @@ import org.junit.platform.suite.api.Suite; TaskMetricsTest.class, LegacyStickyTaskAssignorTest.class, StreamsPartitionAssignorTest.class, - StandbyTaskCreationIntegrationTest.class, }) public class TaskSuite { } diff --git a/streams/src/test/java/org/apache/kafka/streams/utils/TestUtils.java b/streams/src/test/java/org/apache/kafka/streams/utils/TestUtils.java new file mode 100644 index 00000000000..1a8f8d896dd --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/utils/TestUtils.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.utils; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.streams.KafkaStreams; + +import org.junit.jupiter.api.TestInfo; + +import java.lang.reflect.Method; +import java.time.Duration; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import static org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout; +import static org.hamcrest.MatcherAssert.assertThat; + +public class TestUtils { + /** + * Waits for the given {@link KafkaStreams} instances to all be in a specific {@link KafkaStreams.State}. + * This method uses polling, which can be more error prone and slightly slower. + * + * @param streamsList the list of streams instances to run. + * @param state the expected state that all the streams to be in within timeout + * @param timeout the time to wait for the streams to all be in the specific state. + * + * @throws InterruptedException if the streams doesn't change to the expected state in time. + */ + public static void waitForApplicationState(final List streamsList, + final KafkaStreams.State state, + final Duration timeout) throws InterruptedException { + retryOnExceptionWithTimeout(timeout.toMillis(), () -> { + final Map streamsToStates = streamsList + .stream() + .collect(Collectors.toMap(stream -> stream, KafkaStreams::state)); + + final Map wrongStateMap = streamsToStates.entrySet() + .stream() + .filter(entry -> entry.getValue() != state) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + + final String reason = String.format( + "Expected all streams instances in %s to be %s within %d ms, but the following were not: %s", + streamsList, + state, + timeout.toMillis(), + wrongStateMap + ); + assertThat(reason, wrongStateMap.isEmpty()); + }); + } + + public static String safeUniqueTestName(final TestInfo testInfo) { + final String methodName = testInfo.getTestMethod().map(Method::getName).orElse("unknownMethodName"); + return safeUniqueTestName(methodName); + } + + private static String safeUniqueTestName(final String testName) { + return sanitize(testName + Uuid.randomUuid().toString()); + } + + private static String sanitize(final String str) { + return str + // The `-` is used in Streams' thread name as a separator and some tests rely on this. + .replace('-', '_') + .replace(':', '_') + .replace('.', '_') + .replace('[', '_') + .replace(']', '_') + .replace(' ', '_') + .replace('=', '_'); + } +}