KAFKA-8289: Fix Session Expiration and Suppression (#6654)

Fix two problems in Streams:

* Session windows expired prematurely (off-by-one error), since the window end is inclusive, unlike other windows
* Suppress duration for sessions incorrectly waited only the grace period, but session windows aren't closed until gracePeriod + sessionGap

Update the tests accordingly

Reviewers: A. Sophie Blee-Goldman <sophie@confluent.io>, Boyang Chen <boyang@confluent.io>, Matthias J. Sax <matthias@confluent.io>, Bill Bejeck <bill@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
This commit is contained in:
John Roesler 2019-05-02 19:44:53 -05:00 committed by Guozhang Wang
parent a4f7675db1
commit eab855541a
5 changed files with 128 additions and 49 deletions

View File

@ -133,19 +133,7 @@ public class KStreamSessionWindowAggregate<K, V, Agg> implements KStreamAggProce
}
}
if (mergedWindow.end() > closeTime) {
if (!mergedWindow.equals(newSessionWindow)) {
for (final KeyValue<Windowed<K>, Agg> session : merged) {
store.remove(session.key);
tupleForwarder.maybeForward(session.key, null, sendOldValues ? session.value : null);
}
}
agg = aggregator.apply(key, value, agg);
final Windowed<K> sessionKey = new Windowed<>(key, mergedWindow);
store.put(sessionKey, agg);
tupleForwarder.maybeForward(sessionKey, agg, null);
} else {
if (mergedWindow.end() < closeTime) {
LOG.debug(
"Skipping record for expired window. " +
"key=[{}] " +
@ -153,7 +141,7 @@ public class KStreamSessionWindowAggregate<K, V, Agg> implements KStreamAggProce
"partition=[{}] " +
"offset=[{}] " +
"timestamp=[{}] " +
"window=[{},{}) " +
"window=[{},{}] " +
"expiration=[{}] " +
"streamTime=[{}]",
key,
@ -167,6 +155,18 @@ public class KStreamSessionWindowAggregate<K, V, Agg> implements KStreamAggProce
observedStreamTime
);
lateRecordDropSensor.record();
} else {
if (!mergedWindow.equals(newSessionWindow)) {
for (final KeyValue<Windowed<K>, Agg> session : merged) {
store.remove(session.key);
tupleForwarder.maybeForward(session.key, null, sendOldValues ? session.value : null);
}
}
agg = aggregator.apply(key, value, agg);
final Windowed<K> sessionKey = new Windowed<>(key, mergedWindow);
store.put(sessionKey, agg);
tupleForwarder.maybeForward(sessionKey, agg, null);
}
}
}

View File

@ -78,7 +78,7 @@ public final class GraphGraceSearchUtil {
} else if (processorSupplier instanceof KStreamSessionWindowAggregate) {
final KStreamSessionWindowAggregate kStreamSessionWindowAggregate = (KStreamSessionWindowAggregate) processorSupplier;
final SessionWindows windows = kStreamSessionWindowAggregate.windows();
return windows.gracePeriodMs();
return windows.gracePeriodMs() + windows.inactivityGap();
} else {
return null;
}

View File

@ -329,11 +329,11 @@ public class KStreamSessionWindowAggregateProcessorTest {
}
@Test
public void shouldLogAndMeterWhenSkippingLateRecord() {
public void shouldLogAndMeterWhenSkippingLateRecordWithZeroGrace() {
LogCaptureAppender.setClassLoggerToDebug(KStreamSessionWindowAggregate.class);
final LogCaptureAppender appender = LogCaptureAppender.createAndRegister();
final Processor<String, String> processor = new KStreamSessionWindowAggregate<>(
SessionWindows.with(ofMillis(10L)).grace(ofMillis(10L)),
SessionWindows.with(ofMillis(10L)).grace(ofMillis(0L)),
STORE_NAME,
initializer,
aggregator,
@ -343,14 +343,21 @@ public class KStreamSessionWindowAggregateProcessorTest {
initStore(false);
processor.init(context);
// dummy record to advance stream time
context.setRecordContext(new ProcessorRecordContext(20, -2, -3, "topic", null));
// dummy record to establish stream time = 0
context.setRecordContext(new ProcessorRecordContext(0, -2, -3, "topic", null));
processor.process("dummy", "dummy");
// record arrives on time, should not be skipped
context.setRecordContext(new ProcessorRecordContext(0, -2, -3, "topic", null));
processor.process("A", "1");
processor.process("OnTime1", "1");
// dummy record to advance stream time = 1
context.setRecordContext(new ProcessorRecordContext(1, -2, -3, "topic", null));
processor.process("A", "1");
processor.process("dummy", "dummy");
// record is late
context.setRecordContext(new ProcessorRecordContext(0, -2, -3, "topic", null));
processor.process("Late1", "1");
LogCaptureAppender.unregister(appender);
final MetricName dropMetric = new MetricName(
@ -364,7 +371,7 @@ public class KStreamSessionWindowAggregateProcessorTest {
)
);
assertThat(metrics.metrics().get(dropMetric).metricValue(), is(2.0));
assertThat(metrics.metrics().get(dropMetric).metricValue(), is(1.0));
final MetricName dropRate = new MetricName(
"late-record-drop-rate",
@ -382,9 +389,80 @@ public class KStreamSessionWindowAggregateProcessorTest {
greaterThan(0.0));
assertThat(
appender.getMessages(),
hasItem("Skipping record for expired window. key=[A] topic=[topic] partition=[-3] offset=[-2] timestamp=[0] window=[0,0) expiration=[10] streamTime=[20]"));
hasItem("Skipping record for expired window. key=[Late1] topic=[topic] partition=[-3] offset=[-2] timestamp=[0] window=[0,0] expiration=[1] streamTime=[1]"));
}
@Test
public void shouldLogAndMeterWhenSkippingLateRecordWithNonzeroGrace() {
LogCaptureAppender.setClassLoggerToDebug(KStreamSessionWindowAggregate.class);
final LogCaptureAppender appender = LogCaptureAppender.createAndRegister();
final Processor<String, String> processor = new KStreamSessionWindowAggregate<>(
SessionWindows.with(ofMillis(10L)).grace(ofMillis(1L)),
STORE_NAME,
initializer,
aggregator,
sessionMerger
).get();
initStore(false);
processor.init(context);
// dummy record to establish stream time = 0
context.setRecordContext(new ProcessorRecordContext(0, -2, -3, "topic", null));
processor.process("dummy", "dummy");
// record arrives on time, should not be skipped
context.setRecordContext(new ProcessorRecordContext(0, -2, -3, "topic", null));
processor.process("OnTime1", "1");
// dummy record to advance stream time = 1
context.setRecordContext(new ProcessorRecordContext(1, -2, -3, "topic", null));
processor.process("dummy", "dummy");
// delayed record arrives on time, should not be skipped
context.setRecordContext(new ProcessorRecordContext(0, -2, -3, "topic", null));
processor.process("OnTime2", "1");
// dummy record to advance stream time = 2
context.setRecordContext(new ProcessorRecordContext(2, -2, -3, "topic", null));
processor.process("dummy", "dummy");
// delayed record arrives late
context.setRecordContext(new ProcessorRecordContext(0, -2, -3, "topic", null));
processor.process("Late1", "1");
LogCaptureAppender.unregister(appender);
final MetricName dropMetric = new MetricName(
"late-record-drop-total",
"stream-processor-node-metrics",
"The total number of occurrence of late-record-drop operations.",
mkMap(
mkEntry("client-id", "test"),
mkEntry("task-id", "0_0"),
mkEntry("processor-node-id", "TESTING_NODE")
)
);
assertThat(metrics.metrics().get(dropMetric).metricValue(), is(1.0));
final MetricName dropRate = new MetricName(
"late-record-drop-rate",
"stream-processor-node-metrics",
"The average number of occurrence of late-record-drop operations.",
mkMap(
mkEntry("client-id", "test"),
mkEntry("task-id", "0_0"),
mkEntry("processor-node-id", "TESTING_NODE")
)
);
assertThat(
(Double) metrics.metrics().get(dropRate).metricValue(),
greaterThan(0.0));
assertThat(
appender.getMessages(),
hasItem("Skipping record for expired window. key=[A] topic=[topic] partition=[-3] offset=[-2] timestamp=[1] window=[1,1) expiration=[10] streamTime=[20]"));
hasItem("Skipping record for expired window. key=[Late1] topic=[topic] partition=[-3] offset=[-2] timestamp=[0] window=[0,0] expiration=[1] streamTime=[2]"));
}
}

View File

@ -47,7 +47,6 @@ import org.apache.kafka.streams.test.OutputVerifier;
import org.apache.kafka.test.TestUtils;
import org.junit.Test;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedList;
@ -72,9 +71,9 @@ public class SuppressScenarioTest {
private static final Serde<String> STRING_SERDE = Serdes.String();
private static final LongDeserializer LONG_DESERIALIZER = new LongDeserializer();
private final Properties config = Utils.mkProperties(Utils.mkMap(
Utils.mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, getClass().getSimpleName().toLowerCase(Locale.getDefault())),
Utils.mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()),
Utils.mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "bogus")
Utils.mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, getClass().getSimpleName().toLowerCase(Locale.getDefault())),
Utils.mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()),
Utils.mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "bogus")
));
@Test
@ -304,7 +303,7 @@ public class SuppressScenarioTest {
.count();
valueCounts
// this is a bit brittle, but I happen to know that the entries are a little over 100 bytes in size.
.suppress(untilTimeLimit(Duration.ofMillis(Long.MAX_VALUE), maxBytes(200L).emitEarlyWhenFull()))
.suppress(untilTimeLimit(ofMillis(Long.MAX_VALUE), maxBytes(200L).emitEarlyWhenFull()))
.toStream()
.to("output-suppressed", Produced.with(STRING_SERDE, Serdes.Long()));
valueCounts
@ -352,7 +351,6 @@ public class SuppressScenarioTest {
}
}
@SuppressWarnings("deprecation")
@Test
public void shouldSupportFinalResultsForTimeWindows() {
final StreamsBuilder builder = new StreamsBuilder();
@ -403,7 +401,6 @@ public class SuppressScenarioTest {
}
}
@SuppressWarnings("deprecation")
@Test
public void shouldSupportFinalResultsForTimeWindowsWithLargeJump() {
final StreamsBuilder builder = new StreamsBuilder();
@ -465,7 +462,7 @@ public class SuppressScenarioTest {
final KTable<Windowed<String>, Long> valueCounts = builder
.stream("input", Consumed.with(STRING_SERDE, STRING_SERDE))
.groupBy((String k, String v) -> k, Grouped.with(STRING_SERDE, STRING_SERDE))
.windowedBy(SessionWindows.with(ofMillis(5L)).grace(ofMillis(5L)))
.windowedBy(SessionWindows.with(ofMillis(5L)).grace(ofMillis(0L)))
.count(Materialized.<String, Long, SessionStore<Bytes, byte[]>>as("counts").withCachingDisabled());
valueCounts
.suppress(untilWindowCloses(unbounded()))
@ -482,34 +479,38 @@ public class SuppressScenarioTest {
try (final TopologyTestDriver driver = new TopologyTestDriver(topology, config)) {
// first window
driver.pipeInput(recordFactory.create("input", "k1", "v1", 0L));
driver.pipeInput(recordFactory.create("input", "k1", "v1", 5L));
// arbitrarily disordered records are admitted, because the *window* is not closed until stream-time > window-end + grace
driver.pipeInput(recordFactory.create("input", "k1", "v1", 1L));
// new window
driver.pipeInput(recordFactory.create("input", "k1", "v1", 7L));
// any record in the same partition advances stream time (note the key is different)
driver.pipeInput(recordFactory.create("input", "k2", "v1", 6L));
// late event for first window - this should get dropped from all streams, since the first window is now closed.
driver.pipeInput(recordFactory.create("input", "k1", "v1", 1L));
driver.pipeInput(recordFactory.create("input", "k1", "v1", 5L));
// just pushing stream time forward to flush the other events through.
driver.pipeInput(recordFactory.create("input", "k1", "v1", 30L));
verify(
drainProducerRecords(driver, "output-raw", STRING_DESERIALIZER, LONG_DESERIALIZER),
asList(
new KeyValueTimestamp<>("[k1@0/0]", 1L, 0L),
new KeyValueTimestamp<>("[k1@0/0]", null, 1L),
new KeyValueTimestamp<>("[k1@0/1]", 2L, 1L),
new KeyValueTimestamp<>("[k1@7/7]", 1L, 7L),
new KeyValueTimestamp<>("[k1@0/0]", null, 5L),
new KeyValueTimestamp<>("[k1@0/5]", 2L, 5L),
new KeyValueTimestamp<>("[k1@0/5]", null, 1L),
new KeyValueTimestamp<>("[k1@0/5]", 3L, 1L),
new KeyValueTimestamp<>("[k2@6/6]", 1L, 6L),
new KeyValueTimestamp<>("[k1@30/30]", 1L, 30L)
)
);
verify(
drainProducerRecords(driver, "output-suppressed", STRING_DESERIALIZER, LONG_DESERIALIZER),
asList(
new KeyValueTimestamp<>("[k1@0/1]", 2L, 1L),
new KeyValueTimestamp<>("[k1@7/7]", 1L, 7L)
new KeyValueTimestamp<>("[k1@0/5]", 3L, 1L),
new KeyValueTimestamp<>("[k2@6/6]", 1L, 6L)
)
);
}
}
private <K, V> void verify(final List<ProducerRecord<K, V>> results, final List<KeyValueTimestamp<K, V>> expectedResults) {
private static <K, V> void verify(final List<ProducerRecord<K, V>> results, final List<KeyValueTimestamp<K, V>> expectedResults) {
if (results.size() != expectedResults.size()) {
throw new AssertionError(printRecords(results) + " != " + expectedResults);
}
@ -524,7 +525,7 @@ public class SuppressScenarioTest {
}
}
private <K, V> List<ProducerRecord<K, V>> drainProducerRecords(final TopologyTestDriver driver, final String topic, final Deserializer<K> keyDeserializer, final Deserializer<V> valueDeserializer) {
private static <K, V> List<ProducerRecord<K, V>> drainProducerRecords(final TopologyTestDriver driver, final String topic, final Deserializer<K> keyDeserializer, final Deserializer<V> valueDeserializer) {
final List<ProducerRecord<K, V>> result = new LinkedList<>();
for (ProducerRecord<K, V> next = driver.readOutput(topic, keyDeserializer, valueDeserializer);
next != null;
@ -534,11 +535,11 @@ public class SuppressScenarioTest {
return new ArrayList<>(result);
}
private <K, V> String printRecords(final List<ProducerRecord<K, V>> result) {
private static <K, V> String printRecords(final List<ProducerRecord<K, V>> result) {
final StringBuilder resultStr = new StringBuilder();
resultStr.append("[\n");
for (final ProducerRecord<?, ?> record : result) {
resultStr.append(" ").append(record.toString()).append("\n");
resultStr.append(" ").append(record).append("\n");
}
resultStr.append("]");
return resultStr.toString();

View File

@ -118,11 +118,11 @@ public class GraphGraceSearchUtilTest {
);
final long extracted = GraphGraceSearchUtil.findAndVerifyWindowGrace(node);
assertThat(extracted, is(windows.gracePeriodMs()));
assertThat(extracted, is(windows.gracePeriodMs() + windows.inactivityGap()));
}
@Test
public void shouldExtractGraceFromAncestorThroughStatefulParent() {
public void shouldExtractGraceFromSessionAncestorThroughStatefulParent() {
final SessionWindows windows = SessionWindows.with(ofMillis(10L)).grace(ofMillis(1234L));
final StatefulProcessorNode<String, Long> graceGrandparent = new StatefulProcessorNode<>(
"asdf",
@ -155,11 +155,11 @@ public class GraphGraceSearchUtilTest {
statefulParent.addChild(node);
final long extracted = GraphGraceSearchUtil.findAndVerifyWindowGrace(node);
assertThat(extracted, is(windows.gracePeriodMs()));
assertThat(extracted, is(windows.gracePeriodMs() + windows.inactivityGap()));
}
@Test
public void shouldExtractGraceFromAncestorThroughStatelessParent() {
public void shouldExtractGraceFromSessionAncestorThroughStatelessParent() {
final SessionWindows windows = SessionWindows.with(ofMillis(10L)).grace(ofMillis(1234L));
final StatefulProcessorNode<String, Long> graceGrandparent = new StatefulProcessorNode<>(
"asdf",
@ -183,7 +183,7 @@ public class GraphGraceSearchUtilTest {
statelessParent.addChild(node);
final long extracted = GraphGraceSearchUtil.findAndVerifyWindowGrace(node);
assertThat(extracted, is(windows.gracePeriodMs()));
assertThat(extracted, is(windows.gracePeriodMs() + windows.inactivityGap()));
}
@Test