From 4b86ff9d00eaa1f93a99a60b6c321abbabbdb636 Mon Sep 17 00:00:00 2001 From: "Matthias J. Sax" Date: Mon, 31 Mar 2025 10:56:58 -0700 Subject: [PATCH] KAFKA-18943: Update EosIntegrationTest for EOSv1 (#19312) After cherry-picking https://github.com/apache/kafka/commit/2181ddbb039ff688f5ff41784d943cb579f7575c, we realized that the newly added test does not cover EOSv1. This PR closes this testing gap. Reviewers: Lucas Brutschy --- .../integration/EosIntegrationTest.java | 34 +++++++++++++++---- 1 file changed, 27 insertions(+), 7 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java index 584fcb4a25a..c4fc8c434e8 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java @@ -73,7 +73,6 @@ import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Tag; -import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.CsvSource; @@ -1001,12 +1000,25 @@ public class EosIntegrationTest { } - private final AtomicReference transactionalProducerId = new AtomicReference<>(); + private final AtomicReference> transactionalProducerIdEosV1 = new AtomicReference<>(); + private final AtomicReference transactionalProducerIdEosV2 = new AtomicReference<>(); private class TestClientSupplier extends DefaultKafkaClientSupplier { @Override public Producer getProducer(final Map config) { - transactionalProducerId.compareAndSet(null, (String) config.get(ProducerConfig.TRANSACTIONAL_ID_CONFIG)); + final String transactionalId = (String) config.get(ProducerConfig.TRANSACTIONAL_ID_CONFIG); + + if (transactionalId.endsWith("-0_0") || transactionalId.endsWith("-0_1")) { + Map transactionalIds = transactionalProducerIdEosV1.get(); + if (transactionalIds == null) { + transactionalIds = new HashMap<>(); + transactionalProducerIdEosV1.set(transactionalIds); + } + + transactionalIds.put(transactionalId.substring(transactionalId.length() - 3), transactionalId); + } else { + transactionalProducerIdEosV2.compareAndSet(null, transactionalId); + } return new KafkaProducer<>(config, new ByteArraySerializer(), new ByteArraySerializer()); } @@ -1015,8 +1027,12 @@ public class EosIntegrationTest { static final AtomicReference TASK_WITH_DATA = new AtomicReference<>(); static final AtomicBoolean DID_REVOKE_IDLE_TASK = new AtomicBoolean(false); - @Test - public void shouldNotCommitActiveTasksWithPendingInputIfRevokedTaskDidNotMakeProgress() throws Exception { + @ParameterizedTest + @ValueSource(strings = {StreamsConfig.EXACTLY_ONCE, StreamsConfig.EXACTLY_ONCE_V2}) + public void shouldNotCommitActiveTasksWithPendingInputIfRevokedTaskDidNotMakeProgress(final String eosConfig) throws Exception { + TASK_WITH_DATA.set(null); + DID_REVOKE_IDLE_TASK.set(false); + final AtomicBoolean requestCommit = new AtomicBoolean(false); final StreamsBuilder builder = new StreamsBuilder(); @@ -1041,7 +1057,7 @@ public class EosIntegrationTest { .to(SINGLE_PARTITION_OUTPUT_TOPIC); final Properties properties = new Properties(); - properties.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2); + properties.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, eosConfig); properties.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0); properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, Integer.MAX_VALUE); properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), 1); @@ -1104,7 +1120,11 @@ public class EosIntegrationTest { final List> inputDataTask0Fencing = Collections.singletonList(KeyValue.pair(4L, -3L)); final Properties producerConfigs = new Properties(); - producerConfigs.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalProducerId.get()); + if (eosConfig.equals(StreamsConfig.EXACTLY_ONCE)) { + producerConfigs.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalProducerIdEosV1.get().get(TASK_WITH_DATA.get().toString())); + } else { + producerConfigs.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalProducerIdEosV2.get()); + } IntegrationTestUtils.produceKeyValuesSynchronously( MULTI_PARTITION_INPUT_TOPIC, inputDataTask0Fencing,