KAFKA-18943: Update EosIntegrationTest for EOSv1 (#19312)

After cherry-picking
2181ddbb03,
we realized that the newly added test does not cover EOSv1. This PR closes this testing gap.

Reviewers: Lucas Brutschy <lbrutschy@confluent.io>
This commit is contained in:
Matthias J. Sax 2025-03-31 10:56:58 -07:00 committed by GitHub
parent 90ee2d2b34
commit 4b86ff9d00
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 27 additions and 7 deletions

View File

@ -73,7 +73,6 @@ import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout; import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource; import org.junit.jupiter.params.provider.CsvSource;
@ -1001,12 +1000,25 @@ public class EosIntegrationTest {
} }
private final AtomicReference<String> transactionalProducerId = new AtomicReference<>(); private final AtomicReference<Map<String, String>> transactionalProducerIdEosV1 = new AtomicReference<>();
private final AtomicReference<String> transactionalProducerIdEosV2 = new AtomicReference<>();
private class TestClientSupplier extends DefaultKafkaClientSupplier { private class TestClientSupplier extends DefaultKafkaClientSupplier {
@Override @Override
public Producer<byte[], byte[]> getProducer(final Map<String, Object> config) { public Producer<byte[], byte[]> getProducer(final Map<String, Object> config) {
transactionalProducerId.compareAndSet(null, (String) config.get(ProducerConfig.TRANSACTIONAL_ID_CONFIG)); final String transactionalId = (String) config.get(ProducerConfig.TRANSACTIONAL_ID_CONFIG);
if (transactionalId.endsWith("-0_0") || transactionalId.endsWith("-0_1")) {
Map<String, String> transactionalIds = transactionalProducerIdEosV1.get();
if (transactionalIds == null) {
transactionalIds = new HashMap<>();
transactionalProducerIdEosV1.set(transactionalIds);
}
transactionalIds.put(transactionalId.substring(transactionalId.length() - 3), transactionalId);
} else {
transactionalProducerIdEosV2.compareAndSet(null, transactionalId);
}
return new KafkaProducer<>(config, new ByteArraySerializer(), new ByteArraySerializer()); return new KafkaProducer<>(config, new ByteArraySerializer(), new ByteArraySerializer());
} }
@ -1015,8 +1027,12 @@ public class EosIntegrationTest {
static final AtomicReference<TaskId> TASK_WITH_DATA = new AtomicReference<>(); static final AtomicReference<TaskId> TASK_WITH_DATA = new AtomicReference<>();
static final AtomicBoolean DID_REVOKE_IDLE_TASK = new AtomicBoolean(false); static final AtomicBoolean DID_REVOKE_IDLE_TASK = new AtomicBoolean(false);
@Test @ParameterizedTest
public void shouldNotCommitActiveTasksWithPendingInputIfRevokedTaskDidNotMakeProgress() throws Exception { @ValueSource(strings = {StreamsConfig.EXACTLY_ONCE, StreamsConfig.EXACTLY_ONCE_V2})
public void shouldNotCommitActiveTasksWithPendingInputIfRevokedTaskDidNotMakeProgress(final String eosConfig) throws Exception {
TASK_WITH_DATA.set(null);
DID_REVOKE_IDLE_TASK.set(false);
final AtomicBoolean requestCommit = new AtomicBoolean(false); final AtomicBoolean requestCommit = new AtomicBoolean(false);
final StreamsBuilder builder = new StreamsBuilder(); final StreamsBuilder builder = new StreamsBuilder();
@ -1041,7 +1057,7 @@ public class EosIntegrationTest {
.to(SINGLE_PARTITION_OUTPUT_TOPIC); .to(SINGLE_PARTITION_OUTPUT_TOPIC);
final Properties properties = new Properties(); final Properties properties = new Properties();
properties.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2); properties.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, eosConfig);
properties.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0); properties.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, Integer.MAX_VALUE); properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, Integer.MAX_VALUE);
properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), 1); properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), 1);
@ -1104,7 +1120,11 @@ public class EosIntegrationTest {
final List<KeyValue<Long, Long>> inputDataTask0Fencing = Collections.singletonList(KeyValue.pair(4L, -3L)); final List<KeyValue<Long, Long>> inputDataTask0Fencing = Collections.singletonList(KeyValue.pair(4L, -3L));
final Properties producerConfigs = new Properties(); final Properties producerConfigs = new Properties();
producerConfigs.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalProducerId.get()); if (eosConfig.equals(StreamsConfig.EXACTLY_ONCE)) {
producerConfigs.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalProducerIdEosV1.get().get(TASK_WITH_DATA.get().toString()));
} else {
producerConfigs.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalProducerIdEosV2.get());
}
IntegrationTestUtils.produceKeyValuesSynchronously( IntegrationTestUtils.produceKeyValuesSynchronously(
MULTI_PARTITION_INPUT_TOPIC, MULTI_PARTITION_INPUT_TOPIC,
inputDataTask0Fencing, inputDataTask0Fencing,