KAFKA-18038: fix flakey test StreamThreadTest.shouldLogAndRecordSkippedRecordsForInvalidTimestamps (#17889)

With KAFKA-17872, we changed some internals that effects the conditions
of this test, introducing a race condition when the expected log
messages are printed.

This PR adds additional wait-conditions to the test to close the race
condition.

Reviewers: Bill Bejeck <bill@confluent.io>
This commit is contained in:
Matthias J. Sax 2024-11-21 11:42:28 -08:00 committed by GitHub
parent 1c998f8ef3
commit 2519e4af0c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 29 additions and 2 deletions

View File

@ -24,6 +24,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.InvalidOffsetException;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.clients.consumer.internals.MockRebalanceListener;
import org.apache.kafka.clients.producer.MockProducer;
@ -141,6 +142,7 @@ import static org.apache.kafka.streams.processor.internals.ClientUtils.adminClie
import static org.apache.kafka.streams.processor.internals.StateManagerUtil.CHECKPOINT_FILE_NAME;
import static org.apache.kafka.test.StreamsTestUtils.TaskBuilder.statelessTask;
import static org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS;
import static org.apache.kafka.test.TestUtils.waitForCondition;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.not;
import static org.hamcrest.CoreMatchers.startsWith;
@ -262,7 +264,8 @@ public class StreamThreadTest {
mkEntry(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.ByteArraySerde.class.getName()),
mkEntry(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.ByteArraySerde.class.getName()),
mkEntry(InternalConfig.STATE_UPDATER_ENABLED, Boolean.toString(stateUpdaterEnabled)),
mkEntry(InternalConfig.PROCESSING_THREADS_ENABLED, Boolean.toString(processingThreadsEnabled))
mkEntry(InternalConfig.PROCESSING_THREADS_ENABLED, Boolean.toString(processingThreadsEnabled)),
mkEntry(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, "1")
));
}
@ -2979,7 +2982,11 @@ public class StreamThreadTest {
@ParameterizedTest
@MethodSource("data")
public void shouldLogAndRecordSkippedRecordsForInvalidTimestamps(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) {
public void shouldLogAndRecordSkippedRecordsForInvalidTimestamps(
final boolean stateUpdaterEnabled,
final boolean processingThreadsEnabled
) throws Exception {
internalTopologyBuilder.addSource(null, "source1", null, null, null, topic1);
final Properties properties = configProps(false, stateUpdaterEnabled, processingThreadsEnabled);
@ -3013,12 +3020,20 @@ public class StreamThreadTest {
addRecord(mockConsumer, ++offset);
runOnce(processingThreadsEnabled);
if (processingThreadsEnabled) {
waitForCommit(mockConsumer, offset + 1);
}
addRecord(mockConsumer, ++offset);
addRecord(mockConsumer, ++offset);
addRecord(mockConsumer, ++offset);
addRecord(mockConsumer, ++offset);
runOnce(processingThreadsEnabled);
if (processingThreadsEnabled) {
waitForCommit(mockConsumer, offset + 1);
}
addRecord(mockConsumer, ++offset, 1L);
addRecord(mockConsumer, ++offset, 1L);
runOnce(processingThreadsEnabled);
@ -3059,6 +3074,18 @@ public class StreamThreadTest {
}
}
private void waitForCommit(final MockConsumer<byte[], byte[]> mockConsumer, final long expectedOffset) throws Exception {
waitForCondition(() -> {
mockTime.sleep(10L);
runOnce(true);
final Map<TopicPartition, OffsetAndMetadata> committed = mockConsumer.committed(Collections.singleton(t1p1));
return !committed.isEmpty() && committed.get(t1p1).offset() == expectedOffset;
},
"Never committed offset " + expectedOffset
);
}
@ParameterizedTest
@MethodSource("data")
public void shouldTransmitTaskManagerMetrics(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) {