KAFKA-19118: Enable KIP-1071 in InternalTopicIntegrationTest (#19425)

KIP-1071 creates internal topics broker-side, so this test checks
whether, when KIP-1071 is enabled, basically the same topics are
created.

It also adds a little helper method in `EmbeddedKafkaCluster`, so that
fewer code changes are required to enable KIP-1071. We use that helper
in the already enabled SmokeTestDriverIntegrationTest and revert some of
the changes there (making the cluster `final` again).

Reviewers: Bill Bejeck <bbejeck@apache.org>, PoAn Yang
 <payang@apache.org>
This commit is contained in:
Lucas Brutschy 2025-04-10 16:57:38 +02:00 committed by GitHub
parent c3b7aa6e64
commit c65a161cd8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 42 additions and 25 deletions

View File

@ -28,6 +28,7 @@ import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.server.util.MockTime; import org.apache.kafka.server.util.MockTime;
import org.apache.kafka.streams.GroupProtocol;
import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.StreamsConfig;
@ -48,8 +49,9 @@ import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout; import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import java.io.IOException; import java.io.IOException;
import java.time.Duration; import java.time.Duration;
@ -73,7 +75,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
@Timeout(600) @Timeout(600)
@Tag("integration") @Tag("integration")
public class InternalTopicIntegrationTest { public class InternalTopicIntegrationTest {
public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1); public static final EmbeddedKafkaCluster CLUSTER = EmbeddedKafkaCluster.withStreamsRebalanceProtocol(1);
@BeforeAll @BeforeAll
public static void startCluster() throws IOException, InterruptedException { public static void startCluster() throws IOException, InterruptedException {
@ -147,14 +149,22 @@ public class InternalTopicIntegrationTest {
return Admin.create(adminClientConfig); return Admin.create(adminClientConfig);
} }
private void configureStreams(final boolean streamsProtocolEnabled, final String appID) {
streamsProp.put(StreamsConfig.APPLICATION_ID_CONFIG, appID);
if (streamsProtocolEnabled) {
streamsProp.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.STREAMS.name().toLowerCase(Locale.getDefault()));
}
}
/* /*
* This test just ensures that the assignor does not get stuck during partition number resolution * This test just ensures that the assignor does not get stuck during partition number resolution
* for internal repartition topics. See KAFKA-10689 * for internal repartition topics. See KAFKA-10689
*/ */
@Test @ParameterizedTest
public void shouldGetToRunningWithWindowedTableInFKJ() throws Exception { @ValueSource(booleans = {true, false})
final String appID = APP_ID + "-windowed-FKJ"; public void shouldGetToRunningWithWindowedTableInFKJ(final boolean streamsProtocolEnabled) throws Exception {
streamsProp.put(StreamsConfig.APPLICATION_ID_CONFIG, appID); final String appID = APP_ID + "-windowed-FKJ-" + streamsProtocolEnabled;
configureStreams(streamsProtocolEnabled, appID);
final StreamsBuilder streamsBuilder = new StreamsBuilder(); final StreamsBuilder streamsBuilder = new StreamsBuilder();
final KStream<String, String> inputTopic = streamsBuilder.stream(DEFAULT_INPUT_TOPIC); final KStream<String, String> inputTopic = streamsBuilder.stream(DEFAULT_INPUT_TOPIC);
@ -179,10 +189,12 @@ public class InternalTopicIntegrationTest {
} }
} }
@Test
public void shouldCompactTopicsForKeyValueStoreChangelogs() throws Exception { @ParameterizedTest
final String appID = APP_ID + "-compact"; @ValueSource(booleans = {true, false})
streamsProp.put(StreamsConfig.APPLICATION_ID_CONFIG, appID); public void shouldCompactTopicsForKeyValueStoreChangelogs(final boolean streamsProtocolEnabled) throws Exception {
final String appID = APP_ID + "-compact-" + streamsProtocolEnabled;
configureStreams(streamsProtocolEnabled, appID);
// //
// Step 1: Configure and start a simple word count topology // Step 1: Configure and start a simple word count topology
@ -216,10 +228,11 @@ public class InternalTopicIntegrationTest {
assertEquals(4, repartitionProps.size()); assertEquals(4, repartitionProps.size());
} }
@Test @ParameterizedTest
public void shouldCompactAndDeleteTopicsForWindowStoreChangelogs() throws Exception { @ValueSource(booleans = {true, false})
final String appID = APP_ID + "-compact-delete"; public void shouldCompactAndDeleteTopicsForWindowStoreChangelogs(final boolean streamsProtocolEnabled) throws Exception {
streamsProp.put(StreamsConfig.APPLICATION_ID_CONFIG, appID); final String appID = APP_ID + "-compact-delete-" + streamsProtocolEnabled;
configureStreams(streamsProtocolEnabled, appID);
// //
// Step 1: Configure and start a simple word count topology // Step 1: Configure and start a simple word count topology

View File

@ -18,8 +18,6 @@ package org.apache.kafka.streams.integration;
import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.utils.Exit; import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
import org.apache.kafka.server.config.ServerConfigs;
import org.apache.kafka.streams.GroupProtocol; import org.apache.kafka.streams.GroupProtocol;
import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.StreamsConfig.InternalConfig; import org.apache.kafka.streams.StreamsConfig.InternalConfig;
@ -53,21 +51,17 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
@Timeout(600) @Timeout(600)
@Tag("integration") @Tag("integration")
public class SmokeTestDriverIntegrationTest { public class SmokeTestDriverIntegrationTest {
public static EmbeddedKafkaCluster cluster; public static final EmbeddedKafkaCluster CLUSTER = EmbeddedKafkaCluster.withStreamsRebalanceProtocol(3);
public TestInfo testInfo; public TestInfo testInfo;
@BeforeAll @BeforeAll
public static void startCluster() throws IOException { public static void startCluster() throws IOException {
final Properties props = new Properties(); CLUSTER.start();
props.setProperty(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, "classic,consumer,streams");
props.setProperty(ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG, "true");
cluster = new EmbeddedKafkaCluster(3, props);
cluster.start();
} }
@AfterAll @AfterAll
public static void closeCluster() { public static void closeCluster() {
cluster.stop(); CLUSTER.stop();
} }
@BeforeEach @BeforeEach
@ -135,9 +129,9 @@ public class SmokeTestDriverIntegrationTest {
int numClientsCreated = 0; int numClientsCreated = 0;
final ArrayList<SmokeTestClient> clients = new ArrayList<>(); final ArrayList<SmokeTestClient> clients = new ArrayList<>();
IntegrationTestUtils.cleanStateBeforeTest(cluster, SmokeTestDriver.topics()); IntegrationTestUtils.cleanStateBeforeTest(CLUSTER, SmokeTestDriver.topics());
final String bootstrapServers = cluster.bootstrapServers(); final String bootstrapServers = CLUSTER.bootstrapServers();
final Driver driver = new Driver(bootstrapServers, 10, 1000); final Driver driver = new Driver(bootstrapServers, 10, 1000);
driver.start(); driver.start();
System.out.println("started driver"); System.out.println("started driver");

View File

@ -140,6 +140,16 @@ public class EmbeddedKafkaCluster {
this.time = new MockTime(mockTimeMillisStart, mockTimeNanoStart); this.time = new MockTime(mockTimeMillisStart, mockTimeNanoStart);
} }
public static EmbeddedKafkaCluster withStreamsRebalanceProtocol(final int numBrokers) {
return withStreamsRebalanceProtocol(numBrokers, new Properties());
}
public static EmbeddedKafkaCluster withStreamsRebalanceProtocol(final int numBrokers, final Properties props) {
props.setProperty(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, "classic,consumer,streams");
props.setProperty(ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG, "true");
return new EmbeddedKafkaCluster(numBrokers, props);
}
public void start() { public void start() {
try { try {
cluster.format(); cluster.format();