KAFKA-16302: Remove check for log message that is no longer present (fix builds) (#15422)

a3528a3 removed this log but not the test asserting it.

Builds are currently red because for some reason these tests can't retry. We should address that as a followup.

Reviewers:  Greg Harris <greg.harris@aiven.io>,  Matthias J. Sax <matthias@confluent.io>
This commit is contained in:
Justine Olshan 2024-02-22 17:10:11 -08:00 committed by GitHub
parent a512ef1478
commit 661e41c92f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 27 additions and 51 deletions

View File

@ -45,7 +45,6 @@ import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.apache.kafka.streams.processor.internals.Task.TaskType;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.common.utils.LogCaptureAppender;
import org.apache.kafka.streams.query.Position;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.StateSerdes;
@ -83,7 +82,6 @@ import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.streams.state.internals.WindowKeySchema.timeWindowForSize;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.hasItem;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.MatcherAssert.assertThat;
@ -1579,7 +1577,7 @@ public abstract class AbstractDualSchemaRocksDBSegmentedBytesStoreTest<S extends
@Test
public void shouldLogAndMeasureExpiredRecords() {
public void shouldMeasureExpiredRecords() {
final Properties streamsConfig = StreamsTestUtils.getStreamsConfig();
final AbstractDualSchemaRocksDBSegmentedBytesStore<S> bytesStore = getBytesStore();
final InternalMockProcessorContext context = new InternalMockProcessorContext(
@ -1590,7 +1588,6 @@ public abstract class AbstractDualSchemaRocksDBSegmentedBytesStoreTest<S extends
context.setSystemTimeMs(time.milliseconds());
bytesStore.init((StateStoreContext) context, bytesStore);
try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister()) {
// write a record to advance stream time, with a high enough timestamp
// that the subsequent record in windows[0] will already be expired.
bytesStore.put(serializeKey(new Windowed<>("dummy", nextSegmentWindow)), serializeValue(0));
@ -1599,10 +1596,6 @@ public abstract class AbstractDualSchemaRocksDBSegmentedBytesStoreTest<S extends
final byte[] value = serializeValue(5);
bytesStore.put(key, value);
final List<String> messages = appender.getMessages();
assertThat(messages, hasItem("Skipping record for expired segment."));
}
final Map<MetricName, ? extends Metric> metrics = context.metrics().metrics();
final String threadId = Thread.currentThread().getName();
final Metric dropTotal;

View File

@ -27,7 +27,6 @@ import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.LogCaptureAppender;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.SystemTime;
@ -81,7 +80,6 @@ import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.streams.state.internals.WindowKeySchema.timeWindowForSize;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.hasItem;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.hasEntry;
@ -784,7 +782,7 @@ public abstract class AbstractRocksDBSegmentedBytesStoreTest<S extends Segment>
@Test
public void shouldLogAndMeasureExpiredRecords() {
public void shouldMeasureExpiredRecords() {
final Properties streamsConfig = StreamsTestUtils.getStreamsConfig();
final AbstractRocksDBSegmentedBytesStore<S> bytesStore = getBytesStore();
final InternalMockProcessorContext context = new InternalMockProcessorContext(
@ -795,7 +793,6 @@ public abstract class AbstractRocksDBSegmentedBytesStoreTest<S extends Segment>
context.setSystemTimeMs(time.milliseconds());
bytesStore.init((StateStoreContext) context, bytesStore);
try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister()) {
// write a record to advance stream time, with a high enough timestamp
// that the subsequent record in windows[0] will already be expired.
bytesStore.put(serializeKey(new Windowed<>("dummy", nextSegmentWindow)), serializeValue(0));
@ -804,10 +801,6 @@ public abstract class AbstractRocksDBSegmentedBytesStoreTest<S extends Segment>
final byte[] value = serializeValue(5);
bytesStore.put(key, value);
final List<String> messages = appender.getMessages();
assertThat(messages, hasItem("Skipping record for expired segment."));
}
final Map<MetricName, ? extends Metric> metrics = context.metrics().metrics();
final String threadId = Thread.currentThread().getName();
final Metric dropTotal;

View File

@ -793,7 +793,7 @@ public abstract class AbstractSessionBytesStoreTest {
}
@Test
public void shouldLogAndMeasureExpiredRecords() {
public void shouldMeasureExpiredRecords() {
final Properties streamsConfig = StreamsTestUtils.getStreamsConfig();
final SessionStore<String, Long> sessionStore = buildSessionStore(RETENTION_PERIOD, Serdes.String(), Serdes.Long());
final InternalMockProcessorContext context = new InternalMockProcessorContext(
@ -806,7 +806,6 @@ public abstract class AbstractSessionBytesStoreTest {
context.setSystemTimeMs(time.milliseconds());
sessionStore.init((StateStoreContext) context, sessionStore);
try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister()) {
// Advance stream time by inserting record with large enough timestamp that records with timestamp 0 are expired
// Note that rocksdb will only expire segments at a time (where segment interval = 60,000 for this retention period)
sessionStore.put(new Windowed<>("initial record", new SessionWindow(0, 2 * SEGMENT_INTERVAL)), 0L);
@ -815,10 +814,6 @@ public abstract class AbstractSessionBytesStoreTest {
sessionStore.put(new Windowed<>("late record", new SessionWindow(0, 0)), 0L);
sessionStore.put(new Windowed<>("another on-time record", new SessionWindow(0, 2 * SEGMENT_INTERVAL)), 0L);
final List<String> messages = appender.getMessages();
assertThat(messages, hasItem("Skipping record for expired segment."));
}
final Map<MetricName, ? extends Metric> metrics = context.metrics().metrics();
final String threadId = Thread.currentThread().getName();
final Metric dropTotal;

View File

@ -984,7 +984,7 @@ public abstract class AbstractWindowBytesStoreTest {
}
@Test
public void shouldLogAndMeasureExpiredRecords() {
public void shouldMeasureExpiredRecords() {
final Properties streamsConfig = StreamsTestUtils.getStreamsConfig();
final WindowStore<Integer, String> windowStore =
buildWindowStore(RETENTION_PERIOD, WINDOW_SIZE, false, Serdes.Integer(), Serdes.String());
@ -998,7 +998,6 @@ public abstract class AbstractWindowBytesStoreTest {
context.setTime(1L);
windowStore.init((StateStoreContext) context, windowStore);
try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister()) {
// Advance stream time by inserting record with large enough timestamp that records with timestamp 0 are expired
windowStore.put(1, "initial record", 2 * RETENTION_PERIOD);
@ -1006,10 +1005,6 @@ public abstract class AbstractWindowBytesStoreTest {
windowStore.put(1, "late record", 0L);
windowStore.put(1, "another on-time record", RETENTION_PERIOD + 1);
final List<String> messages = appender.getMessages();
assertThat(messages, hasItem("Skipping record for expired segment."));
}
final Map<MetricName, ? extends Metric> metrics = context.metrics().metrics();
final String threadId = Thread.currentThread().getName();