KAFKA-19666: Remove old restoration codepath from RestoreIntegrationTest [4/N] (#20498)

Clean up `RestoreIntegrationTest.java`

Reviewers: Lucas Brutschy <lbrutschy@confluent.io>
This commit is contained in:
Shashank 2025-09-11 07:06:25 -07:00 committed by GitHub
parent 8a79ea2e5b
commit dd824a2e74
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 21 additions and 53 deletions

View File

@ -43,7 +43,6 @@ import org.apache.kafka.streams.KafkaStreams.State;
import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.StreamsConfig.InternalConfig;
import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
@ -77,7 +76,6 @@ import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.TestInfo; import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.Timeout; import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import org.junit.jupiter.params.provider.ValueSource; import org.junit.jupiter.params.provider.ValueSource;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -161,8 +159,8 @@ public class RestoreIntegrationTest {
CLUSTER.createTopic(inputStream, 2, 1); CLUSTER.createTopic(inputStream, 2, 1);
} }
private Properties props(final boolean stateUpdaterEnabled) { private Properties props() {
return props(mkObjectProperties(mkMap(mkEntry(InternalConfig.STATE_UPDATER_ENABLED, stateUpdaterEnabled)))); return props(mkObjectProperties(mkMap()));
} }
private Properties props(final Properties extraProperties) { private Properties props(final Properties extraProperties) {
@ -267,17 +265,12 @@ public class RestoreIntegrationTest {
} }
@ParameterizedTest @ParameterizedTest
@CsvSource({ @ValueSource(booleans = {true, false})
"true,true", public void shouldRestoreStateFromSourceTopicForReadOnlyStore(final boolean useNewProtocol) throws Exception {
"true,false",
"false,true",
"false,false"
})
public void shouldRestoreStateFromSourceTopicForReadOnlyStore(final boolean stateUpdaterEnabled, final boolean useNewProtocol) throws Exception {
final AtomicInteger numReceived = new AtomicInteger(0); final AtomicInteger numReceived = new AtomicInteger(0);
final Topology topology = new Topology(); final Topology topology = new Topology();
final Properties props = props(stateUpdaterEnabled); final Properties props = props();
if (useNewProtocol) { if (useNewProtocol) {
props.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.STREAMS.name()); props.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.STREAMS.name());
} }
@ -338,17 +331,12 @@ public class RestoreIntegrationTest {
} }
@ParameterizedTest @ParameterizedTest
@CsvSource({ @ValueSource(booleans = {true, false})
"true,true", public void shouldRestoreStateFromSourceTopicForGlobalTable(final boolean useNewProtocol) throws Exception {
"true,false",
"false,true",
"false,false"
})
public void shouldRestoreStateFromSourceTopicForGlobalTable(final boolean stateUpdaterEnabled, final boolean useNewProtocol) throws Exception {
final AtomicInteger numReceived = new AtomicInteger(0); final AtomicInteger numReceived = new AtomicInteger(0);
final StreamsBuilder builder = new StreamsBuilder(); final StreamsBuilder builder = new StreamsBuilder();
final Properties props = props(stateUpdaterEnabled); final Properties props = props();
props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.OPTIMIZE); props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.OPTIMIZE);
if (useNewProtocol) { if (useNewProtocol) {
props.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.STREAMS.name()); props.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.STREAMS.name());
@ -413,20 +401,15 @@ public class RestoreIntegrationTest {
} }
@ParameterizedTest @ParameterizedTest
@CsvSource({ @ValueSource(booleans = {true, false})
"true,true", public void shouldRestoreStateFromChangelogTopic(final boolean useNewProtocol) throws Exception {
"true,false",
"false,true",
"false,false"
})
public void shouldRestoreStateFromChangelogTopic(final boolean stateUpdaterEnabled, final boolean useNewProtocol) throws Exception {
final String changelog = appId + "-store-changelog"; final String changelog = appId + "-store-changelog";
CLUSTER.createTopic(changelog, 2, 1); CLUSTER.createTopic(changelog, 2, 1);
final AtomicInteger numReceived = new AtomicInteger(0); final AtomicInteger numReceived = new AtomicInteger(0);
final StreamsBuilder builder = new StreamsBuilder(); final StreamsBuilder builder = new StreamsBuilder();
final Properties props = props(stateUpdaterEnabled); final Properties props = props();
if (useNewProtocol) { if (useNewProtocol) {
props.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.STREAMS.name()); props.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.STREAMS.name());
@ -474,13 +457,8 @@ public class RestoreIntegrationTest {
} }
@ParameterizedTest @ParameterizedTest
@CsvSource({ @ValueSource(booleans = {true, false})
"true,true", public void shouldSuccessfullyStartWhenLoggingDisabled(final boolean useNewProtocol) throws InterruptedException {
"true,false",
"false,true",
"false,false"
})
public void shouldSuccessfullyStartWhenLoggingDisabled(final boolean stateUpdaterEnabled, final boolean useNewProtocol) throws InterruptedException {
final StreamsBuilder builder = new StreamsBuilder(); final StreamsBuilder builder = new StreamsBuilder();
final KStream<Integer, Integer> stream = builder.stream(inputStream); final KStream<Integer, Integer> stream = builder.stream(inputStream);
@ -490,7 +468,7 @@ public class RestoreIntegrationTest {
Integer::sum, Integer::sum,
Materialized.<Integer, Integer, KeyValueStore<Bytes, byte[]>>as("reduce-store").withLoggingDisabled() Materialized.<Integer, Integer, KeyValueStore<Bytes, byte[]>>as("reduce-store").withLoggingDisabled()
); );
final Properties props = props(stateUpdaterEnabled); final Properties props = props();
if (useNewProtocol) { if (useNewProtocol) {
props.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.STREAMS.name()); props.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.STREAMS.name());
} }
@ -503,13 +481,8 @@ public class RestoreIntegrationTest {
} }
@ParameterizedTest @ParameterizedTest
@CsvSource({ @ValueSource(booleans = {true, false})
"true,true", public void shouldProcessDataFromStoresWithLoggingDisabled(final boolean useNewProtocol) throws InterruptedException {
"true,false",
"false,true",
"false,false"
})
public void shouldProcessDataFromStoresWithLoggingDisabled(final boolean stateUpdaterEnabled, final boolean useNewProtocol) throws InterruptedException {
IntegrationTestUtils.produceKeyValuesSynchronously(inputStream, IntegrationTestUtils.produceKeyValuesSynchronously(inputStream,
asList(KeyValue.pair(1, 1), asList(KeyValue.pair(1, 1),
KeyValue.pair(2, 2), KeyValue.pair(2, 2),
@ -537,7 +510,7 @@ public class RestoreIntegrationTest {
final Topology topology = streamsBuilder.build(); final Topology topology = streamsBuilder.build();
final Properties props = props(stateUpdaterEnabled); final Properties props = props();
if (useNewProtocol) { if (useNewProtocol) {
props.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.STREAMS.name()); props.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.STREAMS.name());
@ -558,13 +531,8 @@ public class RestoreIntegrationTest {
} }
@ParameterizedTest @ParameterizedTest
@CsvSource({ @ValueSource(booleans = {true, false})
"true,true", public void shouldRecycleStateFromStandbyTaskPromotedToActiveTaskAndNotRestore(final boolean useNewProtocol) throws Exception {
"true,false",
"false,true",
"false,false"
})
public void shouldRecycleStateFromStandbyTaskPromotedToActiveTaskAndNotRestore(final boolean stateUpdaterEnabled, final boolean useNewProtocol) throws Exception {
final StreamsBuilder builder = new StreamsBuilder(); final StreamsBuilder builder = new StreamsBuilder();
builder.table( builder.table(
inputStream, inputStream,
@ -576,7 +544,7 @@ public class RestoreIntegrationTest {
CLUSTER.setGroupStandbyReplicas(appId, 1); CLUSTER.setGroupStandbyReplicas(appId, 1);
} }
final Properties props1 = props(stateUpdaterEnabled); final Properties props1 = props();
props1.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1); props1.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
props1.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory(appId + "-1").getPath()); props1.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory(appId + "-1").getPath());
if (useNewProtocol) { if (useNewProtocol) {
@ -585,7 +553,7 @@ public class RestoreIntegrationTest {
purgeLocalStreamsState(props1); purgeLocalStreamsState(props1);
final KafkaStreams streams1 = new KafkaStreams(builder.build(), props1); final KafkaStreams streams1 = new KafkaStreams(builder.build(), props1);
final Properties props2 = props(stateUpdaterEnabled); final Properties props2 = props();
props2.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1); props2.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
props2.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory(appId + "-2").getPath()); props2.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory(appId + "-2").getPath());
if (useNewProtocol) { if (useNewProtocol) {