mirror of https://github.com/apache/kafka.git
KAFKA-19666: Remove old restoration codepath from EosIntegrationTest [5/N] (#20499)
clean up `EosIntegrationTest.java` Reviewers: Lucas Brutschy <lucasbru@apache.org>
This commit is contained in:
parent
a244565ed2
commit
709c5fab22
|
@ -788,19 +788,8 @@ public class EosIntegrationTest {
|
||||||
|
|
||||||
@ParameterizedTest
|
@ParameterizedTest
|
||||||
@ValueSource(booleans = {true, false})
|
@ValueSource(booleans = {true, false})
|
||||||
public void shouldCheckpointRestoredOffsetsWhenClosingCleanDuringRestoringStateUpdaterEnabled(
|
public void shouldCheckpointRestoredOffsetsWhenClosingCleanDuringRestoring(
|
||||||
final boolean processingThreadsEnabled) throws Exception {
|
final boolean processingThreadsEnabled) throws Exception {
|
||||||
shouldCheckpointRestoredOffsetsWhenClosingCleanDuringRestoring(processingThreadsEnabled, true);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void shouldCheckpointRestoredOffsetsWhenClosingCleanDuringRestoringStateUpdaterDisabled() throws Exception {
|
|
||||||
shouldCheckpointRestoredOffsetsWhenClosingCleanDuringRestoring(false, false);
|
|
||||||
}
|
|
||||||
|
|
||||||
private void shouldCheckpointRestoredOffsetsWhenClosingCleanDuringRestoring(
|
|
||||||
final boolean processingThreadsEnabled,
|
|
||||||
final boolean stateUpdaterEnabled) throws Exception {
|
|
||||||
|
|
||||||
final Properties streamsConfiguration = new Properties();
|
final Properties streamsConfiguration = new Properties();
|
||||||
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
|
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
|
||||||
|
@ -812,7 +801,6 @@ public class EosIntegrationTest {
|
||||||
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
|
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
|
||||||
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory(applicationId).getPath());
|
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory(applicationId).getPath());
|
||||||
streamsConfiguration.put(InternalConfig.PROCESSING_THREADS_ENABLED, processingThreadsEnabled);
|
streamsConfiguration.put(InternalConfig.PROCESSING_THREADS_ENABLED, processingThreadsEnabled);
|
||||||
streamsConfiguration.put(InternalConfig.STATE_UPDATER_ENABLED, stateUpdaterEnabled);
|
|
||||||
streamsConfiguration.put(StreamsConfig.restoreConsumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), 100);
|
streamsConfiguration.put(StreamsConfig.restoreConsumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), 100);
|
||||||
final String stateStoreName = "stateStore";
|
final String stateStoreName = "stateStore";
|
||||||
|
|
||||||
|
@ -1223,7 +1211,6 @@ public class EosIntegrationTest {
|
||||||
properties.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
|
properties.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
|
||||||
properties.put(StreamsConfig.STATE_DIR_CONFIG, stateTmpDir + appDir);
|
properties.put(StreamsConfig.STATE_DIR_CONFIG, stateTmpDir + appDir);
|
||||||
properties.put(StreamsConfig.APPLICATION_SERVER_CONFIG, dummyHostName + ":2142");
|
properties.put(StreamsConfig.APPLICATION_SERVER_CONFIG, dummyHostName + ":2142");
|
||||||
properties.put(InternalConfig.STATE_UPDATER_ENABLED, processingThreadsEnabled);
|
|
||||||
properties.put(InternalConfig.PROCESSING_THREADS_ENABLED, processingThreadsEnabled);
|
properties.put(InternalConfig.PROCESSING_THREADS_ENABLED, processingThreadsEnabled);
|
||||||
|
|
||||||
final Properties config = StreamsTestUtils.getStreamsConfig(
|
final Properties config = StreamsTestUtils.getStreamsConfig(
|
||||||
|
|
Loading…
Reference in New Issue