rebase to fix merge conflict (#17702)

Fixes an issue with the TTD in the specific case where users don't specify an initial time for the driver and also don't specify a start timestamp for the TestInputTopic, then pipe input records without timestamps. This combination results in a slight mismatch in the expected timestamps for the piped records, which can be noticeable when writing tests with very small time deltas.

The problem is that, while both the TTD and the TestInputTopic will be initialized to the "current time" when not otherwise specified, it's possible for some milliseconds to have passed between the creation of the TTD and the creation of the TestInputTopic. This can result in a TestInputTopic getting a start timestamp that's several ms larger than the driver's time, and ultimately causing the piped input records to have timestamps slightly in the future relative to the driver.

In practice even those who hit this issue might not notice it if they aren't manipulating time in their tests, or are advancing time by enough to negate the several-milliseconds of difference. However we noticed a test fail due to this because we were testing a ttl-based processor and had advanced the driver time by only 1 millisecond past the ttl. The piped record should have been expired, but because it's timestamp was a few milliseconds longer than the driver's start time, this test ended up failing.

Reviewers: Matthias Sax <mjsax@apache.org>, Bruno Cadonna <cadonna@apache.org>, Lucas Brutschy < lbrutschy@confluent.io>
This commit is contained in:
A. Sophie Blee-Goldman 2024-11-08 19:04:07 -08:00 committed by GitHub
parent f7d2a8cd52
commit 32d9dec9e1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 14 additions and 23 deletions

View File

@ -239,7 +239,7 @@ public class StreamsProducer {
* @throws IllegalStateException if EOS is disabled
* @throws TaskMigratedException
*/
protected void commitTransaction(final Map<TopicPartition, OffsetAndMetadata> offsets,
public void commitTransaction(final Map<TopicPartition, OffsetAndMetadata> offsets,
final ConsumerGroupMetadata consumerGroupMetadata) {
if (!eosEnabled()) {
throw new IllegalStateException(formatException("Exactly-once is not enabled"));

View File

@ -29,7 +29,6 @@ import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
@ -44,7 +43,6 @@ import org.apache.kafka.streams.TopologyConfig.TaskConfig;
import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler;
import org.apache.kafka.streams.errors.TopologyException;
import org.apache.kafka.streams.internals.StreamsConfigUtils;
import org.apache.kafka.streams.internals.StreamsConfigUtils.ProcessingMode;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.PunctuationType;
@ -233,7 +231,7 @@ public class TopologyTestDriver implements Closeable {
private final MockConsumer<byte[], byte[]> consumer;
private final MockProducer<byte[], byte[]> producer;
private final TestDriverProducer testDriverProducer;
private final StreamsProducer testDriverProducer;
private final Map<String, TopicPartition> partitionsByInputTopic = new HashMap<>();
private final Map<String, TopicPartition> globalPartitionsByInputTopic = new HashMap<>();
@ -345,7 +343,8 @@ public class TopologyTestDriver implements Closeable {
return Collections.singletonList(new PartitionInfo(topic, PARTITION_ID, null, null, null));
}
};
testDriverProducer = new TestDriverProducer(
testDriverProducer = new StreamsProducer(
producer,
StreamsConfigUtils.processingMode(streamsConfig),
mockWallClockTime,
@ -739,7 +738,14 @@ public class TopologyTestDriver implements Closeable {
public final <K, V> TestInputTopic<K, V> createInputTopic(final String topicName,
final Serializer<K> keySerializer,
final Serializer<V> valueSerializer) {
return new TestInputTopic<>(this, topicName, keySerializer, valueSerializer, Instant.now(), Duration.ZERO);
return new TestInputTopic<>(
this,
topicName,
keySerializer,
valueSerializer,
Instant.ofEpochMilli(mockWallClockTime.milliseconds()),
Duration.ZERO
);
}
/**
@ -986,7 +992,7 @@ public class TopologyTestDriver implements Closeable {
public <K, V> KeyValueStore<K, V> getKeyValueStore(final String name) {
final StateStore store = getStateStore(name, false);
if (store instanceof TimestampedKeyValueStore) {
log.info("Method #getTimestampedKeyValueStore() should be used to access a TimestampedKeyValueStore.");
log.warn("Method #getTimestampedKeyValueStore() should be used to access a TimestampedKeyValueStore.");
return new KeyValueStoreFacade<>((TimestampedKeyValueStore<K, V>) store);
}
return store instanceof KeyValueStore ? (KeyValueStore<K, V>) store : null;
@ -1064,7 +1070,7 @@ public class TopologyTestDriver implements Closeable {
public <K, V> WindowStore<K, V> getWindowStore(final String name) {
final StateStore store = getStateStore(name, false);
if (store instanceof TimestampedWindowStore) {
log.info("Method #getTimestampedWindowStore() should be used to access a TimestampedWindowStore.");
log.warn("Method #getTimestampedWindowStore() should be used to access a TimestampedWindowStore.");
return new WindowStoreFacade<>((TimestampedWindowStore<K, V>) store);
}
return store instanceof WindowStore ? (WindowStore<K, V>) store : null;
@ -1352,19 +1358,4 @@ public class TopologyTestDriver implements Closeable {
}
}
private static class TestDriverProducer extends StreamsProducer {
public TestDriverProducer(final Producer<byte[], byte[]> producer,
final ProcessingMode processingMode,
final Time time,
final LogContext logContext) {
super(producer, processingMode, time, logContext);
}
@Override
public void commitTransaction(final Map<TopicPartition, OffsetAndMetadata> offsets,
final ConsumerGroupMetadata consumerGroupMetadata) throws ProducerFencedException {
super.commitTransaction(offsets, consumerGroupMetadata);
}
}
}