diff --git a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java index d8743330218..46ace65cf04 100644 --- a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java @@ -72,9 +72,10 @@ import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Tag; -import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; import org.junit.jupiter.params.provider.ValueSource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -169,6 +170,15 @@ public class EosIntegrationTest { private String stateTmpDir; + private static java.util.stream.Stream groupProtocolAndProcessingThreadsParameters() { + return java.util.stream.Stream.of( + Arguments.of("classic", true), + Arguments.of("classic", false), + Arguments.of("streams", true), + Arguments.of("streams", false) + ); + } + @BeforeEach public void createTopics() throws Exception { applicationId = "appId-" + TEST_NUMBER.getAndIncrement(); @@ -181,16 +191,19 @@ public class EosIntegrationTest { CLUSTER.createTopic(MULTI_PARTITION_INPUT_TOPIC, NUM_TOPIC_PARTITIONS, 1); CLUSTER.createTopic(MULTI_PARTITION_THROUGH_TOPIC, NUM_TOPIC_PARTITIONS, 1); CLUSTER.createTopic(MULTI_PARTITION_OUTPUT_TOPIC, NUM_TOPIC_PARTITIONS, 1); + CLUSTER.setGroupStandbyReplicas(applicationId, 1); } - @Test - public void shouldBeAbleToRunWithEosEnabled() throws Exception { - runSimpleCopyTest(1, SINGLE_PARTITION_INPUT_TOPIC, null, SINGLE_PARTITION_OUTPUT_TOPIC, false); + @ParameterizedTest + @ValueSource(strings = {"classic", "streams"}) + public void shouldBeAbleToRunWithEosEnabled(final String groupProtocol) throws Exception { + runSimpleCopyTest(1, SINGLE_PARTITION_INPUT_TOPIC, null, SINGLE_PARTITION_OUTPUT_TOPIC, false, groupProtocol); } - @Test - public void shouldCommitCorrectOffsetIfInputTopicIsTransactional() throws Exception { - runSimpleCopyTest(1, SINGLE_PARTITION_INPUT_TOPIC, null, SINGLE_PARTITION_OUTPUT_TOPIC, true); + @ParameterizedTest + @ValueSource(strings = {"classic", "streams"}) + public void shouldCommitCorrectOffsetIfInputTopicIsTransactional(final String groupProtocol) throws Exception { + runSimpleCopyTest(1, SINGLE_PARTITION_INPUT_TOPIC, null, SINGLE_PARTITION_OUTPUT_TOPIC, true, groupProtocol); try (final Admin adminClient = Admin.create(mkMap(mkEntry(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()))); final Consumer consumer = new KafkaConsumer<>(mkMap( @@ -215,36 +228,42 @@ public class EosIntegrationTest { } } - @Test - public void shouldBeAbleToRestartAfterClose() throws Exception { - runSimpleCopyTest(2, SINGLE_PARTITION_INPUT_TOPIC, null, SINGLE_PARTITION_OUTPUT_TOPIC, false); + @ParameterizedTest + @ValueSource(strings = {"classic", "streams"}) + public void shouldBeAbleToRestartAfterClose(final String groupProtocol) throws Exception { + runSimpleCopyTest(2, SINGLE_PARTITION_INPUT_TOPIC, null, SINGLE_PARTITION_OUTPUT_TOPIC, false, groupProtocol); } - @Test - public void shouldBeAbleToCommitToMultiplePartitions() throws Exception { - runSimpleCopyTest(1, SINGLE_PARTITION_INPUT_TOPIC, null, MULTI_PARTITION_OUTPUT_TOPIC, false); + @ParameterizedTest + @ValueSource(strings = {"classic", "streams"}) + public void shouldBeAbleToCommitToMultiplePartitions(final String groupProtocol) throws Exception { + runSimpleCopyTest(1, SINGLE_PARTITION_INPUT_TOPIC, null, MULTI_PARTITION_OUTPUT_TOPIC, false, groupProtocol); } - @Test - public void shouldBeAbleToCommitMultiplePartitionOffsets() throws Exception { - runSimpleCopyTest(1, MULTI_PARTITION_INPUT_TOPIC, null, SINGLE_PARTITION_OUTPUT_TOPIC, false); + @ParameterizedTest + @ValueSource(strings = {"classic", "streams"}) + public void shouldBeAbleToCommitMultiplePartitionOffsets(final String groupProtocol) throws Exception { + runSimpleCopyTest(1, MULTI_PARTITION_INPUT_TOPIC, null, SINGLE_PARTITION_OUTPUT_TOPIC, false, groupProtocol); } - @Test - public void shouldBeAbleToRunWithTwoSubtopologies() throws Exception { - runSimpleCopyTest(1, SINGLE_PARTITION_INPUT_TOPIC, SINGLE_PARTITION_THROUGH_TOPIC, SINGLE_PARTITION_OUTPUT_TOPIC, false); + @ParameterizedTest + @ValueSource(strings = {"classic", "streams"}) + public void shouldBeAbleToRunWithTwoSubtopologies(final String groupProtocol) throws Exception { + runSimpleCopyTest(1, SINGLE_PARTITION_INPUT_TOPIC, SINGLE_PARTITION_THROUGH_TOPIC, SINGLE_PARTITION_OUTPUT_TOPIC, false, groupProtocol); } - @Test - public void shouldBeAbleToRunWithTwoSubtopologiesAndMultiplePartitions() throws Exception { - runSimpleCopyTest(1, MULTI_PARTITION_INPUT_TOPIC, MULTI_PARTITION_THROUGH_TOPIC, MULTI_PARTITION_OUTPUT_TOPIC, false); + @ParameterizedTest + @ValueSource(strings = {"classic", "streams"}) + public void shouldBeAbleToRunWithTwoSubtopologiesAndMultiplePartitions(final String groupProtocol) throws Exception { + runSimpleCopyTest(1, MULTI_PARTITION_INPUT_TOPIC, MULTI_PARTITION_THROUGH_TOPIC, MULTI_PARTITION_OUTPUT_TOPIC, false, groupProtocol); } private void runSimpleCopyTest(final int numberOfRestarts, final String inputTopic, final String throughTopic, final String outputTopic, - final boolean inputTopicTransactional) throws Exception { + final boolean inputTopicTransactional, + final String groupProtocol) throws Exception { final StreamsBuilder builder = new StreamsBuilder(); final KStream input = builder.stream(inputTopic); KStream output = input; @@ -263,6 +282,7 @@ public class EosIntegrationTest { properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), "earliest"); properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG), MAX_POLL_INTERVAL_MS - 1); properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG), MAX_POLL_INTERVAL_MS); + properties.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, groupProtocol); for (int i = 0; i < numberOfRestarts; ++i) { final Properties config = StreamsTestUtils.getStreamsConfig( @@ -326,8 +346,9 @@ public class EosIntegrationTest { return recordsPerKey; } - @Test - public void shouldBeAbleToPerformMultipleTransactions() throws Exception { + @ParameterizedTest + @ValueSource(strings = {"classic", "streams"}) + public void shouldBeAbleToPerformMultipleTransactions(final String groupProtocol) throws Exception { final StreamsBuilder builder = new StreamsBuilder(); builder.stream(SINGLE_PARTITION_INPUT_TOPIC).to(SINGLE_PARTITION_OUTPUT_TOPIC); @@ -337,6 +358,7 @@ public class EosIntegrationTest { properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L); properties.put(ConsumerConfig.METADATA_MAX_AGE_CONFIG, "1000"); properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + properties.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, groupProtocol); final Properties config = StreamsTestUtils.getStreamsConfig( applicationId, @@ -374,8 +396,8 @@ public class EosIntegrationTest { } @ParameterizedTest - @ValueSource(booleans = {true, false}) - public void shouldNotViolateEosIfOneTaskFails(final boolean processingThreadsEnabled) throws Exception { + @MethodSource("groupProtocolAndProcessingThreadsParameters") + public void shouldNotViolateEosIfOneTaskFails(final String groupProtocol, final boolean processingThreadsEnabled) throws Exception { // this test writes 10 + 5 + 5 records per partition (running with 2 partitions) // the app is supposed to copy all 40 records into the output topic @@ -386,7 +408,7 @@ public class EosIntegrationTest { // -> the failure only kills one thread // after fail over, we should read 40 committed records (even if 50 record got written) - try (final KafkaStreams streams = getKafkaStreams("dummy", false, "appDir", 2, processingThreadsEnabled)) { + try (final KafkaStreams streams = getKafkaStreams("dummy", false, "appDir", 2, groupProtocol, processingThreadsEnabled)) { startApplicationAndWaitUntilRunning(streams); final List> committedDataBeforeFailure = prepareData(0L, 10L, 0L, 1L); @@ -476,8 +498,8 @@ public class EosIntegrationTest { } @ParameterizedTest - @ValueSource(booleans = {true, false}) - public void shouldNotViolateEosIfOneTaskFailsWithState(final boolean processingThreadsEnabled) throws Exception { + @MethodSource("groupProtocolAndProcessingThreadsParameters") + public void shouldNotViolateEosIfOneTaskFailsWithState(final String groupProtocol, final boolean processingThreadsEnabled) throws Exception { // this test updates a store with 10 + 5 + 5 records per partition (running with 2 partitions) // the app is supposed to emit all 40 update records into the output topic @@ -493,7 +515,7 @@ public class EosIntegrationTest { // We need more processing time under "with state" situation, so increasing the max.poll.interval.ms // to avoid unexpected rebalance during test, which will cause unexpected fail over triggered - try (final KafkaStreams streams = getKafkaStreams("dummy", true, "appDir", 2, processingThreadsEnabled)) { + try (final KafkaStreams streams = getKafkaStreams("dummy", true, "appDir", 2, groupProtocol, processingThreadsEnabled)) { startApplicationAndWaitUntilRunning(streams); final List> committedDataBeforeFailure = prepareData(0L, 10L, 0L, 1L); @@ -594,8 +616,8 @@ public class EosIntegrationTest { } @ParameterizedTest - @ValueSource(booleans = {true, false}) - public void shouldNotViolateEosIfOneTaskGetsFencedUsingIsolatedAppInstances(final boolean processingThreadsEnabled) throws Exception { + @MethodSource("groupProtocolAndProcessingThreadsParameters") + public void shouldNotViolateEosIfOneTaskGetsFencedUsingIsolatedAppInstances(final String groupProtocol, final boolean processingThreadsEnabled) throws Exception { // this test writes 10 + 5 + 5 + 10 records per partition (running with 2 partitions) // the app is supposed to copy all 60 records into the output topic // @@ -607,10 +629,9 @@ public class EosIntegrationTest { // // afterward, the "stalling" thread resumes, and another rebalance should get triggered // we write the remaining 20 records and verify to read 60 result records - try ( - final KafkaStreams streams1 = getKafkaStreams("streams1", false, "appDir1", 1, processingThreadsEnabled); - final KafkaStreams streams2 = getKafkaStreams("streams2", false, "appDir2", 1, processingThreadsEnabled) + final KafkaStreams streams1 = getKafkaStreams("streams1", false, "appDir1", 1, groupProtocol, processingThreadsEnabled); + final KafkaStreams streams2 = getKafkaStreams("streams2", false, "appDir2", 1, groupProtocol, processingThreadsEnabled) ) { startApplicationAndWaitUntilRunning(streams1); startApplicationAndWaitUntilRunning(streams2); @@ -667,13 +688,10 @@ public class EosIntegrationTest { "Expected a host to start stalling" ); final String observedStallingHost = stallingHost.get(); - final KafkaStreams stallingInstance; final KafkaStreams remainingInstance; if ("streams1".equals(observedStallingHost)) { - stallingInstance = streams1; remainingInstance = streams2; } else if ("streams2".equals(observedStallingHost)) { - stallingInstance = streams2; remainingInstance = streams1; } else { throw new IllegalArgumentException("unexpected host name: " + observedStallingHost); @@ -683,8 +701,7 @@ public class EosIntegrationTest { // the assignment is. We only really care that the remaining instance only sees one host // that owns both partitions. waitForCondition( - () -> stallingInstance.metadataForAllStreamsClients().size() == 2 - && remainingInstance.metadataForAllStreamsClients().size() == 1 + () -> remainingInstance.metadataForAllStreamsClients().size() == 1 && remainingInstance.metadataForAllStreamsClients().iterator().next().topicPartitions().size() == 2, MAX_WAIT_TIME_MS, () -> "Should have rebalanced.\n" + @@ -755,12 +772,12 @@ public class EosIntegrationTest { } @ParameterizedTest - @ValueSource(booleans = {true, false}) - public void shouldWriteLatestOffsetsToCheckpointOnShutdown(final boolean processingThreadsEnabled) throws Exception { + @MethodSource("groupProtocolAndProcessingThreadsParameters") + public void shouldWriteLatestOffsetsToCheckpointOnShutdown(final String groupProtocol, final boolean processingThreadsEnabled) throws Exception { final List> writtenData = prepareData(0L, 10, 0L, 1L); final List> expectedResult = computeExpectedResult(writtenData); - try (final KafkaStreams streams = getKafkaStreams("streams", true, "appDir", 1, processingThreadsEnabled)) { + try (final KafkaStreams streams = getKafkaStreams("streams", true, "appDir", 1, groupProtocol, processingThreadsEnabled)) { writeInputData(writtenData); startApplicationAndWaitUntilRunning(streams); @@ -787,9 +804,9 @@ public class EosIntegrationTest { } @ParameterizedTest - @ValueSource(booleans = {true, false}) + @MethodSource("groupProtocolAndProcessingThreadsParameters") public void shouldCheckpointRestoredOffsetsWhenClosingCleanDuringRestoring( - final boolean processingThreadsEnabled) throws Exception { + final String groupProtocol, final boolean processingThreadsEnabled) throws Exception { final Properties streamsConfiguration = new Properties(); streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId); @@ -801,6 +818,7 @@ public class EosIntegrationTest { streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory(applicationId).getPath()); streamsConfiguration.put(InternalConfig.PROCESSING_THREADS_ENABLED, processingThreadsEnabled); + streamsConfiguration.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, groupProtocol); streamsConfiguration.put(StreamsConfig.restoreConsumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), 100); final String stateStoreName = "stateStore"; @@ -934,8 +952,13 @@ public class EosIntegrationTest { static final AtomicReference TASK_WITH_DATA = new AtomicReference<>(); static final AtomicBoolean DID_REVOKE_IDLE_TASK = new AtomicBoolean(false); - @Test - public void shouldNotCommitActiveTasksWithPendingInputIfRevokedTaskDidNotMakeProgress() throws Exception { + @ParameterizedTest + @ValueSource(strings = {"classic", "streams"}) + public void shouldNotCommitActiveTasksWithPendingInputIfRevokedTaskDidNotMakeProgress(final String groupProtocol) throws Exception { + // Reset static variables to ensure test isolation + TASK_WITH_DATA.set(null); + DID_REVOKE_IDLE_TASK.set(false); + final AtomicBoolean requestCommit = new AtomicBoolean(false); final StreamsBuilder builder = new StreamsBuilder(); @@ -970,6 +993,7 @@ public class EosIntegrationTest { properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG), MAX_POLL_INTERVAL_MS); properties.put(StreamsConfig.producerPrefix(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG), Integer.MAX_VALUE); properties.put(StreamsConfig.TASK_ASSIGNOR_CLASS_CONFIG, TestTaskAssignor.class.getName()); + properties.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, groupProtocol); final Properties config = StreamsTestUtils.getStreamsConfig( applicationId, @@ -1003,9 +1027,9 @@ public class EosIntegrationTest { // add second thread, to trigger rebalance // expect idle task to get revoked -- this should not trigger a TX commit streams.addStreamThread(); - - waitForCondition(DID_REVOKE_IDLE_TASK::get, "Idle Task was not revoked as expected."); - + if (groupProtocol.equals("classic")) { + waitForCondition(DID_REVOKE_IDLE_TASK::get, "Idle Task was not revoked as expected."); + } // best-effort sanity check (might pass and not detect issue in slow environments) try { readResult(SINGLE_PARTITION_OUTPUT_TOPIC, 1, "consumer", 10_000L); @@ -1104,6 +1128,7 @@ public class EosIntegrationTest { final boolean withState, final String appDir, final int numberOfStreamsThreads, + final String groupProtocol, final boolean processingThreadsEnabled) { commitRequested = new AtomicInteger(0); errorInjected = new AtomicBoolean(false); @@ -1212,6 +1237,7 @@ public class EosIntegrationTest { properties.put(StreamsConfig.STATE_DIR_CONFIG, stateTmpDir + appDir); properties.put(StreamsConfig.APPLICATION_SERVER_CONFIG, dummyHostName + ":2142"); properties.put(InternalConfig.PROCESSING_THREADS_ENABLED, processingThreadsEnabled); + properties.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, groupProtocol); final Properties config = StreamsTestUtils.getStreamsConfig( applicationId,