diff --git a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/IQv2IntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/IQv2IntegrationTest.java index dcd711a35c5..20fc7f47236 100644 --- a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/IQv2IntegrationTest.java +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/IQv2IntegrationTest.java @@ -57,11 +57,12 @@ import org.apache.kafka.test.TestUtils; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Tag; -import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInfo; import org.junit.jupiter.api.Timeout; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; import java.io.IOException; import java.lang.reflect.Field; @@ -76,6 +77,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.stream.Stream; import static java.util.Collections.singleton; import static org.apache.kafka.streams.query.StateQueryRequest.inStore; @@ -99,6 +101,7 @@ public class IQv2IntegrationTest { public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS); private KafkaStreams kafkaStreams; + private String groupProtocol; @BeforeAll public static void before() @@ -149,8 +152,8 @@ public class IQv2IntegrationTest { )); } - @BeforeEach - public void beforeTest(final TestInfo testInfo) { + private void setup(final String groupProtocol, final TestInfo testInfo) { + this.groupProtocol = groupProtocol; final StreamsBuilder builder = new StreamsBuilder(); builder.table( @@ -159,7 +162,6 @@ public class IQv2IntegrationTest { Materialized.as(STORE_NAME) ); - final String safeTestName = safeUniqueTestName(testInfo); kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration(safeTestName)); kafkaStreams.cleanUp(); @@ -167,8 +169,10 @@ public class IQv2IntegrationTest { @AfterEach public void afterTest() { - kafkaStreams.close(Duration.ofSeconds(60)); - kafkaStreams.cleanUp(); + if (kafkaStreams != null) { + kafkaStreams.close(Duration.ofSeconds(60)); + kafkaStreams.cleanUp(); + } } @AfterAll @@ -176,8 +180,10 @@ public class IQv2IntegrationTest { CLUSTER.stop(); } - @Test - public void shouldFailUnknownStore() { + @ParameterizedTest(name = "{1}") + @MethodSource("groupProtocolParameters") + public void shouldFailUnknownStore(final String groupProtocol, final String testName, final TestInfo testInfo) { + setup(groupProtocol, testInfo); final KeyQuery> query = KeyQuery.withKey(1); final StateQueryRequest> request = inStore("unknown-store").withQuery(query); @@ -185,8 +191,10 @@ public class IQv2IntegrationTest { assertThrows(UnknownStateStoreException.class, () -> kafkaStreams.query(request)); } - @Test - public void shouldFailNotStarted() { + @ParameterizedTest(name = "{1}") + @MethodSource("groupProtocolParameters") + public void shouldFailNotStarted(final String groupProtocol, final String testName, final TestInfo testInfo) { + setup(groupProtocol, testInfo); final KeyQuery> query = KeyQuery.withKey(1); final StateQueryRequest> request = inStore(STORE_NAME).withQuery(query); @@ -194,8 +202,10 @@ public class IQv2IntegrationTest { assertThrows(StreamsNotStartedException.class, () -> kafkaStreams.query(request)); } - @Test - public void shouldFailStopped() { + @ParameterizedTest(name = "{1}") + @MethodSource("groupProtocolParameters") + public void shouldFailStopped(final String groupProtocol, final String testName, final TestInfo testInfo) { + setup(groupProtocol, testInfo); final KeyQuery> query = KeyQuery.withKey(1); final StateQueryRequest> request = inStore(STORE_NAME).withQuery(query); @@ -205,9 +215,11 @@ public class IQv2IntegrationTest { assertThrows(StreamsStoppedException.class, () -> kafkaStreams.query(request)); } - @Test - public void shouldRejectNonRunningActive() + @ParameterizedTest(name = "{1}") + @MethodSource("groupProtocolParameters") + public void shouldRejectNonRunningActive(final String groupProtocol, final String testName, final TestInfo testInfo) throws NoSuchFieldException, IllegalAccessException { + setup(groupProtocol, testInfo); final KeyQuery> query = KeyQuery.withKey(1); final StateQueryRequest> request = inStore(STORE_NAME).withQuery(query).requireActive(); @@ -261,8 +273,10 @@ public class IQv2IntegrationTest { } } - @Test - public void shouldFetchFromPartition() { + @ParameterizedTest(name = "{1}") + @MethodSource("groupProtocolParameters") + public void shouldFetchFromPartition(final String groupProtocol, final String testName, final TestInfo testInfo) { + setup(groupProtocol, testInfo); final KeyQuery> query = KeyQuery.withKey(1); final int partition = 1; final Set partitions = singleton(partition); @@ -276,8 +290,10 @@ public class IQv2IntegrationTest { assertThat(result.getPartitionResults().keySet(), equalTo(partitions)); } - @Test - public void shouldFetchExplicitlyFromAllPartitions() { + @ParameterizedTest(name = "{1}") + @MethodSource("groupProtocolParameters") + public void shouldFetchExplicitlyFromAllPartitions(final String groupProtocol, final String testName, final TestInfo testInfo) { + setup(groupProtocol, testInfo); final KeyQuery> query = KeyQuery.withKey(1); final Set partitions = Set.of(0, 1); final StateQueryRequest> request = @@ -290,8 +306,10 @@ public class IQv2IntegrationTest { assertThat(result.getPartitionResults().keySet(), equalTo(partitions)); } - @Test - public void shouldNotRequireQueryHandler(final TestInfo testInfo) { + @ParameterizedTest(name = "{1}") + @MethodSource("groupProtocolParameters") + public void shouldNotRequireQueryHandler(final String groupProtocol, final String testName, final TestInfo testInfo) { + setup(groupProtocol, testInfo); final KeyQuery> query = KeyQuery.withKey(1); final int partition = 1; final Set partitions = singleton(partition); @@ -423,8 +441,11 @@ public class IQv2IntegrationTest { ); // Discard the basic streams and replace with test-specific topology - kafkaStreams.close(); + if (kafkaStreams != null) { + kafkaStreams.close(); + } final String safeTestName = safeUniqueTestName(testInfo); + this.groupProtocol = groupProtocol; kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration(safeTestName)); kafkaStreams.cleanUp(); @@ -446,7 +467,7 @@ public class IQv2IntegrationTest { private Properties streamsConfiguration(final String safeTestName) { final Properties config = new Properties(); config.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.OPTIMIZE); - config.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + safeTestName); + config.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + safeTestName + "-" + groupProtocol); config.put(StreamsConfig.APPLICATION_SERVER_CONFIG, "localhost:" + (++port)); config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); config.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); @@ -458,6 +479,14 @@ public class IQv2IntegrationTest { config.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 1000); config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L); config.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1); + config.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, groupProtocol); return config; } + + private static Stream groupProtocolParameters() { + return Stream.of( + Arguments.of("classic", "CLASSIC protocol"), + Arguments.of("streams", "STREAMS protocol") + ); + } } diff --git a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java index 963183fd665..649799c7976 100644 --- a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java @@ -360,7 +360,9 @@ public class IQv2StoreIntegrationTest { for (final boolean logEnabled : Arrays.asList(true, false)) { for (final StoresToTest toTest : StoresToTest.values()) { for (final String kind : Arrays.asList("DSL", "PAPI")) { - values.add(Arguments.of(cacheEnabled, logEnabled, toTest.name(), kind)); + for (final String groupProtocol : Arrays.asList("classic", "streams")) { + values.add(Arguments.of(cacheEnabled, logEnabled, toTest.name(), kind, groupProtocol)); + } } } } @@ -426,13 +428,14 @@ public class IQv2StoreIntegrationTest { )); } - public void setup(final boolean cache, final boolean log, final StoresToTest storeToTest, final String kind) { + public void setup(final boolean cache, final boolean log, final StoresToTest storeToTest, final String kind, final String groupProtocol) { final StoreSupplier supplier = storeToTest.supplier(); final Properties streamsConfig = streamsConfiguration( cache, log, storeToTest.name(), - kind + kind, + groupProtocol ); final StreamsBuilder builder = new StreamsBuilder(); @@ -765,8 +768,8 @@ public class IQv2StoreIntegrationTest { @ParameterizedTest @MethodSource("data") - public void verifyStore(final boolean cache, final boolean log, final StoresToTest storeToTest, final String kind) { - setup(cache, log, storeToTest, kind); + public void verifyStore(final boolean cache, final boolean log, final StoresToTest storeToTest, final String kind, final String groupProtocol) { + setup(cache, log, storeToTest, kind, groupProtocol); try { if (storeToTest.global()) { // See KAFKA-13523 @@ -2030,10 +2033,10 @@ public class IQv2StoreIntegrationTest { } private static Properties streamsConfiguration(final boolean cache, final boolean log, - final String supplier, final String kind) { + final String supplier, final String kind, final String groupProtocol) { final String safeTestName = IQv2StoreIntegrationTest.class.getName() + "-" + cache + "-" + log + "-" + supplier - + "-" + kind + "-" + RANDOM.nextInt(); + + "-" + kind + "-" + groupProtocol + "-" + RANDOM.nextInt(); final Properties config = new Properties(); config.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.OPTIMIZE); config.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + safeTestName); @@ -2048,6 +2051,7 @@ public class IQv2StoreIntegrationTest { config.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 1000); config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L); config.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1); + config.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, groupProtocol); return config; } } \ No newline at end of file diff --git a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/IQv2VersionedStoreIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/IQv2VersionedStoreIntegrationTest.java index 26a5aed5a52..bb1f3c20a87 100644 --- a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/IQv2VersionedStoreIntegrationTest.java +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/IQv2VersionedStoreIntegrationTest.java @@ -47,7 +47,10 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Tag; -import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInfo; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; import java.time.Duration; import java.time.Instant; @@ -57,7 +60,9 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Optional; import java.util.Properties; +import java.util.stream.Stream; +import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName; import static org.hamcrest.CoreMatchers.nullValue; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.empty; @@ -83,16 +88,25 @@ public class IQv2VersionedStoreIntegrationTest { private static final Long[] RECORD_TIMESTAMPS = {BASE_TIMESTAMP_LONG, BASE_TIMESTAMP_LONG + 10, BASE_TIMESTAMP_LONG + 20, BASE_TIMESTAMP_LONG + 30}; private static final int RECORD_NUMBER = RECORD_VALUES.length; private static final int LAST_INDEX = RECORD_NUMBER - 1; - private static final Position INPUT_POSITION = Position.emptyPosition(); + private Position inputPosition; public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS, Utils.mkProperties(Collections.singletonMap("auto.create.topics.enable", "true"))); private KafkaStreams kafkaStreams; + private String groupProtocol; @BeforeAll - public static void before() throws Exception { + public static void beforeAll() throws Exception { CLUSTER.start(); - + } + + @BeforeEach + public void beforeEach() throws Exception { + // Delete and recreate the topic to ensure clean state for each test + CLUSTER.deleteTopic(INPUT_TOPIC_NAME); + CLUSTER.createTopic(INPUT_TOPIC_NAME, 1, 1); + + // Set up fresh test data final Properties producerProps = new Properties(); producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class); @@ -103,19 +117,21 @@ public class IQv2VersionedStoreIntegrationTest { producer.send(new ProducerRecord<>(INPUT_TOPIC_NAME, 0, RECORD_TIMESTAMPS[2], RECORD_KEY, RECORD_VALUES[2])).get(); producer.send(new ProducerRecord<>(INPUT_TOPIC_NAME, 0, RECORD_TIMESTAMPS[3], RECORD_KEY, RECORD_VALUES[3])).get(); } - INPUT_POSITION.withComponent(INPUT_TOPIC_NAME, 0, 3); + inputPosition = Position.emptyPosition().withComponent(INPUT_TOPIC_NAME, 0, 3); } - @BeforeEach - public void beforeTest() { + private void setup(final String groupProtocol, final TestInfo testInfo) { + this.groupProtocol = groupProtocol; final StreamsBuilder builder = new StreamsBuilder(); builder.table(INPUT_TOPIC_NAME, Materialized.as(Stores.persistentVersionedKeyValueStore(STORE_NAME, HISTORY_RETENTION, SEGMENT_INTERVAL))); final Properties configs = new Properties(); - configs.put(StreamsConfig.APPLICATION_ID_CONFIG, "app"); + final String safeTestName = safeUniqueTestName(testInfo); + configs.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + safeTestName); configs.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); configs.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.IntegerSerde.class.getName()); configs.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.IntegerSerde.class.getName()); + configs.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, groupProtocol); kafkaStreams = IntegrationTestUtils.getStartedStreams(configs, builder, true); } @@ -132,8 +148,19 @@ public class IQv2VersionedStoreIntegrationTest { CLUSTER.stop(); } - @Test - public void verifyStore() { + private static Stream groupProtocolParameters() { + return Stream.of( + Arguments.of("classic", "CLASSIC protocol"), + Arguments.of("streams", "STREAMS protocol") + ); + } + + @ParameterizedTest(name = "{1}") + @MethodSource("groupProtocolParameters") + public void verifyStore(final String groupProtocol, final String testName, final TestInfo testInfo) throws Exception { + // Set up streams + setup(groupProtocol, testInfo); + /* Test Versioned Key Queries */ // retrieve the latest value shouldHandleVersionedKeyQuery(Optional.empty(), RECORD_VALUES[3], RECORD_TIMESTAMPS[3], Optional.empty()); @@ -255,7 +282,10 @@ public class IQv2VersionedStoreIntegrationTest { private void shouldHandleRaceCondition() { final MultiVersionedKeyQuery query = defineQuery(RECORD_KEY, Optional.empty(), Optional.empty(), ResultOrder.ANY); - final Map>> partitionResults = sendRequestAndReceiveResults(query, kafkaStreams); + // For race condition test, we don't use position bounds since we're testing concurrent updates + final StateQueryRequest> request = StateQueryRequest.inStore(STORE_NAME).withQuery(query); + final StateQueryResult> result = IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request); + final Map>> partitionResults = result.getPartitionResults(); // verify results in two steps for (final Entry>> partitionResultsEntry : partitionResults.entrySet()) { @@ -327,14 +357,14 @@ public class IQv2VersionedStoreIntegrationTest { return query; } - private static Map>> sendRequestAndReceiveResults(final MultiVersionedKeyQuery query, final KafkaStreams kafkaStreams) { - final StateQueryRequest> request = StateQueryRequest.inStore(STORE_NAME).withQuery(query).withPositionBound(PositionBound.at(INPUT_POSITION)); + private Map>> sendRequestAndReceiveResults(final MultiVersionedKeyQuery query, final KafkaStreams kafkaStreams) { + final StateQueryRequest> request = StateQueryRequest.inStore(STORE_NAME).withQuery(query).withPositionBound(PositionBound.at(inputPosition)); final StateQueryResult> result = IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request); return result.getPartitionResults(); } - private static QueryResult> sendRequestAndReceiveResults(final VersionedKeyQuery query, final KafkaStreams kafkaStreams) { - final StateQueryRequest> request = StateQueryRequest.inStore(STORE_NAME).withQuery(query).withPositionBound(PositionBound.at(INPUT_POSITION)); + private QueryResult> sendRequestAndReceiveResults(final VersionedKeyQuery query, final KafkaStreams kafkaStreams) { + final StateQueryRequest> request = StateQueryRequest.inStore(STORE_NAME).withQuery(query).withPositionBound(PositionBound.at(inputPosition)); final StateQueryResult> result = IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request); return result.getOnlyPartitionResult(); } @@ -352,7 +382,7 @@ public class IQv2VersionedStoreIntegrationTest { /** * This method inserts a new value (999999) for the key in the oldest timestamp (RECORD_TIMESTAMPS[0]). */ - private static void updateRecordValue() { + private void updateRecordValue() { // update the record value at RECORD_TIMESTAMPS[0] final Properties producerProps = new Properties(); producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); @@ -361,8 +391,9 @@ public class IQv2VersionedStoreIntegrationTest { try (final KafkaProducer producer = new KafkaProducer<>(producerProps)) { producer.send(new ProducerRecord<>(INPUT_TOPIC_NAME, 0, RECORD_TIMESTAMPS[0], RECORD_KEY, 999999)); } - INPUT_POSITION.withComponent(INPUT_TOPIC_NAME, 0, 4); - assertThat(INPUT_POSITION, equalTo(Position.emptyPosition().withComponent(INPUT_TOPIC_NAME, 0, 4))); + + inputPosition = inputPosition.withComponent(INPUT_TOPIC_NAME, 0, 4); + assertThat(inputPosition, equalTo(Position.emptyPosition().withComponent(INPUT_TOPIC_NAME, 0, 4))); // make sure that the new value is picked up by the store final Properties consumerProps = new Properties();