diff --git a/build.gradle b/build.gradle index 0907f0506a4..0cd97d23186 100644 --- a/build.gradle +++ b/build.gradle @@ -2528,6 +2528,7 @@ project(':streams') { // testCompileOnly prevents streams from exporting a dependency on test-utils, which would cause a dependency cycle testCompileOnly project(':streams:test-utils') + testImplementation project(':metadata') testImplementation project(':clients').sourceSets.test.output testImplementation project(':server') testImplementation project(':core') diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml index f7d7ef798bd..487d49f0466 100644 --- a/checkstyle/import-control.xml +++ b/checkstyle/import-control.xml @@ -393,6 +393,8 @@ + + diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java index b92e2ad135a..293c5f5d286 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java @@ -150,7 +150,7 @@ public abstract class AbstractResetIntegrationTest { protected static final int STREAMS_CONSUMER_TIMEOUT = 2000; protected static final int CLEANUP_CONSUMER_TIMEOUT = 2000; - protected static final int TIMEOUT_MULTIPLIER = 15; + protected static final int TIMEOUT_MULTIPLIER = 30; void prepareTest(final TestInfo testInfo) throws Exception { final String appID = IntegrationTestUtils.safeUniqueTestName(testInfo); @@ -159,7 +159,7 @@ public abstract class AbstractResetIntegrationTest { waitForEmptyConsumerGroup(adminClient, appID, TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT); - cluster.deleteAllTopicsAndWait(120000); + cluster.deleteAllTopics(); cluster.createTopics(INPUT_TOPIC, OUTPUT_TOPIC, OUTPUT_TOPIC_2, OUTPUT_TOPIC_2_RERUN); add10InputElements(); @@ -199,7 +199,7 @@ public abstract class AbstractResetIntegrationTest { // RUN streams = new KafkaStreams(setupTopologyWithIntermediateTopic(true, OUTPUT_TOPIC_2), streamsConfig); - streams.start(); + IntegrationTestUtils.startApplicationAndWaitUntilRunning(streams); IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig, OUTPUT_TOPIC, 10); streams.close(); @@ -272,7 +272,7 @@ public abstract class AbstractResetIntegrationTest { // RUN streams = new KafkaStreams(setupTopologyWithIntermediateTopic(useRepartitioned, OUTPUT_TOPIC_2), streamsConfig); - streams.start(); + IntegrationTestUtils.startApplicationAndWaitUntilRunning(streams); final List> result = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig, OUTPUT_TOPIC, 10); // receive only first values to make sure intermediate user topic is not consumed completely // => required to test "seekToEnd" for intermediate topics @@ -301,7 +301,7 @@ public abstract class AbstractResetIntegrationTest { assertInternalTopicsGotDeleted(useRepartitioned ? null : INTERMEDIATE_USER_TOPIC); // RE-RUN - streams.start(); + IntegrationTestUtils.startApplicationAndWaitUntilRunning(streams); final List> resultRerun = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig, OUTPUT_TOPIC, 10); final List> resultRerun2 = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig, OUTPUT_TOPIC_2_RERUN, 40); streams.close(); @@ -323,7 +323,7 @@ public abstract class AbstractResetIntegrationTest { cleanGlobal(!useRepartitioned, null, null, appID); if (!useRepartitioned) { - cluster.deleteTopicAndWait(INTERMEDIATE_USER_TOPIC); + cluster.deleteTopic(INTERMEDIATE_USER_TOPIC); } } @@ -420,7 +420,6 @@ public abstract class AbstractResetIntegrationTest { } protected void assertInternalTopicsGotDeleted(final String additionalExistingTopic) throws Exception { - // do not use list topics request, but read from the embedded cluster's zookeeper path directly to confirm if (additionalExistingTopic != null) { cluster.waitForRemainingTopics(30000, INPUT_TOPIC, OUTPUT_TOPIC, OUTPUT_TOPIC_2, OUTPUT_TOPIC_2_RERUN, Topic.GROUP_METADATA_TOPIC_NAME, additionalExistingTopic); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java index 4feaa2c6283..916940ce9bf 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java @@ -107,7 +107,7 @@ public class AdjustStreamThreadCountTest { builder = new StreamsBuilder(); builder.stream(inputTopic); - properties = mkObjectProperties( + properties = mkObjectProperties( mkMap( mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()), mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, appId), diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java index b5651e889ef..2b924e03727 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java @@ -162,8 +162,7 @@ public class EosIntegrationTest { @BeforeEach public void createTopics() throws Exception { applicationId = "appId-" + TEST_NUMBER.getAndIncrement(); - CLUSTER.deleteTopicsAndWait( - 60_000L, + CLUSTER.deleteTopics( SINGLE_PARTITION_INPUT_TOPIC, MULTI_PARTITION_INPUT_TOPIC, SINGLE_PARTITION_THROUGH_TOPIC, MULTI_PARTITION_THROUGH_TOPIC, SINGLE_PARTITION_OUTPUT_TOPIC, MULTI_PARTITION_OUTPUT_TOPIC); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/HighAvailabilityTaskAssignorIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/HighAvailabilityTaskAssignorIntegrationTest.java index 52a3e839c21..b50cf4ad62c 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/HighAvailabilityTaskAssignorIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/HighAvailabilityTaskAssignorIntegrationTest.java @@ -67,7 +67,6 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantLock; import java.util.function.Function; -import static java.util.Arrays.asList; import static org.apache.kafka.common.utils.Utils.mkEntry; import static org.apache.kafka.common.utils.Utils.mkMap; import static org.apache.kafka.common.utils.Utils.mkObjectProperties; @@ -81,19 +80,11 @@ import static org.hamcrest.Matchers.is; @Tag("integration") public class HighAvailabilityTaskAssignorIntegrationTest { public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(3, - new Properties(), - asList( - new Properties() {{ - setProperty(ServerConfigs.BROKER_RACK_CONFIG, AssignmentTestUtils.RACK_0); - }}, - new Properties() {{ - setProperty(ServerConfigs.BROKER_RACK_CONFIG, AssignmentTestUtils.RACK_1); - }}, - new Properties() {{ - setProperty(ServerConfigs.BROKER_RACK_CONFIG, AssignmentTestUtils.RACK_2); - }} - ) - ); + new Properties(), mkMap( + mkEntry(0, mkMap(mkEntry(ServerConfigs.BROKER_RACK_CONFIG, AssignmentTestUtils.RACK_0))), + mkEntry(1, mkMap(mkEntry(ServerConfigs.BROKER_RACK_CONFIG, AssignmentTestUtils.RACK_1))), + mkEntry(2, mkMap(mkEntry(ServerConfigs.BROKER_RACK_CONFIG, AssignmentTestUtils.RACK_2))) + )); @BeforeAll public static void startCluster() throws IOException { @@ -258,7 +249,7 @@ public class HighAvailabilityTaskAssignorIntegrationTest { restoreCompleteLatch.await(); // We should finalize the restoration without having restored any records (because they're already in - // the store. Otherwise, we failed to properly re-use the state from the standby. + // the store). Otherwise, we failed to properly re-use the state from the standby. assertThat(instance1TotalRestored.get(), is(0L)); // Belt-and-suspenders check that we never even attempt to restore any records. assertThat(instance1NumRestored.get(), is(-1L)); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java index 425f1eb207f..be31768aa4a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java @@ -379,7 +379,7 @@ public class IQv2StoreIntegrationTest { throws InterruptedException, IOException, ExecutionException, TimeoutException { CLUSTER.start(); - CLUSTER.deleteAllTopicsAndWait(60 * 1000L); + CLUSTER.deleteAllTopics(); final int partitions = 2; CLUSTER.createTopic(INPUT_TOPIC_NAME, partitions, 1); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/JoinGracePeriodDurabilityIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/JoinGracePeriodDurabilityIntegrationTest.java index 1ac11194f5f..61cbca6e3d2 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/JoinGracePeriodDurabilityIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/JoinGracePeriodDurabilityIntegrationTest.java @@ -49,6 +49,7 @@ import org.junit.jupiter.params.provider.ValueSource; import java.io.IOException; import java.time.Duration; +import java.time.Instant; import java.util.List; import java.util.Optional; import java.util.Properties; @@ -70,12 +71,8 @@ import static org.hamcrest.MatcherAssert.assertThat; @Timeout(600) public class JoinGracePeriodDurabilityIntegrationTest { - public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster( - 3, - mkProperties(mkMap()), - 0L - ); - + public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(3); + private static final long NOW = Instant.now().toEpochMilli(); @BeforeAll public static void startCluster() throws IOException { CLUSTER.start(); @@ -218,7 +215,7 @@ public class JoinGracePeriodDurabilityIntegrationTest { * just to exercise that everything works properly in the presence of commits. */ private long scaledTime(final long unscaledTime) { - return COMMIT_INTERVAL * 2 * unscaledTime; + return NOW + COMMIT_INTERVAL * 2 * unscaledTime; } private static void produceSynchronouslyToPartitionZero(final String topic, final List> toProduce) { diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/JoinStoreIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/JoinStoreIntegrationTest.java index c71ac31672a..1041f323ae1 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/JoinStoreIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/JoinStoreIntegrationTest.java @@ -97,7 +97,7 @@ public class JoinStoreIntegrationTest { @AfterEach public void cleanup() throws InterruptedException, IOException { - CLUSTER.deleteAllTopicsAndWait(120000); + CLUSTER.deleteAllTopics(); IntegrationTestUtils.purgeLocalStreamsState(STREAMS_CONFIG); } diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/JoinWithIncompleteMetadataIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/JoinWithIncompleteMetadataIntegrationTest.java index 58b00f81d53..c3059011cbf 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/JoinWithIncompleteMetadataIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/JoinWithIncompleteMetadataIntegrationTest.java @@ -84,7 +84,7 @@ public class JoinWithIncompleteMetadataIntegrationTest { @AfterEach public void cleanup() throws InterruptedException, IOException { - CLUSTER.deleteAllTopicsAndWait(120000); + CLUSTER.deleteAllTopics(); IntegrationTestUtils.purgeLocalStreamsState(STREAMS_CONFIG); } diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KTableSourceTopicRestartIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KTableSourceTopicRestartIntegrationTest.java index 35677aeaef2..5f742d95b3c 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/KTableSourceTopicRestartIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/KTableSourceTopicRestartIntegrationTest.java @@ -120,8 +120,7 @@ public class KTableSourceTopicRestartIntegrationTest { @Test public void shouldRestoreAndProgressWhenTopicWrittenToDuringRestorationWithEosDisabled() throws Exception { try { - streams = new KafkaStreams(streamsBuilder.build(), STREAMS_CONFIG); - streams.start(); + streams = IntegrationTestUtils.getRunningStreams(STREAMS_CONFIG, streamsBuilder, false); produceKeyValues("a", "b", "c"); @@ -131,7 +130,7 @@ public class KTableSourceTopicRestartIntegrationTest { streams = new KafkaStreams(streamsBuilder.build(), STREAMS_CONFIG); // the state restore listener will append one record to the log streams.setGlobalStateRestoreListener(new UpdatingSourceTopicOnRestoreStartStateRestoreListener()); - streams.start(); + IntegrationTestUtils.startApplicationAndWaitUntilRunning(streams); produceKeyValues("f", "g", "h"); @@ -149,8 +148,7 @@ public class KTableSourceTopicRestartIntegrationTest { STREAMS_CONFIG.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2); try { - streams = new KafkaStreams(streamsBuilder.build(), STREAMS_CONFIG); - streams.start(); + streams = IntegrationTestUtils.getRunningStreams(STREAMS_CONFIG, streamsBuilder, false); produceKeyValues("a", "b", "c"); @@ -160,7 +158,7 @@ public class KTableSourceTopicRestartIntegrationTest { streams = new KafkaStreams(streamsBuilder.build(), STREAMS_CONFIG); // the state restore listener will append one record to the log streams.setGlobalStateRestoreListener(new UpdatingSourceTopicOnRestoreStartStateRestoreListener()); - streams.start(); + IntegrationTestUtils.startApplicationAndWaitUntilRunning(streams); produceKeyValues("f", "g", "h"); @@ -176,16 +174,14 @@ public class KTableSourceTopicRestartIntegrationTest { @Test public void shouldRestoreAndProgressWhenTopicNotWrittenToDuringRestoration() throws Exception { try { - streams = new KafkaStreams(streamsBuilder.build(), STREAMS_CONFIG); - streams.start(); + streams = IntegrationTestUtils.getStartedStreams(STREAMS_CONFIG, streamsBuilder, false); produceKeyValues("a", "b", "c"); assertNumberValuesRead(readKeyValues, expectedInitialResultsMap, "Table did not read all values"); streams.close(); - streams = new KafkaStreams(streamsBuilder.build(), STREAMS_CONFIG); - streams.start(); + streams = IntegrationTestUtils.getRunningStreams(STREAMS_CONFIG, streamsBuilder, false); produceKeyValues("f", "g", "h"); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsCloseOptionsIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsCloseOptionsIntegrationTest.java index d7ce484ba55..bcbc36f3152 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsCloseOptionsIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsCloseOptionsIntegrationTest.java @@ -135,7 +135,7 @@ public class KafkaStreamsCloseOptionsIntegrationTest { adminClient = Admin.create(commonClientConfig); } - CLUSTER.deleteAllTopicsAndWait(120_000L); + CLUSTER.deleteAllTopics(); CLUSTER.createTopic(INPUT_TOPIC, 2, 1); CLUSTER.createTopic(OUTPUT_TOPIC, 2, 1); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java index 4e341b07234..5f0fd659db7 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java @@ -97,7 +97,7 @@ import static org.hamcrest.MatcherAssert.assertThat; @Timeout(600) @Tag("integration") public class NamedTopologyIntegrationTest { - public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1); + public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(3); private static final String TOPOLOGY_1 = "topology-1"; private static final String TOPOLOGY_2 = "topology-2"; @@ -243,14 +243,14 @@ public class NamedTopologyIntegrationTest { CLUSTER.getAllTopicsInCluster().stream().filter(t -> t.contains("-changelog") || t.contains("-repartition")).forEach(t -> { try { assertThat("topic was not decorated", t.contains(TOPIC_PREFIX)); - CLUSTER.deleteTopicsAndWait(t); - } catch (final InterruptedException e) { + CLUSTER.deleteTopics(t); + } catch (final RuntimeException e) { e.printStackTrace(); } }); - CLUSTER.deleteTopicsAndWait(OUTPUT_STREAM_1, OUTPUT_STREAM_2, OUTPUT_STREAM_3); - CLUSTER.deleteTopicsAndWait(SUM_OUTPUT, COUNT_OUTPUT); + CLUSTER.deleteTopics(OUTPUT_STREAM_1, OUTPUT_STREAM_2, OUTPUT_STREAM_3); + CLUSTER.deleteTopics(SUM_OUTPUT, COUNT_OUTPUT); } @Test @@ -518,8 +518,8 @@ public class NamedTopologyIntegrationTest { CLUSTER.getAllTopicsInCluster().stream().filter(t -> t.contains("-changelog")).forEach(t -> { try { - CLUSTER.deleteTopicAndWait(t); - } catch (final InterruptedException e) { + CLUSTER.deleteTopic(t); + } catch (final RuntimeException e) { e.printStackTrace(); } }); @@ -570,7 +570,7 @@ public class NamedTopologyIntegrationTest { assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, COUNT_OUTPUT, 5), equalTo(COUNT_OUTPUT_DATA)); assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, SUM_OUTPUT, 5), equalTo(SUM_OUTPUT_DATA)); } finally { - CLUSTER.deleteTopicsAndWait(SUM_OUTPUT, COUNT_OUTPUT); + CLUSTER.deleteTopics(SUM_OUTPUT, COUNT_OUTPUT); CLUSTER.deleteTopics(DELAYED_INPUT_STREAM_1); } } @@ -624,8 +624,8 @@ public class NamedTopologyIntegrationTest { CLUSTER.getAllTopicsInCluster().stream().filter(t -> t.contains("changelog")).forEach(t -> { try { - CLUSTER.deleteTopicAndWait(t); - } catch (final InterruptedException e) { + CLUSTER.deleteTopic(t); + } catch (final RuntimeException e) { e.printStackTrace(); } }); @@ -640,7 +640,7 @@ public class NamedTopologyIntegrationTest { assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, COUNT_OUTPUT, 5), equalTo(COUNT_OUTPUT_DATA)); assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, SUM_OUTPUT, 5), equalTo(SUM_OUTPUT_DATA)); } finally { - CLUSTER.deleteTopicsAndWait(SUM_OUTPUT, COUNT_OUTPUT); + CLUSTER.deleteTopics(SUM_OUTPUT, COUNT_OUTPUT); } } @@ -662,8 +662,8 @@ public class NamedTopologyIntegrationTest { CLUSTER.getAllTopicsInCluster().stream().filter(t -> t.contains("-changelog") || t.contains("-repartition")).forEach(t -> { try { - CLUSTER.deleteTopicsAndWait(t); - } catch (final InterruptedException e) { + CLUSTER.deleteTopics(t); + } catch (final RuntimeException e) { e.printStackTrace(); } }); @@ -678,7 +678,7 @@ public class NamedTopologyIntegrationTest { assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, COUNT_OUTPUT, 5), equalTo(COUNT_OUTPUT_DATA)); assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, SUM_OUTPUT, 5), equalTo(SUM_OUTPUT_DATA)); - CLUSTER.deleteTopicsAndWait(SUM_OUTPUT, COUNT_OUTPUT); + CLUSTER.deleteTopics(SUM_OUTPUT, COUNT_OUTPUT); } /** diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/PositionRestartIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/PositionRestartIntegrationTest.java index 044acfac8b0..bc21a2a8b6a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/PositionRestartIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/PositionRestartIntegrationTest.java @@ -102,7 +102,7 @@ import static org.hamcrest.Matchers.is; public class PositionRestartIntegrationTest { private static final Logger LOG = LoggerFactory.getLogger(PositionRestartIntegrationTest.class); private static final long SEED = new Random().nextLong(); - private static final int NUM_BROKERS = 1; + private static final int NUM_BROKERS = 3; public static final Duration WINDOW_SIZE = Duration.ofMinutes(5); private static int port = 0; private static final String INPUT_TOPIC_NAME = "input-topic"; @@ -274,7 +274,7 @@ public class PositionRestartIntegrationTest { throws InterruptedException, IOException, ExecutionException, TimeoutException { CLUSTER.start(); - CLUSTER.deleteAllTopicsAndWait(60 * 1000L); + CLUSTER.deleteAllTopics(); final int partitions = 2; CLUSTER.createTopic(INPUT_TOPIC_NAME, partitions, 1); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java index f16c3e57c39..2f84b0b4f4a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java @@ -242,7 +242,7 @@ public class QueryableStateIntegrationTest { kafkaStreams.close(ofSeconds(30)); } IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration); - CLUSTER.deleteAllTopicsAndWait(0L); + CLUSTER.deleteAllTopics(); } /** diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RangeQueryIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RangeQueryIntegrationTest.java index 477024dce53..6842c7718e1 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/RangeQueryIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/RangeQueryIntegrationTest.java @@ -153,7 +153,7 @@ public class RangeQueryIntegrationTest { @AfterEach public void cleanup() throws InterruptedException { - CLUSTER.deleteAllTopicsAndWait(120000); + CLUSTER.deleteAllTopics(); } @ParameterizedTest diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java index ae953020520..7fff8099a16 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java @@ -196,7 +196,7 @@ public class RegexSourceIntegrationTest { streams.close(); } finally { - CLUSTER.deleteTopicsAndWait("TEST-TOPIC-1", "TEST-TOPIC-2"); + CLUSTER.deleteTopics("TEST-TOPIC-1", "TEST-TOPIC-2"); } } @@ -248,7 +248,7 @@ public class RegexSourceIntegrationTest { streams.close(); } finally { - CLUSTER.deleteTopicsAndWait(topic1, topic2); + CLUSTER.deleteTopics(topic1, topic2); } } @@ -290,7 +290,7 @@ public class RegexSourceIntegrationTest { streams.start(); TestUtils.waitForCondition(() -> assignedTopics.equals(expectedFirstAssignment), STREAM_TASKS_NOT_UPDATED); } finally { - CLUSTER.deleteTopicAndWait("TEST-TOPIC-A"); + CLUSTER.deleteTopic("TEST-TOPIC-A"); } TestUtils.waitForCondition(() -> assignedTopics.equals(expectedSecondAssignment), STREAM_TASKS_NOT_UPDATED); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java index 1c8599a3b19..13856c58dc9 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java @@ -68,7 +68,7 @@ public class ResetIntegrationTest extends AbstractResetIntegrationTest { // expiration of connections by the brokers to avoid errors when `AdminClient` sends requests after potentially // very long sleep times brokerProps.put(SocketServerConfigs.CONNECTIONS_MAX_IDLE_MS_CONFIG, -1L); - CLUSTER = new EmbeddedKafkaCluster(1, brokerProps); + CLUSTER = new EmbeddedKafkaCluster(3, brokerProps); } @BeforeAll @@ -98,7 +98,7 @@ public class ResetIntegrationTest extends AbstractResetIntegrationTest { } @Test - public void shouldNotAllowToResetWhileStreamsIsRunning(final TestInfo testInfo) { + public void shouldNotAllowToResetWhileStreamsIsRunning(final TestInfo testInfo) throws Exception { final String appID = IntegrationTestUtils.safeUniqueTestName(testInfo); final String[] parameters = new String[] { "--application-id", appID, @@ -113,7 +113,7 @@ public class ResetIntegrationTest extends AbstractResetIntegrationTest { // RUN streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfig); - streams.start(); + IntegrationTestUtils.startApplicationAndWaitUntilRunning(streams); final int exitCode = new StreamsResetter().execute(parameters, cleanUpConfig); assertEquals(1, exitCode); @@ -193,7 +193,8 @@ public class ResetIntegrationTest extends AbstractResetIntegrationTest { // Run streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfig); - streams.start(); + IntegrationTestUtils.startApplicationAndWaitUntilRunning(streams); + final List> result = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig, OUTPUT_TOPIC, 10); streams.close(); @@ -213,7 +214,7 @@ public class ResetIntegrationTest extends AbstractResetIntegrationTest { assertInternalTopicsGotDeleted(null); // RE-RUN - streams.start(); + IntegrationTestUtils.startApplicationAndWaitUntilRunning(streams); final List> resultRerun = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig, OUTPUT_TOPIC, 10); streams.close(); @@ -228,7 +229,8 @@ public class ResetIntegrationTest extends AbstractResetIntegrationTest { // RUN streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfig); - streams.start(); + IntegrationTestUtils.startApplicationAndWaitUntilRunning(streams); + final List> result = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig, OUTPUT_TOPIC, 10); streams.close(); @@ -251,7 +253,7 @@ public class ResetIntegrationTest extends AbstractResetIntegrationTest { resetFile.deleteOnExit(); // RE-RUN - streams.start(); + IntegrationTestUtils.startApplicationAndWaitUntilRunning(streams); final List> resultRerun = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig, OUTPUT_TOPIC, 5); streams.close(); @@ -269,7 +271,8 @@ public class ResetIntegrationTest extends AbstractResetIntegrationTest { // RUN streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfig); - streams.start(); + IntegrationTestUtils.startApplicationAndWaitUntilRunning(streams); + final List> result = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig, OUTPUT_TOPIC, 10); streams.close(); @@ -297,7 +300,8 @@ public class ResetIntegrationTest extends AbstractResetIntegrationTest { resetFile.deleteOnExit(); // RE-RUN - streams.start(); + IntegrationTestUtils.startApplicationAndWaitUntilRunning(streams); + final List> resultRerun = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig, OUTPUT_TOPIC, 10); streams.close(); @@ -314,7 +318,8 @@ public class ResetIntegrationTest extends AbstractResetIntegrationTest { // RUN streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfig); - streams.start(); + IntegrationTestUtils.startApplicationAndWaitUntilRunning(streams); + final List> result = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig, OUTPUT_TOPIC, 10); streams.close(); @@ -337,7 +342,8 @@ public class ResetIntegrationTest extends AbstractResetIntegrationTest { resetFile.deleteOnExit(); // RE-RUN - streams.start(); + IntegrationTestUtils.startApplicationAndWaitUntilRunning(streams); + final List> resultRerun = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig, OUTPUT_TOPIC, 10); streams.close(); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationWithSslTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationWithSslTest.java index 65a99a0b1b5..7f47233361e 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationWithSslTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationWithSslTest.java @@ -33,8 +33,6 @@ import java.io.IOException; import java.util.Map; import java.util.Properties; -import static org.apache.kafka.server.config.ReplicationConfigs.INTER_BROKER_LISTENER_NAME_CONFIG; - /** * Tests command line SSL setup for reset tool. */ @@ -54,9 +52,7 @@ public class ResetIntegrationWithSslTest extends AbstractResetIntegrationTest { try { SSL_CONFIG = TestSslUtils.createSslConfig(false, true, ConnectionMode.SERVER, TestUtils.tempFile(), "testCert"); - - brokerProps.put(SocketServerConfigs.LISTENERS_CONFIG, "SSL://localhost:0"); - brokerProps.put(INTER_BROKER_LISTENER_NAME_CONFIG, "SSL"); + brokerProps.put(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG, "EXTERNAL:SSL,CONTROLLER:SSL,INTERNAL:SSL"); brokerProps.putAll(SSL_CONFIG); } catch (final Exception e) { throw new RuntimeException(e); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/ResetPartitionTimeIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/ResetPartitionTimeIntegrationTest.java index a0cedcb2840..cff0d74fcad 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/ResetPartitionTimeIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/ResetPartitionTimeIntegrationTest.java @@ -44,6 +44,7 @@ import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; import java.io.IOException; +import java.time.Instant; import java.util.Collections; import java.util.List; import java.util.Optional; @@ -64,13 +65,15 @@ import static org.hamcrest.MatcherAssert.assertThat; public class ResetPartitionTimeIntegrationTest { private static final int NUM_BROKERS = 1; private static final Properties BROKER_CONFIG; + private static final long NOW = Instant.now().toEpochMilli(); + static { BROKER_CONFIG = new Properties(); BROKER_CONFIG.put("transaction.state.log.replication.factor", (short) 1); BROKER_CONFIG.put("transaction.state.log.min.isr", 1); } public static final EmbeddedKafkaCluster CLUSTER = - new EmbeddedKafkaCluster(NUM_BROKERS, BROKER_CONFIG, 0L); + new EmbeddedKafkaCluster(NUM_BROKERS, BROKER_CONFIG); @BeforeAll public static void startCluster() throws IOException { @@ -117,13 +120,13 @@ public class ResetPartitionTimeIntegrationTest { produceSynchronouslyToPartitionZero( input, Collections.singletonList( - new KeyValueTimestamp<>("k3", "v3", 5000) + new KeyValueTimestamp<>("k3", "v3", NOW + 5000) ) ); verifyOutput( outputRaw, Collections.singletonList( - new KeyValueTimestamp<>("k3", "v3", 5000) + new KeyValueTimestamp<>("k3", "v3", NOW + 5000) ) ); assertThat(lastRecordedTimestamp, is(-1L)); @@ -138,16 +141,16 @@ public class ResetPartitionTimeIntegrationTest { produceSynchronouslyToPartitionZero( input, Collections.singletonList( - new KeyValueTimestamp<>("k5", "v5", 4999) + new KeyValueTimestamp<>("k5", "v5", NOW + 4999) ) ); verifyOutput( outputRaw, Collections.singletonList( - new KeyValueTimestamp<>("k5", "v5", 4999) + new KeyValueTimestamp<>("k5", "v5", NOW + 4999) ) ); - assertThat(lastRecordedTimestamp, is(5000L)); + assertThat(lastRecordedTimestamp, is(NOW + 5000L)); } finally { kafkaStreams.close(); quietlyCleanStateAfterTest(CLUSTER, kafkaStreams); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java index 636c9c52f05..5fe61eed66e 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java @@ -117,7 +117,7 @@ public class RestoreIntegrationTest { private static final Duration RESTORATION_DELAY = Duration.ofMillis(2000); - private static final int NUM_BROKERS = 1; + private static final int NUM_BROKERS = 2; public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RocksDBMetricsIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RocksDBMetricsIntegrationTest.java index 5fd98bbd7d6..22ac5e5408e 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/RocksDBMetricsIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/RocksDBMetricsIntegrationTest.java @@ -135,7 +135,7 @@ public class RocksDBMetricsIntegrationTest { @AfterEach public void after() throws Exception { - CLUSTER.deleteTopicsAndWait(STREAM_INPUT_ONE, STREAM_INPUT_TWO, STREAM_OUTPUT_ONE, STREAM_OUTPUT_TWO); + CLUSTER.deleteTopics(STREAM_INPUT_ONE, STREAM_INPUT_TWO, STREAM_OUTPUT_ONE, STREAM_OUTPUT_TWO); } @FunctionalInterface diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java index f910043d1e0..1c3bd11957e 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java @@ -104,7 +104,7 @@ public class StandbyTaskEOSIntegrationTest { inputTopic = "input-" + safeTestName; outputTopic = "output-" + safeTestName; storeName = "store-" + safeTestName; - CLUSTER.deleteTopicsAndWait(inputTopic, outputTopic, appId + "-KSTREAM-AGGREGATE-STATE-STORE-0000000001-changelog"); + CLUSTER.deleteTopics(inputTopic, outputTopic, appId + "-KSTREAM-AGGREGATE-STATE-STORE-0000000001-changelog"); CLUSTER.createTopic(inputTopic, 1, 3); CLUSTER.createTopic(outputTopic, 1, 3); } diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSMultiRebalanceIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSMultiRebalanceIntegrationTest.java index 66f5b5f0209..e4d355cdabe 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSMultiRebalanceIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSMultiRebalanceIntegrationTest.java @@ -18,9 +18,11 @@ package org.apache.kafka.streams.integration; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.IntegerDeserializer; import org.apache.kafka.common.serialization.IntegerSerializer; import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StoreQueryParameters; @@ -99,7 +101,7 @@ public class StandbyTaskEOSMultiRebalanceIntegrationTest { storeName = "store-" + safeTestName; counterName = "counter-" + safeTestName; - CLUSTER.deleteTopicsAndWait(inputTopic, outputTopic); + CLUSTER.deleteTopics(inputTopic, outputTopic); CLUSTER.createTopic(inputTopic, partitionCount, 3); CLUSTER.createTopic(outputTopic, partitionCount, 3); } @@ -141,7 +143,7 @@ public class StandbyTaskEOSMultiRebalanceIntegrationTest { CLUSTER.bootstrapServers(), IntegerSerializer.class, IntegerSerializer.class, - new Properties() + Utils.mkProperties(Collections.singletonMap(ProducerConfig.ACKS_CONFIG, "all")) ), 10L + time ); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/StateDirectoryIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/StateDirectoryIntegrationTest.java index df4359228c6..3c3a3721748 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/StateDirectoryIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/StateDirectoryIntegrationTest.java @@ -171,7 +171,7 @@ public class StateDirectoryIntegrationTest { ).findFirst().isPresent() ); } finally { - CLUSTER.deleteAllTopicsAndWait(0L); + CLUSTER.deleteAllTopics(); } } @@ -271,7 +271,7 @@ public class StateDirectoryIntegrationTest { assertTrue((new File(stateDir)).exists()); // Root state store exists assertTrue(appDir.exists()); // Application state store exists } finally { - CLUSTER.deleteAllTopicsAndWait(0L); + CLUSTER.deleteAllTopics(); } } } diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java index 476c1857422..9ab5c7d574c 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java @@ -54,6 +54,7 @@ import org.junit.jupiter.api.Timeout; import java.io.IOException; import java.time.Duration; +import java.time.Instant; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -80,8 +81,9 @@ import static org.junit.jupiter.api.Assertions.fail; @Tag("integration") @Timeout(600) public class StreamsUncaughtExceptionHandlerIntegrationTest { + private static final long NOW = Instant.now().toEpochMilli(); - public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1, new Properties(), Collections.emptyList(), 0L, 0L); + public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1); @BeforeAll public static void startCluster() throws IOException { @@ -146,7 +148,7 @@ public class StreamsUncaughtExceptionHandlerIntegrationTest { kafkaStreams.setUncaughtExceptionHandler((t, e) -> counter.incrementAndGet()); startApplicationAndWaitUntilRunning(kafkaStreams); - produceMessages(0L, inputTopic, "A"); + produceMessages(NOW, inputTopic, "A"); // should call the UncaughtExceptionHandler in current thread TestUtils.waitForCondition(() -> counter.get() == 1, "Handler was called 1st time"); @@ -168,7 +170,7 @@ public class StreamsUncaughtExceptionHandlerIntegrationTest { startApplicationAndWaitUntilRunning(kafkaStreams); - produceMessages(0L, inputTopic, "A"); + produceMessages(NOW, inputTopic, "A"); waitForApplicationState(Collections.singletonList(kafkaStreams), KafkaStreams.State.ERROR, DEFAULT_DURATION); assertThat(processorValueCollector.size(), equalTo(1)); @@ -252,7 +254,7 @@ public class StreamsUncaughtExceptionHandlerIntegrationTest { startApplicationAndWaitUntilRunning(kafkaStreams); - produceMessages(0L, inputTopic2, "A"); + produceMessages(NOW, inputTopic2, "A"); waitForApplicationState(Collections.singletonList(kafkaStreams), KafkaStreams.State.ERROR, DEFAULT_DURATION); assertThat(processorValueCollector.size(), equalTo(1)); @@ -297,7 +299,7 @@ public class StreamsUncaughtExceptionHandlerIntegrationTest { IntegerSerializer.class, StringSerializer.class, new Properties()), - 0L); + NOW); IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp( inputTopic2, @@ -310,7 +312,7 @@ public class StreamsUncaughtExceptionHandlerIntegrationTest { IntegerSerializer.class, StringSerializer.class, new Properties()), - 0L); + NOW); IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived( TestUtils.consumerConfig( @@ -365,7 +367,7 @@ public class StreamsUncaughtExceptionHandlerIntegrationTest { startApplicationAndWaitUntilRunning(asList(kafkaStreams1, kafkaStreams2)); - produceMessages(0L, inputTopic, "A"); + produceMessages(NOW, inputTopic, "A"); waitForApplicationState(asList(kafkaStreams1, kafkaStreams2), KafkaStreams.State.ERROR, DEFAULT_DURATION); assertThat(processorValueCollector.size(), equalTo(1)); @@ -386,7 +388,7 @@ public class StreamsUncaughtExceptionHandlerIntegrationTest { }); startApplicationAndWaitUntilRunning(kafkaStreams); - produceMessages(0L, inputTopic, "A"); + produceMessages(NOW, inputTopic, "A"); TestUtils.waitForCondition(() -> count.get() == numThreads, "finished replacing threads"); TestUtils.waitForCondition(() -> throwError.get(), "finished replacing threads"); kafkaStreams.close(); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionDurabilityIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionDurabilityIntegrationTest.java index 9215a0095e1..47d37f76d8c 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionDurabilityIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionDurabilityIntegrationTest.java @@ -55,11 +55,10 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; -import java.util.HashSet; +import java.time.Instant; import java.util.List; import java.util.Optional; import java.util.Properties; -import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; @@ -82,12 +81,8 @@ import static org.hamcrest.Matchers.equalTo; @Tag("integration") @Timeout(600) public class SuppressionDurabilityIntegrationTest { - - public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster( - 3, - mkProperties(mkMap()), - 0L - ); + private static final long NOW = Instant.now().toEpochMilli(); + public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(3); @BeforeAll public static void startCluster() throws IOException { @@ -173,11 +168,11 @@ public class SuppressionDurabilityIntegrationTest { ); verifyOutput( outputRaw, - new HashSet<>(asList( + asList( new KeyValueTimestamp<>("k1", 1L, scaledTime(1L)), new KeyValueTimestamp<>("k2", 1L, scaledTime(2L)), new KeyValueTimestamp<>("k3", 1L, scaledTime(3L)) - )) + ) ); assertThat(eventCount.get(), is(0)); @@ -191,10 +186,10 @@ public class SuppressionDurabilityIntegrationTest { ); verifyOutput( outputRaw, - new HashSet<>(asList( + asList( new KeyValueTimestamp<>("k4", 1L, scaledTime(4L)), new KeyValueTimestamp<>("k5", 1L, scaledTime(5L)) - )) + ) ); assertThat(eventCount.get(), is(2)); verifyOutput( @@ -225,11 +220,11 @@ public class SuppressionDurabilityIntegrationTest { ); verifyOutput( outputRaw, - new HashSet<>(asList( + asList( new KeyValueTimestamp<>("k6", 1L, scaledTime(6L)), new KeyValueTimestamp<>("k7", 1L, scaledTime(7L)), new KeyValueTimestamp<>("k8", 1L, scaledTime(8L)) - )) + ) ); assertThat("suppress has apparently produced some duplicates. There should only be 5 output events.", eventCount.get(), is(5)); @@ -303,24 +298,12 @@ public class SuppressionDurabilityIntegrationTest { IntegrationTestUtils.verifyKeyValueTimestamps(properties, topic, keyValueTimestamps); } - private void verifyOutput(final String topic, final Set> keyValueTimestamps) { - final Properties properties = mkProperties( - mkMap( - mkEntry(ConsumerConfig.GROUP_ID_CONFIG, "test-group"), - mkEntry(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()), - mkEntry(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ((Deserializer) STRING_DESERIALIZER).getClass().getName()), - mkEntry(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ((Deserializer) LONG_DESERIALIZER).getClass().getName()) - ) - ); - IntegrationTestUtils.verifyKeyValueTimestamps(properties, topic, keyValueTimestamps); - } - /** * scaling to ensure that there are commits in between the various test events, * just to exercise that everything works properly in the presence of commits. */ private long scaledTime(final long unscaledTime) { - return COMMIT_INTERVAL * 2 * unscaledTime; + return NOW + COMMIT_INTERVAL * 2 * unscaledTime; } private static void produceSynchronouslyToPartitionZero(final String topic, final List> toProduce) { diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionIntegrationTest.java index 9b9722e8539..3d7c141129f 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionIntegrationTest.java @@ -52,6 +52,7 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; import java.io.IOException; +import java.time.Instant; import java.util.Collections; import java.util.List; import java.util.Locale; @@ -83,12 +84,8 @@ import static org.hamcrest.Matchers.empty; @Tag("integration") @Timeout(600) public class SuppressionIntegrationTest { - - public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster( - 1, - mkProperties(mkMap()), - 0L - ); + private static final long NOW = Instant.now().toEpochMilli(); + public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1); @BeforeAll public static void startCluster() throws IOException { @@ -525,7 +522,7 @@ public class SuppressionIntegrationTest { * just to exercise that everything works properly in the presence of commits. */ private static long scaledTime(final long unscaledTime) { - return COMMIT_INTERVAL * 2 * unscaledTime; + return NOW + COMMIT_INTERVAL * 2 * unscaledTime; } private static void produceSynchronously(final String topic, final List> toProduce) { diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/TaskMetadataIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/TaskMetadataIntegrationTest.java index 0238dcae621..9f025ee414a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/TaskMetadataIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/TaskMetadataIntegrationTest.java @@ -61,7 +61,7 @@ import static org.hamcrest.MatcherAssert.assertThat; @Timeout(600) public class TaskMetadataIntegrationTest { - public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1, new Properties(), Collections.emptyList(), 0L, 0L); + public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1, new Properties(), Collections.emptyMap(), 0L, 0L); @BeforeAll public static void startCluster() throws IOException { diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java index f4f03b98330..16334714168 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java @@ -16,19 +16,36 @@ */ package org.apache.kafka.streams.integration.utils; -import kafka.server.KafkaServer; -import kafka.zk.EmbeddedZookeeper; +import kafka.testkit.KafkaClusterTestKit; +import kafka.testkit.TestKitNodes; +import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.admin.Admin; -import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.clients.admin.Config; +import org.apache.kafka.clients.admin.ConfigEntry; +import org.apache.kafka.clients.admin.ListTopicsOptions; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.config.ConfigResource; +import org.apache.kafka.common.config.SslConfigs; +import org.apache.kafka.common.config.types.Password; +import org.apache.kafka.common.errors.InvalidReplicationFactorException; +import org.apache.kafka.common.errors.TopicExistsException; import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; +import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.apache.kafka.common.utils.Utils; import org.apache.kafka.coordinator.group.GroupCoordinatorConfig; import org.apache.kafka.coordinator.transaction.TransactionLogConfig; import org.apache.kafka.network.SocketServerConfigs; -import org.apache.kafka.server.config.ConfigType; import org.apache.kafka.server.config.ServerConfigs; import org.apache.kafka.server.config.ServerLogConfigs; -import org.apache.kafka.server.config.ZkConfigs; import org.apache.kafka.server.util.MockTime; import org.apache.kafka.storage.internals.log.CleanerConfig; import org.apache.kafka.test.TestCondition; @@ -37,115 +54,143 @@ import org.apache.kafka.test.TestUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; +import java.time.Duration; +import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; -import java.util.List; import java.util.Map; import java.util.Properties; import java.util.Set; +import java.util.UUID; import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicReference; + +import static org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_OFFSET_RESET_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG; +import static org.apache.kafka.common.utils.Utils.mkProperties; /** - * Runs an in-memory, "embedded" Kafka cluster with 1 ZooKeeper instance and supplied number of Kafka brokers. + * Setup an embedded Kafka KRaft cluster for integration tests (using {@link kafka.testkit.KafkaClusterTestKit} internally) with the + * specified number of brokers and the specified broker properties. + * Additional Kafka client properties can also be supplied if required. + * This class also provides various utility methods to easily create Kafka topics, produce data, consume data etc. */ public class EmbeddedKafkaCluster { private static final Logger log = LoggerFactory.getLogger(EmbeddedKafkaCluster.class); - private static final int DEFAULT_BROKER_PORT = 0; // 0 results in a random port being selected - private static final int TOPIC_CREATION_TIMEOUT = 30000; - private static final int TOPIC_DELETION_TIMEOUT = 30000; - private EmbeddedZookeeper zookeeper = null; - private final KafkaEmbedded[] brokers; - + private final KafkaClusterTestKit cluster; private final Properties brokerConfig; - private final List brokerConfigOverrides; public final MockTime time; - public EmbeddedKafkaCluster(final int numBrokers) { this(numBrokers, new Properties()); } - public EmbeddedKafkaCluster(final int numBrokers, - final Properties brokerConfig) { - this(numBrokers, brokerConfig, System.currentTimeMillis()); + public EmbeddedKafkaCluster(final int numBrokers, final Properties brokerConfig) { + this(numBrokers, brokerConfig, Collections.emptyMap()); } public EmbeddedKafkaCluster(final int numBrokers, final Properties brokerConfig, final long mockTimeMillisStart) { - this(numBrokers, brokerConfig, Collections.emptyList(), mockTimeMillisStart); + this(numBrokers, brokerConfig, Collections.emptyMap(), mockTimeMillisStart, System.nanoTime()); } - public EmbeddedKafkaCluster(final int numBrokers, final Properties brokerConfig, - final List brokerConfigOverrides) { - this(numBrokers, brokerConfig, brokerConfigOverrides, System.currentTimeMillis()); + final Map> brokerConfigOverrides) { + this(numBrokers, brokerConfig, brokerConfigOverrides, System.currentTimeMillis(), System.nanoTime()); } - public EmbeddedKafkaCluster(final int numBrokers, final Properties brokerConfig, - final List brokerConfigOverrides, - final long mockTimeMillisStart) { - this(numBrokers, brokerConfig, brokerConfigOverrides, mockTimeMillisStart, System.nanoTime()); - } - - public EmbeddedKafkaCluster(final int numBrokers, - final Properties brokerConfig, - final List brokerConfigOverrides, + final Map> brokerConfigOverrides, final long mockTimeMillisStart, final long mockTimeNanoStart) { + addDefaultBrokerPropsIfAbsent(brokerConfig); + if (!brokerConfigOverrides.isEmpty() && brokerConfigOverrides.size() != numBrokers) { throw new IllegalArgumentException("Size of brokerConfigOverrides " + brokerConfigOverrides.size() - + " must match broker number " + numBrokers); + + " must match broker number " + numBrokers); + } + try { + final KafkaClusterTestKit.Builder clusterBuilder = new KafkaClusterTestKit.Builder( + new TestKitNodes.Builder() + .setCombined(true) + .setNumBrokerNodes(numBrokers) + .setPerServerProperties(brokerConfigOverrides) + // Reduce number of controllers for faster startup + // We may make this configurable in the future if there's a use case for it + .setNumControllerNodes(1) + .build() + ); + + brokerConfig.forEach((k, v) -> clusterBuilder.setConfigProp((String) k, v)); + cluster = clusterBuilder.build(); + cluster.nonFatalFaultHandler().setIgnore(true); + } catch (final Exception e) { + throw new KafkaException("Failed to create test Kafka cluster", e); } - brokers = new KafkaEmbedded[numBrokers]; this.brokerConfig = brokerConfig; - time = new MockTime(mockTimeMillisStart, mockTimeNanoStart); - this.brokerConfigOverrides = brokerConfigOverrides; + this.time = new MockTime(mockTimeMillisStart, mockTimeNanoStart); + } + + public void start() { + try { + cluster.format(); + cluster.startup(); + cluster.waitForReadyBrokers(); + } catch (final Exception e) { + throw new KafkaException("Failed to start test Kafka cluster", e); + } + + verifyClusterReadiness(); } /** - * Creates and starts a Kafka cluster. + * Perform an extended check to ensure that the primary APIs of the cluster are available, including: + *
    + *
  • Ability to create a topic
  • + *
  • Ability to produce to a topic
  • + *
  • Ability to form a consumer group
  • + *
  • Ability to consume from a topic
  • + *
+ * If this method completes successfully, all resources created to verify the cluster health + * (such as topics and consumer groups) will be cleaned up before it returns. + *

+ * This provides extra guarantees compared to other cluster readiness checks such as + * {@link KafkaClusterTestKit#waitForReadyBrokers()}, which verify that brokers have + * completed startup and joined the cluster, but do not verify that the internal consumer + * offsets topic has been created or that it's actually possible for users to create and + * interact with topics. */ - public void start() throws IOException { - log.debug("Initiating embedded Kafka cluster startup"); - log.debug("Starting a ZooKeeper instance"); - zookeeper = new EmbeddedZookeeper(); - log.debug("ZooKeeper instance is running at {}", zKConnectString()); + public void verifyClusterReadiness() { + final UUID uuid = UUID.randomUUID(); + final String consumerGroupId = "group-warmup-" + uuid; + final Map consumerConfig = Collections.singletonMap(GROUP_ID_CONFIG, consumerGroupId); + final String topic = "topic-warmup-" + uuid; - brokerConfig.put(ZkConfigs.ZK_CONNECT_CONFIG, zKConnectString()); - putIfAbsent(brokerConfig, SocketServerConfigs.LISTENERS_CONFIG, "PLAINTEXT://localhost:" + DEFAULT_BROKER_PORT); - putIfAbsent(brokerConfig, ServerConfigs.DELETE_TOPIC_ENABLE_CONFIG, true); - putIfAbsent(brokerConfig, CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_SIZE_PROP, 2 * 1024 * 1024L); - putIfAbsent(brokerConfig, GroupCoordinatorConfig.GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG, 0); - putIfAbsent(brokerConfig, GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, 0); - putIfAbsent(brokerConfig, GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, (short) 1); - putIfAbsent(brokerConfig, GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, 5); - putIfAbsent(brokerConfig, TransactionLogConfig.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG, 5); - putIfAbsent(brokerConfig, ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG, true); + createTopic(topic); + final Map producerProps = new HashMap<>(clientDefaultConfig()); + producerProps.put(ProducerConfig.CLIENT_ID_CONFIG, "warmup-producer"); + produce(producerProps, topic, null, "warmup message key", "warmup message value"); - for (int i = 0; i < brokers.length; i++) { - brokerConfig.put(ServerConfigs.BROKER_ID_CONFIG, i); - log.debug("Starting a Kafka instance on {} ...", brokerConfig.get(SocketServerConfigs.LISTENERS_CONFIG)); - - final Properties effectiveConfig = new Properties(); - effectiveConfig.putAll(brokerConfig); - if (brokerConfigOverrides != null && brokerConfigOverrides.size() > i) { - effectiveConfig.putAll(brokerConfigOverrides.get(i)); + try (Consumer consumer = createConsumerAndSubscribeTo(consumerConfig, topic)) { + final ConsumerRecords records = consumer.poll(Duration.ofMillis(TimeUnit.MINUTES.toMillis(2))); + if (records.isEmpty()) { + throw new AssertionError("Failed to verify availability of group coordinator and produce/consume APIs on Kafka cluster in time"); } - brokers[i] = new KafkaEmbedded(effectiveConfig, time); - - log.debug("Kafka instance is running at {}, connected to ZooKeeper at {}", - brokers[i].brokerList(), brokers[i].zookeeperConnect()); } - } - private void putIfAbsent(final Properties props, final String propertyKey, final Object propertyValue) { - if (!props.containsKey(propertyKey)) { - brokerConfig.put(propertyKey, propertyValue); + try (Admin admin = createAdminClient()) { + admin.deleteConsumerGroups(Collections.singleton(consumerGroupId)).all().get(30, TimeUnit.SECONDS); + admin.deleteTopics(Collections.singleton(topic)).all().get(30, TimeUnit.SECONDS); + } catch (final InterruptedException | ExecutionException | TimeoutException e) { + throw new AssertionError("Failed to clean up cluster health check resource(s)", e); } } @@ -153,46 +198,22 @@ public class EmbeddedKafkaCluster { * Stop the Kafka cluster. */ public void stop() { - if (brokers.length > 1) { - // delete the topics first to avoid cascading leader elections while shutting down the brokers - final Set topics = getAllTopicsInCluster(); - if (!topics.isEmpty()) { - try (final Admin adminClient = brokers[0].createAdminClient()) { - adminClient.deleteTopics(topics).all().get(); - } catch (final InterruptedException e) { - log.warn("Got interrupted while deleting topics in preparation for stopping embedded brokers", e); - throw new RuntimeException(e); - } catch (final ExecutionException | RuntimeException e) { - log.warn("Couldn't delete all topics before stopping brokers", e); - } - } + final AtomicReference shutdownFailure = new AtomicReference<>(); + Utils.closeQuietly(cluster, "embedded Kafka cluster", shutdownFailure); + if (shutdownFailure.get() != null) { + throw new KafkaException("Failed to shut down producer / embedded Kafka cluster", shutdownFailure.get()); } - for (final KafkaEmbedded broker : brokers) { - broker.stopAsync(); - } - for (final KafkaEmbedded broker : brokers) { - broker.awaitStoppedAndPurge(); - } - zookeeper.shutdown(); } - /** - * The ZooKeeper connection string aka `zookeeper.connect` in `hostnameOrIp:port` format. - * Example: `127.0.0.1:2181`. - *

- * You can use this to e.g. tell Kafka brokers how to connect to this instance. - */ - public String zKConnectString() { - return "127.0.0.1:" + zookeeper.port(); - } - - /** - * This cluster's `bootstrap.servers` value. Example: `127.0.0.1:9092`. - *

- * You can use this to tell Kafka producers how to connect to this cluster. - */ public String bootstrapServers() { - return brokers[0].brokerList(); + return cluster.bootstrapServers(); + } + + public boolean sslEnabled() { + final String listenerSecurityProtocolMap = brokerConfig.getProperty(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG); + if (listenerSecurityProtocolMap == null) + return false; + return listenerSecurityProtocolMap.contains(":SSL") || listenerSecurityProtocolMap.contains(":SASL_SSL"); } /** @@ -211,8 +232,18 @@ public class EmbeddedKafkaCluster { * * @param topic The name of the topic. */ - public void createTopic(final String topic) throws InterruptedException { - createTopic(topic, 1, 1, Collections.emptyMap()); + public void createTopic(final String topic) { + createTopic(topic, 1); + } + + /** + * Create a Kafka topic with given partition and a replication factor of 1. + * + * @param topic The name of the topic. + * @param partitions The number of partitions for this topic. + */ + public void createTopic(final String topic, final int partitions) { + createTopic(topic, partitions, 1, Collections.emptyMap()); } /** @@ -227,116 +258,177 @@ public class EmbeddedKafkaCluster { } /** - * Create a Kafka topic with the given parameters. + * Create a Kafka topic with given partition, replication factor, and topic config. * - * @param topic The name of the topic. + * @param topic The name of the topic. * @param partitions The number of partitions for this topic. * @param replication The replication factor for (partitions of) this topic. * @param topicConfig Additional topic-level configuration settings. */ - public void createTopic(final String topic, - final int partitions, - final int replication, - final Map topicConfig) throws InterruptedException { - brokers[0].createTopic(topic, partitions, replication, topicConfig); - final List topicPartitions = new ArrayList<>(); - for (int partition = 0; partition < partitions; partition++) { - topicPartitions.add(new TopicPartition(topic, partition)); + public void createTopic(final String topic, final int partitions, final int replication, final Map topicConfig) { + if (replication > cluster.brokers().size()) { + throw new InvalidReplicationFactorException("Insufficient brokers (" + + cluster.brokers().size() + ") for desired replication (" + replication + ")"); + } + + log.info("Creating topic { name: {}, partitions: {}, replication: {}, config: {} }", + topic, partitions, replication, topicConfig); + final NewTopic newTopic = new NewTopic(topic, partitions, (short) replication); + newTopic.configs(topicConfig); + + try (final Admin adminClient = createAdminClient()) { + adminClient.createTopics(Collections.singletonList(newTopic)).all().get(); + TestUtils.waitForCondition(() -> adminClient.listTopics().names().get().contains(topic), + "Wait for topic " + topic + " to get created."); + } catch (final TopicExistsException ignored) { + } catch (final InterruptedException | ExecutionException e) { + if (!(e.getCause() instanceof TopicExistsException)) { + throw new RuntimeException(e); + } } - IntegrationTestUtils.waitForTopicPartitions(brokers(), topicPartitions, TOPIC_CREATION_TIMEOUT); } - /** - * Deletes a topic returns immediately. - * - * @param topic the name of the topic - */ - public void deleteTopic(final String topic) throws InterruptedException { - deleteTopicsAndWait(-1L, topic); - } - - /** - * Deletes a topic and blocks for max 30 sec until the topic got deleted. - * - * @param topic the name of the topic - */ - public void deleteTopicAndWait(final String topic) throws InterruptedException { - deleteTopicsAndWait(TOPIC_DELETION_TIMEOUT, topic); - } - - /** - * Deletes multiple topics returns immediately. - * - * @param topics the name of the topics - */ - public void deleteTopics(final String... topics) throws InterruptedException { - deleteTopicsAndWait(-1, topics); - } - - /** - * Deletes multiple topics and blocks for max 30 sec until all topics got deleted. - * - * @param topics the name of the topics - */ - public void deleteTopicsAndWait(final String... topics) throws InterruptedException { - deleteTopicsAndWait(TOPIC_DELETION_TIMEOUT, topics); - } - - /** - * Deletes multiple topics and blocks until all topics got deleted. - * - * @param timeoutMs the max time to wait for the topics to be deleted (does not block if {@code <= 0}) - * @param topics the name of the topics - */ - public void deleteTopicsAndWait(final long timeoutMs, final String... topics) throws InterruptedException { + public void deleteTopics(final String... topics) { for (final String topic : topics) { - try { - brokers[0].deleteTopic(topic); - } catch (final UnknownTopicOrPartitionException ignored) { } + deleteTopic(topic); } + } - if (timeoutMs > 0) { - TestUtils.waitForCondition(new TopicsDeletedCondition(topics), timeoutMs, "Topics not deleted after " + timeoutMs + " milli seconds."); + + /** + * Delete a Kafka topic. + * + * @param topic the topic to delete; may not be null + */ + public void deleteTopic(final String topic) { + try (final Admin adminClient = createAdminClient()) { + adminClient.deleteTopics(Collections.singleton(topic)).all().get(); + } catch (final InterruptedException | ExecutionException e) { + if (!(e.getCause() instanceof UnknownTopicOrPartitionException)) { + throw new RuntimeException(e); + } } } /** - * Deletes all topics and blocks until all topics got deleted. - * - * @param timeoutMs the max time to wait for the topics to be deleted (does not block if {@code <= 0}) + * Delete all topics except internal topics. */ - public void deleteAllTopicsAndWait(final long timeoutMs) throws InterruptedException { - final Set topics = getAllTopicsInCluster(); - for (final String topic : topics) { - try { - brokers[0].deleteTopic(topic); - } catch (final UnknownTopicOrPartitionException ignored) { } + public void deleteAllTopics() { + try (final Admin adminClient = createAdminClient()) { + final Set topics = adminClient.listTopics().names().get(); + adminClient.deleteTopics(topics).all().get(); + } catch (final UnknownTopicOrPartitionException ignored) { + } catch (final ExecutionException | InterruptedException e) { + if (!(e.getCause() instanceof UnknownTopicOrPartitionException)) { + throw new RuntimeException(e); + } } + } - if (timeoutMs > 0) { - TestUtils.waitForCondition(new TopicsDeletedCondition(topics), timeoutMs, "Topics not deleted after " + timeoutMs + " milli seconds."); + /** + * Produce given key and value to topic partition. + * @param topic the topic to produce to; may not be null. + * @param partition the topic partition to produce to. + * @param key the record key. + * @param value the record value. + */ + public void produce(final Map producerProps, final String topic, final Integer partition, final String key, final String value) { + try (KafkaProducer producer = new KafkaProducer<>(producerProps, new ByteArraySerializer(), new ByteArraySerializer())) { + final ProducerRecord msg = new ProducerRecord<>(topic, partition, key == null ? null : key.getBytes(), value == null ? null : value.getBytes()); + try { + producer.send(msg).get(TimeUnit.SECONDS.toMillis(120), TimeUnit.MILLISECONDS); + producer.flush(); + } catch (final Exception e) { + throw new KafkaException("Could not produce message: " + msg, e); + } } } + public Admin createAdminClient() { + return Admin.create(mkProperties(clientDefaultConfig())); + } + + public Map clientDefaultConfig() { + final Map props = new HashMap<>(); + props.putIfAbsent(BOOTSTRAP_SERVERS_CONFIG, bootstrapServers()); + if (sslEnabled()) { + props.putIfAbsent(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, brokerConfig.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG).toString()); + props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, ((Password) brokerConfig.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG)).value()); + props.putIfAbsent(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL"); + } + return props; + } + + public KafkaConsumer createConsumer(final Map consumerProps) { + final Map props = new HashMap<>(clientDefaultConfig()); + props.putAll(consumerProps); + + props.putIfAbsent(GROUP_ID_CONFIG, UUID.randomUUID().toString()); + props.putIfAbsent(ENABLE_AUTO_COMMIT_CONFIG, "false"); + props.putIfAbsent(AUTO_OFFSET_RESET_CONFIG, "earliest"); + props.putIfAbsent(KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer"); + props.putIfAbsent(VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer"); + + final KafkaConsumer consumer; + try { + consumer = new KafkaConsumer<>(props); + } catch (final Throwable t) { + throw new KafkaException("Failed to create consumer", t); + } + return consumer; + } + + public KafkaConsumer createConsumerAndSubscribeTo(final Map consumerProps, final String... topics) { + return createConsumerAndSubscribeTo(consumerProps, null, topics); + } + + public KafkaConsumer createConsumerAndSubscribeTo(final Map consumerProps, final ConsumerRebalanceListener rebalanceListener, final String... topics) { + final KafkaConsumer consumer = createConsumer(consumerProps); + if (rebalanceListener != null) { + consumer.subscribe(Arrays.asList(topics), rebalanceListener); + } else { + consumer.subscribe(Arrays.asList(topics)); + } + return consumer; + } + + private void addDefaultBrokerPropsIfAbsent(final Properties brokerConfig) { + brokerConfig.putIfAbsent(CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_SIZE_PROP, 2 * 1024 * 1024L); + brokerConfig.putIfAbsent(GroupCoordinatorConfig.GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG, "0"); + brokerConfig.putIfAbsent(GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, "0"); + brokerConfig.putIfAbsent(GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, "5"); + brokerConfig.putIfAbsent(GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, "1"); + brokerConfig.putIfAbsent(TransactionLogConfig.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG, "5"); + brokerConfig.putIfAbsent(TransactionLogConfig.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG, "1"); + brokerConfig.putIfAbsent(ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG, true); + brokerConfig.putIfAbsent(ServerConfigs.DELETE_TOPIC_ENABLE_CONFIG, true); + } + public void waitForRemainingTopics(final long timeoutMs, final String... topics) throws InterruptedException { TestUtils.waitForCondition(new TopicsRemainingCondition(topics), timeoutMs, "Topics are not expected after " + timeoutMs + " milli seconds."); } - private final class TopicsDeletedCondition implements TestCondition { - final Set deletedTopics = new HashSet<>(); - - private TopicsDeletedCondition(final String... topics) { - Collections.addAll(deletedTopics, topics); + public Set getAllTopicsInCluster() { + try (final Admin adminClient = createAdminClient()) { + return adminClient.listTopics(new ListTopicsOptions().listInternal(true)).names().get(); + } catch (final InterruptedException | ExecutionException e) { + throw new RuntimeException(e); } + } - private TopicsDeletedCondition(final Collection topics) { - deletedTopics.addAll(topics); - } - - @Override - public boolean conditionMet() { - final Set allTopics = getAllTopicsInCluster(); - return !allTopics.removeAll(deletedTopics); + public Properties getLogConfig(final String topic) { + try (final Admin adminClient = createAdminClient()) { + final ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, topic); + final Config config = adminClient.describeConfigs(Collections.singleton(configResource)).values().get(configResource).get(); + final Properties properties = new Properties(); + for (final ConfigEntry configEntry : config.entries()) { + if (configEntry.source() == ConfigEntry.ConfigSource.DYNAMIC_TOPIC_CONFIG) { + properties.put(configEntry.name(), configEntry.value()); + } + } + return properties; + } catch (final InterruptedException | ExecutionException e) { + throw new RuntimeException(e); } } @@ -353,25 +445,4 @@ public class EmbeddedKafkaCluster { return allTopics.equals(remainingTopics); } } - - private List brokers() { - final List servers = new ArrayList<>(); - for (final KafkaEmbedded broker : brokers) { - servers.add(broker.kafkaServer()); - } - return servers; - } - - public Properties getLogConfig(final String topic) { - return brokers[0].kafkaServer().zkClient().getEntityConfigs(ConfigType.TOPIC, topic); - } - - public Set getAllTopicsInCluster() { - final scala.collection.Iterator topicsIterator = brokers[0].kafkaServer().zkClient().getAllTopicsInCluster(false).iterator(); - final Set topics = new HashSet<>(); - while (topicsIterator.hasNext()) { - topics.add(topicsIterator.next()); - } - return topics; - } } diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java index 007da4c0789..536e43b715a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java @@ -75,6 +75,7 @@ import java.nio.file.Paths; import java.time.Duration; import java.util.ArrayList; import java.util.Collection; +import java.util.Comparator; import java.util.HashMap; import java.util.Iterator; import java.util.LinkedList; @@ -104,7 +105,6 @@ import static org.apache.kafka.common.utils.Utils.sleep; import static org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout; import static org.apache.kafka.test.TestUtils.waitForCondition; import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.is; import static org.junit.jupiter.api.Assertions.fail; @@ -294,7 +294,7 @@ public class IntegrationTestUtils { final int replicationCount, final String... topics) { try { - cluster.deleteAllTopicsAndWait(DEFAULT_TIMEOUT); + cluster.deleteAllTopics(); for (final String topic : topics) { cluster.createTopic(topic, partitionCount, replicationCount); } @@ -306,9 +306,9 @@ public class IntegrationTestUtils { public static void quietlyCleanStateAfterTest(final EmbeddedKafkaCluster cluster, final KafkaStreams driver) { try { driver.cleanUp(); - cluster.deleteAllTopicsAndWait(DEFAULT_TIMEOUT); - } catch (final RuntimeException | InterruptedException e) { - LOG.warn("Ignoring failure to clean test state", e); + cluster.deleteAllTopics(); + } catch (final RuntimeException e) { + LOG.warn("Ignoring failure to clean test state"); } } @@ -1167,6 +1167,10 @@ public class IntegrationTestUtils { if (results.size() != expected.size()) { throw new AssertionError(printRecords(results) + " != " + expected); } + // sort expected and results by key before comparing them + expected.sort(Comparator.comparing(e -> e.key().toString())); + results.sort(Comparator.comparing(e -> e.key().toString())); + final Iterator> expectedIterator = expected.iterator(); for (final ConsumerRecord result : results) { final KeyValueTimestamp expected1 = expectedIterator.next(); @@ -1178,28 +1182,6 @@ public class IntegrationTestUtils { } } - public static void verifyKeyValueTimestamps(final Properties consumerConfig, - final String topic, - final Set> expected) { - final List> results; - try { - results = waitUntilMinRecordsReceived(consumerConfig, topic, expected.size()); - } catch (final Exception e) { - throw new RuntimeException(e); - } - - if (results.size() != expected.size()) { - throw new AssertionError(printRecords(results) + " != " + expected); - } - - final Set> actual = - results.stream() - .map(result -> new KeyValueTimestamp<>(result.key(), result.value(), result.timestamp())) - .collect(Collectors.toSet()); - - assertThat(actual, equalTo(expected)); - } - private static void compareKeyValueTimestamp(final ConsumerRecord record, final K expectedKey, final V expectedValue, diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java deleted file mode 100644 index 5937d287e0c..00000000000 --- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java +++ /dev/null @@ -1,223 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.streams.integration.utils; - -import kafka.cluster.EndPoint; -import kafka.server.KafkaConfig; -import kafka.server.KafkaServer; -import kafka.utils.TestUtils; - -import org.apache.kafka.clients.CommonClientConfigs; -import org.apache.kafka.clients.admin.Admin; -import org.apache.kafka.clients.admin.AdminClientConfig; -import org.apache.kafka.clients.admin.NewTopic; -import org.apache.kafka.common.config.SslConfigs; -import org.apache.kafka.common.config.types.Password; -import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; -import org.apache.kafka.common.utils.Utils; -import org.apache.kafka.network.SocketServerConfigs; -import org.apache.kafka.server.config.ServerConfigs; -import org.apache.kafka.server.config.ZkConfigs; -import org.apache.kafka.server.util.MockTime; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.File; -import java.io.IOException; -import java.util.Collections; -import java.util.Map; -import java.util.Properties; -import java.util.concurrent.ExecutionException; - -import static org.apache.kafka.server.config.ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG; -import static org.apache.kafka.server.config.ServerLogConfigs.LOG_DIR_CONFIG; -import static org.apache.kafka.server.config.ServerLogConfigs.NUM_PARTITIONS_CONFIG; - - -/** - * Runs an in-memory, "embedded" instance of a Kafka broker, which listens at `127.0.0.1:9092` by - * default. - *

- * Requires a running ZooKeeper instance to connect to. - */ -public class KafkaEmbedded { - - private static final Logger log = LoggerFactory.getLogger(KafkaEmbedded.class); - - private static final String DEFAULT_ZK_CONNECT = "127.0.0.1:2181"; - - private final Properties effectiveConfig; - private final File logDir; - private final File tmpFolder; - private final KafkaServer kafka; - - /** - * Creates and starts an embedded Kafka broker. - * - * @param config Broker configuration settings. Used to modify, for example, on which port the - * broker should listen to. Note that you cannot change the `log.dirs` setting - * currently. - */ - @SuppressWarnings({"WeakerAccess", "this-escape"}) - public KafkaEmbedded(final Properties config, final MockTime time) throws IOException { - tmpFolder = org.apache.kafka.test.TestUtils.tempDirectory(); - logDir = org.apache.kafka.test.TestUtils.tempDirectory(tmpFolder.toPath(), "log"); - effectiveConfig = effectiveConfigFrom(config); - final boolean loggingEnabled = true; - final KafkaConfig kafkaConfig = new KafkaConfig(effectiveConfig, loggingEnabled); - log.debug("Starting embedded Kafka broker (with log.dirs={} and ZK ensemble at {}) ...", - logDir, zookeeperConnect()); - kafka = TestUtils.createServer(kafkaConfig, time); - log.debug("Startup of embedded Kafka broker at {} completed (with ZK ensemble at {}) ...", - brokerList(), zookeeperConnect()); - } - - /** - * Creates the configuration for starting the Kafka broker by merging default values with - * overwrites. - * - * @param initialConfig Broker configuration settings that override the default config. - */ - private Properties effectiveConfigFrom(final Properties initialConfig) { - final Properties effectiveConfig = new Properties(); - effectiveConfig.put(ServerConfigs.BROKER_ID_CONFIG, 0); - effectiveConfig.put(NUM_PARTITIONS_CONFIG, 1); - effectiveConfig.put(AUTO_CREATE_TOPICS_ENABLE_CONFIG, true); - effectiveConfig.put(ServerConfigs.MESSAGE_MAX_BYTES_CONFIG, 1000000); - effectiveConfig.put(ServerConfigs.CONTROLLED_SHUTDOWN_ENABLE_CONFIG, true); - effectiveConfig.put(ZkConfigs.ZK_SESSION_TIMEOUT_MS_CONFIG, 10000); - - effectiveConfig.putAll(initialConfig); - effectiveConfig.setProperty(LOG_DIR_CONFIG, logDir.getAbsolutePath()); - return effectiveConfig; - } - - /** - * This broker's `metadata.broker.list` value. Example: `localhost:9092`. - *

- * You can use this to tell Kafka producers and consumers how to connect to this instance. - */ - @SuppressWarnings("WeakerAccess") - public String brokerList() { - final EndPoint endPoint = kafka.advertisedListeners().head(); - return endPoint.host() + ":" + endPoint.port(); - } - - - /** - * The ZooKeeper connection string aka `zookeeper.connect`. - */ - @SuppressWarnings("WeakerAccess") - public String zookeeperConnect() { - return effectiveConfig.getProperty("zookeeper.connect", DEFAULT_ZK_CONNECT); - } - - @SuppressWarnings("WeakerAccess") - public void stopAsync() { - log.debug("Shutting down embedded Kafka broker at {} (with ZK ensemble at {}) ...", - brokerList(), zookeeperConnect()); - kafka.shutdown(); - } - - @SuppressWarnings("WeakerAccess") - public void awaitStoppedAndPurge() { - kafka.awaitShutdown(); - log.debug("Removing log dir at {} ...", logDir); - try { - Utils.delete(tmpFolder); - } catch (final IOException e) { - throw new RuntimeException(e); - } - log.debug("Shutdown of embedded Kafka broker at {} completed (with ZK ensemble at {}) ...", - brokerList(), zookeeperConnect()); - } - - /** - * Create a Kafka topic with 1 partition and a replication factor of 1. - * - * @param topic The name of the topic. - */ - public void createTopic(final String topic) { - createTopic(topic, 1, 1, Collections.emptyMap()); - } - - /** - * Create a Kafka topic with the given parameters. - * - * @param topic The name of the topic. - * @param partitions The number of partitions for this topic. - * @param replication The replication factor for (the partitions of) this topic. - */ - public void createTopic(final String topic, final int partitions, final int replication) { - createTopic(topic, partitions, replication, Collections.emptyMap()); - } - - /** - * Create a Kafka topic with the given parameters. - * - * @param topic The name of the topic. - * @param partitions The number of partitions for this topic. - * @param replication The replication factor for (partitions of) this topic. - * @param topicConfig Additional topic-level configuration settings. - */ - public void createTopic(final String topic, - final int partitions, - final int replication, - final Map topicConfig) { - log.debug("Creating topic { name: {}, partitions: {}, replication: {}, config: {} }", - topic, partitions, replication, topicConfig); - final NewTopic newTopic = new NewTopic(topic, partitions, (short) replication); - newTopic.configs(topicConfig); - - try (final Admin adminClient = createAdminClient()) { - adminClient.createTopics(Collections.singletonList(newTopic)).all().get(); - } catch (final InterruptedException | ExecutionException e) { - throw new RuntimeException(e); - } - } - - @SuppressWarnings("WeakerAccess") - public Admin createAdminClient() { - final Properties adminClientConfig = new Properties(); - adminClientConfig.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList()); - final Object listeners = effectiveConfig.get(SocketServerConfigs.LISTENERS_CONFIG); - if (listeners != null && listeners.toString().contains("SSL")) { - adminClientConfig.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, effectiveConfig.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG)); - adminClientConfig.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, ((Password) effectiveConfig.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG)).value()); - adminClientConfig.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL"); - } - return Admin.create(adminClientConfig); - } - - @SuppressWarnings("WeakerAccess") - public void deleteTopic(final String topic) { - log.debug("Deleting topic { name: {} }", topic); - try (final Admin adminClient = createAdminClient()) { - adminClient.deleteTopics(Collections.singletonList(topic)).all().get(); - } catch (final InterruptedException | ExecutionException e) { - if (!(e.getCause() instanceof UnknownTopicOrPartitionException)) { - throw new RuntimeException(e); - } - } - } - - @SuppressWarnings("WeakerAccess") - public KafkaServer kafkaServer() { - return kafka; - } -} diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/HandlingSourceTopicDeletionIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/HandlingSourceTopicDeletionIntegrationTest.java index 56218c72900..1aeaa45d92c 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/HandlingSourceTopicDeletionIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/HandlingSourceTopicDeletionIntegrationTest.java @@ -114,7 +114,7 @@ public class HandlingSourceTopicDeletionIntegrationTest { () -> "Kafka Streams clients did not reach state RUNNING" ); - CLUSTER.deleteTopicAndWait(INPUT_TOPIC); + CLUSTER.deleteTopic(INPUT_TOPIC); TestUtils.waitForCondition( () -> kafkaStreams1.state() == State.ERROR && kafkaStreams2.state() == State.ERROR,