KAFKA-19705: Enable streams rebalance protocol in IQv2 integration test (#20541)

Update IQv2 Integration tests for streams group protocol

Reviewers: Lucas Brutschy <lbrutschy@confluent.io>
This commit is contained in:
Jinhe Zhang 2025-09-18 03:41:52 -04:00 committed by GitHub
parent e647bdcee5
commit 04b4a8f571
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 112 additions and 48 deletions

View File

@ -57,11 +57,12 @@ import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import java.io.IOException;
import java.lang.reflect.Field;
@ -76,6 +77,7 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Stream;
import static java.util.Collections.singleton;
import static org.apache.kafka.streams.query.StateQueryRequest.inStore;
@ -99,6 +101,7 @@ public class IQv2IntegrationTest {
public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
private KafkaStreams kafkaStreams;
private String groupProtocol;
@BeforeAll
public static void before()
@ -149,8 +152,8 @@ public class IQv2IntegrationTest {
));
}
@BeforeEach
public void beforeTest(final TestInfo testInfo) {
private void setup(final String groupProtocol, final TestInfo testInfo) {
this.groupProtocol = groupProtocol;
final StreamsBuilder builder = new StreamsBuilder();
builder.table(
@ -159,7 +162,6 @@ public class IQv2IntegrationTest {
Materialized.as(STORE_NAME)
);
final String safeTestName = safeUniqueTestName(testInfo);
kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration(safeTestName));
kafkaStreams.cleanUp();
@ -167,8 +169,10 @@ public class IQv2IntegrationTest {
@AfterEach
public void afterTest() {
kafkaStreams.close(Duration.ofSeconds(60));
kafkaStreams.cleanUp();
if (kafkaStreams != null) {
kafkaStreams.close(Duration.ofSeconds(60));
kafkaStreams.cleanUp();
}
}
@AfterAll
@ -176,8 +180,10 @@ public class IQv2IntegrationTest {
CLUSTER.stop();
}
@Test
public void shouldFailUnknownStore() {
@ParameterizedTest(name = "{1}")
@MethodSource("groupProtocolParameters")
public void shouldFailUnknownStore(final String groupProtocol, final String testName, final TestInfo testInfo) {
setup(groupProtocol, testInfo);
final KeyQuery<Integer, ValueAndTimestamp<Integer>> query = KeyQuery.withKey(1);
final StateQueryRequest<ValueAndTimestamp<Integer>> request =
inStore("unknown-store").withQuery(query);
@ -185,8 +191,10 @@ public class IQv2IntegrationTest {
assertThrows(UnknownStateStoreException.class, () -> kafkaStreams.query(request));
}
@Test
public void shouldFailNotStarted() {
@ParameterizedTest(name = "{1}")
@MethodSource("groupProtocolParameters")
public void shouldFailNotStarted(final String groupProtocol, final String testName, final TestInfo testInfo) {
setup(groupProtocol, testInfo);
final KeyQuery<Integer, ValueAndTimestamp<Integer>> query = KeyQuery.withKey(1);
final StateQueryRequest<ValueAndTimestamp<Integer>> request =
inStore(STORE_NAME).withQuery(query);
@ -194,8 +202,10 @@ public class IQv2IntegrationTest {
assertThrows(StreamsNotStartedException.class, () -> kafkaStreams.query(request));
}
@Test
public void shouldFailStopped() {
@ParameterizedTest(name = "{1}")
@MethodSource("groupProtocolParameters")
public void shouldFailStopped(final String groupProtocol, final String testName, final TestInfo testInfo) {
setup(groupProtocol, testInfo);
final KeyQuery<Integer, ValueAndTimestamp<Integer>> query = KeyQuery.withKey(1);
final StateQueryRequest<ValueAndTimestamp<Integer>> request =
inStore(STORE_NAME).withQuery(query);
@ -205,9 +215,11 @@ public class IQv2IntegrationTest {
assertThrows(StreamsStoppedException.class, () -> kafkaStreams.query(request));
}
@Test
public void shouldRejectNonRunningActive()
@ParameterizedTest(name = "{1}")
@MethodSource("groupProtocolParameters")
public void shouldRejectNonRunningActive(final String groupProtocol, final String testName, final TestInfo testInfo)
throws NoSuchFieldException, IllegalAccessException {
setup(groupProtocol, testInfo);
final KeyQuery<Integer, ValueAndTimestamp<Integer>> query = KeyQuery.withKey(1);
final StateQueryRequest<ValueAndTimestamp<Integer>> request =
inStore(STORE_NAME).withQuery(query).requireActive();
@ -261,8 +273,10 @@ public class IQv2IntegrationTest {
}
}
@Test
public void shouldFetchFromPartition() {
@ParameterizedTest(name = "{1}")
@MethodSource("groupProtocolParameters")
public void shouldFetchFromPartition(final String groupProtocol, final String testName, final TestInfo testInfo) {
setup(groupProtocol, testInfo);
final KeyQuery<Integer, ValueAndTimestamp<Integer>> query = KeyQuery.withKey(1);
final int partition = 1;
final Set<Integer> partitions = singleton(partition);
@ -276,8 +290,10 @@ public class IQv2IntegrationTest {
assertThat(result.getPartitionResults().keySet(), equalTo(partitions));
}
@Test
public void shouldFetchExplicitlyFromAllPartitions() {
@ParameterizedTest(name = "{1}")
@MethodSource("groupProtocolParameters")
public void shouldFetchExplicitlyFromAllPartitions(final String groupProtocol, final String testName, final TestInfo testInfo) {
setup(groupProtocol, testInfo);
final KeyQuery<Integer, ValueAndTimestamp<Integer>> query = KeyQuery.withKey(1);
final Set<Integer> partitions = Set.of(0, 1);
final StateQueryRequest<ValueAndTimestamp<Integer>> request =
@ -290,8 +306,10 @@ public class IQv2IntegrationTest {
assertThat(result.getPartitionResults().keySet(), equalTo(partitions));
}
@Test
public void shouldNotRequireQueryHandler(final TestInfo testInfo) {
@ParameterizedTest(name = "{1}")
@MethodSource("groupProtocolParameters")
public void shouldNotRequireQueryHandler(final String groupProtocol, final String testName, final TestInfo testInfo) {
setup(groupProtocol, testInfo);
final KeyQuery<Integer, ValueAndTimestamp<Integer>> query = KeyQuery.withKey(1);
final int partition = 1;
final Set<Integer> partitions = singleton(partition);
@ -423,8 +441,11 @@ public class IQv2IntegrationTest {
);
// Discard the basic streams and replace with test-specific topology
kafkaStreams.close();
if (kafkaStreams != null) {
kafkaStreams.close();
}
final String safeTestName = safeUniqueTestName(testInfo);
this.groupProtocol = groupProtocol;
kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration(safeTestName));
kafkaStreams.cleanUp();
@ -446,7 +467,7 @@ public class IQv2IntegrationTest {
private Properties streamsConfiguration(final String safeTestName) {
final Properties config = new Properties();
config.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.OPTIMIZE);
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + safeTestName);
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + safeTestName + "-" + groupProtocol);
config.put(StreamsConfig.APPLICATION_SERVER_CONFIG, "localhost:" + (++port));
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
config.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
@ -458,6 +479,14 @@ public class IQv2IntegrationTest {
config.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 1000);
config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
config.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1);
config.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, groupProtocol);
return config;
}
private static Stream<Arguments> groupProtocolParameters() {
return Stream.of(
Arguments.of("classic", "CLASSIC protocol"),
Arguments.of("streams", "STREAMS protocol")
);
}
}

View File

@ -360,7 +360,9 @@ public class IQv2StoreIntegrationTest {
for (final boolean logEnabled : Arrays.asList(true, false)) {
for (final StoresToTest toTest : StoresToTest.values()) {
for (final String kind : Arrays.asList("DSL", "PAPI")) {
values.add(Arguments.of(cacheEnabled, logEnabled, toTest.name(), kind));
for (final String groupProtocol : Arrays.asList("classic", "streams")) {
values.add(Arguments.of(cacheEnabled, logEnabled, toTest.name(), kind, groupProtocol));
}
}
}
}
@ -426,13 +428,14 @@ public class IQv2StoreIntegrationTest {
));
}
public void setup(final boolean cache, final boolean log, final StoresToTest storeToTest, final String kind) {
public void setup(final boolean cache, final boolean log, final StoresToTest storeToTest, final String kind, final String groupProtocol) {
final StoreSupplier<?> supplier = storeToTest.supplier();
final Properties streamsConfig = streamsConfiguration(
cache,
log,
storeToTest.name(),
kind
kind,
groupProtocol
);
final StreamsBuilder builder = new StreamsBuilder();
@ -765,8 +768,8 @@ public class IQv2StoreIntegrationTest {
@ParameterizedTest
@MethodSource("data")
public void verifyStore(final boolean cache, final boolean log, final StoresToTest storeToTest, final String kind) {
setup(cache, log, storeToTest, kind);
public void verifyStore(final boolean cache, final boolean log, final StoresToTest storeToTest, final String kind, final String groupProtocol) {
setup(cache, log, storeToTest, kind, groupProtocol);
try {
if (storeToTest.global()) {
// See KAFKA-13523
@ -2030,10 +2033,10 @@ public class IQv2StoreIntegrationTest {
}
private static Properties streamsConfiguration(final boolean cache, final boolean log,
final String supplier, final String kind) {
final String supplier, final String kind, final String groupProtocol) {
final String safeTestName =
IQv2StoreIntegrationTest.class.getName() + "-" + cache + "-" + log + "-" + supplier
+ "-" + kind + "-" + RANDOM.nextInt();
+ "-" + kind + "-" + groupProtocol + "-" + RANDOM.nextInt();
final Properties config = new Properties();
config.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.OPTIMIZE);
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + safeTestName);
@ -2048,6 +2051,7 @@ public class IQv2StoreIntegrationTest {
config.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 1000);
config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
config.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1);
config.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, groupProtocol);
return config;
}
}

View File

@ -47,7 +47,10 @@ import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import java.time.Duration;
import java.time.Instant;
@ -57,7 +60,9 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.Properties;
import java.util.stream.Stream;
import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.empty;
@ -83,16 +88,25 @@ public class IQv2VersionedStoreIntegrationTest {
private static final Long[] RECORD_TIMESTAMPS = {BASE_TIMESTAMP_LONG, BASE_TIMESTAMP_LONG + 10, BASE_TIMESTAMP_LONG + 20, BASE_TIMESTAMP_LONG + 30};
private static final int RECORD_NUMBER = RECORD_VALUES.length;
private static final int LAST_INDEX = RECORD_NUMBER - 1;
private static final Position INPUT_POSITION = Position.emptyPosition();
private Position inputPosition;
public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS, Utils.mkProperties(Collections.singletonMap("auto.create.topics.enable", "true")));
private KafkaStreams kafkaStreams;
private String groupProtocol;
@BeforeAll
public static void before() throws Exception {
public static void beforeAll() throws Exception {
CLUSTER.start();
}
@BeforeEach
public void beforeEach() throws Exception {
// Delete and recreate the topic to ensure clean state for each test
CLUSTER.deleteTopic(INPUT_TOPIC_NAME);
CLUSTER.createTopic(INPUT_TOPIC_NAME, 1, 1);
// Set up fresh test data
final Properties producerProps = new Properties();
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
@ -103,19 +117,21 @@ public class IQv2VersionedStoreIntegrationTest {
producer.send(new ProducerRecord<>(INPUT_TOPIC_NAME, 0, RECORD_TIMESTAMPS[2], RECORD_KEY, RECORD_VALUES[2])).get();
producer.send(new ProducerRecord<>(INPUT_TOPIC_NAME, 0, RECORD_TIMESTAMPS[3], RECORD_KEY, RECORD_VALUES[3])).get();
}
INPUT_POSITION.withComponent(INPUT_TOPIC_NAME, 0, 3);
inputPosition = Position.emptyPosition().withComponent(INPUT_TOPIC_NAME, 0, 3);
}
@BeforeEach
public void beforeTest() {
private void setup(final String groupProtocol, final TestInfo testInfo) {
this.groupProtocol = groupProtocol;
final StreamsBuilder builder = new StreamsBuilder();
builder.table(INPUT_TOPIC_NAME,
Materialized.as(Stores.persistentVersionedKeyValueStore(STORE_NAME, HISTORY_RETENTION, SEGMENT_INTERVAL)));
final Properties configs = new Properties();
configs.put(StreamsConfig.APPLICATION_ID_CONFIG, "app");
final String safeTestName = safeUniqueTestName(testInfo);
configs.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + safeTestName);
configs.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
configs.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.IntegerSerde.class.getName());
configs.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.IntegerSerde.class.getName());
configs.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, groupProtocol);
kafkaStreams = IntegrationTestUtils.getStartedStreams(configs, builder, true);
}
@ -132,8 +148,19 @@ public class IQv2VersionedStoreIntegrationTest {
CLUSTER.stop();
}
@Test
public void verifyStore() {
private static Stream<Arguments> groupProtocolParameters() {
return Stream.of(
Arguments.of("classic", "CLASSIC protocol"),
Arguments.of("streams", "STREAMS protocol")
);
}
@ParameterizedTest(name = "{1}")
@MethodSource("groupProtocolParameters")
public void verifyStore(final String groupProtocol, final String testName, final TestInfo testInfo) throws Exception {
// Set up streams
setup(groupProtocol, testInfo);
/* Test Versioned Key Queries */
// retrieve the latest value
shouldHandleVersionedKeyQuery(Optional.empty(), RECORD_VALUES[3], RECORD_TIMESTAMPS[3], Optional.empty());
@ -255,7 +282,10 @@ public class IQv2VersionedStoreIntegrationTest {
private void shouldHandleRaceCondition() {
final MultiVersionedKeyQuery<Integer, Integer> query = defineQuery(RECORD_KEY, Optional.empty(), Optional.empty(), ResultOrder.ANY);
final Map<Integer, QueryResult<VersionedRecordIterator<Integer>>> partitionResults = sendRequestAndReceiveResults(query, kafkaStreams);
// For race condition test, we don't use position bounds since we're testing concurrent updates
final StateQueryRequest<VersionedRecordIterator<Integer>> request = StateQueryRequest.inStore(STORE_NAME).withQuery(query);
final StateQueryResult<VersionedRecordIterator<Integer>> result = IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request);
final Map<Integer, QueryResult<VersionedRecordIterator<Integer>>> partitionResults = result.getPartitionResults();
// verify results in two steps
for (final Entry<Integer, QueryResult<VersionedRecordIterator<Integer>>> partitionResultsEntry : partitionResults.entrySet()) {
@ -327,14 +357,14 @@ public class IQv2VersionedStoreIntegrationTest {
return query;
}
private static Map<Integer, QueryResult<VersionedRecordIterator<Integer>>> sendRequestAndReceiveResults(final MultiVersionedKeyQuery<Integer, Integer> query, final KafkaStreams kafkaStreams) {
final StateQueryRequest<VersionedRecordIterator<Integer>> request = StateQueryRequest.inStore(STORE_NAME).withQuery(query).withPositionBound(PositionBound.at(INPUT_POSITION));
private Map<Integer, QueryResult<VersionedRecordIterator<Integer>>> sendRequestAndReceiveResults(final MultiVersionedKeyQuery<Integer, Integer> query, final KafkaStreams kafkaStreams) {
final StateQueryRequest<VersionedRecordIterator<Integer>> request = StateQueryRequest.inStore(STORE_NAME).withQuery(query).withPositionBound(PositionBound.at(inputPosition));
final StateQueryResult<VersionedRecordIterator<Integer>> result = IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request);
return result.getPartitionResults();
}
private static QueryResult<VersionedRecord<Integer>> sendRequestAndReceiveResults(final VersionedKeyQuery<Integer, Integer> query, final KafkaStreams kafkaStreams) {
final StateQueryRequest<VersionedRecord<Integer>> request = StateQueryRequest.inStore(STORE_NAME).withQuery(query).withPositionBound(PositionBound.at(INPUT_POSITION));
private QueryResult<VersionedRecord<Integer>> sendRequestAndReceiveResults(final VersionedKeyQuery<Integer, Integer> query, final KafkaStreams kafkaStreams) {
final StateQueryRequest<VersionedRecord<Integer>> request = StateQueryRequest.inStore(STORE_NAME).withQuery(query).withPositionBound(PositionBound.at(inputPosition));
final StateQueryResult<VersionedRecord<Integer>> result = IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request);
return result.getOnlyPartitionResult();
}
@ -352,7 +382,7 @@ public class IQv2VersionedStoreIntegrationTest {
/**
* This method inserts a new value (999999) for the key in the oldest timestamp (RECORD_TIMESTAMPS[0]).
*/
private static void updateRecordValue() {
private void updateRecordValue() {
// update the record value at RECORD_TIMESTAMPS[0]
final Properties producerProps = new Properties();
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
@ -361,8 +391,9 @@ public class IQv2VersionedStoreIntegrationTest {
try (final KafkaProducer<Integer, Integer> producer = new KafkaProducer<>(producerProps)) {
producer.send(new ProducerRecord<>(INPUT_TOPIC_NAME, 0, RECORD_TIMESTAMPS[0], RECORD_KEY, 999999));
}
INPUT_POSITION.withComponent(INPUT_TOPIC_NAME, 0, 4);
assertThat(INPUT_POSITION, equalTo(Position.emptyPosition().withComponent(INPUT_TOPIC_NAME, 0, 4)));
inputPosition = inputPosition.withComponent(INPUT_TOPIC_NAME, 0, 4);
assertThat(inputPosition, equalTo(Position.emptyPosition().withComponent(INPUT_TOPIC_NAME, 0, 4)));
// make sure that the new value is picked up by the store
final Properties consumerProps = new Properties();