From 661e41c92f9cee4ea97835de40ecf328330465f5 Mon Sep 17 00:00:00 2001 From: Justine Olshan Date: Thu, 22 Feb 2024 17:10:11 -0800 Subject: [PATCH] KAFKA-16302: Remove check for log message that is no longer present (fix builds) (#15422) a3528a3 removed this log but not the test asserting it. Builds are currently red because for some reason these tests can't retry. We should address that as a followup. Reviewers: Greg Harris , Matthias J. Sax --- ...lSchemaRocksDBSegmentedBytesStoreTest.java | 21 +++++++------------ ...bstractRocksDBSegmentedBytesStoreTest.java | 21 +++++++------------ .../AbstractSessionBytesStoreTest.java | 19 +++++++---------- .../AbstractWindowBytesStoreTest.java | 17 ++++++--------- 4 files changed, 27 insertions(+), 51 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStoreTest.java index c81e57589a0..c7af25726a4 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStoreTest.java @@ -45,7 +45,6 @@ import org.apache.kafka.streams.processor.internals.MockStreamsMetrics; import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; import org.apache.kafka.streams.processor.internals.Task.TaskType; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; -import org.apache.kafka.common.utils.LogCaptureAppender; import org.apache.kafka.streams.query.Position; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.StateSerdes; @@ -83,7 +82,6 @@ import static org.apache.kafka.common.utils.Utils.mkEntry; import static org.apache.kafka.common.utils.Utils.mkMap; import static org.apache.kafka.streams.state.internals.WindowKeySchema.timeWindowForSize; import static org.hamcrest.CoreMatchers.equalTo; -import static org.hamcrest.CoreMatchers.hasItem; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.CoreMatchers.nullValue; import static org.hamcrest.MatcherAssert.assertThat; @@ -1579,7 +1577,7 @@ public abstract class AbstractDualSchemaRocksDBSegmentedBytesStoreTest bytesStore = getBytesStore(); final InternalMockProcessorContext context = new InternalMockProcessorContext( @@ -1590,18 +1588,13 @@ public abstract class AbstractDualSchemaRocksDBSegmentedBytesStoreTest("dummy", nextSegmentWindow)), serializeValue(0)); + // write a record to advance stream time, with a high enough timestamp + // that the subsequent record in windows[0] will already be expired. + bytesStore.put(serializeKey(new Windowed<>("dummy", nextSegmentWindow)), serializeValue(0)); - final Bytes key = serializeKey(new Windowed<>("a", windows[0])); - final byte[] value = serializeValue(5); - bytesStore.put(key, value); - - final List messages = appender.getMessages(); - assertThat(messages, hasItem("Skipping record for expired segment.")); - } + final Bytes key = serializeKey(new Windowed<>("a", windows[0])); + final byte[] value = serializeValue(5); + bytesStore.put(key, value); final Map metrics = context.metrics().metrics(); final String threadId = Thread.currentThread().getName(); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java index 852f6fd3ceb..a20bebd0081 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java @@ -27,7 +27,6 @@ import org.apache.kafka.common.record.RecordBatch; import org.apache.kafka.common.record.TimestampType; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.utils.Bytes; -import org.apache.kafka.common.utils.LogCaptureAppender; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.SystemTime; @@ -81,7 +80,6 @@ import static org.apache.kafka.common.utils.Utils.mkEntry; import static org.apache.kafka.common.utils.Utils.mkMap; import static org.apache.kafka.streams.state.internals.WindowKeySchema.timeWindowForSize; import static org.hamcrest.CoreMatchers.equalTo; -import static org.hamcrest.CoreMatchers.hasItem; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.hasEntry; @@ -784,7 +782,7 @@ public abstract class AbstractRocksDBSegmentedBytesStoreTest @Test - public void shouldLogAndMeasureExpiredRecords() { + public void shouldMeasureExpiredRecords() { final Properties streamsConfig = StreamsTestUtils.getStreamsConfig(); final AbstractRocksDBSegmentedBytesStore bytesStore = getBytesStore(); final InternalMockProcessorContext context = new InternalMockProcessorContext( @@ -795,18 +793,13 @@ public abstract class AbstractRocksDBSegmentedBytesStoreTest context.setSystemTimeMs(time.milliseconds()); bytesStore.init((StateStoreContext) context, bytesStore); - try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister()) { - // write a record to advance stream time, with a high enough timestamp - // that the subsequent record in windows[0] will already be expired. - bytesStore.put(serializeKey(new Windowed<>("dummy", nextSegmentWindow)), serializeValue(0)); + // write a record to advance stream time, with a high enough timestamp + // that the subsequent record in windows[0] will already be expired. + bytesStore.put(serializeKey(new Windowed<>("dummy", nextSegmentWindow)), serializeValue(0)); - final Bytes key = serializeKey(new Windowed<>("a", windows[0])); - final byte[] value = serializeValue(5); - bytesStore.put(key, value); - - final List messages = appender.getMessages(); - assertThat(messages, hasItem("Skipping record for expired segment.")); - } + final Bytes key = serializeKey(new Windowed<>("a", windows[0])); + final byte[] value = serializeValue(5); + bytesStore.put(key, value); final Map metrics = context.metrics().metrics(); final String threadId = Thread.currentThread().getName(); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractSessionBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractSessionBytesStoreTest.java index 56c77ce4fe1..2d75f751344 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractSessionBytesStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractSessionBytesStoreTest.java @@ -793,7 +793,7 @@ public abstract class AbstractSessionBytesStoreTest { } @Test - public void shouldLogAndMeasureExpiredRecords() { + public void shouldMeasureExpiredRecords() { final Properties streamsConfig = StreamsTestUtils.getStreamsConfig(); final SessionStore sessionStore = buildSessionStore(RETENTION_PERIOD, Serdes.String(), Serdes.Long()); final InternalMockProcessorContext context = new InternalMockProcessorContext( @@ -806,18 +806,13 @@ public abstract class AbstractSessionBytesStoreTest { context.setSystemTimeMs(time.milliseconds()); sessionStore.init((StateStoreContext) context, sessionStore); - try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister()) { - // Advance stream time by inserting record with large enough timestamp that records with timestamp 0 are expired - // Note that rocksdb will only expire segments at a time (where segment interval = 60,000 for this retention period) - sessionStore.put(new Windowed<>("initial record", new SessionWindow(0, 2 * SEGMENT_INTERVAL)), 0L); + // Advance stream time by inserting record with large enough timestamp that records with timestamp 0 are expired + // Note that rocksdb will only expire segments at a time (where segment interval = 60,000 for this retention period) + sessionStore.put(new Windowed<>("initial record", new SessionWindow(0, 2 * SEGMENT_INTERVAL)), 0L); - // Try inserting a record with timestamp 0 -- should be dropped - sessionStore.put(new Windowed<>("late record", new SessionWindow(0, 0)), 0L); - sessionStore.put(new Windowed<>("another on-time record", new SessionWindow(0, 2 * SEGMENT_INTERVAL)), 0L); - - final List messages = appender.getMessages(); - assertThat(messages, hasItem("Skipping record for expired segment.")); - } + // Try inserting a record with timestamp 0 -- should be dropped + sessionStore.put(new Windowed<>("late record", new SessionWindow(0, 0)), 0L); + sessionStore.put(new Windowed<>("another on-time record", new SessionWindow(0, 2 * SEGMENT_INTERVAL)), 0L); final Map metrics = context.metrics().metrics(); final String threadId = Thread.currentThread().getName(); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractWindowBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractWindowBytesStoreTest.java index d29c6bf88d4..688efab857d 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractWindowBytesStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractWindowBytesStoreTest.java @@ -984,7 +984,7 @@ public abstract class AbstractWindowBytesStoreTest { } @Test - public void shouldLogAndMeasureExpiredRecords() { + public void shouldMeasureExpiredRecords() { final Properties streamsConfig = StreamsTestUtils.getStreamsConfig(); final WindowStore windowStore = buildWindowStore(RETENTION_PERIOD, WINDOW_SIZE, false, Serdes.Integer(), Serdes.String()); @@ -998,17 +998,12 @@ public abstract class AbstractWindowBytesStoreTest { context.setTime(1L); windowStore.init((StateStoreContext) context, windowStore); - try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister()) { - // Advance stream time by inserting record with large enough timestamp that records with timestamp 0 are expired - windowStore.put(1, "initial record", 2 * RETENTION_PERIOD); + // Advance stream time by inserting record with large enough timestamp that records with timestamp 0 are expired + windowStore.put(1, "initial record", 2 * RETENTION_PERIOD); - // Try inserting a record with timestamp 0 -- should be dropped - windowStore.put(1, "late record", 0L); - windowStore.put(1, "another on-time record", RETENTION_PERIOD + 1); - - final List messages = appender.getMessages(); - assertThat(messages, hasItem("Skipping record for expired segment.")); - } + // Try inserting a record with timestamp 0 -- should be dropped + windowStore.put(1, "late record", 0L); + windowStore.put(1, "another on-time record", RETENTION_PERIOD + 1); final Map metrics = context.metrics().metrics();