diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java index 584fcb4a25a..c4fc8c434e8 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java @@ -73,7 +73,6 @@ import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Tag; -import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.CsvSource; @@ -1001,12 +1000,25 @@ public class EosIntegrationTest { } - private final AtomicReference transactionalProducerId = new AtomicReference<>(); + private final AtomicReference> transactionalProducerIdEosV1 = new AtomicReference<>(); + private final AtomicReference transactionalProducerIdEosV2 = new AtomicReference<>(); private class TestClientSupplier extends DefaultKafkaClientSupplier { @Override public Producer getProducer(final Map config) { - transactionalProducerId.compareAndSet(null, (String) config.get(ProducerConfig.TRANSACTIONAL_ID_CONFIG)); + final String transactionalId = (String) config.get(ProducerConfig.TRANSACTIONAL_ID_CONFIG); + + if (transactionalId.endsWith("-0_0") || transactionalId.endsWith("-0_1")) { + Map transactionalIds = transactionalProducerIdEosV1.get(); + if (transactionalIds == null) { + transactionalIds = new HashMap<>(); + transactionalProducerIdEosV1.set(transactionalIds); + } + + transactionalIds.put(transactionalId.substring(transactionalId.length() - 3), transactionalId); + } else { + transactionalProducerIdEosV2.compareAndSet(null, transactionalId); + } return new KafkaProducer<>(config, new ByteArraySerializer(), new ByteArraySerializer()); } @@ -1015,8 +1027,12 @@ public class EosIntegrationTest { static final AtomicReference TASK_WITH_DATA = new AtomicReference<>(); static final AtomicBoolean DID_REVOKE_IDLE_TASK = new AtomicBoolean(false); - @Test - public void shouldNotCommitActiveTasksWithPendingInputIfRevokedTaskDidNotMakeProgress() throws Exception { + @ParameterizedTest + @ValueSource(strings = {StreamsConfig.EXACTLY_ONCE, StreamsConfig.EXACTLY_ONCE_V2}) + public void shouldNotCommitActiveTasksWithPendingInputIfRevokedTaskDidNotMakeProgress(final String eosConfig) throws Exception { + TASK_WITH_DATA.set(null); + DID_REVOKE_IDLE_TASK.set(false); + final AtomicBoolean requestCommit = new AtomicBoolean(false); final StreamsBuilder builder = new StreamsBuilder(); @@ -1041,7 +1057,7 @@ public class EosIntegrationTest { .to(SINGLE_PARTITION_OUTPUT_TOPIC); final Properties properties = new Properties(); - properties.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2); + properties.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, eosConfig); properties.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0); properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, Integer.MAX_VALUE); properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), 1); @@ -1104,7 +1120,11 @@ public class EosIntegrationTest { final List> inputDataTask0Fencing = Collections.singletonList(KeyValue.pair(4L, -3L)); final Properties producerConfigs = new Properties(); - producerConfigs.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalProducerId.get()); + if (eosConfig.equals(StreamsConfig.EXACTLY_ONCE)) { + producerConfigs.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalProducerIdEosV1.get().get(TASK_WITH_DATA.get().toString())); + } else { + producerConfigs.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalProducerIdEosV2.get()); + } IntegrationTestUtils.produceKeyValuesSynchronously( MULTI_PARTITION_INPUT_TOPIC, inputDataTask0Fencing,